-
Notifications
You must be signed in to change notification settings - Fork 1.8k
fix: fetch is missed in the EnsureSorting #14192
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Does this PR fix: |
No, it's another bug, I'll complete the issue summary. |
done |
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @xudong963 !
This PR changes the datafusion-testing pin. I think it would be better to avoid that:

I think you can fix it like
git restore datafusion-testng
git commit -m 'restore datafusion-testing'
git resetOther than that I think the PR is ok to merge, but I also left some suggestions for your consideration
| fetch: if let Some(fetch) = child.plan.fetch() { | ||
| Some(fetch) | ||
| } else { | ||
| None | ||
| }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this the same as this?
| fetch: if let Some(fetch) = child.plan.fetch() { | |
| Some(fetch) | |
| } else { | |
| None | |
| }, | |
| fetch: child.plan.fetch().clone(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
| let fetch = if let Some(fetch) = requirements.data.fetch.clone() { | ||
| Some(fetch) | ||
| } else { | ||
| // It's possible current plan (`SortExec`) has a fetch value. | ||
| sort_fetch | ||
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can express this same thing more concisely
| let fetch = if let Some(fetch) = requirements.data.fetch.clone() { | |
| Some(fetch) | |
| } else { | |
| // It's possible current plan (`SortExec`) has a fetch value. | |
| sort_fetch | |
| }; | |
| let fetch = requirements.data.fetch.clone() | |
| // It's possible current plan (`SortExec`) has a fetch value. | |
| .and(sort_fetch); |
Also it seems strange that we have two fetches here that could be different .
I reverted just this change and the test still passes. By my reading of the code you have fixed the issue above by setting the requirements fetch.
So maybe this would be better as check that the fetches are the same and an internal error if they aren't 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, they should be the same, checking is enough.
For the suggestion, I think what you mean is requirements.data.fetch.clone().or_else(|| sort_fetch)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a test which has different values
async fn test_remove_unnecessary_sort8() -> Result<()> {
let schema = create_test_schema()?;
let source = memory_exec(&schema);
let input = Arc::new(SortExec::new(
LexOrdering::new(vec![sort_expr("non_nullable_col", &schema)]),
source,
));
let limit = Arc::new(LocalLimitExec::new(input, 2));
let physical_plan = sort_exec(
vec![
sort_expr("non_nullable_col", &schema),
sort_expr("nullable_col", &schema),
],
limit,
);
let expected_input = [
"SortExec: expr=[non_nullable_col@1 ASC, nullable_col@0 ASC], preserve_partitioning=[false]",
" LocalLimitExec: fetch=2",
" SortExec: expr=[non_nullable_col@1 ASC], preserve_partitioning=[false]",
" MemoryExec: partitions=1, partition_sizes=[0]",
];
let expected_optimized = [
"LocalLimitExec: fetch=2",
" SortExec: TopK(fetch=2), expr=[non_nullable_col@1 ASC, nullable_col@0 ASC], preserve_partitioning=[false]",
" MemoryExec: partitions=1, partition_sizes=[0]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan, true);
Ok(())
}requirement data fetch is Some(2), but sort doesn't have fetch, is none.
akurmustafa
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @xudong963 for this PR.
|
Thanks, @akurmustafa! |
|
I'll be reviewing this today |
| let sort_reqs = requirements.data.ordering_requirement.unwrap_or_default(); | ||
| let fetch = requirements.data.fetch; | ||
| // It's possible current plan (`SortExec`) has a fetch value. | ||
| let fetch = requirements.data.fetch.or(sort_fetch); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we need to take min() instead of or()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't find a case that both requirement fetch and sort fetch are Some and requirement fetch is large than sort fetch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even if there is not a case from the sql API, a plan can be constructed like:
Limit: 10
--Sort: fetch:5
IIUC, that will set sort fetch as 10, but that will be suboptimal. Do you think there is something wrong in taking their min?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PR looks good BTW. I don't want to block merging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can address as a follow on PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks all, I opened a follow up PR: #14221
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great to me -- thank you @xudong963 @berkaysynnada and @akurmustafa
|
Thanks @xudong963 and @berkaysynnada and @berkaysynnada |
* fix: fetch is missed in the EnsureSorting * fix conflict * resolve comments from alamb * update
* fix: fetch is missed in the EnfoceSorting * fix conflict * resolve comments from alamb * update
* fix: fetch is missed in the EnfoceSorting * fix conflict * resolve comments from alamb * update
* Get working build * Add pool_size method to MemoryPool (#218) (#230) * Add pool_size method to MemoryPool * Fix * Fmt Co-authored-by: Daniël Heres <danielheres@gmail.com> * Respect `IGNORE NULLS` flag in `ARRAY_AGG` (#260) (apache#15544) v48 * Hook for doing distributed `CollectLeft` joins (#269) * Ignore writer shutdown error (#271) * ignore writer shutdown error * cargo check * Fix bug in `swap_hash_join` (#278) * Try and fix swap_hash_join * Only swap projections when join does not have projections * just backport upstream fix * remove println * Support Duration in min/max agg functions (#283) (apache#15310) v47 * Support Duration in min/max agg functions * Attempt to fix build * Attempt to fix build - Fix chrono version * Revert "Attempt to fix build - Fix chrono version" This reverts commit fd76fe6. * Revert "Attempt to fix build" This reverts commit 9114b86. --------- Co-authored-by: svranesevic <svranesevic@users.noreply.github.com> * Fix panics in array_union (#287) (apache#15149) v48 * Drop rust-toolchain * Fix panics in array_union * Fix the chrono * Backport `GroupsAccumulator` for Duration min/max agg (#288) (apache#15322) v47 * Fix array_sort for empty record batch (#290) (apache#15149) v48 * fix: rewrite fetch, skip of the Limit node in correct order (apache#14496) v46 * fix: rewrite fetch, skip of the Limit node in correct order * style: fix clippy * Support aliases in ConstEvaluator (apache#14734) (#281) v46 * Support aliases in ConstEvaluator (apache#14734) Not sure why they are not supported. It seems that if we're not careful, some transformations can introduce aliases nested inside other expressions. * Format Cargo.toml * Preserve the name of grouping sets in SimplifyExpressions (#282) (apache#14888) v46 Whenever we use `recompute_schema` or `with_exprs_and_inputs`, this ensures that we obtain the same schema. * Support Duration in min/max agg functions (#284) (apache#15310) v47 Co-authored-by: svranesevic <svranesevic@users.noreply.github.com> * fix case_column_or_null with nullable when conditions (apache#13886) v45 * fix case_column_or_null with nullable when conditions * improve sqllogictests for case_column_or_null --------- Co-authored-by: zhangli20 <zhangli20@kuaishou.com> * Fix casewhen (apache#14156) v45 * Cherry-pick topk limit pushdown fix (apache#14192) v45 * fix: FULL OUTER JOIN and LIMIT produces wrong results (apache#14338) v45 * fix: FULL OUTER JOIN and LIMIT produces wrong results * Fix minor slt testing * fix test (cherry picked from commit ecc5694) * Cherry-pick global limit fix (apache#14245) v45 * fix: Limits are not applied correctly (apache#14418) v46 * fix: Limits are not applied correctly * Add easy fix * Add fix * Add slt testing * Address comments * Disable grouping set in CSE * Fix spm + limit (apache#14569) v46 * prost 0.13 / fix parquet dep * Delete unreliable checks * Segfault in ByteGroupValueBuilder (#294) (apache#15968) v50 * test to demonstrate segfault in ByteGroupValueBuilder * check for offset overflow * clippy (cherry picked from commit 5bdaeaf) * Update arrow dependency to include rowid (#295) * Update arrow version * Feat: Add fetch to CoalescePartitionsExec (apache#14499) (#298) v46 * add fetch info to CoalescePartitionsExec * use Statistics with_fetch API on CoalescePartitionsExec * check limit_reached only if fetch is assigned Co-authored-by: mertak-synnada <mertak67+synaada@gmail.com> * Fix `CoalescePartitionsExec` proto serialization (apache#15824) (#299) v48 * add fetch to CoalescePartitionsExecNode * gen proto code * Add test * fix * fix build * Fix test build * remove comments Co-authored-by: 张林伟 <lewiszlw520@gmail.com> * Add JoinContext with JoinLeftData to TaskContext in HashJoinExec (#300) * Add JoinContext with JoinLeftData to TaskContext in HashJoinExec * Expose random state as const * re-export ahash::RandomState * JoinContext default impl * Add debug log when setting join left data * Update arrow version for not preserving dict_id (#303) * Use partial aggregation schema for spilling to avoid column mismatch in GroupedHashAggregateStream (apache#13995) (#302) v45 * Refactor spill handling in GroupedHashAggregateStream to use partial aggregate schema * Implement aggregate functions with spill handling in tests * Add tests for aggregate functions with and without spill handling * Move test related imports into mod test * Rename spill pool test functions for clarity and consistency * Refactor aggregate function imports to use fully qualified paths * Remove outdated comments regarding input batch schema for spilling in GroupedHashAggregateStream * Update aggregate test to use AVG instead of MAX * assert spill count * Refactor partial aggregate schema creation to use create_schema function * Refactor partial aggregation schema creation and remove redundant function * Remove unused import of Schema from arrow::datatypes in row_hash.rs * move spill pool testing for aggregate functions to physical-plan/src/aggregates * Use Arc::clone for schema references in aggregate functions (cherry picked from commit 81b50c4) Co-authored-by: kosiew <kosiew@gmail.com> * Update tag * Push limits past windows (#337) (apache#17347) v50 * Restore old method for DQE * feat(optimizer): Enable filter pushdown on window functions (apache#14026) v45 * Avoid Aliased Window Expr Enter Unreachable Code (apache#14109) v45 (cherry picked from commit fda500a) * Use `Expr::qualified_name()` and `Column::new()` to extract partition keys from window and aggregate operators (#355) (apache#17757) v51 * Update PR template to be relevant to our fork * Make limit pushdown work for SortPreservingMergeExec (apache#17893) (#361) * re-publicise functions DQE relies on * Handle columns in with_new_exprs with a Join (apache#15055) (#384) apache#15055 * handle columns in with_new_exprs with Join * test doesn't return result * take join from result * clippy * make test fallible * accept any pair of expression for new_on in with_new_exprs for Join * use with_capacity Co-authored-by: delamarch3 <68732277+delamarch3@users.noreply.github.com> --------- Co-authored-by: Georgi Krastev <georgi.krastev@coralogix.com> Co-authored-by: Daniël Heres <danielheres@gmail.com> Co-authored-by: Dan Harris <1327726+thinkharderdev@users.noreply.github.com> Co-authored-by: Faiaz Sanaulla <105630300+fsdvh@users.noreply.github.com> Co-authored-by: Sava Vranešević <20240220+svranesevic@users.noreply.github.com> Co-authored-by: svranesevic <svranesevic@users.noreply.github.com> Co-authored-by: Yingwen <realevenyag@gmail.com> Co-authored-by: Zhang Li <richselian@gmail.com> Co-authored-by: zhangli20 <zhangli20@kuaishou.com> Co-authored-by: Aleksey Kirilishin <54231417+avkirilishin@users.noreply.github.com> Co-authored-by: xudong.w <wxd963996380@gmail.com> Co-authored-by: Qi Zhu <821684824@qq.com> Co-authored-by: Martins Purins <martins.purins@coralogix.com> Co-authored-by: mertak-synnada <mertak67+synaada@gmail.com> Co-authored-by: 张林伟 <lewiszlw520@gmail.com> Co-authored-by: kosiew <kosiew@gmail.com> Co-authored-by: nuno-faria <nunofpfaria@gmail.com> Co-authored-by: Berkay Şahin <124376117+berkaysynnada@users.noreply.github.com> Co-authored-by: Mason Hall <mason.hall@coralogix.com> Co-authored-by: delamarch3 <68732277+delamarch3@users.noreply.github.com>
Which issue does this PR close?
select * from (select * from topk limit 8) order by x limit 3;will return 8 rows which isn't expected.After debug: I found the fetch(limit 3) in topk will lost during

EnforceSortingRationale for this change
fix offset missed during EnforceSorting
What changes are included in this PR?
Are these changes tested?
yes
Are there any user-facing changes?