Skip to content

Commit 2510e34

Browse files
authored
fix: LimitPushdown rule uncorrect remove some GlobalLimitExec (#14245)
* fix: LimitPushdown rule uncorrect remove some GlobalLimitExec * Fix some logic for maybe fetch * Fix test * Address comments * Address comments * Add comments * Address comments
1 parent a93b4de commit 2510e34

File tree

3 files changed

+70
-7
lines changed

3 files changed

+70
-7
lines changed

datafusion/core/tests/dataframe/mod.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2217,11 +2217,6 @@ async fn write_parquet_with_order() -> Result<()> {
22172217
let df = ctx.sql("SELECT * FROM data").await?;
22182218
let results = df.collect().await?;
22192219

2220-
let df_explain = ctx.sql("explain SELECT a FROM data").await?;
2221-
let explain_result = df_explain.collect().await?;
2222-
2223-
println!("explain_result {:?}", explain_result);
2224-
22252220
assert_batches_eq!(
22262221
&[
22272222
"+---+---+",

datafusion/physical-optimizer/src/limit_pushdown.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
3131
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
3232
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
3333
use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
34-
3534
/// This rule inspects [`ExecutionPlan`]'s and pushes down the fetch limit from
3635
/// the parent to the child if applicable.
3736
#[derive(Default, Debug)]
@@ -248,7 +247,15 @@ pub fn pushdown_limit_helper(
248247
}
249248
} else {
250249
// Add fetch or a `LimitExec`:
251-
global_state.satisfied = true;
250+
// If the plan's children have limit and the child's limit < parent's limit, we shouldn't change the global state to true,
251+
// because the children limit will be overridden if the global state is changed.
252+
if !pushdown_plan
253+
.children()
254+
.iter()
255+
.any(|&child| extract_limit(child).is_some())
256+
{
257+
global_state.satisfied = true;
258+
}
252259
pushdown_plan = if let Some(plan_with_fetch) = maybe_fetchable {
253260
if global_skip > 0 {
254261
add_global_limit(plan_with_fetch, global_skip, Some(global_fetch))

datafusion/sqllogictest/test_files/limit.slt

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -711,3 +711,64 @@ OFFSET 3 LIMIT 2;
711711

712712
statement ok
713713
drop table ordered_table;
714+
715+
# Test issue: https://github.com/apache/datafusion/issues/14204
716+
# Test limit pushdown with subquery
717+
statement ok
718+
create table testSubQueryLimit (a int, b int) as values (1,2), (2,3), (3,4);
719+
720+
query IIII
721+
select * from testSubQueryLimit as t1 join (select * from testSubQueryLimit limit 1) limit 10;
722+
----
723+
1 2 1 2
724+
2 3 1 2
725+
3 4 1 2
726+
727+
query TT
728+
explain select * from testSubQueryLimit as t1 join (select * from testSubQueryLimit limit 1) limit 10;
729+
----
730+
logical_plan
731+
01)Limit: skip=0, fetch=10
732+
02)--Cross Join:
733+
03)----SubqueryAlias: t1
734+
04)------Limit: skip=0, fetch=10
735+
05)--------TableScan: testsubquerylimit projection=[a, b], fetch=10
736+
06)----Limit: skip=0, fetch=1
737+
07)------TableScan: testsubquerylimit projection=[a, b], fetch=1
738+
physical_plan
739+
01)ProjectionExec: expr=[a@2 as a, b@3 as b, a@0 as a, b@1 as b]
740+
02)--GlobalLimitExec: skip=0, fetch=10
741+
03)----CrossJoinExec
742+
04)------GlobalLimitExec: skip=0, fetch=1
743+
05)--------MemoryExec: partitions=1, partition_sizes=[1]
744+
06)------GlobalLimitExec: skip=0, fetch=10
745+
07)--------MemoryExec: partitions=1, partition_sizes=[1]
746+
747+
748+
query IIII
749+
select * from testSubQueryLimit as t1 join (select * from testSubQueryLimit limit 10) limit 2;
750+
----
751+
1 2 1 2
752+
1 2 2 3
753+
754+
query TT
755+
explain select * from testSubQueryLimit as t1 join (select * from testSubQueryLimit limit 10) limit 2;
756+
----
757+
logical_plan
758+
01)Limit: skip=0, fetch=2
759+
02)--Cross Join:
760+
03)----SubqueryAlias: t1
761+
04)------Limit: skip=0, fetch=2
762+
05)--------TableScan: testsubquerylimit projection=[a, b], fetch=2
763+
06)----Limit: skip=0, fetch=2
764+
07)------TableScan: testsubquerylimit projection=[a, b], fetch=2
765+
physical_plan
766+
01)GlobalLimitExec: skip=0, fetch=2
767+
02)--CrossJoinExec
768+
03)----GlobalLimitExec: skip=0, fetch=2
769+
04)------MemoryExec: partitions=1, partition_sizes=[1]
770+
05)----GlobalLimitExec: skip=0, fetch=2
771+
06)------MemoryExec: partitions=1, partition_sizes=[1]
772+
773+
statement ok
774+
drop table testSubQueryLimit;

0 commit comments

Comments
 (0)