Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-41706: [C++][Acero] Enhance asof_join to work in multi-threaded execution by sequencing input #44083

Merged
merged 5 commits into from
Oct 29, 2024

Conversation

mroz45
Copy link
Contributor

@mroz45 mroz45 commented Sep 12, 2024

Rationale for this change

This is initial PR. I found that with specyfics parameters test fails.

What changes are included in this PR?

In this PR I provoke fail of asof_join_node_test.

Copy link

Thanks for opening a pull request!

If this is not a minor PR. Could you open an issue for this pull request on GitHub? https://github.com/apache/arrow/issues/new/choose

Opening GitHub issues ahead of time contributes to the Openness of the Apache Arrow project.

Then could you also rename the pull request title in the following format?

GH-${GITHUB_ISSUE_ID}: [${COMPONENT}] ${SUMMARY}

or

MINOR: [${COMPONENT}] ${SUMMARY}

In the case of PARQUET issues on JIRA the title also supports:

PARQUET-${JIRA_ISSUE_ID}: [${COMPONENT}] ${SUMMARY}

See also:

@mroz45 mroz45 changed the title Modify asof_join_node test to provoke Issue 41706 GH-41706: [C++][Acero] Modify asof_join_node test to provoke Issue 41706 Sep 16, 2024
Copy link

⚠️ GitHub issue #41706 has been automatically assigned in GitHub to PR creator.

@mroz45 mroz45 changed the title GH-41706: [C++][Acero] Modify asof_join_node test to provoke Issue 41706 GH-41706: [C++][Acero] Modify asof_join_node test to provoke issue Sep 16, 2024
Copy link

⚠️ GitHub issue #41706 has been automatically assigned in GitHub to PR creator.

Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've let CI run, there are some formatting changes needed. Otherwise, this seems straightforward enough.

The most significant change would probably be the breaking change that the asof join node now requires sequenced input. However, I feel like this was always true in theory.

Perhaps @zanmato1984 @ZhangHuiGui @icexelloss or @rtpsw have thoughts? I'm not certain who all is still using this node.

cpp/src/arrow/acero/asof_join_node_test.cc Outdated Show resolved Hide resolved
cpp/src/arrow/acero/source_node.cc Outdated Show resolved Hide resolved
bool require_sequenced_output = false)
bool require_sequenced_output = true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing this default would be a breaking change and I'm not certain it's warranted.

Comment on lines 1415 to 1416
if(::arrow::compute::kUnsequencedIndex == batch.index)
return Status::Invalid("AsofJoin requires sequenced input");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be a breaking change I believe. However, it does seem accurate that this is required.

@github-actions github-actions bot added awaiting review Awaiting review awaiting changes Awaiting changes and removed awaiting review Awaiting review labels Oct 6, 2024
@westonpace
Copy link
Member

This CI failure seems relevant:

FAILED: src/arrow/acero/CMakeFiles/arrow_acero_objlib.dir/asof_join_node.cc.o 
/usr/bin/ccache /usr/lib/ccache/clang++-14 -DADDRESS_SANITIZER -DARROW_ACERO_EXPORTING -DARROW_HAVE_RUNTIME_AVX2 -DARROW_HAVE_RUNTIME_AVX512 -DARROW_HAVE_RUNTIME_BMI2 -DARROW_HAVE_RUNTIME_SSE4_2 -DARROW_HAVE_SSE4_2 -DARROW_UBSAN -DJSON_DIAGNOSTICS=0 -DJSON_USE_IMPLICIT_CONVERSIONS=1 -I/build/cpp/src -I/arrow/cpp/src -I/arrow/cpp/src/generated -isystem /build/cpp/opentelemetry_ep-install/include -Qunused-arguments -fcolor-diagnostics  -Wall -Wextra -Wdocumentation -DARROW_WARN_DOCUMENTATION -Wshorten-64-to-32 -Wno-missing-braces -Wno-unused-parameter -Wno-constant-logical-operand -Wno-return-stack-address -Wdate-time -Wno-unknown-warning-option -Wno-pass-failed -msse4.2  -fsanitize=address -DADDRESS_SANITIZER -fsanitize=undefined -fno-sanitize=alignment,vptr,function,float-divide-by-zero -fno-sanitize-recover=all -fsanitize-coverage=pc-table,inline-8bit-counters,edge,no-prune,trace-cmp,trace-div,trace-gep -fsanitize-blacklist=/arrow/cpp/build-support/sanitizer-disallowed-entries.txt -g -Werror -O0 -ggdb -g1 -fPIC   -fsanitize-coverage=pc-table,inline-8bit-counters,edge,no-prune,trace-cmp,trace-div,trace-gep -std=c++17 -MD -MT src/arrow/acero/CMakeFiles/arrow_acero_objlib.dir/asof_join_node.cc.o -MF src/arrow/acero/CMakeFiles/arrow_acero_objlib.dir/asof_join_node.cc.o.d -o src/arrow/acero/CMakeFiles/arrow_acero_objlib.dir/asof_join_node.cc.o -c /arrow/cpp/src/arrow/acero/asof_join_node.cc
In file included from /arrow/cpp/src/arrow/acero/asof_join_node.cc:18:
In file included from /arrow/cpp/src/arrow/acero/asof_join_node.h:20:
In file included from /arrow/cpp/src/arrow/acero/options.h:21:
In file included from /usr/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/memory:76:
/usr/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/unique_ptr.h:85:2: error: delete called on non-final 'arrow::acero::InputState' that has virtual functions but non-virtual destructor [-Werror,-Wdelete-non-abstract-non-virtual-dtor]
        delete __ptr;
        ^
/usr/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/unique_ptr.h:361:4: note: in instantiation of member function 'std::default_delete<arrow::acero::InputState>::operator()' requested here
          get_deleter()(std::move(__ptr));
          ^
/arrow/cpp/src/arrow/acero/asof_join_node.cc:518:12: note: in instantiation of member function 'std::unique_ptr<arrow::acero::InputState>::~unique_ptr' requested here
    return std::make_unique<InputState>(index, tolerance, must_hash, may_rehash,
           ^
1 error generated.

@gitmodimo
Copy link

Let me pitch in. Disclaimer I am working @mroz45 on the same project using arrow.

                           -bool require_sequenced_output = false)
                           +bool require_sequenced_output = true)

Changing this default would be a breaking change and I'm not certain it's warranted.

Without it python tests are failing.

Should this only be Ordering::Implicit if require_sequenced_output is set?

It requires deeper and breaking changes which I think are necessary.
require_sequenced_output - means the source should give implicit ordering to produces batches ant therefore require_sequenced_output should be moved from ScanNodeOptions to ScanOptions to allow pass this option to ScannerBuilder which is used by python. But in fact I think there should be unified way to assert implicit ordering in all source nodes. Or maybe the need for implicit ordering should propagate from nodes that need ordering (asof_join, fetch etc.) down the line to source nodes (and maybe fail if the source node cannot provide it). There are few related issues:
no standardized sorting information
add ordering information to exec batches
Add AsofJoin Ordering Assertion

This issue gave me the idea that implicit ordering should be asserted by default. And additional source node/additional option to assert no ordering - to enable some performance optimization for "don't care" ordering cases. This would fix those issues:
asof_join node not working propertly
order is unstable
Preserve order when writing dataset
ordering is weird
dataset not preserving ordering
scan node not asserting ordering

We are willing to contribute to fix ordering issue within acero but we have next to none experience in python/Cython. Also the size of the issue seems to grow with every little change. I think the ordering in Acero is a little bigger topic to discuss.

@github-actions github-actions bot added Component: Python awaiting review Awaiting review and removed awaiting review Awaiting review awaiting changes Awaiting changes labels Oct 16, 2024
@mroz45 mroz45 requested a review from wgtmac as a code owner October 16, 2024 12:59
@gitmodimo
Copy link

The addition of ordering to python was more straightforward than I initially anticipated. Current version of pull request should fix at least some of above-mentioned python issues. Please review python changes carefully since it was our first contact with cython.

PS. Can we also Export SequencingQueue and SerialSequencingQueue
This is useful in implementing custom nodes that require ordering. It seems reasonable to export it for custom nodes developers. Or maybe I should create separate pull request?

Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for continuing to work on this and thank you for figuring out how to avoid the breaking change for now.

Or maybe the need for implicit ordering should propagate from nodes that need ordering (asof_join, fetch etc.) down the line to source nodes (and maybe fail if the source node cannot provide it)

This seems reasonable but might be a bit complex.

This #27651 gave me the idea that implicit ordering should be asserted by default. And additional source node/additional option to assert no ordering - to enable some performance optimization for "don't care" ordering cases. This would fix those issues:

This seems logical to me. Make the default safe but slow and add a potentially dangerous "I know what I'm doing" option to speed it up but still keep the responsibility on the user to figure out if that speedup is warranted.

I think it's probably pretty safe to change the defaults in things like acero.py. My only concern would be changing defaults lower level in C++ which might change defaults in things like dataset.py which uses Acero and has a much wider user base than acero.py. However, it may be that the usage of Acero from datasets already sets this parameter in all cases and doesn't rely on defaults (it's been too long since I've really dug in). So if we feel that is the case then changing defaults lower down is probably fine too.

I'm +1 on this change provided CI gets to a good state. Feel free to @ me on this issue if you make any changes and need CI re-run.

@github-actions github-actions bot added awaiting merge Awaiting merge and removed awaiting review Awaiting review labels Oct 29, 2024
@gitmodimo
Copy link

It looks like all checks passed. There are probably few issues (like this, this maybe ) that should be verified and closed by this pull request. I think some of this work also overlaps with GH-26818

@westonpace westonpace changed the title GH-41706: [C++][Acero] Modify asof_join_node test to provoke issue GH-41706: [C++][Acero] Enhance asof_join to work in multi-threaded execution by sequencing input Oct 29, 2024
@westonpace westonpace merged commit 2a0f06c into apache:main Oct 29, 2024
41 checks passed
@westonpace westonpace removed the awaiting merge Awaiting merge label Oct 29, 2024
Copy link

⚠️ GitHub issue #41706 has been automatically assigned in GitHub to PR creator.

@westonpace
Copy link
Member

@gitmodimo thanks for the help getting this sorted out!

@gitmodimo
Copy link

@gitmodimo thanks for the help getting this sorted out!

My pleasure. Me and @mroz45 have few more enhancements for upstreaming. We will post them soon.

Copy link

After merging your PR, Conbench analyzed the 3 benchmarking runs that have been run so far on merge-commit 2a0f06c.

There were no benchmark performance regressions. 🎉

The full Conbench report has more details.

@@ -1057,9 +1057,12 @@ Result<acero::ExecNode*> MakeScanNode(acero::ExecPlan* plan,
batch->values.emplace_back(partial.record_batch.index);
batch->values.emplace_back(partial.record_batch.last);
batch->values.emplace_back(partial.fragment.value->ToString());
if (index != compute::kUnsequencedIndex) batch->index = index++;
return batch;
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like you are adding the batch index here, which is already done by the SourceNode when ordering = Ordering::Implicit() (see line 153-155):

plan_->query_context()->ScheduleTask(
[this, morsel_length, use_legacy_batching, initial_batch_index, morsel,
has_ordering = !ordering_.is_unordered()]() {
int64_t offset = 0;
int batch_index = initial_batch_index;
do {
int64_t batch_size =
std::min<int64_t>(morsel_length - offset, ExecPlan::kMaxBatchSize);
// In order for the legacy batching model to work we must
// not slice batches from the source
if (use_legacy_batching) {
batch_size = morsel_length;
}
ExecBatch batch = morsel.Slice(offset, batch_size);
UnalignedBufferHandling unaligned_buffer_handling =
plan_->query_context()->options().unaligned_buffer_handling.value_or(
GetDefaultUnalignedBufferHandling());
ARROW_RETURN_NOT_OK(
HandleUnalignedBuffers(&batch, unaligned_buffer_handling));
if (has_ordering) {
batch.index = batch_index;
}
offset += batch_size;
batch_index++;
ARROW_RETURN_NOT_OK(output_->InputReceived(this, std::move(batch)));
} while (offset < morsel.length);
return Status::OK();
},
"SourceNode::ProcessMorsel");

This seems redundant.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems so. I first added indexing and then fixed ordering information propagation to source_node. Not sure it is possible to update already merged PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not possible, I'll draft a followup PR, if you don't mind.

@github-actions github-actions bot added the awaiting committer review Awaiting committer review label Oct 31, 2024
@@ -406,7 +407,7 @@ struct SchemaSourceNode : public SourceNode {
struct RecordBatchReaderSourceNode : public SourceNode {
RecordBatchReaderSourceNode(ExecPlan* plan, std::shared_ptr<Schema> schema,
arrow::AsyncGenerator<std::optional<ExecBatch>> generator)
: SourceNode(plan, schema, generator) {}
: SourceNode(plan, schema, generator, Ordering::Implicit()) {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this a bit too restrictive? This means all RecordBatch reader source always preserve the implicit order, even if if users do not care? Shouldn't this be configurable? What about other sources? Shouldn't the behaviour be the same for any source?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably option to specify ordering would be best. Or maybe automatically assign implicit ordering based on
batch.index!=compute::kUnsequencedIndex?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate on why your asof_join enhancement requires specifically the RecordBatchReaderSrouceNode to have implicit ordering?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not. I assumed that when user provides a generator, it has some kind of implicot order. Now I see it is not necessarily a case.

right_source = _dataset_to_decl(right_operand, use_threads=use_threads)
right_source = _dataset_to_decl(
right_operand, use_threads=use_threads,
require_sequenced_output=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you really need the dataset to be sequenced? I think all you need is implicit ordering (batch indices) so you can sequence the batches at any point down the pipeline (by your asof join node).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you really need the dataset to be sequenced? I think all you need is implicit ordering (batch indices) so you can sequence the batches at any point down the pipeline (by your asof join node).

True. But I don't think require_sequenced_output actually means batches should be produced by this node in order, but batches need to be sequenced (ie. have batch.index assigned). Current implementation does produce batches in order as a side effect of indexing.

In context of your GH-26818
Maybe the option require_sequenced_output should just be renamed to implicit_ordering?
What would be the purpose of require_sequenced_output!=implicit_ordering?

P.S. I think that the idea of "Dataset has implicit ordering" is not handled in arrow. There is no way to store ordering information in dataset and order is subjected to alphabetical ordering based on fragment filenames(which with default filename template is incorrect part_1 -> part_10 -> part2 -> part_3 etc...).

@EnricoMi
Copy link
Contributor

EnricoMi commented Nov 1, 2024

@gitmodimo I have reworked some of this PR's changes in #44616 and added some of my code from #44470.

Please have a look.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants