Skip to content

perf: Introduce sort prefix computation for early TopK exit optimization on partially sorted input (10x speedup on top10 bench) #15563

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Apr 9, 2025

Conversation

geoffreyclaude
Copy link
Contributor

@geoffreyclaude geoffreyclaude commented Apr 3, 2025

Which issue does this PR close?

Rationale for this change

DataFusion currently implements an early termination optimization (TopK) for fully sorted data. However, many real-world datasets are only partially sorted (for example, a time-series dataset sorted by day but not within each day). This PR extends the TopK early termination optimization to recognize and leverage a common prefix between the input ordering and the query’s requested ordering. With this enhancement, once the top‑K buffer is full and subsequent rows are guaranteed to be worse based on the shared sort prefix, DataFusion can safely terminate scanning early. This change is expected to reduce unnecessary data scans and sorts, thereby significantly improving query performance and reducing resource consumption.

What changes are included in this PR?

  • Sort Prefix Computation:

    • Enhanced the computation of equivalence properties to calculate the matching prefix length between the input ordering and the query’s sort requirements.
    • Introduced a new method to extract the common sort prefix.
  • Modifications in SortExec:

    • Added a new field (common_sort_prefix) to track the common prefix between the input and sort expressions.
    • Updated the property computation to return both the plan properties and the computed sort prefix.
    • Modified the display implementation to include the sort prefix when available.
  • Updates in TopK Execution:

    • Added a common_prefix_converter to handle the conversion for common prefix columns.
    • Integrated early termination logic in the TopK operator that checks if the last row of a batch (on the prefix columns) is strictly worse than the worst row in the current top‑K.
    • Updated the TopK insertion logic to flag early completion when applicable.
  • Test Enhancements:

    • Updated existing tests (e.g., in the physical optimizer and sqllogictest files) to validate the new sort prefix display and behavior.
    • Introduced new tests to specifically verify that early termination is triggered when the input ordering partially satisfies the full sort expression.

Are these changes tested?

Yes, the changes include comprehensive tests:

  • Existing "topK" tests in the SQL logic tests have been enhanced with partially sorted inputs.
  • New unit tests have been added to verify that the TopK operator correctly marks itself as finished when the common prefix condition is met, ensuring that the early termination logic behaves as expected.

I've also ran the newly introduced TopK benchmarks with the following code, which validate performance improvements on partially sorted input.

Benchmark:

cd benchmarks
git checkout perf/topk_benchmarks
cargo run --release --bin tpch -- convert --input ./data/tpch_sf1 --output ./data/tpch_sf1_sorted --format parquet --sort
cargo run --release --bin dfbench -- sort-tpch --sorted --limit 10 --iterations 5 --path ./data/tpch_sf1_sorted -o ./results/main/top10_sorted_tpch.json
git cherry-pick perf/partitioned_topk
cargo run --release --bin dfbench -- sort-tpch --sorted --limit 10 --iterations 5 --path ./data/tpch_sf1_sorted -o ./results/partitioned_topk/top10_sorted_tpch.json

Results:

> ./bench.sh compare main partitioned_topk
Comparing main and partitioned_topk
--------------------                                                                          
Benchmark top10_sorted_tpch.json                                                              
--------------------                                                                          
┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━┓
┃ Query        ┃     main ┃ partitioned_topk ┃         Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━┩
│ Q1           │  24.73ms │          24.86ms │      no change │
│ Q2           │   5.42ms │           5.27ms │      no change │
│ Q3           │  77.20ms │          76.83ms │      no change │
│ Q4           │  28.43ms │           5.41ms │  +5.26x faster │
│ Q5           │  18.44ms │          18.34ms │      no change │
│ Q6           │  32.91ms │          32.52ms │      no change │
│ Q7           │  74.63ms │          75.11ms │      no change │
│ Q8           │  77.22ms │           6.78ms │ +11.39x faster │
│ Q9           │  90.10ms │          10.04ms │  +8.97x faster │
│ Q10          │ 135.39ms │          14.04ms │  +9.64x faster │
│ Q11          │  72.33ms │          71.06ms │      no change │
└──────────────┴──────────┴──────────────────┴────────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━┓ 
┃ Benchmark Summary               ┃          ┃ 
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━┩ 
│ Total Time (main)               │ 636.81ms │
│ Total Time (partitioned_topk)   │ 340.26ms │
│ Average Time (main)             │  57.89ms │
│ Average Time (partitioned_topk) │  30.93ms │                                                
│ Queries Faster                  │        4 │                                                
│ Queries Slower                  │        0 │                                                
│ Queries with No Change          │        7 │                                                
└─────────────────────────────────┴──────────┘

Are there any user-facing changes?

There are no breaking user-facing changes to the external API. The changes improve internal performance optimizations for queries using ORDER BY with LIMIT on partially sorted datasets. Users may see significant performance improvements for relevant queries, but the output and overall query semantics remain unchanged.


@github-actions github-actions bot added physical-expr Changes to the physical-expr crates sqllogictest SQL Logic Tests (.slt) labels Apr 3, 2025
@geoffreyclaude geoffreyclaude force-pushed the perf/partitioned_topk branch 18 times, most recently from b212690 to 5bfb191 Compare April 4, 2025 10:01
@github-actions github-actions bot added the core Core DataFusion crate label Apr 4, 2025
@geoffreyclaude geoffreyclaude marked this pull request as ready for review April 4, 2025 10:15
}

impl TopK {
/// Create a new [`TopK`] that stores the top `k` values, as
/// defined by the sort expressions in `expr`.
// TODO: make a builder or some other nicer API
#[allow(clippy::too_many_arguments)]
Copy link
Contributor Author

@geoffreyclaude geoffreyclaude Apr 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TopK code could benefit from a refactoring for better readability. It's probably grown a bit too much out of it's original compact format... But leaving this out of the scope of this PR.

@geoffreyclaude geoffreyclaude force-pushed the perf/partitioned_topk branch from 5bfb191 to 195e2cb Compare April 4, 2025 12:21
@geoffreyclaude geoffreyclaude changed the title perf: Introduce sort prefix computation for early TopK exit optimization on partially sorted input perf: Introduce sort prefix computation for early TopK exit optimization on partially sorted input (10x speedup on top10 bench) Apr 4, 2025
Copy link
Contributor

@NGA-TRAN NGA-TRAN left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nicely done. Clear use case description with example in the request ticket as well as in this PR which make the review a pleasure. The code is refactored and organized in such an easy way to review. ❤️ 🚀

I only have one request to modify the data of column b in the unit test to also verify correct result in a different corner case

@@ -1652,7 +1652,7 @@ async fn test_remove_unnecessary_sort7() -> Result<()> {
) as Arc<dyn ExecutionPlan>;

let expected_input = [
"SortExec: TopK(fetch=2), expr=[non_nullable_col@1 ASC], preserve_partitioning=[false]",
"SortExec: TopK(fetch=2), expr=[non_nullable_col@1 ASC], preserve_partitioning=[false], sort_prefix=[non_nullable_col@1 ASC]",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice and clear addition to the explain

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we just keep the common prefix count, it will simplify the displays too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Careful, the expr is unnormalized, the sort_prefix is normalized. I agree this is a bit confusing, but normalized_common_sort_prefix seems a bit too verbose. Any suggestions?

/// Checks whether the given sort requirements are satisfied by any of the
/// existing orderings.
pub fn ordering_satisfy_requirement(&self, reqs: &LexRequirement) -> bool {
self.extract_matching_prefix(reqs).1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have review carefully this mod.rs file. Very nice work to refactor and add functions to get the sorted prefix

emission_type,
boundedness,
),
LexOrdering::from(sort_prefix),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice addition logic in this compute_properties function

))
})
.collect::<Result<_>>()?;
let sort_fields: Vec<_> = build_sort_fields(&expr, &schema)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice refactor


// Evaluate the prefix for the last row of the current batch.
let last_row_idx = batch.num_rows() - 1;
let mut batch_prefix_scratch = prefix_converter.empty_rows(1, 20);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add comment explaining why 1 and 20? Can they be defined and assigned to constants or, at least variables, and use the variables here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 here is the number of rows, and 20 is the initial capacity in bytes for storing the raw arrow data that represents the rows (just 1 in this case). This does not mean that only 20 bytes of data can be stored, IIUC this is just the amount of data that can be stored without dynamically resizing the underlaying buffer, but upon storing more data, the underlaying buffer will just get resized.

This might me worth a comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

=> Moved to const ESTIMATED_BYTES_PER_ROW: usize = 20;

explain select column1, column2, column3 from partial_sorted order by column1 desc, column2 asc, column3 desc limit 3;
----
physical_plan
01)SortExec: TopK(fetch=3), expr=[column1@0 DESC, column2@1 ASC NULLS LAST, column3@2 DESC], preserve_partitioning=[false], sort_prefix=[column1@0 DESC, column2@1 ASC NULLS LAST]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nicely 2-column sort_prefix

explain select column1, column2, column3 from partial_sorted order by column3 desc limit 3;
----
physical_plan
01)SortExec: TopK(fetch=3), expr=[column3@2 DESC], preserve_partitioning=[false]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 no sort_prefix

explain select column1, column2, column3 from partial_sorted order by column1 desc, column2 desc limit 3;
----
physical_plan
01)SortExec: TopK(fetch=3), expr=[column1@0 DESC, column2@1 DESC], preserve_partitioning=[false], sort_prefix=[column1@0 DESC]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 one-column sort prefix

DROP TABLE partial_sorted;

statement ok
set datafusion.explain.physical_plan_only = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice explain tests. 🚀


// Evaluate the prefix for the last row of the current batch.
let last_row_idx = batch.num_rows() - 1;
let mut batch_prefix_scratch = prefix_converter.empty_rows(1, 20);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 here is the number of rows, and 20 is the initial capacity in bytes for storing the raw arrow data that represents the rows (just 1 in this case). This does not mean that only 20 bytes of data can be stored, IIUC this is just the amount of data that can be stored without dynamically resizing the underlaying buffer, but upon storing more data, the underlaying buffer will just get resized.

This might me worth a comment.

Comment on lines 305 to 309
common_prefix_converter: _,
common_sort_prefix: _,
finished: _,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 I wonder why the spread operator was just not used here on the first place. For consistency it might make more sense to leave it as is.

Suggested change
common_prefix_converter: _,
common_sort_prefix: _,
finished: _,
..

# DataFusion should use the common prefix optimization
# and return the correct top 3 rows when ordering by all columns.
query TII
select column1, column2, column3 from partial_sorted order by column1 desc, column2 asc, column3 desc limit 3;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding some tests that try to mess around with NULLS LAST and NULLS FIRST?

Copy link
Contributor

@berkaysynnada berkaysynnada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @geoffreyclaude, this is a really nice PR. I've some further suggestion and I'm thinking on the same questions with @NGA-TRAN in attempt_early_completion() code.

@@ -1652,7 +1652,7 @@ async fn test_remove_unnecessary_sort7() -> Result<()> {
) as Arc<dyn ExecutionPlan>;

let expected_input = [
"SortExec: TopK(fetch=2), expr=[non_nullable_col@1 ASC], preserve_partitioning=[false]",
"SortExec: TopK(fetch=2), expr=[non_nullable_col@1 ASC], preserve_partitioning=[false], sort_prefix=[non_nullable_col@1 ASC]",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we just keep the common prefix count, it will simplify the displays too

@@ -966,6 +966,8 @@ pub struct SortExec {
preserve_partitioning: bool,
/// Fetch highest/lowest n results
fetch: Option<usize>,
/// Common sort prefix between the input and the sort expressions (only used with fetch)
common_sort_prefix: LexOrdering,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, can we just keep the common prefix expr count of

    /// Sort expressions
    expr: LexOrdering

? I think it'll be more simplified, and avoiding duplication

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we can do that simply, as the common_sort_prefix is normalized, and the expr is not. It would mean we'd need to re-normalize expr everytime we need common_sort_prefix.

// - If not sorted, we must wait until all data is processed to emit results (Final)
let emission_type = if sort_satisfied {
input.pipeline_behavior()
} else if sort_partially_satisfied {
if input.pipeline_behavior() == EmissionType::Incremental {
EmissionType::Both
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just say input.pipeline_behavior(), as Both means there is an internal accumulation within the operator, and those accumulated results are emitted after input exhaust. However the emission depends on the changes of common prefix values mainly, not the input termination.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the common prefix never changes though? The operator will then accumulate and only emit once done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That risk is always there. For example aggregation groups might never close, but we don't mark it as "Both".

Both is currently used for Joins, when there is an exact accumulation inside of the operator, and that accumulated data can only be output when the input is consumed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But maybe those should be Both as well. Let me think on that a bit 😅

/// row converter, for common keys between the sort keys and the input ordering
common_prefix_converter: Option<RowConverter>,
/// Common sort prefix between the input and the sort expressions to allow early exit optimization
common_sort_prefix: Arc<[PhysicalSortExpr]>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we decide keeping the count only, this field simplifies as well

// flag the topK as finished if we know that all
// subsequent batches are guaranteed to be worse than the
// current topK
self.attempt_early_completion(&batch)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you can somehow check this attempt before registering the batch, we won't need to clone the batch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that's possible as we need to insert the batch into the heap to update its max value first.

@berkaysynnada
Copy link
Contributor

and @gabotechs shared valuable suggestions while I was reviewing the PR. I've approved it as it's good enough to go, but let's go over the given ideas

@geoffreyclaude geoffreyclaude force-pushed the perf/partitioned_topk branch from 195e2cb to 30e5fcd Compare April 7, 2025 08:34
@alamb
Copy link
Contributor

alamb commented Apr 7, 2025

🤖 ./gh_compare_branch.sh Benchmark Script Running
Linux aal-dev 6.8.0-1016-gcp #18-Ubuntu SMP Fri Oct 4 22:16:29 UTC 2024 x86_64 x86_64 x86_64 GNU/Linux
Comparing perf/partitioned_topk (9e89688) to 19a1e58 diff
Benchmarks: sort_tpch clickbench_1 tpch_mem
Results will be posted here when complete

@alamb
Copy link
Contributor

alamb commented Apr 7, 2025

🤖: Benchmark completed

Details

Comparing HEAD and perf_partitioned_topk
--------------------
Benchmark clickbench_1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃       HEAD ┃ perf_partitioned_topk ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0     │     0.55ms │                0.58ms │     no change │
│ QQuery 1     │    71.87ms │               67.67ms │ +1.06x faster │
│ QQuery 2     │   124.36ms │              119.37ms │     no change │
│ QQuery 3     │   127.85ms │              129.09ms │     no change │
│ QQuery 4     │   787.41ms │              801.05ms │     no change │
│ QQuery 5     │  1045.75ms │             1013.35ms │     no change │
│ QQuery 6     │     0.63ms │                0.63ms │     no change │
│ QQuery 7     │    85.09ms │               84.11ms │     no change │
│ QQuery 8     │   980.13ms │              965.56ms │     no change │
│ QQuery 9     │  1305.97ms │             1259.63ms │     no change │
│ QQuery 10    │   307.22ms │              310.27ms │     no change │
│ QQuery 11    │   351.03ms │              352.62ms │     no change │
│ QQuery 12    │  1124.58ms │             1081.10ms │     no change │
│ QQuery 13    │  1514.46ms │             1474.48ms │     no change │
│ QQuery 14    │  1047.96ms │             1004.22ms │     no change │
│ QQuery 15    │  1123.57ms │             1121.84ms │     no change │
│ QQuery 16    │  1914.02ms │             1918.52ms │     no change │
│ QQuery 17    │  1765.11ms │             1744.91ms │     no change │
│ QQuery 18    │  3287.40ms │             3350.44ms │     no change │
│ QQuery 19    │   119.46ms │              116.30ms │     no change │
│ QQuery 20    │  1303.02ms │             1322.82ms │     no change │
│ QQuery 21    │  1680.96ms │             1670.27ms │     no change │
│ QQuery 22    │  4623.36ms │             4219.07ms │ +1.10x faster │
│ QQuery 23    │ 10483.17ms │            10245.87ms │     no change │
│ QQuery 24    │   681.01ms │              646.43ms │ +1.05x faster │
│ QQuery 25    │   611.16ms │              575.18ms │ +1.06x faster │
│ QQuery 26    │   759.73ms │              695.54ms │ +1.09x faster │
│ QQuery 27    │  1941.41ms │             1956.46ms │     no change │
│ QQuery 28    │ 13420.97ms │            13171.83ms │     no change │
│ QQuery 29    │   586.72ms │              577.07ms │     no change │
│ QQuery 30    │  1056.45ms │             1018.84ms │     no change │
│ QQuery 31    │  1129.84ms │             1069.49ms │ +1.06x faster │
│ QQuery 32    │  2757.28ms │             2830.42ms │     no change │
│ QQuery 33    │  3609.33ms │             3617.67ms │     no change │
│ QQuery 34    │  3616.82ms │             3599.22ms │     no change │
│ QQuery 35    │  1358.64ms │             1366.65ms │     no change │
│ QQuery 36    │   212.72ms │              213.59ms │     no change │
│ QQuery 37    │   188.84ms │              173.86ms │ +1.09x faster │
│ QQuery 38    │   208.16ms │              205.21ms │     no change │
│ QQuery 39    │   308.09ms │              301.35ms │     no change │
│ QQuery 40    │   113.99ms │               95.40ms │ +1.19x faster │
│ QQuery 41    │   106.37ms │              104.36ms │     no change │
│ QQuery 42    │   103.30ms │               99.06ms │     no change │
└──────────────┴────────────┴───────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary                    ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (HEAD)                    │ 67945.74ms │
│ Total Time (perf_partitioned_topk)   │ 66691.39ms │
│ Average Time (HEAD)                  │  1580.13ms │
│ Average Time (perf_partitioned_topk) │  1550.96ms │
│ Queries Faster                       │          8 │
│ Queries Slower                       │          0 │
│ Queries with No Change               │         35 │
└──────────────────────────────────────┴────────────┘
--------------------
Benchmark sort_tpch.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
┃ Query        ┃      HEAD ┃ perf_partitioned_topk ┃       Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩
│ Q1           │  353.11ms │              365.59ms │    no change │
│ Q2           │  295.45ms │              319.07ms │ 1.08x slower │
│ Q3           │ 1188.51ms │             1189.09ms │    no change │
│ Q4           │  424.02ms │              428.25ms │    no change │
│ Q5           │  436.02ms │              441.91ms │    no change │
│ Q6           │  475.06ms │              475.58ms │    no change │
│ Q7           │  973.21ms │              971.26ms │    no change │
│ Q8           │  810.55ms │              793.97ms │    no change │
│ Q9           │  826.30ms │              817.77ms │    no change │
│ Q10          │ 1263.00ms │             1272.90ms │    no change │
│ Q11          │  752.28ms │              761.43ms │    no change │
└──────────────┴───────────┴───────────────────────┴──────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary                    ┃           ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (HEAD)                    │ 7797.50ms │
│ Total Time (perf_partitioned_topk)   │ 7836.81ms │
│ Average Time (HEAD)                  │  708.86ms │
│ Average Time (perf_partitioned_topk) │  712.44ms │
│ Queries Faster                       │         0 │
│ Queries Slower                       │         1 │
│ Queries with No Change               │        10 │
└──────────────────────────────────────┴───────────┘
--------------------
Benchmark tpch_mem_sf1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
┃ Query        ┃     HEAD ┃ perf_partitioned_topk ┃       Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩
│ QQuery 1     │ 123.41ms │              123.87ms │    no change │
│ QQuery 2     │  25.24ms │               24.42ms │    no change │
│ QQuery 3     │  35.05ms │               35.18ms │    no change │
│ QQuery 4     │  20.49ms │               20.85ms │    no change │
│ QQuery 5     │  58.48ms │               57.27ms │    no change │
│ QQuery 6     │   8.06ms │                8.16ms │    no change │
│ QQuery 7     │ 103.88ms │              106.34ms │    no change │
│ QQuery 8     │  26.11ms │               28.06ms │ 1.07x slower │
│ QQuery 9     │  62.55ms │               62.46ms │    no change │
│ QQuery 10    │  59.68ms │               59.06ms │    no change │
│ QQuery 11    │  13.01ms │               13.07ms │    no change │
│ QQuery 12    │  38.75ms │               37.78ms │    no change │
│ QQuery 13    │  30.97ms │               32.02ms │    no change │
│ QQuery 14    │   9.92ms │                9.85ms │    no change │
│ QQuery 15    │  25.88ms │               25.29ms │    no change │
│ QQuery 16    │  23.79ms │               24.08ms │    no change │
│ QQuery 17    │  98.83ms │               95.66ms │    no change │
│ QQuery 18    │ 252.93ms │              251.98ms │    no change │
│ QQuery 19    │  28.98ms │               29.12ms │    no change │
│ QQuery 20    │  39.47ms │               40.29ms │    no change │
│ QQuery 21    │ 178.84ms │              176.28ms │    no change │
│ QQuery 22    │  17.82ms │               17.51ms │    no change │
└──────────────┴──────────┴───────────────────────┴──────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary                    ┃           ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (HEAD)                    │ 1282.15ms │
│ Total Time (perf_partitioned_topk)   │ 1278.61ms │
│ Average Time (HEAD)                  │   58.28ms │
│ Average Time (perf_partitioned_topk) │   58.12ms │
│ Queries Faster                       │         0 │
│ Queries Slower                       │         1 │
│ Queries with No Change               │        21 │
└──────────────────────────────────────┴───────────┘

Copy link
Contributor

@2010YOUY01 2010YOUY01 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. This is a great optimization, and I really appreciate the detailed write-up.

I suggest also adding high-level overview documentation for this optimization at the top of struct TopK


# Insert test data into the external table.
query I
COPY (VALUES
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think CREATE EXTERNAL TABLE WITH ORDER only acts as a metadata hint: when writing data into the external table, it doesn’t enforce the specified order, instead VALUES here should be declared with the order specified in CREATE EXTERNAL TABLE statement.
The test case didn't fail because the whole parquet file fits in one batch, maybe we can let test execute in a smaller batch size config:

set datafusion.execution.batch_size = 2;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've set the batch size to 2 and am now inserting from a ORDER BY query:

# Insert test data into the external table.
query I
COPY (
  SELECT *
  FROM (
    VALUES
      (1, 'F', 100),
      (1, 'B', 50),
      (2, 'C', 70),
      (2, 'D', 80),
      (3, 'A', 60),
      (3, 'E', 90)
  ) AS t(number, letter, age)
  ORDER BY number DESC, letter ASC
)
TO 'test_files/scratch/topk/partial_sorted/1.parquet';
----
6

Copy link
Contributor

@NGA-TRAN NGA-TRAN left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @geoffreyclaude for modifying the unit tests to cover more cases. Looks great

// Column "a": [1, 1, 2], Column "b": [20.0, 15.0, 30.0].
let array_a1: ArrayRef =
Arc::new(Int32Array::from(vec![Some(1), Some(1), Some(2)]));
let array_b1: ArrayRef = Arc::new(Float64Array::from(vec![20.0, 15.0, 30.0]));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice test to cover both general and corner cases

// Create the second batch with two columns:
// Column "a": [2, 3], Column "b": [10.0, 20.0].
let array_a2: ArrayRef = Arc::new(Int32Array::from(vec![Some(2), Some(3)]));
let array_b2: ArrayRef = Arc::new(Float64Array::from(vec![10.0, 20.0]));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

"| 1 | 15.0 |",
"| 1 | 20.0 |",
"| 2 | 10.0 |",
"+---+------+",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@geoffreyclaude geoffreyclaude force-pushed the perf/partitioned_topk branch from a184dd0 to d27d969 Compare April 7, 2025 13:29
@berkaysynnada
Copy link
Contributor

berkaysynnada commented Apr 9, 2025

@geoffreyclaude I'm going to merge if you're done with this?

@geoffreyclaude
Copy link
Contributor Author

@geoffreyclaude I'm going to merge if you're done with this?

@berkaysynnada: Fine by me! We resolved the (unfortunately, necessary) semi-redundant common prefix field.

Only remaining question is the Emission type: if it makes sense to leave it as Both you can merge, but if you think Incremental makes more sense I can do a quick new commit to change it.

@berkaysynnada
Copy link
Contributor

berkaysynnada commented Apr 9, 2025

@geoffreyclaude I'm going to merge if you're done with this?

@berkaysynnada: Fine by me! We resolved the (unfortunately, necessary) semi-redundant common prefix field.

Only remaining question is the Emission type: if it makes sense to leave it as Both you can merge, but if you think Incremental makes more sense I can do a quick new commit to change it.

Let's make it incremental for consistency. We're thinking on this, and likely figure out a refactor to make all things more clear. (if you wonder, there is also an issue here #15479 (comment). These two cases seem to be related and I believe we will solve them all)

@geoffreyclaude
Copy link
Contributor Author

geoffreyclaude commented Apr 9, 2025

Let's make it incremental for consistency.

@berkaysynnada: Looking at this again, I'm pretty sure it should actually be Final! On partially sorted input, with or without fetch, the SortExec will only emit once it has processed all its necessary input data (eventually cancelling the input stream early). I've updated the PR with a new fix commit.

It could be moved back to Both or Incremental without fetch if we optimize the SortExec to emit result batches as soon as it "knows" the subsequent rows are guaranteed to be sorted after all those it already saw. But that's for a future Issue and PR.

@geoffreyclaude geoffreyclaude force-pushed the perf/partitioned_topk branch from b481f16 to 0bb26a1 Compare April 9, 2025 08:28
@berkaysynnada
Copy link
Contributor

Let's make it incremental for consistency.

@berkaysynnada: Looking at this again, I'm pretty sure it should actually be Final! On partially sorted input, with or without fetch, the SortExec will only emit once it has processed all its necessary input data (eventually cancelling the input stream early). I've updated the PR with a new fix commit.

PartiallySorted case is actually the same of AggregateExec with an ordered group_by key. Final means the operator can only emit results after its inputs are fully consumed.

@geoffreyclaude
Copy link
Contributor Author

geoffreyclaude commented Apr 9, 2025

PartiallySorted case is actually the same of AggregateExec with an ordered group_by key.

Exactly, except for now the SortExec without fetch does not incrementally emit results on partially sorted input: it behaves the same as on completely unsorted input. And with fetch (so TopK), it still buffers everything in memory before emitting its results.
I think the important thing here is that all records are emitted only once the node's execution terminates (whether all the input has been read or not should be irrelevant to the emission type.)

… input

- Without a fetch, the entire input data must be sorted before emitting results
- With a fetch, we can optimize for an early exit, but the results will still be emitted once all the necessary input data has been processed
@geoffreyclaude geoffreyclaude force-pushed the perf/partitioned_topk branch from 0bb26a1 to 3e2e7ed Compare April 9, 2025 10:00
@berkaysynnada
Copy link
Contributor

berkaysynnada commented Apr 9, 2025

Exactly, except for now the SortExec without fetch does not incrementally emit results on partially sorted input: it behaves the same as on completely unsorted input.

Oh, you are right. I was thinking that case as if it runs like PartialSortExec.

And with fetch (so TopK), it still buffers everything in memory before emitting its results. I think the important thing here is that all records are emitted only once the node's execution terminates (whether all the input has been read or not should be irrelevant to the emission type.)

I'm sorry, I can realize the case just now. You are right about partial sort case is Final indeed in SortExec, whether there is a fetch or not

I think it's ready to go now?

@geoffreyclaude
Copy link
Contributor Author

I think it's ready to go now?

All ready :) I need to look at PartialSortExec, I wasn't aware of it and it seems like there could be some code duplication?

@berkaysynnada berkaysynnada merged commit 7a4577e into apache:main Apr 9, 2025
27 checks passed
@berkaysynnada
Copy link
Contributor

All ready :) I need to look at PartialSortExec, I wasn't aware of it and it seems like there could be some code duplication?

No need to keep this open anymore. If you see any possible improvements, we can work on those in the new PR's.

Thank you @geoffreyclaude, happy to collaborate with you.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just had a chance to skim this PR and it looks very nicely documented and tested -- thank you @geoffreyclaude @berkaysynnada @NGA-TRAN and @gabotechs

@@ -70,6 +70,25 @@ use datafusion_physical_expr_common::sort_expr::LexOrdering;
/// The same answer can be produced by simply keeping track of the top
/// K=3 elements, reducing the total amount of required buffer memory.
///
/// # Partial Sort Optimization
///
/// This implementation additionally optimizes queries where the input is already
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯 this is great

nirnayroy pushed a commit to nirnayroy/datafusion that referenced this pull request May 2, 2025
…ion on partially sorted input (10x speedup on top10 bench) (apache#15563)

* perf: Introduce sort prefix computation for early TopK exit optimization on partially sorted input

* perf: Use same `common_sort_prefix` nomenclature everywhere

* perf: Remove redundant argument to `sort_partially_satisfied`

* perf: Clarify that the `common_sort_prefix` is normalized

* perf: Update the topk tests for normalized projections

* perf: Rename `worst` to `max` to keep naming consistent with heap nomenclature

* perf: Add `NULLS FIRST` and `NULLS LAST` TopK sql logic tests

* perf: Rename sqllogic topk test columns and reduce batch size

* perf: Update TopK header doc with "Partial Sort Optimization" section

* fix: Reset `SortExec`'s `EmissionType` to `Final` on partially sorted input

- Without a fetch, the entire input data must be sorted before emitting results
- With a fetch, we can optimize for an early exit, but the results will still be emitted once all the necessary input data has been processed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate physical-expr Changes to the physical-expr crates sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Extend TopK early termination to partially sorted inputs
6 participants