-
Notifications
You must be signed in to change notification settings - Fork 1.8k
fix: LimitPushdown rule uncorrect remove some GlobalLimitExec #14245
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| ---- | ||
| 2 2 2 2 true | ||
| 3 3 2 2 true | ||
| 2 2 2 2 false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this change?
According to this PR, the limit push down to full join both side:
#12963
| 02)--HashJoinExec: mode=CollectLeft, join_type=Full, on=[(c1@0, c1@0)] | ||
| 03)----MemoryExec: partitions=1, partition_sizes=[1] | ||
| 04)----MemoryExec: partitions=1, partition_sizes=[1] | ||
| 03)----GlobalLimitExec: skip=0, fetch=2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this change?
According to this PR, the limit push down to full join both side:
#12963
So the limit will apply both side already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't think it is correct to push a limit below a Full join -- a FullJoin will create null values to match any misisng rows 🤔 So even if you limited both sides you'll still get rows out of there that shouldn't be ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't think it is correct to push a limit below a Full join -- a FullJoin will create null values to match any misisng rows 🤔 So even if you limited both sides you'll still get rows out of there that shouldn't be ...
Thank you @alamb for review, this is a good point, and we don't do anything for the full join limit in current PR.
The logical plan already push down full join limit since:
#12963
For example the following code:
## Test !join.on.is_empty() && join.filter.is_none()
query TT
EXPLAIN SELECT * FROM t0 FULL JOIN t1 ON t0.c1 = t1.c1 LIMIT 2;
----
logical_plan
01)Limit: skip=0, fetch=2
02)--Full Join: t0.c1 = t1.c1
03)----Limit: skip=0, fetch=2
04)------TableScan: t0 projection=[c1, c2], fetch=2
05)----Limit: skip=0, fetch=2
06)------TableScan: t1 projection=[c1, c2, c3], fetch=2
And the physical plan will apply the limit pushdown, but without current PR, the child limit will be overridden by parent limit, so it does not show the limit in physical plan before.
I suggest we can create a following issue to discuss do we need to fallback or change the full join push down limit case:
#12963
What's your opinion? Thanks a lot!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the current datafusion code pushes limits down through FULL OUTER JOINs I agree we should file a bug and fix it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @alamb :
I looked into the code about the push down for join:
/// Adds a limit to the inputs of a join, if possible
fn push_down_join(mut join: Join, limit: usize) -> Transformed<Join> {
use JoinType::*;
fn is_no_join_condition(join: &Join) -> bool {
join.on.is_empty() && join.filter.is_none()
}
let (left_limit, right_limit) = if is_no_join_condition(&join) {
match join.join_type {
Left | Right | Full | Inner => (Some(limit), Some(limit)),
LeftAnti | LeftSemi | LeftMark => (Some(limit), None),
RightAnti | RightSemi => (None, Some(limit)),
}
} else {
match join.join_type {
Left => (Some(limit), None),
Right => (None, Some(limit)),
Full => (Some(limit), Some(limit)),
_ => (None, None),
}
};
if left_limit.is_none() && right_limit.is_none() {
return Transformed::no(join);
}
if let Some(limit) = left_limit {
join.left = make_arc_limit(0, limit, join.left);
}
if let Some(limit) = right_limit {
join.right = make_arc_limit(0, limit, join.right);
}
Transformed::yes(join)
}I think it's safe we just want to limit any result, we don't care about which line to return for the optimization.
If we want to get the accurate limit same with not limit, i think we need to remove those push down join optimization?
Thanks a lot!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this line looks suspicious.
Full => (Some(limit), Some(limit)),
I will work on trying to file a bug tomorrow. BTW I don't think this is introduced in your PR
I have also marked this PR as must be part of the 45 release
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot @alamb!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am pretty sure this a bug (not introduced by this PR):
I am now reviewing the rest of this PR more carefully
Thank you for your patience @zhuqi-lucas
| let df = ctx.sql("SELECT * FROM data").await?; | ||
| let results = df.collect().await?; | ||
|
|
||
| let df_explain = ctx.sql("explain SELECT a FROM data").await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Remove print info
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @zhuqi-lucas -- I looked at this PR carefully and it looks good to me -- both code and tests ❤️ 🦾
Thank you for the fix
| 02)--NestedLoopJoinExec: join_type=Full, filter=c2@0 >= c2@1 | ||
| 03)----MemoryExec: partitions=1, partition_sizes=[1] | ||
| 04)----MemoryExec: partitions=1, partition_sizes=[1] | ||
| 03)----GlobalLimitExec: skip=0, fetch=2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This plans is incorrect due to
(not this PR)
| 03)----SubqueryAlias: t1 | ||
| 04)------Limit: skip=0, fetch=10 | ||
| 05)--------TableScan: testsubquerylimit projection=[a, b], fetch=10 | ||
| 06)----Limit: skip=0, fetch=1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this plan looks good to me -- the Limit 1 is still here
| global_state.satisfied = true; | ||
| // If the plan's children have limit, we shouldn't change the global state to true, | ||
| // because the children limit will be overridden if the global state is changed. | ||
| if pushdown_plan.children().iter().any(|child| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks to me like most of the rest of this rule is implemented in terms of LimitExec (defined in this file) to abstract away directly looking for GlobalLimitExec and LocalLimitExec
I think you could do something like this instead to be more consistent with the rest of the codebase
if pushdown_plan.children().iter().any(|&child| {
extract_limit(child).is_some()
}) {
global_state.satisfied = false;
}However, this works too and fixes the bug so I think it is fine as is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's a good suggestion and it makes code clear, addressed in latest PR. Thanks @alamb !
| } else { | ||
| // Add fetch or a `LimitExec`: | ||
| global_state.satisfied = true; | ||
| // If the plan's children have limit, we shouldn't change the global state to true, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the children's limit is >= the globe limit, can we push down the limit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question @xudong963 , added the slt testing for children's limit is >= the globe limit, the limit should also support push down consistent with current behaviour, thanks!
| if pushdown_plan | ||
| .children() | ||
| .iter() | ||
| .any(|&child| extract_limit(child).is_some()) | ||
| { | ||
| global_state.satisfied = false; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the logic is strange, if comes to the else branch(248 lines), it means global_state.satisfied == false;, then here(257 lines) sets global_state.satisfied = false;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it @xudong963 , the previous logic will always setting true in the global_state.satisfied == false. This logic is keep the false for some cases. I change the logic to more clear that, we only setting to true to exclude the above case.
Thanks!
| } | ||
| } else { | ||
| // Add fetch or a `LimitExec`: | ||
| global_state.satisfied = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is the original logic for setting back to true.
|
Thanks again @zhuqi-lucas and @xudong963 -- this PR took a while but I think things are good in the end |
…#14245) * fix: LimitPushdown rule uncorrect remove some GlobalLimitExec * Fix some logic for maybe fetch * Fix test * Address comments * Address comments * Add comments * Address comments
* Get working build * Add pool_size method to MemoryPool (#218) (#230) * Add pool_size method to MemoryPool * Fix * Fmt Co-authored-by: Daniël Heres <danielheres@gmail.com> * Respect `IGNORE NULLS` flag in `ARRAY_AGG` (#260) (apache#15544) v48 * Hook for doing distributed `CollectLeft` joins (#269) * Ignore writer shutdown error (#271) * ignore writer shutdown error * cargo check * Fix bug in `swap_hash_join` (#278) * Try and fix swap_hash_join * Only swap projections when join does not have projections * just backport upstream fix * remove println * Support Duration in min/max agg functions (#283) (apache#15310) v47 * Support Duration in min/max agg functions * Attempt to fix build * Attempt to fix build - Fix chrono version * Revert "Attempt to fix build - Fix chrono version" This reverts commit fd76fe6. * Revert "Attempt to fix build" This reverts commit 9114b86. --------- Co-authored-by: svranesevic <svranesevic@users.noreply.github.com> * Fix panics in array_union (#287) (apache#15149) v48 * Drop rust-toolchain * Fix panics in array_union * Fix the chrono * Backport `GroupsAccumulator` for Duration min/max agg (#288) (apache#15322) v47 * Fix array_sort for empty record batch (#290) (apache#15149) v48 * fix: rewrite fetch, skip of the Limit node in correct order (apache#14496) v46 * fix: rewrite fetch, skip of the Limit node in correct order * style: fix clippy * Support aliases in ConstEvaluator (apache#14734) (#281) v46 * Support aliases in ConstEvaluator (apache#14734) Not sure why they are not supported. It seems that if we're not careful, some transformations can introduce aliases nested inside other expressions. * Format Cargo.toml * Preserve the name of grouping sets in SimplifyExpressions (#282) (apache#14888) v46 Whenever we use `recompute_schema` or `with_exprs_and_inputs`, this ensures that we obtain the same schema. * Support Duration in min/max agg functions (#284) (apache#15310) v47 Co-authored-by: svranesevic <svranesevic@users.noreply.github.com> * fix case_column_or_null with nullable when conditions (apache#13886) v45 * fix case_column_or_null with nullable when conditions * improve sqllogictests for case_column_or_null --------- Co-authored-by: zhangli20 <zhangli20@kuaishou.com> * Fix casewhen (apache#14156) v45 * Cherry-pick topk limit pushdown fix (apache#14192) v45 * fix: FULL OUTER JOIN and LIMIT produces wrong results (apache#14338) v45 * fix: FULL OUTER JOIN and LIMIT produces wrong results * Fix minor slt testing * fix test (cherry picked from commit ecc5694) * Cherry-pick global limit fix (apache#14245) v45 * fix: Limits are not applied correctly (apache#14418) v46 * fix: Limits are not applied correctly * Add easy fix * Add fix * Add slt testing * Address comments * Disable grouping set in CSE * Fix spm + limit (apache#14569) v46 * prost 0.13 / fix parquet dep * Delete unreliable checks * Segfault in ByteGroupValueBuilder (#294) (apache#15968) v50 * test to demonstrate segfault in ByteGroupValueBuilder * check for offset overflow * clippy (cherry picked from commit 5bdaeaf) * Update arrow dependency to include rowid (#295) * Update arrow version * Feat: Add fetch to CoalescePartitionsExec (apache#14499) (#298) v46 * add fetch info to CoalescePartitionsExec * use Statistics with_fetch API on CoalescePartitionsExec * check limit_reached only if fetch is assigned Co-authored-by: mertak-synnada <mertak67+synaada@gmail.com> * Fix `CoalescePartitionsExec` proto serialization (apache#15824) (#299) v48 * add fetch to CoalescePartitionsExecNode * gen proto code * Add test * fix * fix build * Fix test build * remove comments Co-authored-by: 张林伟 <lewiszlw520@gmail.com> * Add JoinContext with JoinLeftData to TaskContext in HashJoinExec (#300) * Add JoinContext with JoinLeftData to TaskContext in HashJoinExec * Expose random state as const * re-export ahash::RandomState * JoinContext default impl * Add debug log when setting join left data * Update arrow version for not preserving dict_id (#303) * Use partial aggregation schema for spilling to avoid column mismatch in GroupedHashAggregateStream (apache#13995) (#302) v45 * Refactor spill handling in GroupedHashAggregateStream to use partial aggregate schema * Implement aggregate functions with spill handling in tests * Add tests for aggregate functions with and without spill handling * Move test related imports into mod test * Rename spill pool test functions for clarity and consistency * Refactor aggregate function imports to use fully qualified paths * Remove outdated comments regarding input batch schema for spilling in GroupedHashAggregateStream * Update aggregate test to use AVG instead of MAX * assert spill count * Refactor partial aggregate schema creation to use create_schema function * Refactor partial aggregation schema creation and remove redundant function * Remove unused import of Schema from arrow::datatypes in row_hash.rs * move spill pool testing for aggregate functions to physical-plan/src/aggregates * Use Arc::clone for schema references in aggregate functions (cherry picked from commit 81b50c4) Co-authored-by: kosiew <kosiew@gmail.com> * Update tag * Push limits past windows (#337) (apache#17347) v50 * Restore old method for DQE * feat(optimizer): Enable filter pushdown on window functions (apache#14026) v45 * Avoid Aliased Window Expr Enter Unreachable Code (apache#14109) v45 (cherry picked from commit fda500a) * Use `Expr::qualified_name()` and `Column::new()` to extract partition keys from window and aggregate operators (#355) (apache#17757) v51 * Update PR template to be relevant to our fork * Make limit pushdown work for SortPreservingMergeExec (apache#17893) (#361) * re-publicise functions DQE relies on * Handle columns in with_new_exprs with a Join (apache#15055) (#384) apache#15055 * handle columns in with_new_exprs with Join * test doesn't return result * take join from result * clippy * make test fallible * accept any pair of expression for new_on in with_new_exprs for Join * use with_capacity Co-authored-by: delamarch3 <68732277+delamarch3@users.noreply.github.com> --------- Co-authored-by: Georgi Krastev <georgi.krastev@coralogix.com> Co-authored-by: Daniël Heres <danielheres@gmail.com> Co-authored-by: Dan Harris <1327726+thinkharderdev@users.noreply.github.com> Co-authored-by: Faiaz Sanaulla <105630300+fsdvh@users.noreply.github.com> Co-authored-by: Sava Vranešević <20240220+svranesevic@users.noreply.github.com> Co-authored-by: svranesevic <svranesevic@users.noreply.github.com> Co-authored-by: Yingwen <realevenyag@gmail.com> Co-authored-by: Zhang Li <richselian@gmail.com> Co-authored-by: zhangli20 <zhangli20@kuaishou.com> Co-authored-by: Aleksey Kirilishin <54231417+avkirilishin@users.noreply.github.com> Co-authored-by: xudong.w <wxd963996380@gmail.com> Co-authored-by: Qi Zhu <821684824@qq.com> Co-authored-by: Martins Purins <martins.purins@coralogix.com> Co-authored-by: mertak-synnada <mertak67+synaada@gmail.com> Co-authored-by: 张林伟 <lewiszlw520@gmail.com> Co-authored-by: kosiew <kosiew@gmail.com> Co-authored-by: nuno-faria <nunofpfaria@gmail.com> Co-authored-by: Berkay Şahin <124376117+berkaysynnada@users.noreply.github.com> Co-authored-by: Mason Hall <mason.hall@coralogix.com> Co-authored-by: delamarch3 <68732277+delamarch3@users.noreply.github.com>
Which issue does this PR close?
Closes #14204
What changes are included in this PR?
Fix LimitPushdown rule uncorrect remove some GlobalLimitExec.
Are these changes tested?
Yes, added unit test and slt testing.