-
Notifications
You must be signed in to change notification settings - Fork 1.5k
perf: Introduce sort prefix computation for early TopK exit optimization on partially sorted input (10x speedup on top10 bench) #15563
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
perf: Introduce sort prefix computation for early TopK exit optimization on partially sorted input (10x speedup on top10 bench) #15563
Conversation
b212690
to
5bfb191
Compare
} | ||
|
||
impl TopK { | ||
/// Create a new [`TopK`] that stores the top `k` values, as | ||
/// defined by the sort expressions in `expr`. | ||
// TODO: make a builder or some other nicer API | ||
#[allow(clippy::too_many_arguments)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The TopK
code could benefit from a refactoring for better readability. It's probably grown a bit too much out of it's original compact format... But leaving this out of the scope of this PR.
5bfb191
to
195e2cb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nicely done. Clear use case description with example in the request ticket as well as in this PR which make the review a pleasure. The code is refactored and organized in such an easy way to review. ❤️ 🚀
I only have one request to modify the data of column b
in the unit test to also verify correct result in a different corner case
@@ -1652,7 +1652,7 @@ async fn test_remove_unnecessary_sort7() -> Result<()> { | |||
) as Arc<dyn ExecutionPlan>; | |||
|
|||
let expected_input = [ | |||
"SortExec: TopK(fetch=2), expr=[non_nullable_col@1 ASC], preserve_partitioning=[false]", | |||
"SortExec: TopK(fetch=2), expr=[non_nullable_col@1 ASC], preserve_partitioning=[false], sort_prefix=[non_nullable_col@1 ASC]", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice and clear addition to the explain
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we just keep the common prefix count, it will simplify the displays too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Careful, the expr
is unnormalized, the sort_prefix
is normalized. I agree this is a bit confusing, but normalized_common_sort_prefix
seems a bit too verbose. Any suggestions?
/// Checks whether the given sort requirements are satisfied by any of the | ||
/// existing orderings. | ||
pub fn ordering_satisfy_requirement(&self, reqs: &LexRequirement) -> bool { | ||
self.extract_matching_prefix(reqs).1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have review carefully this mod.rs file. Very nice work to refactor and add functions to get the sorted prefix
emission_type, | ||
boundedness, | ||
), | ||
LexOrdering::from(sort_prefix), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice addition logic in this compute_properties
function
)) | ||
}) | ||
.collect::<Result<_>>()?; | ||
let sort_fields: Vec<_> = build_sort_fields(&expr, &schema)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice refactor
|
||
// Evaluate the prefix for the last row of the current batch. | ||
let last_row_idx = batch.num_rows() - 1; | ||
let mut batch_prefix_scratch = prefix_converter.empty_rows(1, 20); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add comment explaining why 1 and 20? Can they be defined and assigned to constants or, at least variables, and use the variables here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1 here is the number of rows, and 20 is the initial capacity in bytes for storing the raw arrow data that represents the rows (just 1 in this case). This does not mean that only 20 bytes of data can be stored, IIUC this is just the amount of data that can be stored without dynamically resizing the underlaying buffer, but upon storing more data, the underlaying buffer will just get resized.
This might me worth a comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
=> Moved to const ESTIMATED_BYTES_PER_ROW: usize = 20;
explain select column1, column2, column3 from partial_sorted order by column1 desc, column2 asc, column3 desc limit 3; | ||
---- | ||
physical_plan | ||
01)SortExec: TopK(fetch=3), expr=[column1@0 DESC, column2@1 ASC NULLS LAST, column3@2 DESC], preserve_partitioning=[false], sort_prefix=[column1@0 DESC, column2@1 ASC NULLS LAST] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nicely 2-column sort_prefix
explain select column1, column2, column3 from partial_sorted order by column3 desc limit 3; | ||
---- | ||
physical_plan | ||
01)SortExec: TopK(fetch=3), expr=[column3@2 DESC], preserve_partitioning=[false] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 no sort_prefix
explain select column1, column2, column3 from partial_sorted order by column1 desc, column2 desc limit 3; | ||
---- | ||
physical_plan | ||
01)SortExec: TopK(fetch=3), expr=[column1@0 DESC, column2@1 DESC], preserve_partitioning=[false], sort_prefix=[column1@0 DESC] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 one-column sort prefix
DROP TABLE partial_sorted; | ||
|
||
statement ok | ||
set datafusion.explain.physical_plan_only = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice explain tests. 🚀
|
||
// Evaluate the prefix for the last row of the current batch. | ||
let last_row_idx = batch.num_rows() - 1; | ||
let mut batch_prefix_scratch = prefix_converter.empty_rows(1, 20); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1 here is the number of rows, and 20 is the initial capacity in bytes for storing the raw arrow data that represents the rows (just 1 in this case). This does not mean that only 20 bytes of data can be stored, IIUC this is just the amount of data that can be stored without dynamically resizing the underlaying buffer, but upon storing more data, the underlaying buffer will just get resized.
This might me worth a comment.
common_prefix_converter: _, | ||
common_sort_prefix: _, | ||
finished: _, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 I wonder why the spread operator was just not used here on the first place. For consistency it might make more sense to leave it as is.
common_prefix_converter: _, | |
common_sort_prefix: _, | |
finished: _, | |
.. |
# DataFusion should use the common prefix optimization | ||
# and return the correct top 3 rows when ordering by all columns. | ||
query TII | ||
select column1, column2, column3 from partial_sorted order by column1 desc, column2 asc, column3 desc limit 3; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about adding some tests that try to mess around with NULLS LAST
and NULLS FIRST
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @geoffreyclaude, this is a really nice PR. I've some further suggestion and I'm thinking on the same questions with @NGA-TRAN in attempt_early_completion()
code.
@@ -1652,7 +1652,7 @@ async fn test_remove_unnecessary_sort7() -> Result<()> { | |||
) as Arc<dyn ExecutionPlan>; | |||
|
|||
let expected_input = [ | |||
"SortExec: TopK(fetch=2), expr=[non_nullable_col@1 ASC], preserve_partitioning=[false]", | |||
"SortExec: TopK(fetch=2), expr=[non_nullable_col@1 ASC], preserve_partitioning=[false], sort_prefix=[non_nullable_col@1 ASC]", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we just keep the common prefix count, it will simplify the displays too
@@ -966,6 +966,8 @@ pub struct SortExec { | |||
preserve_partitioning: bool, | |||
/// Fetch highest/lowest n results | |||
fetch: Option<usize>, | |||
/// Common sort prefix between the input and the sort expressions (only used with fetch) | |||
common_sort_prefix: LexOrdering, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, can we just keep the common prefix expr count of
/// Sort expressions
expr: LexOrdering
? I think it'll be more simplified, and avoiding duplication
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure we can do that simply, as the common_sort_prefix
is normalized, and the expr
is not. It would mean we'd need to re-normalize expr
everytime we need common_sort_prefix
.
// - If not sorted, we must wait until all data is processed to emit results (Final) | ||
let emission_type = if sort_satisfied { | ||
input.pipeline_behavior() | ||
} else if sort_partially_satisfied { | ||
if input.pipeline_behavior() == EmissionType::Incremental { | ||
EmissionType::Both |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can just say input.pipeline_behavior()
, as Both means there is an internal accumulation within the operator, and those accumulated results are emitted after input exhaust. However the emission depends on the changes of common prefix values mainly, not the input termination.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if the common prefix never changes though? The operator will then accumulate and only emit once done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That risk is always there. For example aggregation groups might never close, but we don't mark it as "Both".
Both is currently used for Joins, when there is an exact accumulation inside of the operator, and that accumulated data can only be output when the input is consumed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But maybe those should be Both as well. Let me think on that a bit 😅
/// row converter, for common keys between the sort keys and the input ordering | ||
common_prefix_converter: Option<RowConverter>, | ||
/// Common sort prefix between the input and the sort expressions to allow early exit optimization | ||
common_sort_prefix: Arc<[PhysicalSortExpr]>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we decide keeping the count only, this field simplifies as well
// flag the topK as finished if we know that all | ||
// subsequent batches are guaranteed to be worse than the | ||
// current topK | ||
self.attempt_early_completion(&batch)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you can somehow check this attempt before registering the batch, we won't need to clone the batch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that's possible as we need to insert the batch into the heap
to update its max value first.
and @gabotechs shared valuable suggestions while I was reviewing the PR. I've approved it as it's good enough to go, but let's go over the given ideas |
195e2cb
to
30e5fcd
Compare
🤖 |
🤖: Benchmark completed Details
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you. This is a great optimization, and I really appreciate the detailed write-up.
I suggest also adding high-level overview documentation for this optimization at the top of struct TopK
|
||
# Insert test data into the external table. | ||
query I | ||
COPY (VALUES |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think CREATE EXTERNAL TABLE WITH ORDER
only acts as a metadata hint: when writing data into the external table, it doesn’t enforce the specified order, instead VALUES
here should be declared with the order specified in CREATE EXTERNAL TABLE
statement.
The test case didn't fail because the whole parquet file fits in one batch, maybe we can let test execute in a smaller batch size config:
set datafusion.execution.batch_size = 2;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've set the batch size to 2 and am now inserting from a ORDER BY
query:
# Insert test data into the external table.
query I
COPY (
SELECT *
FROM (
VALUES
(1, 'F', 100),
(1, 'B', 50),
(2, 'C', 70),
(2, 'D', 80),
(3, 'A', 60),
(3, 'E', 90)
) AS t(number, letter, age)
ORDER BY number DESC, letter ASC
)
TO 'test_files/scratch/topk/partial_sorted/1.parquet';
----
6
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @geoffreyclaude for modifying the unit tests to cover more cases. Looks great
// Column "a": [1, 1, 2], Column "b": [20.0, 15.0, 30.0]. | ||
let array_a1: ArrayRef = | ||
Arc::new(Int32Array::from(vec![Some(1), Some(1), Some(2)])); | ||
let array_b1: ArrayRef = Arc::new(Float64Array::from(vec![20.0, 15.0, 30.0])); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice test to cover both general and corner cases
// Create the second batch with two columns: | ||
// Column "a": [2, 3], Column "b": [10.0, 20.0]. | ||
let array_a2: ArrayRef = Arc::new(Int32Array::from(vec![Some(2), Some(3)])); | ||
let array_b2: ArrayRef = Arc::new(Float64Array::from(vec![10.0, 20.0])); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
"| 1 | 15.0 |", | ||
"| 1 | 20.0 |", | ||
"| 2 | 10.0 |", | ||
"+---+------+", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
a184dd0
to
d27d969
Compare
d27d969
to
1f94f80
Compare
@geoffreyclaude I'm going to merge if you're done with this? |
@berkaysynnada: Fine by me! We resolved the (unfortunately, necessary) semi-redundant common prefix field. Only remaining question is the Emission type: if it makes sense to leave it as |
Let's make it incremental for consistency. We're thinking on this, and likely figure out a refactor to make all things more clear. (if you wonder, there is also an issue here #15479 (comment). These two cases seem to be related and I believe we will solve them all) |
@berkaysynnada: Looking at this again, I'm pretty sure it should actually be It could be moved back to |
b481f16
to
0bb26a1
Compare
PartiallySorted case is actually the same of AggregateExec with an ordered group_by key. Final means the operator can only emit results after its inputs are fully consumed. |
Exactly, except for now the SortExec without fetch does not incrementally emit results on partially sorted input: it behaves the same as on completely unsorted input. And with fetch (so TopK), it still buffers everything in memory before emitting its results. |
… input - Without a fetch, the entire input data must be sorted before emitting results - With a fetch, we can optimize for an early exit, but the results will still be emitted once all the necessary input data has been processed
0bb26a1
to
3e2e7ed
Compare
Oh, you are right. I was thinking that case as if it runs like PartialSortExec.
I'm sorry, I can realize the case just now. You are right about partial sort case is Final indeed in SortExec, whether there is a fetch or not I think it's ready to go now? |
All ready :) I need to look at PartialSortExec, I wasn't aware of it and it seems like there could be some code duplication? |
No need to keep this open anymore. If you see any possible improvements, we can work on those in the new PR's. Thank you @geoffreyclaude, happy to collaborate with you. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just had a chance to skim this PR and it looks very nicely documented and tested -- thank you @geoffreyclaude @berkaysynnada @NGA-TRAN and @gabotechs
@@ -70,6 +70,25 @@ use datafusion_physical_expr_common::sort_expr::LexOrdering; | |||
/// The same answer can be produced by simply keeping track of the top | |||
/// K=3 elements, reducing the total amount of required buffer memory. | |||
/// | |||
/// # Partial Sort Optimization | |||
/// | |||
/// This implementation additionally optimizes queries where the input is already |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💯 this is great
…ion on partially sorted input (10x speedup on top10 bench) (apache#15563) * perf: Introduce sort prefix computation for early TopK exit optimization on partially sorted input * perf: Use same `common_sort_prefix` nomenclature everywhere * perf: Remove redundant argument to `sort_partially_satisfied` * perf: Clarify that the `common_sort_prefix` is normalized * perf: Update the topk tests for normalized projections * perf: Rename `worst` to `max` to keep naming consistent with heap nomenclature * perf: Add `NULLS FIRST` and `NULLS LAST` TopK sql logic tests * perf: Rename sqllogic topk test columns and reduce batch size * perf: Update TopK header doc with "Partial Sort Optimization" section * fix: Reset `SortExec`'s `EmissionType` to `Final` on partially sorted input - Without a fetch, the entire input data must be sorted before emitting results - With a fetch, we can optimize for an early exit, but the results will still be emitted once all the necessary input data has been processed
Which issue does this PR close?
Rationale for this change
DataFusion currently implements an early termination optimization (TopK) for fully sorted data. However, many real-world datasets are only partially sorted (for example, a time-series dataset sorted by day but not within each day). This PR extends the TopK early termination optimization to recognize and leverage a common prefix between the input ordering and the query’s requested ordering. With this enhancement, once the top‑K buffer is full and subsequent rows are guaranteed to be worse based on the shared sort prefix, DataFusion can safely terminate scanning early. This change is expected to reduce unnecessary data scans and sorts, thereby significantly improving query performance and reducing resource consumption.
What changes are included in this PR?
Sort Prefix Computation:
Modifications in
SortExec
:common_sort_prefix
) to track the common prefix between the input and sort expressions.Updates in TopK Execution:
common_prefix_converter
to handle the conversion for common prefix columns.Test Enhancements:
Are these changes tested?
Yes, the changes include comprehensive tests:
I've also ran the newly introduced TopK benchmarks with the following code, which validate performance improvements on partially sorted input.
Benchmark:
cd benchmarks git checkout perf/topk_benchmarks cargo run --release --bin tpch -- convert --input ./data/tpch_sf1 --output ./data/tpch_sf1_sorted --format parquet --sort cargo run --release --bin dfbench -- sort-tpch --sorted --limit 10 --iterations 5 --path ./data/tpch_sf1_sorted -o ./results/main/top10_sorted_tpch.json git cherry-pick perf/partitioned_topk cargo run --release --bin dfbench -- sort-tpch --sorted --limit 10 --iterations 5 --path ./data/tpch_sf1_sorted -o ./results/partitioned_topk/top10_sorted_tpch.json
Results:
Are there any user-facing changes?
There are no breaking user-facing changes to the external API. The changes improve internal performance optimizations for queries using
ORDER BY
withLIMIT
on partially sorted datasets. Users may see significant performance improvements for relevant queries, but the output and overall query semantics remain unchanged.