Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,10 @@ config_namespace! {
/// during aggregations, if possible
pub enable_topk_aggregation: bool, default = true

/// When set to true, the optimizer will attempt to push limit operations
/// past window functions, if possible
pub enable_window_limits: bool, default = true

/// When set to true attempts to push down dynamic filters generated by operators into the file scan phase.
/// For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer
/// will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans.
Expand Down
5 changes: 5 additions & 0 deletions datafusion/common/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,11 @@ impl<T> Transformed<T> {
Self::new(data, true, TreeNodeRecursion::Continue)
}

/// Wrapper for transformed data with [`TreeNodeRecursion::Stop`] statement.
pub fn complete(data: T) -> Self {
Self::new(data, true, TreeNodeRecursion::Stop)
}

/// Wrapper for unchanged data with [`TreeNodeRecursion::Continue`] statement.
pub fn no(data: T) -> Self {
Self::new(data, false, TreeNodeRecursion::Continue)
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-optimizer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub mod optimizer;
pub mod output_requirements;
pub mod projection_pushdown;
pub use datafusion_pruning as pruning;
mod limit_pushdown_past_window;
pub mod sanity_checker;
pub mod topk_aggregation;
pub mod update_aggr_exprs;
Expand Down
141 changes: 141 additions & 0 deletions datafusion/physical-optimizer/src/limit_pushdown_past_window.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::PhysicalOptimizerRule;
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::ScalarValue;
use datafusion_expr::{WindowFrameBound, WindowFrameUnits};
use datafusion_physical_plan::execution_plan::CardinalityEffect;
use datafusion_physical_plan::limit::GlobalLimitExec;
use datafusion_physical_plan::sorts::sort::SortExec;
use datafusion_physical_plan::windows::BoundedWindowAggExec;
use datafusion_physical_plan::ExecutionPlan;
use std::cmp;
use std::sync::Arc;

/// This rule inspects [`ExecutionPlan`]'s attempting to find fetch limits that were not pushed
/// down by `LimitPushdown` because [BoundedWindowAggExec]s were "in the way". If the window is
/// bounded by [WindowFrameUnits::Rows] then we calculate the adjustment needed to grow the limit
/// and continue pushdown.
#[derive(Default, Clone, Debug)]
pub struct LimitPushPastWindows;

impl LimitPushPastWindows {
pub fn new() -> Self {
Self
}
}

impl PhysicalOptimizerRule for LimitPushPastWindows {
fn optimize(
&self,
original: Arc<dyn ExecutionPlan>,
config: &ConfigOptions,
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
if !config.optimizer.enable_window_limits {
return Ok(original);
}
let mut latest_limit: Option<usize> = None;
let mut latest_max = 0;
let result = original.transform_down(|node| {
// helper closure to DRY out most the early return cases
let mut reset = |node,
max: &mut usize|
-> datafusion_common::Result<
Transformed<Arc<dyn ExecutionPlan>>,
> {
latest_limit = None;
*max = 0;
Ok(Transformed::no(node))
};

// traversing sides of joins will require more thought
if node.children().len() > 1 {
return reset(node, &mut latest_max);
}

// grab the latest limit we see
if let Some(limit) = node.as_any().downcast_ref::<GlobalLimitExec>() {
latest_limit = limit.fetch().map(|fetch| fetch + limit.skip());
latest_max = 0;
return Ok(Transformed::no(node));
}

// grow the limit if we hit a window function
if let Some(window) = node.as_any().downcast_ref::<BoundedWindowAggExec>() {
for expr in window.window_expr().iter() {
let frame = expr.get_window_frame();
if frame.units != WindowFrameUnits::Rows {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can see, we update latest_max only for WindowFrameUnits::Rows cases. If I am not mistaken, it is also valid to update latest_max for WindowFrameUnits::Range and WindowFrameUnits::Groups as longs as end_bound is WindowFrameBound::Preceding(_). I think, we can extend checks to include this use case also.
This should change the plan for following kind of queries

SELECT
    c9,
    SUM(c9) OVER(ORDER BY c9 ASC RANGE BETWEEN 5 PRECEDING AND 1 PRECEDING) as sum1
    FROM aggregate_test_100
    LIMIT 5

However, we can do this change in subsequent PRs. I think in current form, this PR is correct and we can merge as is. Thanks @avantgardnerio for this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! I will file a subsequent PR 😄

Copy link
Contributor Author

@avantgardnerio avantgardnerio Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried this and I think it's unsafe:

1. query result mismatch:
[SQL] SELECT
 SUM(c1) OVER (ORDER BY c2 DESC) as summation1
 FROM null_cases
 LIMIT 5;
[Diff] (-expected|+actual)
-   962
-   962
-   962
-   962
-   962
+   263
+   263
+   263
+   263
+   263

and

[Diff] (-expected|+actual)
    logical_plan
    01)Projection: sum(null_cases.c1) ORDER BY [null_cases.c2 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS summation1
    02)--Limit: skip=0, fetch=5
    03)----WindowAggr: windowExpr=[[sum(null_cases.c1) ORDER BY [null_cases.c2 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
    04)------TableScan: null_cases projection=[c1, c2]
    physical_plan
    01)ProjectionExec: expr=[sum(null_cases.c1) ORDER BY [null_cases.c2 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as summation1]
    02)--GlobalLimitExec: skip=0, fetch=5
    03)----BoundedWindowAggExec: wdw=[sum(null_cases.c1) ORDER BY [null_cases.c2 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { name: "sum(null_cases.c1) ORDER BY [null_cases.c2 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
-   04)------SortExec: expr=[c2@1 DESC], preserve_partitioning=[false]
+   04)------SortExec: TopK(fetch=5), expr=[c2@1 DESC], preserve_partitioning=[false]
    05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/null_cases.csv]]}, projection=[c1, c2], file_type=csv, has_header=true

I think this is failing because there are multiple entries with the same value, so it needs more than the limit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, for this case end bound is Current Row. I think, when end bound is WindowFrameBound::Preceding(_) we can still do this optimization for WindowFrameUnits::Range and WindowFrameUnits::Groups

return reset(node, &mut latest_max); // expression-based limits?
}
let Some(end_bound) = bound_to_usize(&frame.end_bound) else {
return reset(node, &mut latest_max);
};
latest_max = cmp::max(end_bound, latest_max);
}
return Ok(Transformed::no(node));
}

// Apply the limit if we hit a sort node
if let Some(sort) = node.as_any().downcast_ref::<SortExec>() {
let latest = latest_limit.take();
let Some(fetch) = latest else {
latest_max = 0;
return Ok(Transformed::no(node));
};
let fetch = match sort.fetch() {
None => fetch + latest_max,
Some(existing) => cmp::min(existing, fetch + latest_max),
};
let sort: Arc<dyn ExecutionPlan> = Arc::new(sort.with_fetch(Some(fetch)));
latest_max = 0;
return Ok(Transformed::complete(sort));
}

// we can't push the limit past nodes that decrease row count
match node.cardinality_effect() {
CardinalityEffect::Equal => {}
_ => return reset(node, &mut latest_max),
}

Ok(Transformed::no(node))
})?;
Ok(result.data)
}

fn name(&self) -> &str {
"LimitPushPastWindows"
}

fn schema_check(&self) -> bool {
false // we don't change the schema
}
}

fn bound_to_usize(bound: &WindowFrameBound) -> Option<usize> {
match bound {
WindowFrameBound::Preceding(_) => Some(0),
WindowFrameBound::CurrentRow => Some(0),
WindowFrameBound::Following(ScalarValue::UInt64(Some(scalar))) => {
Some(*scalar as usize)
}
_ => None,
}
}

// tests: all branches are covered by sqllogictests
7 changes: 6 additions & 1 deletion datafusion/physical-optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::topk_aggregation::TopKAggregation;
use crate::update_aggr_exprs::OptimizeAggregateOrder;

use crate::coalesce_async_exec_input::CoalesceAsyncExecInput;
use crate::limit_pushdown_past_window::LimitPushPastWindows;
use datafusion_common::config::ConfigOptions;
use datafusion_common::Result;
use datafusion_physical_plan::ExecutionPlan;
Expand All @@ -59,7 +60,7 @@ pub trait PhysicalOptimizerRule: Debug {
/// A human readable name for this optimizer rule
fn name(&self) -> &str;

/// A flag to indicate whether the physical planner should valid the rule will not
/// A flag to indicate whether the physical planner should validate that the rule will not
/// change the schema of the plan after the rewriting.
/// Some of the optimization rules might change the nullable properties of the schema
/// and should disable the schema check.
Expand Down Expand Up @@ -131,6 +132,10 @@ impl PhysicalOptimizer {
// into an `order by max(x) limit y`. In this case it will copy the limit value down
// to the aggregation, allowing it to use only y number of accumulators.
Arc::new(TopKAggregation::new()),
// Tries to push limits down through window functions, growing as appropriate
// This can possibly be combined with [LimitPushdown]
// It needs to come after [EnforceSorting]
Arc::new(LimitPushPastWindows::new()),
// The LimitPushdown rule tries to push limits down as far as possible,
// replacing operators with fetching variants, or adding limits
// past operators that support limit pushdown.
Expand Down
3 changes: 3 additions & 0 deletions datafusion/sqllogictest/test_files/explain.slt
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ physical_plan after coalesce_batches SAME TEXT AS ABOVE
physical_plan after coalesce_async_exec_input SAME TEXT AS ABOVE
physical_plan after OutputRequirements DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], file_type=csv, has_header=true
physical_plan after LimitAggregation SAME TEXT AS ABOVE
physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE
physical_plan after LimitPushdown SAME TEXT AS ABOVE
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
physical_plan after EnsureCooperative SAME TEXT AS ABOVE
Expand Down Expand Up @@ -321,6 +322,7 @@ physical_plan after OutputRequirements
01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
physical_plan after LimitAggregation SAME TEXT AS ABOVE
physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE
physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
physical_plan after EnsureCooperative SAME TEXT AS ABOVE
Expand Down Expand Up @@ -365,6 +367,7 @@ physical_plan after OutputRequirements
01)GlobalLimitExec: skip=0, fetch=10
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet
physical_plan after LimitAggregation SAME TEXT AS ABOVE
physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE
physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
physical_plan after EnsureCooperative SAME TEXT AS ABOVE
Expand Down
2 changes: 2 additions & 0 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ datafusion.optimizer.enable_distinct_aggregation_soft_limit true
datafusion.optimizer.enable_dynamic_filter_pushdown true
datafusion.optimizer.enable_round_robin_repartition true
datafusion.optimizer.enable_topk_aggregation true
datafusion.optimizer.enable_window_limits true
datafusion.optimizer.expand_views_at_output false
datafusion.optimizer.filter_null_join_keys false
datafusion.optimizer.hash_join_single_partition_threshold 1048576
Expand Down Expand Up @@ -402,6 +403,7 @@ datafusion.optimizer.enable_distinct_aggregation_soft_limit true When set to tru
datafusion.optimizer.enable_dynamic_filter_pushdown true When set to true attempts to push down dynamic filters generated by operators into the file scan phase. For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. This means that if we already have 10 timestamps in the year 2025 any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan.
datafusion.optimizer.enable_round_robin_repartition true When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores
datafusion.optimizer.enable_topk_aggregation true When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible
datafusion.optimizer.enable_window_limits true When set to true, the optimizer will attempt to push limit operations past window functions, if possible
datafusion.optimizer.expand_views_at_output false When set to true, if the returned type is a view type then the output will be coerced to a non-view. Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`.
datafusion.optimizer.filter_null_join_keys false When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down.
datafusion.optimizer.hash_join_single_partition_threshold 1048576 The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition
Expand Down
Loading
Loading