-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-41706: [C++][Acero] Enhance asof_join to work in multi-threaded execution by sequencing input #44083
Conversation
Thanks for opening a pull request! If this is not a minor PR. Could you open an issue for this pull request on GitHub? https://github.com/apache/arrow/issues/new/choose Opening GitHub issues ahead of time contributes to the Openness of the Apache Arrow project. Then could you also rename the pull request title in the following format?
or
In the case of PARQUET issues on JIRA the title also supports:
See also: |
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've let CI run, there are some formatting changes needed. Otherwise, this seems straightforward enough.
The most significant change would probably be the breaking change that the asof join node now requires sequenced input. However, I feel like this was always true in theory.
Perhaps @zanmato1984 @ZhangHuiGui @icexelloss or @rtpsw have thoughts? I'm not certain who all is still using this node.
cpp/src/arrow/dataset/scanner.h
Outdated
bool require_sequenced_output = false) | ||
bool require_sequenced_output = true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changing this default would be a breaking change and I'm not certain it's warranted.
if(::arrow::compute::kUnsequencedIndex == batch.index) | ||
return Status::Invalid("AsofJoin requires sequenced input"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would be a breaking change I believe. However, it does seem accurate that this is required.
This CI failure seems relevant:
|
Let me pitch in. Disclaimer I am working @mroz45 on the same project using arrow.
Without it python tests are failing.
It requires deeper and breaking changes which I think are necessary. This issue gave me the idea that implicit ordering should be asserted by default. And additional source node/additional option to assert no ordering - to enable some performance optimization for "don't care" ordering cases. This would fix those issues: We are willing to contribute to fix ordering issue within acero but we have next to none experience in python/Cython. Also the size of the issue seems to grow with every little change. I think the ordering in Acero is a little bigger topic to discuss. |
The addition of ordering to python was more straightforward than I initially anticipated. Current version of pull request should fix at least some of above-mentioned python issues. Please review python changes carefully since it was our first contact with cython. PS. Can we also Export SequencingQueue and SerialSequencingQueue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for continuing to work on this and thank you for figuring out how to avoid the breaking change for now.
Or maybe the need for implicit ordering should propagate from nodes that need ordering (asof_join, fetch etc.) down the line to source nodes (and maybe fail if the source node cannot provide it)
This seems reasonable but might be a bit complex.
This #27651 gave me the idea that implicit ordering should be asserted by default. And additional source node/additional option to assert no ordering - to enable some performance optimization for "don't care" ordering cases. This would fix those issues:
This seems logical to me. Make the default safe but slow and add a potentially dangerous "I know what I'm doing" option to speed it up but still keep the responsibility on the user to figure out if that speedup is warranted.
I think it's probably pretty safe to change the defaults in things like acero.py
. My only concern would be changing defaults lower level in C++ which might change defaults in things like dataset.py
which uses Acero and has a much wider user base than acero.py
. However, it may be that the usage of Acero from datasets already sets this parameter in all cases and doesn't rely on defaults (it's been too long since I've really dug in). So if we feel that is the case then changing defaults lower down is probably fine too.
I'm +1 on this change provided CI gets to a good state. Feel free to @
me on this issue if you make any changes and need CI re-run.
It looks like all checks passed. There are probably few issues (like this, this maybe ) that should be verified and closed by this pull request. I think some of this work also overlaps with GH-26818 |
|
@gitmodimo thanks for the help getting this sorted out! |
My pleasure. Me and @mroz45 have few more enhancements for upstreaming. We will post them soon. |
After merging your PR, Conbench analyzed the 3 benchmarking runs that have been run so far on merge-commit 2a0f06c. There were no benchmark performance regressions. 🎉 The full Conbench report has more details. |
@@ -1057,9 +1057,12 @@ Result<acero::ExecNode*> MakeScanNode(acero::ExecPlan* plan, | |||
batch->values.emplace_back(partial.record_batch.index); | |||
batch->values.emplace_back(partial.record_batch.last); | |||
batch->values.emplace_back(partial.fragment.value->ToString()); | |||
if (index != compute::kUnsequencedIndex) batch->index = index++; | |||
return batch; | |||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like you are adding the batch index here, which is already done by the SourceNode
when ordering = Ordering::Implicit()
(see line 153-155):
arrow/cpp/src/arrow/acero/source_node.cc
Lines 134 to 162 in 567f9c5
plan_->query_context()->ScheduleTask( | |
[this, morsel_length, use_legacy_batching, initial_batch_index, morsel, | |
has_ordering = !ordering_.is_unordered()]() { | |
int64_t offset = 0; | |
int batch_index = initial_batch_index; | |
do { | |
int64_t batch_size = | |
std::min<int64_t>(morsel_length - offset, ExecPlan::kMaxBatchSize); | |
// In order for the legacy batching model to work we must | |
// not slice batches from the source | |
if (use_legacy_batching) { | |
batch_size = morsel_length; | |
} | |
ExecBatch batch = morsel.Slice(offset, batch_size); | |
UnalignedBufferHandling unaligned_buffer_handling = | |
plan_->query_context()->options().unaligned_buffer_handling.value_or( | |
GetDefaultUnalignedBufferHandling()); | |
ARROW_RETURN_NOT_OK( | |
HandleUnalignedBuffers(&batch, unaligned_buffer_handling)); | |
if (has_ordering) { | |
batch.index = batch_index; | |
} | |
offset += batch_size; | |
batch_index++; | |
ARROW_RETURN_NOT_OK(output_->InputReceived(this, std::move(batch))); | |
} while (offset < morsel.length); | |
return Status::OK(); | |
}, | |
"SourceNode::ProcessMorsel"); |
This seems redundant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems so. I first added indexing and then fixed ordering information propagation to source_node. Not sure it is possible to update already merged PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not possible, I'll draft a followup PR, if you don't mind.
@@ -406,7 +407,7 @@ struct SchemaSourceNode : public SourceNode { | |||
struct RecordBatchReaderSourceNode : public SourceNode { | |||
RecordBatchReaderSourceNode(ExecPlan* plan, std::shared_ptr<Schema> schema, | |||
arrow::AsyncGenerator<std::optional<ExecBatch>> generator) | |||
: SourceNode(plan, schema, generator) {} | |||
: SourceNode(plan, schema, generator, Ordering::Implicit()) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this a bit too restrictive? This means all RecordBatch reader source always preserve the implicit order, even if if users do not care? Shouldn't this be configurable? What about other sources? Shouldn't the behaviour be the same for any source?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably option to specify ordering would be best. Or maybe automatically assign implicit ordering based on
batch.index!=compute::kUnsequencedIndex
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you elaborate on why your asof_join enhancement requires specifically the RecordBatchReaderSrouceNode
to have implicit ordering?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does not. I assumed that when user provides a generator, it has some kind of implicot order. Now I see it is not necessarily a case.
right_source = _dataset_to_decl(right_operand, use_threads=use_threads) | ||
right_source = _dataset_to_decl( | ||
right_operand, use_threads=use_threads, | ||
require_sequenced_output=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you really need the dataset to be sequenced? I think all you need is implicit ordering (batch indices) so you can sequence the batches at any point down the pipeline (by your asof join node).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you really need the dataset to be sequenced? I think all you need is implicit ordering (batch indices) so you can sequence the batches at any point down the pipeline (by your asof join node).
True. But I don't think require_sequenced_output
actually means batches should be produced by this node in order, but batches need to be sequenced (ie. have batch.index assigned). Current implementation does produce batches in order as a side effect of indexing.
In context of your GH-26818
Maybe the option require_sequenced_output
should just be renamed to implicit_ordering
?
What would be the purpose of require_sequenced_output!=implicit_ordering
?
P.S. I think that the idea of "Dataset has implicit ordering" is not handled in arrow. There is no way to store ordering information in dataset and order is subjected to alphabetical ordering based on fragment filenames(which with default filename template is incorrect part_1 -> part_10 -> part2 -> part_3
etc...).
@gitmodimo I have reworked some of this PR's changes in #44616 and added some of my code from #44470. Please have a look. |
Rationale for this change
This is initial PR. I found that with specyfics parameters test fails.
What changes are included in this PR?
In this PR I provoke fail of asof_join_node_test.