-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sort preserving merge (#362) #379
Conversation
datafusion/src/physical_plan/sort.rs
Outdated
@@ -99,11 +99,11 @@ impl ExecutionPlan for SortExec { | |||
|
|||
/// Get the output partitioning of this plan | |||
fn output_partitioning(&self) -> Partitioning { | |||
Partitioning::UnknownPartitioning(1) | |||
self.input.output_partitioning() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the change from #377
ea493f8
to
f090a0a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @tustvold -- I think this is the last missing physical operator we need in DataFusion to start enabling sort based optimizations (e.g. sort-merge-join, etc)
I think this is pretty amazing work -- I am sure there will be more work to optimize this, but I like the overall structure and I think it is looking very cool.
I think we should let at least one other pair of eyes read it carefully so I will hold off on clicking approve until that happens. But from what I can see at this point, this PR is basically ready to go
@@ -113,3 +118,29 @@ fn build_file_list_recurse( | |||
} | |||
Ok(()) | |||
} | |||
|
|||
/// Spawns a task to the tokio threadpool and writes its outputs to the provided mpsc sender | |||
pub(crate) fn spawn_execution( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a nice abstraction (and we can probably use it elsewhere)
Partitioning::UnknownPartitioning(1) | ||
} | ||
|
||
fn required_child_distribution(&self) -> Distribution { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
eventually (not as part of this PR) we should add something like required_child_sort_order
so the operators can report on what sortedness they are assuming.
(true, false) => return Ok(Ordering::Less), | ||
(false, false) => {} | ||
(true, true) => { | ||
// TODO: Building the predicate each time is sub-optimal |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I predicate this line will be the bottleneck of this operator.
However, I feel like getting it in and working and then optimizing as a follow on is the correct course of action in this case.
"+---+---+-------------------------------+", | ||
"| 1 | | 1970-01-01 00:00:00.000000008 |", | ||
"| 1 | | 1970-01-01 00:00:00.000000008 |", | ||
"| 2 | a | |", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In order to cover the nulls_first: false
case for "c"
I think you need several rows here with a tie for a and b, and both a null and non value for c. I didn't see any such cases (though I may have missed it)
Perhaps adding a row like the following would be enough
"| 7 | b | NULL |",
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The sort key is just b
and c
so the lines
"| 7 | b | 1970-01-01 00:00:00.000000006 |",
"| 2 | b | |",
test this?
assert_eq!(basic, partition); | ||
} | ||
|
||
// Split the provided record batch into multiple batch_size record batches |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might be a function that we could add to RecordBatch
itself? I can file a ticket to do so if you would like
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} | ||
|
||
#[tokio::test] | ||
async fn test_partition_sort_streaming_input_output() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this test covers the case where each input stream has more than one RecordBatch
, right (each input partition has three record batches).
Is there any value to another test that has input streams with differing numbers of input batches (I am thinking of an input with 3 partitions: 0 record batches, 1 record batch, and "many" (aka 2 or 3))?
Codecov Report
@@ Coverage Diff @@
## master #379 +/- ##
==========================================
+ Coverage 74.85% 75.39% +0.54%
==========================================
Files 146 148 +2
Lines 24565 25242 +677
==========================================
+ Hits 18387 19031 +644
- Misses 6178 6211 +33
Continue to review full report at Codecov.
|
/// if all cursors for all streams are exhausted | ||
fn next_stream_idx(&self) -> Result<Option<usize>> { | ||
let mut min_cursor: Option<(usize, &SortKeyCursor)> = None; | ||
for (idx, candidate) in self.cursors.iter().enumerate() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For bigger number of partitions, storing the cursors in a BinaryHeap, sorted by their current item, would be beneficial.
A rust implementation of that approach can be seen in this blog post and the first comment under it. I have implemented the same approach in java before. I agree with @alamb though to make it work first, and then optimize later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great suggestion @jhorstmann -- thank you -- I filed #416 so it is more visible
Will rebase to remove merges |
9999e4d
to
29c767b
Compare
This PR appears to need some rebasing / test fixing love: https://github.com/apache/arrow-datafusion/pull/379/checks?check_run_id=2674096854
|
Apologies - I stripped out the merge that fixed the logical conflict 🤦 Pushed a commit that fixes it 😄 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this PR is ready -- thanks again @tustvold
What do you think @Dandandan / @andygrove ? Any objections to merging this (as a step towards a more sorted future in DataFusion)?
/// if all cursors for all streams are exhausted | ||
fn next_stream_idx(&self) -> Result<Option<usize>> { | ||
let mut min_cursor: Option<(usize, &SortKeyCursor)> = None; | ||
for (idx, candidate) in self.cursors.iter().enumerate() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great suggestion @jhorstmann -- thank you -- I filed #416 so it is more visible
I just fixed a merge conflict -- if the tests pass I plan to merge this PR in |
Closes #362.
Creating as draft as currently builds on top of #378 as it uses a partitioned SortExec as part of its tests.
This PR adds a SortPreservingMergeExec operator that allows merging together multiple sorted partitions into a single partition.
The main implementation is contained within SortPreservingMergeStream and SortKeyCursor:
SortKeyCursor
provides the ability to compare the sort keys of the next row that could be yielded for each stream, in order to determine which one to yield.SortPreservingMergeStream
maintains a list ofSortKeyCursor
for each stream and builds up a list of sorted indices identifying rows within these cursors. When it reads the last row of a RecordBatch, it fetches another from the input. Once it has accumulated target_batch_size` row indexes (or exhausted all input streams) it will combine the relevant rows from the buffered RecordBatches into a single RecordBatch, drop any cursors it no longer needs, and yield the batch.