Skip to content

Commit 76006f8

Browse files
xudong963avantgardnerio
authored andcommitted
Cherry-pick topk limit pushdown fix (apache#14192) v45
1 parent baa8bbd commit 76006f8

File tree

5 files changed

+44
-25
lines changed

5 files changed

+44
-25
lines changed

benchmarks/queries/clickbench/queries.sql

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ SELECT SUM("AdvEngineID"), COUNT(*), AVG("ResolutionWidth") FROM hits;
44
SELECT AVG("UserID") FROM hits;
55
SELECT COUNT(DISTINCT "UserID") FROM hits;
66
SELECT COUNT(DISTINCT "SearchPhrase") FROM hits;
7-
SELECT MIN("EventDate"::INT::DATE), MAX("EventDate"::INT::DATE) FROM hits;
7+
SELECT MIN("EventDate"), MAX("EventDate") FROM hits;
88
SELECT "AdvEngineID", COUNT(*) FROM hits WHERE "AdvEngineID" <> 0 GROUP BY "AdvEngineID" ORDER BY COUNT(*) DESC;
99
SELECT "RegionID", COUNT(DISTINCT "UserID") AS u FROM hits GROUP BY "RegionID" ORDER BY u DESC LIMIT 10;
1010
SELECT "RegionID", SUM("AdvEngineID"), COUNT(*) AS c, AVG("ResolutionWidth"), COUNT(DISTINCT "UserID") FROM hits GROUP BY "RegionID" ORDER BY c DESC LIMIT 10;
@@ -21,10 +21,10 @@ SELECT "UserID" FROM hits WHERE "UserID" = 435090932899640449;
2121
SELECT COUNT(*) FROM hits WHERE "URL" LIKE '%google%';
2222
SELECT "SearchPhrase", MIN("URL"), COUNT(*) AS c FROM hits WHERE "URL" LIKE '%google%' AND "SearchPhrase" <> '' GROUP BY "SearchPhrase" ORDER BY c DESC LIMIT 10;
2323
SELECT "SearchPhrase", MIN("URL"), MIN("Title"), COUNT(*) AS c, COUNT(DISTINCT "UserID") FROM hits WHERE "Title" LIKE '%Google%' AND "URL" NOT LIKE '%.google.%' AND "SearchPhrase" <> '' GROUP BY "SearchPhrase" ORDER BY c DESC LIMIT 10;
24-
SELECT * FROM hits WHERE "URL" LIKE '%google%' ORDER BY to_timestamp_seconds("EventTime") LIMIT 10;
25-
SELECT "SearchPhrase" FROM hits WHERE "SearchPhrase" <> '' ORDER BY to_timestamp_seconds("EventTime") LIMIT 10;
24+
SELECT * FROM hits WHERE "URL" LIKE '%google%' ORDER BY "EventTime" LIMIT 10;
25+
SELECT "SearchPhrase" FROM hits WHERE "SearchPhrase" <> '' ORDER BY "EventTime" LIMIT 10;
2626
SELECT "SearchPhrase" FROM hits WHERE "SearchPhrase" <> '' ORDER BY "SearchPhrase" LIMIT 10;
27-
SELECT "SearchPhrase" FROM hits WHERE "SearchPhrase" <> '' ORDER BY to_timestamp_seconds("EventTime"), "SearchPhrase" LIMIT 10;
27+
SELECT "SearchPhrase" FROM hits WHERE "SearchPhrase" <> '' ORDER BY "EventTime", "SearchPhrase" LIMIT 10;
2828
SELECT "CounterID", AVG(length("URL")) AS l, COUNT(*) AS c FROM hits WHERE "URL" <> '' GROUP BY "CounterID" HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25;
2929
SELECT REGEXP_REPLACE("Referer", '^https?://(?:www\.)?([^/]+)/.*$', '\1') AS k, AVG(length("Referer")) AS l, COUNT(*) AS c, MIN("Referer") FROM hits WHERE "Referer" <> '' GROUP BY k HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25;
3030
SELECT SUM("ResolutionWidth"), SUM("ResolutionWidth" + 1), SUM("ResolutionWidth" + 2), SUM("ResolutionWidth" + 3), SUM("ResolutionWidth" + 4), SUM("ResolutionWidth" + 5), SUM("ResolutionWidth" + 6), SUM("ResolutionWidth" + 7), SUM("ResolutionWidth" + 8), SUM("ResolutionWidth" + 9), SUM("ResolutionWidth" + 10), SUM("ResolutionWidth" + 11), SUM("ResolutionWidth" + 12), SUM("ResolutionWidth" + 13), SUM("ResolutionWidth" + 14), SUM("ResolutionWidth" + 15), SUM("ResolutionWidth" + 16), SUM("ResolutionWidth" + 17), SUM("ResolutionWidth" + 18), SUM("ResolutionWidth" + 19), SUM("ResolutionWidth" + 20), SUM("ResolutionWidth" + 21), SUM("ResolutionWidth" + 22), SUM("ResolutionWidth" + 23), SUM("ResolutionWidth" + 24), SUM("ResolutionWidth" + 25), SUM("ResolutionWidth" + 26), SUM("ResolutionWidth" + 27), SUM("ResolutionWidth" + 28), SUM("ResolutionWidth" + 29), SUM("ResolutionWidth" + 30), SUM("ResolutionWidth" + 31), SUM("ResolutionWidth" + 32), SUM("ResolutionWidth" + 33), SUM("ResolutionWidth" + 34), SUM("ResolutionWidth" + 35), SUM("ResolutionWidth" + 36), SUM("ResolutionWidth" + 37), SUM("ResolutionWidth" + 38), SUM("ResolutionWidth" + 39), SUM("ResolutionWidth" + 40), SUM("ResolutionWidth" + 41), SUM("ResolutionWidth" + 42), SUM("ResolutionWidth" + 43), SUM("ResolutionWidth" + 44), SUM("ResolutionWidth" + 45), SUM("ResolutionWidth" + 46), SUM("ResolutionWidth" + 47), SUM("ResolutionWidth" + 48), SUM("ResolutionWidth" + 49), SUM("ResolutionWidth" + 50), SUM("ResolutionWidth" + 51), SUM("ResolutionWidth" + 52), SUM("ResolutionWidth" + 53), SUM("ResolutionWidth" + 54), SUM("ResolutionWidth" + 55), SUM("ResolutionWidth" + 56), SUM("ResolutionWidth" + 57), SUM("ResolutionWidth" + 58), SUM("ResolutionWidth" + 59), SUM("ResolutionWidth" + 60), SUM("ResolutionWidth" + 61), SUM("ResolutionWidth" + 62), SUM("ResolutionWidth" + 63), SUM("ResolutionWidth" + 64), SUM("ResolutionWidth" + 65), SUM("ResolutionWidth" + 66), SUM("ResolutionWidth" + 67), SUM("ResolutionWidth" + 68), SUM("ResolutionWidth" + 69), SUM("ResolutionWidth" + 70), SUM("ResolutionWidth" + 71), SUM("ResolutionWidth" + 72), SUM("ResolutionWidth" + 73), SUM("ResolutionWidth" + 74), SUM("ResolutionWidth" + 75), SUM("ResolutionWidth" + 76), SUM("ResolutionWidth" + 77), SUM("ResolutionWidth" + 78), SUM("ResolutionWidth" + 79), SUM("ResolutionWidth" + 80), SUM("ResolutionWidth" + 81), SUM("ResolutionWidth" + 82), SUM("ResolutionWidth" + 83), SUM("ResolutionWidth" + 84), SUM("ResolutionWidth" + 85), SUM("ResolutionWidth" + 86), SUM("ResolutionWidth" + 87), SUM("ResolutionWidth" + 88), SUM("ResolutionWidth" + 89) FROM hits;
@@ -34,10 +34,10 @@ SELECT "WatchID", "ClientIP", COUNT(*) AS c, SUM("IsRefresh"), AVG("ResolutionWi
3434
SELECT "URL", COUNT(*) AS c FROM hits GROUP BY "URL" ORDER BY c DESC LIMIT 10;
3535
SELECT 1, "URL", COUNT(*) AS c FROM hits GROUP BY 1, "URL" ORDER BY c DESC LIMIT 10;
3636
SELECT "ClientIP", "ClientIP" - 1, "ClientIP" - 2, "ClientIP" - 3, COUNT(*) AS c FROM hits GROUP BY "ClientIP", "ClientIP" - 1, "ClientIP" - 2, "ClientIP" - 3 ORDER BY c DESC LIMIT 10;
37-
SELECT "URL", COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-01' AND "EventDate"::INT::DATE <= '2013-07-31' AND "DontCountHits" = 0 AND "IsRefresh" = 0 AND "URL" <> '' GROUP BY "URL" ORDER BY PageViews DESC LIMIT 10;
38-
SELECT "Title", COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-01' AND "EventDate"::INT::DATE <= '2013-07-31' AND "DontCountHits" = 0 AND "IsRefresh" = 0 AND "Title" <> '' GROUP BY "Title" ORDER BY PageViews DESC LIMIT 10;
39-
SELECT "URL", COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-01' AND "EventDate"::INT::DATE <= '2013-07-31' AND "IsRefresh" = 0 AND "IsLink" <> 0 AND "IsDownload" = 0 GROUP BY "URL" ORDER BY PageViews DESC LIMIT 10 OFFSET 1000;
40-
SELECT "TraficSourceID", "SearchEngineID", "AdvEngineID", CASE WHEN ("SearchEngineID" = 0 AND "AdvEngineID" = 0) THEN "Referer" ELSE '' END AS Src, "URL" AS Dst, COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-01' AND "EventDate"::INT::DATE <= '2013-07-31' AND "IsRefresh" = 0 GROUP BY "TraficSourceID", "SearchEngineID", "AdvEngineID", Src, Dst ORDER BY PageViews DESC LIMIT 10 OFFSET 1000;
41-
SELECT "URLHash", "EventDate"::INT::DATE, COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-01' AND "EventDate"::INT::DATE <= '2013-07-31' AND "IsRefresh" = 0 AND "TraficSourceID" IN (-1, 6) AND "RefererHash" = 3594120000172545465 GROUP BY "URLHash", "EventDate"::INT::DATE ORDER BY PageViews DESC LIMIT 10 OFFSET 100;
42-
SELECT "WindowClientWidth", "WindowClientHeight", COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-01' AND "EventDate"::INT::DATE <= '2013-07-31' AND "IsRefresh" = 0 AND "DontCountHits" = 0 AND "URLHash" = 2868770270353813622 GROUP BY "WindowClientWidth", "WindowClientHeight" ORDER BY PageViews DESC LIMIT 10 OFFSET 10000;
43-
SELECT DATE_TRUNC('minute', to_timestamp_seconds("EventTime")) AS M, COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-14' AND "EventDate"::INT::DATE <= '2013-07-15' AND "IsRefresh" = 0 AND "DontCountHits" = 0 GROUP BY DATE_TRUNC('minute', to_timestamp_seconds("EventTime")) ORDER BY DATE_TRUNC('minute', M) LIMIT 10 OFFSET 1000;
37+
SELECT "URL", COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate" >= '2013-07-01' AND "EventDate" <= '2013-07-31' AND "DontCountHits" = 0 AND "IsRefresh" = 0 AND "URL" <> '' GROUP BY "URL" ORDER BY PageViews DESC LIMIT 10;
38+
SELECT "Title", COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate" >= '2013-07-01' AND "EventDate" <= '2013-07-31' AND "DontCountHits" = 0 AND "IsRefresh" = 0 AND "Title" <> '' GROUP BY "Title" ORDER BY PageViews DESC LIMIT 10;
39+
SELECT "URL", COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate" >= '2013-07-01' AND "EventDate" <= '2013-07-31' AND "IsRefresh" = 0 AND "IsLink" <> 0 AND "IsDownload" = 0 GROUP BY "URL" ORDER BY PageViews DESC LIMIT 10 OFFSET 1000;
40+
SELECT "TraficSourceID", "SearchEngineID", "AdvEngineID", CASE WHEN ("SearchEngineID" = 0 AND "AdvEngineID" = 0) THEN "Referer" ELSE '' END AS Src, "URL" AS Dst, COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate" >= '2013-07-01' AND "EventDate" <= '2013-07-31' AND "IsRefresh" = 0 GROUP BY "TraficSourceID", "SearchEngineID", "AdvEngineID", Src, Dst ORDER BY PageViews DESC LIMIT 10 OFFSET 1000;
41+
SELECT "URLHash", "EventDate", COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate" >= '2013-07-01' AND "EventDate" <= '2013-07-31' AND "IsRefresh" = 0 AND "TraficSourceID" IN (-1, 6) AND "RefererHash" = 3594120000172545465 GROUP BY "URLHash", "EventDate" ORDER BY PageViews DESC LIMIT 10 OFFSET 100;
42+
SELECT "WindowClientWidth", "WindowClientHeight", COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate" >= '2013-07-01' AND "EventDate" <= '2013-07-31' AND "IsRefresh" = 0 AND "DontCountHits" = 0 AND "URLHash" = 2868770270353813622 GROUP BY "WindowClientWidth", "WindowClientHeight" ORDER BY PageViews DESC LIMIT 10 OFFSET 10000;
43+
SELECT DATE_TRUNC('minute', to_timestamp_seconds("EventTime")) AS M, COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate" >= '2013-07-14' AND "EventDate" <= '2013-07-15' AND "IsRefresh" = 0 AND "DontCountHits" = 0 GROUP BY DATE_TRUNC('minute', to_timestamp_seconds("EventTime")) ORDER BY DATE_TRUNC('minute', M) LIMIT 10 OFFSET 1000;

datafusion/core/src/physical_optimizer/enforce_sorting.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -186,13 +186,11 @@ impl PhysicalOptimizerRule for EnforceSorting {
186186
)
187187
})
188188
.data()?;
189-
190189
// Execute a top-down traversal to exploit sort push-down opportunities
191190
// missed by the bottom-up traversal:
192191
let mut sort_pushdown = SortPushDown::new_default(updated_plan.plan);
193192
assign_initial_requirements(&mut sort_pushdown);
194193
let adjusted = pushdown_sorts(sort_pushdown)?;
195-
196194
adjusted
197195
.plan
198196
.transform_up(|plan| Ok(Transformed::yes(replace_with_partial_sort(plan)?)))
@@ -399,7 +397,10 @@ fn analyze_immediate_sort_removal(
399397
{
400398
// Replace the sort with a sort-preserving merge:
401399
let expr = LexOrdering::new(sort_exec.expr().to_vec());
402-
Arc::new(SortPreservingMergeExec::new(expr, sort_input.clone())) as _
400+
Arc::new(
401+
SortPreservingMergeExec::new(expr, Arc::clone(sort_input))
402+
.with_fetch(sort_exec.fetch()),
403+
) as _
403404
} else {
404405
// Remove the sort:
405406
node.children = node.children.swap_remove(0).children;
@@ -617,12 +618,13 @@ fn remove_corresponding_sort_from_sub_plan(
617618
{
618619
// If there is existing ordering, to preserve ordering use
619620
// `SortPreservingMergeExec` instead of a `CoalescePartitionsExec`.
620-
let plan = node.plan.clone();
621+
let plan = Arc::clone(&node.plan);
622+
let fetch = plan.fetch();
621623
let plan = if let Some(ordering) = plan.output_ordering() {
622-
Arc::new(SortPreservingMergeExec::new(
623-
LexOrdering::new(ordering.to_vec()),
624-
plan,
625-
)) as _
624+
Arc::new(
625+
SortPreservingMergeExec::new(LexOrdering::new(ordering.to_vec()), plan)
626+
.with_fetch(fetch),
627+
) as _
626628
} else {
627629
Arc::new(CoalescePartitionsExec::new(plan)) as _
628630
};

datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,14 +99,20 @@ fn plan_with_order_preserving_variants(
9999
// Flag indicating that it is desirable to replace `CoalescePartitionsExec`s
100100
// with `SortPreservingMergeExec`s:
101101
is_spm_better: bool,
102+
fetch: Option<usize>,
102103
) -> Result<OrderPreservationContext> {
103104
sort_input.children = sort_input
104105
.children
105106
.into_iter()
106107
.map(|node| {
107108
// Update descendants in the given tree if there is a connection:
108109
if node.data {
109-
plan_with_order_preserving_variants(node, is_spr_better, is_spm_better)
110+
plan_with_order_preserving_variants(
111+
node,
112+
is_spr_better,
113+
is_spm_better,
114+
fetch,
115+
)
110116
} else {
111117
Ok(node)
112118
}
@@ -133,7 +139,8 @@ fn plan_with_order_preserving_variants(
133139
// When the input of a `CoalescePartitionsExec` has an ordering,
134140
// replace it with a `SortPreservingMergeExec` if appropriate:
135141
let spm =
136-
SortPreservingMergeExec::new(LexOrdering::new(ordering), child.clone());
142+
SortPreservingMergeExec::new(LexOrdering::new(ordering), child.clone())
143+
.with_fetch(fetch);
137144
sort_input.plan = Arc::new(spm) as _;
138145
sort_input.children[0].data = true;
139146
return Ok(sort_input);
@@ -251,6 +258,7 @@ pub(crate) fn replace_with_order_preserving_variants(
251258
requirements.children.swap_remove(0),
252259
is_spr_better || use_order_preserving_variant,
253260
is_spm_better || use_order_preserving_variant,
261+
requirements.plan.fetch(),
254262
)?;
255263

256264
// If the alternate plan makes this sort unnecessary, accept the alternate:

datafusion/core/src/physical_optimizer/sort_pushdown.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,9 @@ pub fn assign_initial_requirements(node: &mut SortPushDown) {
6464
for (child, requirement) in node.children.iter_mut().zip(reqs) {
6565
child.data = ParentRequirements {
6666
ordering_requirement: requirement,
67-
fetch: None,
67+
// If the parent has a fetch value, assign it to the children
68+
// Or use the fetch value of the child.
69+
fetch: child.plan.fetch(),
6870
};
6971
}
7072
}
@@ -95,14 +97,16 @@ fn pushdown_sorts_helper(
9597
.equivalence_properties()
9698
.ordering_satisfy_requirement(parent_reqs);
9799
if is_sort(plan) {
100+
let sort_fetch = plan.fetch();
98101
let required_ordering = plan
99102
.output_ordering()
100103
.map(PhysicalSortRequirement::from_sort_exprs)
101104
.unwrap_or_default();
102105
if !satisfy_parent {
103106
// Make sure this `SortExec` satisfies parent requirements:
104107
let sort_reqs = requirements.data.ordering_requirement.unwrap_or_default();
105-
let fetch = requirements.data.fetch;
108+
// It's possible current plan (`SortExec`) has a fetch value.
109+
let fetch = requirements.data.fetch.or(sort_fetch);
106110
requirements = requirements.children.swap_remove(0);
107111
requirements = add_sort_above(requirements, sort_reqs, fetch);
108112
};
@@ -112,7 +116,7 @@ fn pushdown_sorts_helper(
112116
if let Some(adjusted) =
113117
pushdown_requirement_to_children(&child.plan, &required_ordering)?
114118
{
115-
let fetch = child.plan.fetch();
119+
let fetch = sort_fetch.or_else(|| child.plan.fetch());
116120
for (grand_child, order) in child.children.iter_mut().zip(adjusted) {
117121
grand_child.data = ParentRequirements {
118122
ordering_requirement: order,

datafusion/sqllogictest/test_files/topk.slt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,12 @@ select * from topk order by x desc limit 3;
4949
8
5050
5
5151

52-
52+
query I
53+
select * from (select * from topk limit 8) order by x limit 3;
54+
----
55+
0
56+
1
57+
2
5358

5459

5560
statement ok

0 commit comments

Comments
 (0)