Skip to content

Commit c578c52

Browse files
xudong963avantgardnerio
authored andcommitted
Cherry-pick topk limit pushdown fix (apache#14192) v45
1 parent 843b19a commit c578c52

File tree

5 files changed

+43
-24
lines changed

5 files changed

+43
-24
lines changed

benchmarks/queries/clickbench/queries.sql

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ SELECT SUM("AdvEngineID"), COUNT(*), AVG("ResolutionWidth") FROM hits;
44
SELECT AVG("UserID") FROM hits;
55
SELECT COUNT(DISTINCT "UserID") FROM hits;
66
SELECT COUNT(DISTINCT "SearchPhrase") FROM hits;
7-
SELECT MIN("EventDate"::INT::DATE), MAX("EventDate"::INT::DATE) FROM hits;
7+
SELECT MIN("EventDate"), MAX("EventDate") FROM hits;
88
SELECT "AdvEngineID", COUNT(*) FROM hits WHERE "AdvEngineID" <> 0 GROUP BY "AdvEngineID" ORDER BY COUNT(*) DESC;
99
SELECT "RegionID", COUNT(DISTINCT "UserID") AS u FROM hits GROUP BY "RegionID" ORDER BY u DESC LIMIT 10;
1010
SELECT "RegionID", SUM("AdvEngineID"), COUNT(*) AS c, AVG("ResolutionWidth"), COUNT(DISTINCT "UserID") FROM hits GROUP BY "RegionID" ORDER BY c DESC LIMIT 10;
@@ -21,10 +21,10 @@ SELECT "UserID" FROM hits WHERE "UserID" = 435090932899640449;
2121
SELECT COUNT(*) FROM hits WHERE "URL" LIKE '%google%';
2222
SELECT "SearchPhrase", MIN("URL"), COUNT(*) AS c FROM hits WHERE "URL" LIKE '%google%' AND "SearchPhrase" <> '' GROUP BY "SearchPhrase" ORDER BY c DESC LIMIT 10;
2323
SELECT "SearchPhrase", MIN("URL"), MIN("Title"), COUNT(*) AS c, COUNT(DISTINCT "UserID") FROM hits WHERE "Title" LIKE '%Google%' AND "URL" NOT LIKE '%.google.%' AND "SearchPhrase" <> '' GROUP BY "SearchPhrase" ORDER BY c DESC LIMIT 10;
24-
SELECT * FROM hits WHERE "URL" LIKE '%google%' ORDER BY to_timestamp_seconds("EventTime") LIMIT 10;
25-
SELECT "SearchPhrase" FROM hits WHERE "SearchPhrase" <> '' ORDER BY to_timestamp_seconds("EventTime") LIMIT 10;
24+
SELECT * FROM hits WHERE "URL" LIKE '%google%' ORDER BY "EventTime" LIMIT 10;
25+
SELECT "SearchPhrase" FROM hits WHERE "SearchPhrase" <> '' ORDER BY "EventTime" LIMIT 10;
2626
SELECT "SearchPhrase" FROM hits WHERE "SearchPhrase" <> '' ORDER BY "SearchPhrase" LIMIT 10;
27-
SELECT "SearchPhrase" FROM hits WHERE "SearchPhrase" <> '' ORDER BY to_timestamp_seconds("EventTime"), "SearchPhrase" LIMIT 10;
27+
SELECT "SearchPhrase" FROM hits WHERE "SearchPhrase" <> '' ORDER BY "EventTime", "SearchPhrase" LIMIT 10;
2828
SELECT "CounterID", AVG(length("URL")) AS l, COUNT(*) AS c FROM hits WHERE "URL" <> '' GROUP BY "CounterID" HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25;
2929
SELECT REGEXP_REPLACE("Referer", '^https?://(?:www\.)?([^/]+)/.*$', '\1') AS k, AVG(length("Referer")) AS l, COUNT(*) AS c, MIN("Referer") FROM hits WHERE "Referer" <> '' GROUP BY k HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25;
3030
SELECT SUM("ResolutionWidth"), SUM("ResolutionWidth" + 1), SUM("ResolutionWidth" + 2), SUM("ResolutionWidth" + 3), SUM("ResolutionWidth" + 4), SUM("ResolutionWidth" + 5), SUM("ResolutionWidth" + 6), SUM("ResolutionWidth" + 7), SUM("ResolutionWidth" + 8), SUM("ResolutionWidth" + 9), SUM("ResolutionWidth" + 10), SUM("ResolutionWidth" + 11), SUM("ResolutionWidth" + 12), SUM("ResolutionWidth" + 13), SUM("ResolutionWidth" + 14), SUM("ResolutionWidth" + 15), SUM("ResolutionWidth" + 16), SUM("ResolutionWidth" + 17), SUM("ResolutionWidth" + 18), SUM("ResolutionWidth" + 19), SUM("ResolutionWidth" + 20), SUM("ResolutionWidth" + 21), SUM("ResolutionWidth" + 22), SUM("ResolutionWidth" + 23), SUM("ResolutionWidth" + 24), SUM("ResolutionWidth" + 25), SUM("ResolutionWidth" + 26), SUM("ResolutionWidth" + 27), SUM("ResolutionWidth" + 28), SUM("ResolutionWidth" + 29), SUM("ResolutionWidth" + 30), SUM("ResolutionWidth" + 31), SUM("ResolutionWidth" + 32), SUM("ResolutionWidth" + 33), SUM("ResolutionWidth" + 34), SUM("ResolutionWidth" + 35), SUM("ResolutionWidth" + 36), SUM("ResolutionWidth" + 37), SUM("ResolutionWidth" + 38), SUM("ResolutionWidth" + 39), SUM("ResolutionWidth" + 40), SUM("ResolutionWidth" + 41), SUM("ResolutionWidth" + 42), SUM("ResolutionWidth" + 43), SUM("ResolutionWidth" + 44), SUM("ResolutionWidth" + 45), SUM("ResolutionWidth" + 46), SUM("ResolutionWidth" + 47), SUM("ResolutionWidth" + 48), SUM("ResolutionWidth" + 49), SUM("ResolutionWidth" + 50), SUM("ResolutionWidth" + 51), SUM("ResolutionWidth" + 52), SUM("ResolutionWidth" + 53), SUM("ResolutionWidth" + 54), SUM("ResolutionWidth" + 55), SUM("ResolutionWidth" + 56), SUM("ResolutionWidth" + 57), SUM("ResolutionWidth" + 58), SUM("ResolutionWidth" + 59), SUM("ResolutionWidth" + 60), SUM("ResolutionWidth" + 61), SUM("ResolutionWidth" + 62), SUM("ResolutionWidth" + 63), SUM("ResolutionWidth" + 64), SUM("ResolutionWidth" + 65), SUM("ResolutionWidth" + 66), SUM("ResolutionWidth" + 67), SUM("ResolutionWidth" + 68), SUM("ResolutionWidth" + 69), SUM("ResolutionWidth" + 70), SUM("ResolutionWidth" + 71), SUM("ResolutionWidth" + 72), SUM("ResolutionWidth" + 73), SUM("ResolutionWidth" + 74), SUM("ResolutionWidth" + 75), SUM("ResolutionWidth" + 76), SUM("ResolutionWidth" + 77), SUM("ResolutionWidth" + 78), SUM("ResolutionWidth" + 79), SUM("ResolutionWidth" + 80), SUM("ResolutionWidth" + 81), SUM("ResolutionWidth" + 82), SUM("ResolutionWidth" + 83), SUM("ResolutionWidth" + 84), SUM("ResolutionWidth" + 85), SUM("ResolutionWidth" + 86), SUM("ResolutionWidth" + 87), SUM("ResolutionWidth" + 88), SUM("ResolutionWidth" + 89) FROM hits;
@@ -34,10 +34,10 @@ SELECT "WatchID", "ClientIP", COUNT(*) AS c, SUM("IsRefresh"), AVG("ResolutionWi
3434
SELECT "URL", COUNT(*) AS c FROM hits GROUP BY "URL" ORDER BY c DESC LIMIT 10;
3535
SELECT 1, "URL", COUNT(*) AS c FROM hits GROUP BY 1, "URL" ORDER BY c DESC LIMIT 10;
3636
SELECT "ClientIP", "ClientIP" - 1, "ClientIP" - 2, "ClientIP" - 3, COUNT(*) AS c FROM hits GROUP BY "ClientIP", "ClientIP" - 1, "ClientIP" - 2, "ClientIP" - 3 ORDER BY c DESC LIMIT 10;
37-
SELECT "URL", COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-01' AND "EventDate"::INT::DATE <= '2013-07-31' AND "DontCountHits" = 0 AND "IsRefresh" = 0 AND "URL" <> '' GROUP BY "URL" ORDER BY PageViews DESC LIMIT 10;
38-
SELECT "Title", COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-01' AND "EventDate"::INT::DATE <= '2013-07-31' AND "DontCountHits" = 0 AND "IsRefresh" = 0 AND "Title" <> '' GROUP BY "Title" ORDER BY PageViews DESC LIMIT 10;
39-
SELECT "URL", COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-01' AND "EventDate"::INT::DATE <= '2013-07-31' AND "IsRefresh" = 0 AND "IsLink" <> 0 AND "IsDownload" = 0 GROUP BY "URL" ORDER BY PageViews DESC LIMIT 10 OFFSET 1000;
40-
SELECT "TraficSourceID", "SearchEngineID", "AdvEngineID", CASE WHEN ("SearchEngineID" = 0 AND "AdvEngineID" = 0) THEN "Referer" ELSE '' END AS Src, "URL" AS Dst, COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-01' AND "EventDate"::INT::DATE <= '2013-07-31' AND "IsRefresh" = 0 GROUP BY "TraficSourceID", "SearchEngineID", "AdvEngineID", Src, Dst ORDER BY PageViews DESC LIMIT 10 OFFSET 1000;
41-
SELECT "URLHash", "EventDate"::INT::DATE, COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-01' AND "EventDate"::INT::DATE <= '2013-07-31' AND "IsRefresh" = 0 AND "TraficSourceID" IN (-1, 6) AND "RefererHash" = 3594120000172545465 GROUP BY "URLHash", "EventDate"::INT::DATE ORDER BY PageViews DESC LIMIT 10 OFFSET 100;
42-
SELECT "WindowClientWidth", "WindowClientHeight", COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-01' AND "EventDate"::INT::DATE <= '2013-07-31' AND "IsRefresh" = 0 AND "DontCountHits" = 0 AND "URLHash" = 2868770270353813622 GROUP BY "WindowClientWidth", "WindowClientHeight" ORDER BY PageViews DESC LIMIT 10 OFFSET 10000;
43-
SELECT DATE_TRUNC('minute', to_timestamp_seconds("EventTime")) AS M, COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-14' AND "EventDate"::INT::DATE <= '2013-07-15' AND "IsRefresh" = 0 AND "DontCountHits" = 0 GROUP BY DATE_TRUNC('minute', to_timestamp_seconds("EventTime")) ORDER BY DATE_TRUNC('minute', M) LIMIT 10 OFFSET 1000;
37+
SELECT "URL", COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate" >= '2013-07-01' AND "EventDate" <= '2013-07-31' AND "DontCountHits" = 0 AND "IsRefresh" = 0 AND "URL" <> '' GROUP BY "URL" ORDER BY PageViews DESC LIMIT 10;
38+
SELECT "Title", COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate" >= '2013-07-01' AND "EventDate" <= '2013-07-31' AND "DontCountHits" = 0 AND "IsRefresh" = 0 AND "Title" <> '' GROUP BY "Title" ORDER BY PageViews DESC LIMIT 10;
39+
SELECT "URL", COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate" >= '2013-07-01' AND "EventDate" <= '2013-07-31' AND "IsRefresh" = 0 AND "IsLink" <> 0 AND "IsDownload" = 0 GROUP BY "URL" ORDER BY PageViews DESC LIMIT 10 OFFSET 1000;
40+
SELECT "TraficSourceID", "SearchEngineID", "AdvEngineID", CASE WHEN ("SearchEngineID" = 0 AND "AdvEngineID" = 0) THEN "Referer" ELSE '' END AS Src, "URL" AS Dst, COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate" >= '2013-07-01' AND "EventDate" <= '2013-07-31' AND "IsRefresh" = 0 GROUP BY "TraficSourceID", "SearchEngineID", "AdvEngineID", Src, Dst ORDER BY PageViews DESC LIMIT 10 OFFSET 1000;
41+
SELECT "URLHash", "EventDate", COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate" >= '2013-07-01' AND "EventDate" <= '2013-07-31' AND "IsRefresh" = 0 AND "TraficSourceID" IN (-1, 6) AND "RefererHash" = 3594120000172545465 GROUP BY "URLHash", "EventDate" ORDER BY PageViews DESC LIMIT 10 OFFSET 100;
42+
SELECT "WindowClientWidth", "WindowClientHeight", COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate" >= '2013-07-01' AND "EventDate" <= '2013-07-31' AND "IsRefresh" = 0 AND "DontCountHits" = 0 AND "URLHash" = 2868770270353813622 GROUP BY "WindowClientWidth", "WindowClientHeight" ORDER BY PageViews DESC LIMIT 10 OFFSET 10000;
43+
SELECT DATE_TRUNC('minute', to_timestamp_seconds("EventTime")) AS M, COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate" >= '2013-07-14' AND "EventDate" <= '2013-07-15' AND "IsRefresh" = 0 AND "DontCountHits" = 0 GROUP BY DATE_TRUNC('minute', to_timestamp_seconds("EventTime")) ORDER BY DATE_TRUNC('minute', M) LIMIT 10 OFFSET 1000;

datafusion/core/src/physical_optimizer/enforce_sorting.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -186,13 +186,11 @@ impl PhysicalOptimizerRule for EnforceSorting {
186186
)
187187
})
188188
.data()?;
189-
190189
// Execute a top-down traversal to exploit sort push-down opportunities
191190
// missed by the bottom-up traversal:
192191
let mut sort_pushdown = SortPushDown::new_default(updated_plan.plan);
193192
assign_initial_requirements(&mut sort_pushdown);
194193
let adjusted = pushdown_sorts(sort_pushdown)?;
195-
196194
adjusted
197195
.plan
198196
.transform_up(|plan| Ok(Transformed::yes(replace_with_partial_sort(plan)?)))
@@ -403,7 +401,10 @@ fn analyze_immediate_sort_removal(
403401
{
404402
// Replace the sort with a sort-preserving merge:
405403
let expr = LexOrdering::new(sort_exec.expr().to_vec());
406-
Arc::new(SortPreservingMergeExec::new(expr, Arc::clone(sort_input))) as _
404+
Arc::new(
405+
SortPreservingMergeExec::new(expr, Arc::clone(sort_input))
406+
.with_fetch(sort_exec.fetch()),
407+
) as _
407408
} else {
408409
// Remove the sort:
409410
node.children = node.children.swap_remove(0).children;
@@ -626,11 +627,12 @@ fn remove_corresponding_sort_from_sub_plan(
626627
// If there is existing ordering, to preserve ordering use
627628
// `SortPreservingMergeExec` instead of a `CoalescePartitionsExec`.
628629
let plan = Arc::clone(&node.plan);
630+
let fetch = plan.fetch();
629631
let plan = if let Some(ordering) = plan.output_ordering() {
630-
Arc::new(SortPreservingMergeExec::new(
631-
LexOrdering::new(ordering.to_vec()),
632-
plan,
633-
)) as _
632+
Arc::new(
633+
SortPreservingMergeExec::new(LexOrdering::new(ordering.to_vec()), plan)
634+
.with_fetch(fetch),
635+
) as _
634636
} else {
635637
Arc::new(CoalescePartitionsExec::new(plan)) as _
636638
};

datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,14 +100,20 @@ fn plan_with_order_preserving_variants(
100100
// Flag indicating that it is desirable to replace `CoalescePartitionsExec`s
101101
// with `SortPreservingMergeExec`s:
102102
is_spm_better: bool,
103+
fetch: Option<usize>,
103104
) -> Result<OrderPreservationContext> {
104105
sort_input.children = sort_input
105106
.children
106107
.into_iter()
107108
.map(|node| {
108109
// Update descendants in the given tree if there is a connection:
109110
if node.data {
110-
plan_with_order_preserving_variants(node, is_spr_better, is_spm_better)
111+
plan_with_order_preserving_variants(
112+
node,
113+
is_spr_better,
114+
is_spm_better,
115+
fetch,
116+
)
111117
} else {
112118
Ok(node)
113119
}
@@ -136,7 +142,8 @@ fn plan_with_order_preserving_variants(
136142
let spm = SortPreservingMergeExec::new(
137143
LexOrdering::new(ordering.inner.clone()),
138144
Arc::clone(child),
139-
);
145+
)
146+
.with_fetch(fetch);
140147
sort_input.plan = Arc::new(spm) as _;
141148
sort_input.children[0].data = true;
142149
return Ok(sort_input);
@@ -255,6 +262,7 @@ pub(crate) fn replace_with_order_preserving_variants(
255262
requirements.children.swap_remove(0),
256263
is_spr_better || use_order_preserving_variant,
257264
is_spm_better || use_order_preserving_variant,
265+
requirements.plan.fetch(),
258266
)?;
259267

260268
// If the alternate plan makes this sort unnecessary, accept the alternate:

datafusion/core/src/physical_optimizer/sort_pushdown.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,9 @@ pub fn assign_initial_requirements(node: &mut SortPushDown) {
6363
for (child, requirement) in node.children.iter_mut().zip(reqs) {
6464
child.data = ParentRequirements {
6565
ordering_requirement: requirement,
66-
fetch: None,
66+
// If the parent has a fetch value, assign it to the children
67+
// Or use the fetch value of the child.
68+
fetch: child.plan.fetch(),
6769
};
6870
}
6971
}
@@ -95,6 +97,7 @@ fn pushdown_sorts_helper(
9597
.ordering_satisfy_requirement(&parent_reqs);
9698

9799
if is_sort(plan) {
100+
let sort_fetch = plan.fetch();
98101
let required_ordering = plan
99102
.output_ordering()
100103
.cloned()
@@ -103,7 +106,8 @@ fn pushdown_sorts_helper(
103106
if !satisfy_parent {
104107
// Make sure this `SortExec` satisfies parent requirements:
105108
let sort_reqs = requirements.data.ordering_requirement.unwrap_or_default();
106-
let fetch = requirements.data.fetch;
109+
// It's possible current plan (`SortExec`) has a fetch value.
110+
let fetch = requirements.data.fetch.or(sort_fetch);
107111
requirements = requirements.children.swap_remove(0);
108112
requirements = add_sort_above(requirements, sort_reqs, fetch);
109113
};
@@ -113,7 +117,7 @@ fn pushdown_sorts_helper(
113117
if let Some(adjusted) =
114118
pushdown_requirement_to_children(&child.plan, &required_ordering)?
115119
{
116-
let fetch = child.plan.fetch();
120+
let fetch = sort_fetch.or_else(|| child.plan.fetch());
117121
for (grand_child, order) in child.children.iter_mut().zip(adjusted) {
118122
grand_child.data = ParentRequirements {
119123
ordering_requirement: order,

datafusion/sqllogictest/test_files/topk.slt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,12 @@ select * from topk order by x desc limit 3;
4949
8
5050
5
5151

52-
52+
query I
53+
select * from (select * from topk limit 8) order by x limit 3;
54+
----
55+
0
56+
1
57+
2
5358

5459

5560
statement ok

0 commit comments

Comments
 (0)