Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion datafusion/core/benches/sql_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,8 @@ fn criterion_benchmark(c: &mut Criterion) {
});
});

for partitioning_columns in [4, 7, 8] {
// It was observed in production that queries with window functions sometimes partition over more than 30 columns
for partitioning_columns in [4, 7, 8, 12, 30] {
c.bench_function(
&format!(
"physical_window_function_partition_by_{partitioning_columns}_on_values"
Expand Down
148 changes: 118 additions & 30 deletions datafusion/physical-plan/src/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,17 +371,40 @@ pub(crate) fn window_equivalence_properties(
for (i, expr) in window_exprs.iter().enumerate() {
let partitioning_exprs = expr.partition_by();
let no_partitioning = partitioning_exprs.is_empty();
// Collect columns defining partitioning, and construct all `SortOptions`
// variations for them. Then, we will check each one whether it satisfies
// the existing ordering provided by the input plan.

// Find "one" valid ordering for partition columns to avoid exponential complexity.
// see https://github.com/apache/datafusion/issues/17401
let mut all_satisfied_lexs = vec![];
for lex in partitioning_exprs
.iter()
.map(|pb_order| sort_options_resolving_constant(Arc::clone(pb_order)))
.multi_cartesian_product()
.filter_map(LexOrdering::new)
{
if window_eq_properties.ordering_satisfy(lex.clone())? {
let mut candidate_ordering = vec![];

for partition_expr in partitioning_exprs.iter() {
let sort_options =
sort_options_resolving_constant(Arc::clone(partition_expr), true);

// Try each sort option and pick the first one that works
let mut found = false;
for sort_expr in sort_options.into_iter() {
candidate_ordering.push(sort_expr);
if let Some(lex) = LexOrdering::new(candidate_ordering.clone()) {
if window_eq_properties.ordering_satisfy(lex)? {
found = true;
break;
}
}
// This option didn't work, remove it and try the next one
candidate_ordering.pop();
}
// If no sort option works for this column, we can't build a valid ordering
if !found {
candidate_ordering.clear();
break;
}
}

// If we successfully built an ordering for all columns, use it
// When there are no partition expressions, candidate_ordering will be empty and won't be added
if candidate_ordering.len() == partitioning_exprs.len() {
if let Some(lex) = LexOrdering::new(candidate_ordering) {
all_satisfied_lexs.push(lex);
}
}
Expand Down Expand Up @@ -410,8 +433,10 @@ pub(crate) fn window_equivalence_properties(
// Window function results in a partial constant value in
// some ordering. Adjust the ordering equivalences accordingly:
let new_lexs = all_satisfied_lexs.into_iter().flat_map(|lex| {
let new_partial_consts =
sort_options_resolving_constant(Arc::clone(&window_col));
let new_partial_consts = sort_options_resolving_constant(
Arc::clone(&window_col),
false,
);

new_partial_consts.into_iter().map(move |partial| {
let mut existing = lex.clone();
Expand Down Expand Up @@ -467,23 +492,52 @@ pub(crate) fn window_equivalence_properties(
// utilize set-monotonicity since the set shrinks as the frame
// boundary starts "touching" the end of the table.
else if frame.is_causal() {
let args_all_lexs = sliding_expr
.get_aggregate_expr()
.expressions()
.into_iter()
.map(sort_options_resolving_constant)
.multi_cartesian_product();

let (mut asc, mut satisfied) = (false, false);
for order in args_all_lexs {
if let Some(f) = order.first() {
asc = !f.options.descending;
// Find one valid ordering for aggregate arguments instead of
// checking all combinations
let aggregate_exprs = sliding_expr.get_aggregate_expr().expressions();
let mut candidate_order = vec![];
let mut asc = false;

for (idx, expr) in aggregate_exprs.iter().enumerate() {
let mut found = false;
let sort_options =
sort_options_resolving_constant(Arc::clone(expr), false);

// Try each option and pick the first that works
for sort_expr in sort_options.into_iter() {
let is_asc = !sort_expr.options.descending;
candidate_order.push(sort_expr);

if let Some(lex) = LexOrdering::new(candidate_order.clone()) {
if window_eq_properties.ordering_satisfy(lex)? {
if idx == 0 {
// The first column's ordering direction determines the overall
// monotonicity behavior of the window result.
// - If the aggregate has increasing set monotonicity (e.g., MAX, COUNT)
// and the first arg is ascending, the window result is increasing
// - If the aggregate has decreasing set monotonicity (e.g., MIN)
// and the first arg is ascending, the window result is also increasing
// This flag is used to determine the final window column ordering.
asc = is_asc;
}
found = true;
break;
}
}
// This option didn't work, remove it and try the next one
candidate_order.pop();
}
if window_eq_properties.ordering_satisfy(order)? {
satisfied = true;

// If we couldn't extend the ordering, stop trying
if !found {
break;
}
}

// Check if we successfully built a complete ordering
let satisfied = candidate_order.len() == aggregate_exprs.len()
&& !aggregate_exprs.is_empty();

if satisfied {
let increasing =
set_monotonicity.eq(&SetMonotonicity::Increasing);
Expand Down Expand Up @@ -634,11 +688,45 @@ pub fn get_window_mode(
Ok(None)
}

fn sort_options_resolving_constant(expr: Arc<dyn PhysicalExpr>) -> Vec<PhysicalSortExpr> {
vec![
PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(false, false)),
PhysicalSortExpr::new(expr, SortOptions::new(true, true)),
]
/// Generates sort option variations for a given expression.
///
/// This function is used to handle constant columns in window operations. Since constant
/// columns can be considered as having any ordering, we generate multiple sort options
/// to explore different ordering possibilities.
///
/// # Parameters
/// - `expr`: The physical expression to generate sort options for
/// - `only_monotonic`: If false, generates all 4 possible sort options (ASC/DESC × NULLS FIRST/LAST).
/// If true, generates only 2 options that preserve set monotonicity.
///
/// # When to use `only_monotonic = false`:
/// Use for PARTITION BY columns where we want to explore all possible orderings to find
/// one that matches the existing data ordering.
///
/// # When to use `only_monotonic = true`:
/// Use for aggregate/window function arguments where set monotonicity needs to be preserved.
/// Only generates ASC NULLS LAST and DESC NULLS FIRST because:
/// - Set monotonicity is broken if data has increasing order but nulls come first
/// - Set monotonicity is broken if data has decreasing order but nulls come last
fn sort_options_resolving_constant(
expr: Arc<dyn PhysicalExpr>,
only_monotonic: bool,
) -> Vec<PhysicalSortExpr> {
if only_monotonic {
// Generate only the 2 options that preserve set monotonicity
vec![
PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(false, false)), // ASC NULLS LAST
PhysicalSortExpr::new(expr, SortOptions::new(true, true)), // DESC NULLS FIRST
]
} else {
// Generate all 4 possible sort options for partition columns
vec![
PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(false, false)), // ASC NULLS LAST
PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(false, true)), // ASC NULLS FIRST
PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(true, false)), // DESC NULLS LAST
PhysicalSortExpr::new(expr, SortOptions::new(true, true)), // DESC NULLS FIRST
]
}
}

#[cfg(test)]
Expand Down
89 changes: 89 additions & 0 deletions datafusion/sqllogictest/test_files/window.slt
Original file line number Diff line number Diff line change
Expand Up @@ -6034,3 +6034,92 @@ LIMIT 5
0 2 NULL NULL 0 NULL NULL
0 3 NULL NULL 0 NULL NULL
0 4 NULL NULL 0 NULL NULL

# regression test for https://github.com/apache/datafusion/issues/17401
query I
WITH source AS (
SELECT
1 AS n,
'' AS a1, '' AS a2, '' AS a3, '' AS a4, '' AS a5, '' AS a6, '' AS a7, '' AS a8,
'' AS a9, '' AS a10, '' AS a11, '' AS a12
)
SELECT
sum(n) OVER (PARTITION BY
a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12
)
FROM source;
----
1

# regression test for https://github.com/apache/datafusion/issues/17401
query I
WITH source AS (
SELECT
1 AS n,
'' AS a1, '' AS a2, '' AS a3, '' AS a4, '' AS a5, '' AS a6, '' AS a7, '' AS a8,
'' AS a9, '' AS a10, '' AS a11, '' AS a12, '' AS a13, '' AS a14, '' AS a15, '' AS a16,
'' AS a17, '' AS a18, '' AS a19, '' AS a20, '' AS a21, '' AS a22, '' AS a23, '' AS a24,
'' AS a25, '' AS a26, '' AS a27, '' AS a28, '' AS a29, '' AS a30, '' AS a31, '' AS a32,
'' AS a33, '' AS a34, '' AS a35, '' AS a36, '' AS a37, '' AS a38, '' AS a39, '' AS a40
)
SELECT
sum(n) OVER (PARTITION BY
a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, a16, a17, a18, a19, a20,
a21, a22, a23, a24, a25, a26, a27, a28, a29, a30, a31, a32, a33, a34, a35, a36, a37, a38, a39, a40
)
FROM source;
----
1

# regression test for https://github.com/apache/datafusion/issues/17401
query I
WITH source AS (
SELECT
1 AS n,
'' AS a1, '' AS a2, '' AS a3, '' AS a4, '' AS a5, '' AS a6, '' AS a7, '' AS a8,
'' AS a9, '' AS a10, '' AS a11, '' AS a12, '' AS a13, '' AS a14, '' AS a15, '' AS a16,
'' AS a17, '' AS a18, '' AS a19, '' AS a20, '' AS a21, '' AS a22, '' AS a23, '' AS a24,
'' AS a25, '' AS a26, '' AS a27, '' AS a28, '' AS a29, '' AS a30, '' AS a31, '' AS a32,
'' AS a33, '' AS a34, '' AS a35, '' AS a36, '' AS a37, '' AS a38, '' AS a39, '' AS a40
)
SELECT
sum(n) OVER (PARTITION BY
a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, a16, a17, a18, a19, a20,
a21, a22, a23, a24, a25, a26, a27, a28, a29, a30, a31, a32, a33, a34, a35, a36, a37, a38, a39, a40
)
FROM (
SELECT * FROM source
ORDER BY a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, a16, a17, a18, a19, a20,
a21, a22, a23, a24, a25, a26, a27, a28, a29, a30, a31, a32, a33, a34, a35, a36, a37, a38, a39, a40
);
----
1

# regression test for https://github.com/apache/datafusion/issues/17401
query I
WITH source AS (
SELECT
1 AS n,
'' AS a1, '' AS a2, '' AS a3, '' AS a4, '' AS a5, '' AS a6, '' AS a7, '' AS a8,
'' AS a9, '' AS a10, '' AS a11, '' AS a12, '' AS a13, '' AS a14, '' AS a15, '' AS a16,
'' AS a17, '' AS a18, '' AS a19, '' AS a20, '' AS a21, '' AS a22, '' AS a23, '' AS a24,
'' AS a25, '' AS a26, '' AS a27, '' AS a28, '' AS a29, '' AS a30, '' AS a31, '' AS a32,
'' AS a33, '' AS a34, '' AS a35, '' AS a36, '' AS a37, '' AS a38, '' AS a39, '' AS a40
)
SELECT
sum(n) OVER (PARTITION BY
a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, a16, a17, a18, a19, a20,
a21, a22, a23, a24, a25, a26, a27, a28, a29, a30, a31, a32, a33, a34, a35, a36, a37, a38, a39, a40
)
FROM (
SELECT * FROM source
WHERE a1 = '' AND a2 = '' AND a3 = '' AND a4 = '' AND a5 = '' AND a6 = '' AND a7 = '' AND a8 = ''
AND a9 = '' AND a10 = '' AND a11 = '' AND a12 = '' AND a13 = '' AND a14 = '' AND a15 = '' AND a16 = ''
AND a17 = '' AND a18 = '' AND a19 = '' AND a20 = '' AND a21 = '' AND a22 = '' AND a23 = '' AND a24 = ''
AND a25 = '' AND a26 = '' AND a27 = '' AND a28 = '' AND a29 = '' AND a30 = '' AND a31 = '' AND a32 = ''
AND a33 = '' AND a34 = '' AND a35 = '' AND a36 = '' AND a37 = '' AND a38 = '' AND a39 = '' AND a40 = ''
ORDER BY a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, a16, a17, a18, a19, a20,
a21, a22, a23, a24, a25, a26, a27, a28, a29, a30, a31, a32, a33, a34, a35, a36, a37, a38, a39, a40
);
----
1
Loading