-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Prevent exponential planning time for Window functions - v2 #17684
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
b545b27
f30af51
43f7592
43a323f
c4bbecb
e390836
bc56376
50a08f4
0e9e236
2b164d5
b3543c0
4579343
45cbdd9
e94a4b7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
|
|
@@ -371,17 +371,40 @@ pub(crate) fn window_equivalence_properties( | |||
| for (i, expr) in window_exprs.iter().enumerate() { | ||||
| let partitioning_exprs = expr.partition_by(); | ||||
| let no_partitioning = partitioning_exprs.is_empty(); | ||||
| // Collect columns defining partitioning, and construct all `SortOptions` | ||||
| // variations for them. Then, we will check each one whether it satisfies | ||||
| // the existing ordering provided by the input plan. | ||||
|
|
||||
| // Find "one" valid ordering for partition columns to avoid exponential complexity. | ||||
| // see https://github.com/apache/datafusion/issues/17401 | ||||
| let mut all_satisfied_lexs = vec![]; | ||||
| for lex in partitioning_exprs | ||||
| .iter() | ||||
| .map(|pb_order| sort_options_resolving_constant(Arc::clone(pb_order))) | ||||
| .multi_cartesian_product() | ||||
| .filter_map(LexOrdering::new) | ||||
| { | ||||
| if window_eq_properties.ordering_satisfy(lex.clone())? { | ||||
| let mut candidate_ordering = vec![]; | ||||
|
|
||||
| for partition_expr in partitioning_exprs.iter() { | ||||
| let sort_options = | ||||
| sort_options_resolving_constant(Arc::clone(partition_expr), true); | ||||
|
|
||||
| // Try each sort option and pick the first one that works | ||||
| let mut found = false; | ||||
| for sort_expr in sort_options.into_iter() { | ||||
| candidate_ordering.push(sort_expr); | ||||
| if let Some(lex) = LexOrdering::new(candidate_ordering.clone()) { | ||||
| if window_eq_properties.ordering_satisfy(lex)? { | ||||
| found = true; | ||||
| break; | ||||
| } | ||||
| } | ||||
| // This option didn't work, remove it and try the next one | ||||
| candidate_ordering.pop(); | ||||
| } | ||||
| // If no sort option works for this column, we can't build a valid ordering | ||||
| if !found { | ||||
| candidate_ordering.clear(); | ||||
| break; | ||||
| } | ||||
| } | ||||
|
|
||||
| // If we successfully built an ordering for all columns, use it | ||||
| // When there are no partition expressions, candidate_ordering will be empty and won't be added | ||||
| if candidate_ordering.len() == partitioning_exprs.len() { | ||||
| if let Some(lex) = LexOrdering::new(candidate_ordering) { | ||||
| all_satisfied_lexs.push(lex); | ||||
| } | ||||
| } | ||||
|
|
@@ -410,8 +433,10 @@ pub(crate) fn window_equivalence_properties( | |||
| // Window function results in a partial constant value in | ||||
| // some ordering. Adjust the ordering equivalences accordingly: | ||||
| let new_lexs = all_satisfied_lexs.into_iter().flat_map(|lex| { | ||||
| let new_partial_consts = | ||||
| sort_options_resolving_constant(Arc::clone(&window_col)); | ||||
| let new_partial_consts = sort_options_resolving_constant( | ||||
| Arc::clone(&window_col), | ||||
| false, | ||||
| ); | ||||
|
|
||||
| new_partial_consts.into_iter().map(move |partial| { | ||||
| let mut existing = lex.clone(); | ||||
|
|
@@ -467,23 +492,52 @@ pub(crate) fn window_equivalence_properties( | |||
| // utilize set-monotonicity since the set shrinks as the frame | ||||
| // boundary starts "touching" the end of the table. | ||||
| else if frame.is_causal() { | ||||
| let args_all_lexs = sliding_expr | ||||
| .get_aggregate_expr() | ||||
| .expressions() | ||||
| .into_iter() | ||||
| .map(sort_options_resolving_constant) | ||||
| .multi_cartesian_product(); | ||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for solving this exponential case too! I am not sure the regression tests cover this case.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||||
|
|
||||
| let (mut asc, mut satisfied) = (false, false); | ||||
| for order in args_all_lexs { | ||||
| if let Some(f) = order.first() { | ||||
| asc = !f.options.descending; | ||||
| // Find one valid ordering for aggregate arguments instead of | ||||
| // checking all combinations | ||||
| let aggregate_exprs = sliding_expr.get_aggregate_expr().expressions(); | ||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FWIW this seems very similar to the loop above, I wonder if there is some way (as a follow on PR) to factor it out to reduce the replication |
||||
| let mut candidate_order = vec![]; | ||||
| let mut asc = false; | ||||
|
|
||||
| for (idx, expr) in aggregate_exprs.iter().enumerate() { | ||||
| let mut found = false; | ||||
| let sort_options = | ||||
| sort_options_resolving_constant(Arc::clone(expr), false); | ||||
|
|
||||
| // Try each option and pick the first that works | ||||
| for sort_expr in sort_options.into_iter() { | ||||
| let is_asc = !sort_expr.options.descending; | ||||
| candidate_order.push(sort_expr); | ||||
|
|
||||
| if let Some(lex) = LexOrdering::new(candidate_order.clone()) { | ||||
| if window_eq_properties.ordering_satisfy(lex)? { | ||||
| if idx == 0 { | ||||
| // The first column's ordering direction determines the overall | ||||
| // monotonicity behavior of the window result. | ||||
| // - If the aggregate has increasing set monotonicity (e.g., MAX, COUNT) | ||||
| // and the first arg is ascending, the window result is increasing | ||||
| // - If the aggregate has decreasing set monotonicity (e.g., MIN) | ||||
| // and the first arg is ascending, the window result is also increasing | ||||
| // This flag is used to determine the final window column ordering. | ||||
| asc = is_asc; | ||||
| } | ||||
| found = true; | ||||
| break; | ||||
| } | ||||
| } | ||||
| // This option didn't work, remove it and try the next one | ||||
| candidate_order.pop(); | ||||
| } | ||||
| if window_eq_properties.ordering_satisfy(order)? { | ||||
| satisfied = true; | ||||
|
|
||||
| // If we couldn't extend the ordering, stop trying | ||||
| if !found { | ||||
| break; | ||||
| } | ||||
| } | ||||
|
|
||||
| // Check if we successfully built a complete ordering | ||||
| let satisfied = candidate_order.len() == aggregate_exprs.len() | ||||
| && !aggregate_exprs.is_empty(); | ||||
|
|
||||
| if satisfied { | ||||
| let increasing = | ||||
| set_monotonicity.eq(&SetMonotonicity::Increasing); | ||||
|
|
@@ -634,11 +688,45 @@ pub fn get_window_mode( | |||
| Ok(None) | ||||
| } | ||||
|
|
||||
| fn sort_options_resolving_constant(expr: Arc<dyn PhysicalExpr>) -> Vec<PhysicalSortExpr> { | ||||
| vec![ | ||||
| PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(false, false)), | ||||
| PhysicalSortExpr::new(expr, SortOptions::new(true, true)), | ||||
| ] | ||||
| /// Generates sort option variations for a given expression. | ||||
| /// | ||||
| /// This function is used to handle constant columns in window operations. Since constant | ||||
| /// columns can be considered as having any ordering, we generate multiple sort options | ||||
| /// to explore different ordering possibilities. | ||||
| /// | ||||
| /// # Parameters | ||||
| /// - `expr`: The physical expression to generate sort options for | ||||
| /// - `only_monotonic`: If false, generates all 4 possible sort options (ASC/DESC × NULLS FIRST/LAST). | ||||
| /// If true, generates only 2 options that preserve set monotonicity. | ||||
| /// | ||||
| /// # When to use `only_monotonic = false`: | ||||
| /// Use for PARTITION BY columns where we want to explore all possible orderings to find | ||||
| /// one that matches the existing data ordering. | ||||
| /// | ||||
| /// # When to use `only_monotonic = true`: | ||||
| /// Use for aggregate/window function arguments where set monotonicity needs to be preserved. | ||||
| /// Only generates ASC NULLS LAST and DESC NULLS FIRST because: | ||||
| /// - Set monotonicity is broken if data has increasing order but nulls come first | ||||
| /// - Set monotonicity is broken if data has decreasing order but nulls come last | ||||
| fn sort_options_resolving_constant( | ||||
| expr: Arc<dyn PhysicalExpr>, | ||||
| only_monotonic: bool, | ||||
| ) -> Vec<PhysicalSortExpr> { | ||||
| if only_monotonic { | ||||
| // Generate only the 2 options that preserve set monotonicity | ||||
| vec![ | ||||
| PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(false, false)), // ASC NULLS LAST | ||||
| PhysicalSortExpr::new(expr, SortOptions::new(true, true)), // DESC NULLS FIRST | ||||
| ] | ||||
| } else { | ||||
| // Generate all 4 possible sort options for partition columns | ||||
| vec![ | ||||
| PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(false, false)), // ASC NULLS LAST | ||||
| PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(false, true)), // ASC NULLS FIRST | ||||
| PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(true, false)), // DESC NULLS LAST | ||||
| PhysicalSortExpr::new(expr, SortOptions::new(true, true)), // DESC NULLS FIRST | ||||
| ] | ||||
| } | ||||
| } | ||||
|
|
||||
| #[cfg(test)] | ||||
|
|
||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6034,3 +6034,92 @@ LIMIT 5 | |
| 0 2 NULL NULL 0 NULL NULL | ||
| 0 3 NULL NULL 0 NULL NULL | ||
| 0 4 NULL NULL 0 NULL NULL | ||
|
|
||
| # regression test for https://github.com/apache/datafusion/issues/17401 | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I ran this locally, and found both main and This branch -- took 2 seconds Main (venv) andrewlamb@Andrews-MacBook-Pro-3:~/Software/datafusion$ cargo test --profile=ci --test sqllogictests -- window.slt
Finished `ci` profile [unoptimized + debuginfo] target(s) in 0.20s
Running bin/sqllogictests.rs (target/ci/deps/sqllogictests-4dcb99f83e94c047)
Completed 2 test files in 2 secondsThis branch
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Do you mean you run window.slt unmodified as currently in I tried to reproduce the latter and for me execution "hangs" at |
||
| query I | ||
| WITH source AS ( | ||
| SELECT | ||
| 1 AS n, | ||
| '' AS a1, '' AS a2, '' AS a3, '' AS a4, '' AS a5, '' AS a6, '' AS a7, '' AS a8, | ||
| '' AS a9, '' AS a10, '' AS a11, '' AS a12 | ||
| ) | ||
| SELECT | ||
| sum(n) OVER (PARTITION BY | ||
| a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12 | ||
| ) | ||
| FROM source; | ||
| ---- | ||
| 1 | ||
|
|
||
| # regression test for https://github.com/apache/datafusion/issues/17401 | ||
| query I | ||
| WITH source AS ( | ||
| SELECT | ||
| 1 AS n, | ||
| '' AS a1, '' AS a2, '' AS a3, '' AS a4, '' AS a5, '' AS a6, '' AS a7, '' AS a8, | ||
| '' AS a9, '' AS a10, '' AS a11, '' AS a12, '' AS a13, '' AS a14, '' AS a15, '' AS a16, | ||
| '' AS a17, '' AS a18, '' AS a19, '' AS a20, '' AS a21, '' AS a22, '' AS a23, '' AS a24, | ||
| '' AS a25, '' AS a26, '' AS a27, '' AS a28, '' AS a29, '' AS a30, '' AS a31, '' AS a32, | ||
| '' AS a33, '' AS a34, '' AS a35, '' AS a36, '' AS a37, '' AS a38, '' AS a39, '' AS a40 | ||
| ) | ||
| SELECT | ||
| sum(n) OVER (PARTITION BY | ||
| a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, a16, a17, a18, a19, a20, | ||
| a21, a22, a23, a24, a25, a26, a27, a28, a29, a30, a31, a32, a33, a34, a35, a36, a37, a38, a39, a40 | ||
| ) | ||
| FROM source; | ||
| ---- | ||
| 1 | ||
|
|
||
| # regression test for https://github.com/apache/datafusion/issues/17401 | ||
| query I | ||
| WITH source AS ( | ||
| SELECT | ||
| 1 AS n, | ||
| '' AS a1, '' AS a2, '' AS a3, '' AS a4, '' AS a5, '' AS a6, '' AS a7, '' AS a8, | ||
| '' AS a9, '' AS a10, '' AS a11, '' AS a12, '' AS a13, '' AS a14, '' AS a15, '' AS a16, | ||
| '' AS a17, '' AS a18, '' AS a19, '' AS a20, '' AS a21, '' AS a22, '' AS a23, '' AS a24, | ||
| '' AS a25, '' AS a26, '' AS a27, '' AS a28, '' AS a29, '' AS a30, '' AS a31, '' AS a32, | ||
| '' AS a33, '' AS a34, '' AS a35, '' AS a36, '' AS a37, '' AS a38, '' AS a39, '' AS a40 | ||
| ) | ||
| SELECT | ||
| sum(n) OVER (PARTITION BY | ||
| a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, a16, a17, a18, a19, a20, | ||
| a21, a22, a23, a24, a25, a26, a27, a28, a29, a30, a31, a32, a33, a34, a35, a36, a37, a38, a39, a40 | ||
| ) | ||
| FROM ( | ||
| SELECT * FROM source | ||
| ORDER BY a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, a16, a17, a18, a19, a20, | ||
| a21, a22, a23, a24, a25, a26, a27, a28, a29, a30, a31, a32, a33, a34, a35, a36, a37, a38, a39, a40 | ||
| ); | ||
| ---- | ||
| 1 | ||
|
|
||
| # regression test for https://github.com/apache/datafusion/issues/17401 | ||
| query I | ||
| WITH source AS ( | ||
| SELECT | ||
| 1 AS n, | ||
| '' AS a1, '' AS a2, '' AS a3, '' AS a4, '' AS a5, '' AS a6, '' AS a7, '' AS a8, | ||
| '' AS a9, '' AS a10, '' AS a11, '' AS a12, '' AS a13, '' AS a14, '' AS a15, '' AS a16, | ||
| '' AS a17, '' AS a18, '' AS a19, '' AS a20, '' AS a21, '' AS a22, '' AS a23, '' AS a24, | ||
| '' AS a25, '' AS a26, '' AS a27, '' AS a28, '' AS a29, '' AS a30, '' AS a31, '' AS a32, | ||
| '' AS a33, '' AS a34, '' AS a35, '' AS a36, '' AS a37, '' AS a38, '' AS a39, '' AS a40 | ||
| ) | ||
| SELECT | ||
| sum(n) OVER (PARTITION BY | ||
| a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, a16, a17, a18, a19, a20, | ||
| a21, a22, a23, a24, a25, a26, a27, a28, a29, a30, a31, a32, a33, a34, a35, a36, a37, a38, a39, a40 | ||
| ) | ||
| FROM ( | ||
| SELECT * FROM source | ||
| WHERE a1 = '' AND a2 = '' AND a3 = '' AND a4 = '' AND a5 = '' AND a6 = '' AND a7 = '' AND a8 = '' | ||
| AND a9 = '' AND a10 = '' AND a11 = '' AND a12 = '' AND a13 = '' AND a14 = '' AND a15 = '' AND a16 = '' | ||
| AND a17 = '' AND a18 = '' AND a19 = '' AND a20 = '' AND a21 = '' AND a22 = '' AND a23 = '' AND a24 = '' | ||
| AND a25 = '' AND a26 = '' AND a27 = '' AND a28 = '' AND a29 = '' AND a30 = '' AND a31 = '' AND a32 = '' | ||
| AND a33 = '' AND a34 = '' AND a35 = '' AND a36 = '' AND a37 = '' AND a38 = '' AND a39 = '' AND a40 = '' | ||
| ORDER BY a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, a16, a17, a18, a19, a20, | ||
| a21, a22, a23, a24, a25, a26, a27, a28, a29, a30, a31, a32, a33, a34, a35, a36, a37, a38, a39, a40 | ||
| ); | ||
| ---- | ||
| 1 | ||
Uh oh!
There was an error while loading. Please reload this page.