Skip to content

Commit 02eab80

Browse files
mertak-synnadamustafasrepoozankabak
authored
Do not add redundant subquery ordering into plan (#12003)
* do not add redundant subquery ordering into plan * format code * add license * fix test cases with sort plan removing * fix comment * keep sorting on ordering mode test cases * protect test intentions with order + limit * protect test intentions with order + limit * Tmp * Minor changes * Minor changes * Minor changes * Implement top down recursion with delete check * Minor changes * Minor changes * initialize fetch() api for execution plan remove unnecessary limit plans when used with sort + fetch add test case for Sort and Limit with offset push down limit even if a child with no fetch appears when the child supports push down * Address reviews * Update comments * Minor changes * Make test deterministic * add supports limit push down to union exec * support limit push down with multi children cases * fix typos Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com> * Add fetch info to the statistics * optimize tpch test plans * Enforce distribution use inexact count estimate also. * Minor changes * Minor changes * merge with apache main add pushes_global_limit_into_multiple_fetch_plans test case change limit_pushdown.rs as manual top down operator and simplify algorithm by supporting most parent node remove and other pushdown cases * format code * fix doc paths * fix doc paths * remove redundant code block * if partition count is 1 put GlobalLimitExec * fix test cases * Apply suggestions from code review * fix syntax errors * Simplify branches * remove redundant limit plans from merge --------- Co-authored-by: Mustafa Akur <mustafa.akur@synnada.ai> Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>
1 parent 574dfeb commit 02eab80

File tree

39 files changed

+1415
-1040
lines changed

39 files changed

+1415
-1040
lines changed

datafusion/common/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
mod column;
2121
mod dfschema;
22-
mod error;
2322
mod functional_dependencies;
2423
mod join_type;
2524
mod param_value;
@@ -33,6 +32,7 @@ pub mod alias;
3332
pub mod cast;
3433
pub mod config;
3534
pub mod display;
35+
pub mod error;
3636
pub mod file_options;
3737
pub mod format;
3838
pub mod hash_utils;

datafusion/core/src/dataframe/mod.rs

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3046,13 +3046,12 @@ mod tests {
30463046
assert_eq!(
30473047
"\
30483048
Projection: t1.c1, t2.c1, Boolean(true) AS new_column\
3049-
\n Limit: skip=0, fetch=1\
3050-
\n Sort: t1.c1 ASC NULLS FIRST, fetch=1\
3051-
\n Inner Join: t1.c1 = t2.c1\
3052-
\n SubqueryAlias: t1\
3053-
\n TableScan: aggregate_test_100 projection=[c1]\
3054-
\n SubqueryAlias: t2\
3055-
\n TableScan: aggregate_test_100 projection=[c1]",
3049+
\n Sort: t1.c1 ASC NULLS FIRST, fetch=1\
3050+
\n Inner Join: t1.c1 = t2.c1\
3051+
\n SubqueryAlias: t1\
3052+
\n TableScan: aggregate_test_100 projection=[c1]\
3053+
\n SubqueryAlias: t2\
3054+
\n TableScan: aggregate_test_100 projection=[c1]",
30563055
format!("{}", df_with_column.clone().into_optimized_plan()?)
30573056
);
30583057

@@ -3240,13 +3239,12 @@ mod tests {
32403239

32413240
assert_eq!("\
32423241
Projection: t1.c1 AS AAA, t1.c2, t1.c3, t2.c1, t2.c2, t2.c3\
3243-
\n Limit: skip=0, fetch=1\
3244-
\n Sort: t1.c1 ASC NULLS FIRST, t1.c2 ASC NULLS FIRST, t1.c3 ASC NULLS FIRST, t2.c1 ASC NULLS FIRST, t2.c2 ASC NULLS FIRST, t2.c3 ASC NULLS FIRST, fetch=1\
3245-
\n Inner Join: t1.c1 = t2.c1\
3246-
\n SubqueryAlias: t1\
3247-
\n TableScan: aggregate_test_100 projection=[c1, c2, c3]\
3248-
\n SubqueryAlias: t2\
3249-
\n TableScan: aggregate_test_100 projection=[c1, c2, c3]",
3242+
\n Sort: t1.c1 ASC NULLS FIRST, t1.c2 ASC NULLS FIRST, t1.c3 ASC NULLS FIRST, t2.c1 ASC NULLS FIRST, t2.c2 ASC NULLS FIRST, t2.c3 ASC NULLS FIRST, fetch=1\
3243+
\n Inner Join: t1.c1 = t2.c1\
3244+
\n SubqueryAlias: t1\
3245+
\n TableScan: aggregate_test_100 projection=[c1, c2, c3]\
3246+
\n SubqueryAlias: t2\
3247+
\n TableScan: aggregate_test_100 projection=[c1, c2, c3]",
32503248
format!("{}", df_renamed.clone().into_optimized_plan()?)
32513249
);
32523250

datafusion/core/src/datasource/physical_plan/arrow_file.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,10 @@ impl ExecutionPlan for ArrowExec {
197197
Ok(self.projected_statistics.clone())
198198
}
199199

200+
fn fetch(&self) -> Option<usize> {
201+
self.base_config.limit
202+
}
203+
200204
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
201205
let new_config = self.base_config.clone().with_limit(limit);
202206

datafusion/core/src/datasource/physical_plan/avro.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,10 @@ impl ExecutionPlan for AvroExec {
165165
Some(self.metrics.clone_inner())
166166
}
167167

168+
fn fetch(&self) -> Option<usize> {
169+
self.base_config.limit
170+
}
171+
168172
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
169173
let new_config = self.base_config.clone().with_limit(limit);
170174

datafusion/core/src/datasource/physical_plan/csv.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,10 @@ impl ExecutionPlan for CsvExec {
427427
Some(self.metrics.clone_inner())
428428
}
429429

430+
fn fetch(&self) -> Option<usize> {
431+
self.base_config.limit
432+
}
433+
430434
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
431435
let new_config = self.base_config.clone().with_limit(limit);
432436

datafusion/core/src/datasource/physical_plan/json.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,10 @@ impl ExecutionPlan for NdJsonExec {
207207
Some(self.metrics.clone_inner())
208208
}
209209

210+
fn fetch(&self) -> Option<usize> {
211+
self.base_config.limit
212+
}
213+
210214
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
211215
let new_config = self.base_config.clone().with_limit(limit);
212216

datafusion/core/src/datasource/physical_plan/parquet/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -745,6 +745,10 @@ impl ExecutionPlan for ParquetExec {
745745
Ok(self.projected_statistics.clone())
746746
}
747747

748+
fn fetch(&self) -> Option<usize> {
749+
self.base_config.limit
750+
}
751+
748752
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
749753
let new_config = self.base_config.clone().with_limit(limit);
750754

datafusion/core/src/physical_optimizer/enforce_sorting.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ use crate::physical_plan::{Distribution, ExecutionPlan, InputOrderMode};
6262
use datafusion_common::plan_err;
6363
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
6464
use datafusion_physical_expr::{Partitioning, PhysicalSortExpr, PhysicalSortRequirement};
65-
use datafusion_physical_plan::limit::LocalLimitExec;
65+
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
6666
use datafusion_physical_plan::repartition::RepartitionExec;
6767
use datafusion_physical_plan::sorts::partial_sort::PartialSortExec;
6868
use datafusion_physical_plan::ExecutionPlanProperties;
@@ -405,7 +405,16 @@ fn analyze_immediate_sort_removal(
405405
node.children = node.children.swap_remove(0).children;
406406
if let Some(fetch) = sort_exec.fetch() {
407407
// If the sort has a fetch, we need to add a limit:
408-
Arc::new(LocalLimitExec::new(sort_input.clone(), fetch))
408+
if sort_exec
409+
.properties()
410+
.output_partitioning()
411+
.partition_count()
412+
== 1
413+
{
414+
Arc::new(GlobalLimitExec::new(sort_input.clone(), 0, Some(fetch)))
415+
} else {
416+
Arc::new(LocalLimitExec::new(sort_input.clone(), fetch))
417+
}
409418
} else {
410419
sort_input.clone()
411420
}
@@ -1124,7 +1133,7 @@ mod tests {
11241133
" MemoryExec: partitions=1, partition_sizes=[0]",
11251134
];
11261135
let expected_optimized = [
1127-
"LocalLimitExec: fetch=2",
1136+
"GlobalLimitExec: skip=0, fetch=2",
11281137
" SortExec: expr=[non_nullable_col@1 ASC,nullable_col@0 ASC], preserve_partitioning=[false]",
11291138
" MemoryExec: partitions=1, partition_sizes=[0]",
11301139
];

datafusion/core/src/physical_planner.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2177,9 +2177,6 @@ mod tests {
21772177
assert!(format!("{plan:?}").contains("GlobalLimitExec"));
21782178
assert!(format!("{plan:?}").contains("skip: 3, fetch: Some(5)"));
21792179

2180-
// LocalLimitExec adjusts the `fetch`
2181-
assert!(format!("{plan:?}").contains("LocalLimitExec"));
2182-
assert!(format!("{plan:?}").contains("fetch: 8"));
21832180
Ok(())
21842181
}
21852182

datafusion/core/tests/memory_limit/mod.rs

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -238,18 +238,15 @@ async fn sort_preserving_merge() {
238238
// SortPreservingMergeExec (not a Sort which would compete
239239
// with the SortPreservingMergeExec for memory)
240240
&[
241-
"+---------------+---------------------------------------------------------------------------------------------------------------+",
242-
"| plan_type | plan |",
243-
"+---------------+---------------------------------------------------------------------------------------------------------------+",
244-
"| logical_plan | Limit: skip=0, fetch=10 |",
245-
"| | Sort: t.a ASC NULLS LAST, t.b ASC NULLS LAST, fetch=10 |",
246-
"| | TableScan: t projection=[a, b] |",
247-
"| physical_plan | GlobalLimitExec: skip=0, fetch=10 |",
248-
"| | SortPreservingMergeExec: [a@0 ASC NULLS LAST,b@1 ASC NULLS LAST], fetch=10 |",
249-
"| | LocalLimitExec: fetch=10 |",
250-
"| | MemoryExec: partitions=2, partition_sizes=[5, 5], output_ordering=a@0 ASC NULLS LAST,b@1 ASC NULLS LAST |",
251-
"| | |",
252-
"+---------------+---------------------------------------------------------------------------------------------------------------+",
241+
"+---------------+-----------------------------------------------------------------------------------------------------------+",
242+
"| plan_type | plan |",
243+
"+---------------+-----------------------------------------------------------------------------------------------------------+",
244+
"| logical_plan | Sort: t.a ASC NULLS LAST, t.b ASC NULLS LAST, fetch=10 |",
245+
"| | TableScan: t projection=[a, b] |",
246+
"| physical_plan | SortPreservingMergeExec: [a@0 ASC NULLS LAST,b@1 ASC NULLS LAST], fetch=10 |",
247+
"| | MemoryExec: partitions=2, partition_sizes=[5, 5], output_ordering=a@0 ASC NULLS LAST,b@1 ASC NULLS LAST |",
248+
"| | |",
249+
"+---------------+-----------------------------------------------------------------------------------------------------------+",
253250
]
254251
)
255252
.run()

0 commit comments

Comments
 (0)