Skip to content

Commit 734f916

Browse files
Push limits past windows (#337) (apache#17347) v50
1 parent b0bbaa8 commit 734f916

File tree

10 files changed

+246
-9
lines changed

10 files changed

+246
-9
lines changed

datafusion/common/src/config.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -549,6 +549,10 @@ config_namespace! {
549549
/// during aggregations, if possible
550550
pub enable_topk_aggregation: bool, default = true
551551

552+
/// When set to true, the optimizer will attempt to push limit operations
553+
/// past window functions, if possible
554+
pub enable_window_limits: bool, default = true
555+
552556
/// When set to true, the optimizer will insert filters before a join between
553557
/// a nullable and non-nullable column to filter out nulls on the nullable side. This
554558
/// filter can add additional overhead when the file format does not fully support

datafusion/common/src/tree_node.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -680,6 +680,11 @@ impl<T> Transformed<T> {
680680
Self::new(data, true, TreeNodeRecursion::Continue)
681681
}
682682

683+
/// Wrapper for transformed data with [`TreeNodeRecursion::Stop`] statement.
684+
pub fn complete(data: T) -> Self {
685+
Self::new(data, true, TreeNodeRecursion::Stop)
686+
}
687+
683688
/// Wrapper for unchanged data with [`TreeNodeRecursion::Continue`] statement.
684689
pub fn no(data: T) -> Self {
685690
Self::new(data, false, TreeNodeRecursion::Continue)

datafusion/core/src/physical_optimizer/optimizer.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,6 @@
1717

1818
//! Physical optimizer traits
1919
20-
use datafusion_physical_optimizer::PhysicalOptimizerRule;
21-
use std::sync::Arc;
22-
2320
use super::projection_pushdown::ProjectionPushdown;
2421
use super::update_aggr_exprs::OptimizeAggregateOrder;
2522
use crate::physical_optimizer::aggregate_statistics::AggregateStatistics;
@@ -33,6 +30,9 @@ use crate::physical_optimizer::limited_distinct_aggregation::LimitedDistinctAggr
3330
use crate::physical_optimizer::output_requirements::OutputRequirements;
3431
use crate::physical_optimizer::sanity_checker::SanityCheckPlan;
3532
use crate::physical_optimizer::topk_aggregation::TopKAggregation;
33+
use datafusion_physical_optimizer::limit_pushdown_past_window::LimitPushPastWindows;
34+
use datafusion_physical_optimizer::PhysicalOptimizerRule;
35+
use std::sync::Arc;
3636

3737
/// A rule-based physical optimizer.
3838
#[derive(Clone, Debug)]
@@ -92,6 +92,10 @@ impl PhysicalOptimizer {
9292
// into an `order by max(x) limit y`. In this case it will copy the limit value down
9393
// to the aggregation, allowing it to use only y number of accumulators.
9494
Arc::new(TopKAggregation::new()),
95+
// Tries to push limits down through window functions, growing as appropriate
96+
// This can possibly be combined with [LimitPushdown]
97+
// It needs to come after [EnforceSorting]
98+
Arc::new(LimitPushPastWindows::new()),
9599
// The ProjectionPushdown rule tries to push projections towards
96100
// the sources in the execution plan. As a result of this process,
97101
// a projection can disappear if it reaches the source providers, and

datafusion/physical-optimizer/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ recursive_protection = ["dep:recursive"]
3838
arrow = { workspace = true }
3939
datafusion-common = { workspace = true, default-features = true }
4040
datafusion-execution = { workspace = true }
41+
datafusion-expr.workspace = true
4142
datafusion-expr-common = { workspace = true, default-features = true }
4243
datafusion-physical-expr = { workspace = true }
4344
datafusion-physical-plan = { workspace = true }

datafusion/physical-optimizer/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ pub mod aggregate_statistics;
2222
pub mod coalesce_batches;
2323
pub mod combine_partial_final_agg;
2424
pub mod limit_pushdown;
25+
pub mod limit_pushdown_past_window;
2526
pub mod limited_distinct_aggregation;
2627
mod optimizer;
2728
pub mod output_requirements;
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::PhysicalOptimizerRule;
19+
use datafusion_common::config::ConfigOptions;
20+
use datafusion_common::tree_node::{Transformed, TreeNode};
21+
use datafusion_common::ScalarValue;
22+
use datafusion_expr::{WindowFrameBound, WindowFrameUnits};
23+
use datafusion_physical_plan::execution_plan::CardinalityEffect;
24+
use datafusion_physical_plan::limit::GlobalLimitExec;
25+
use datafusion_physical_plan::sorts::sort::SortExec;
26+
use datafusion_physical_plan::windows::BoundedWindowAggExec;
27+
use datafusion_physical_plan::ExecutionPlan;
28+
use std::cmp;
29+
use std::sync::Arc;
30+
31+
/// This rule inspects [`ExecutionPlan`]'s attempting to find fetch limits that were not pushed
32+
/// down by `LimitPushdown` because [BoundedWindowAggExec]s were "in the way". If the window is
33+
/// bounded by [WindowFrameUnits::Rows] then we calculate the adjustment needed to grow the limit
34+
/// and continue pushdown.
35+
#[derive(Default, Clone, Debug)]
36+
pub struct LimitPushPastWindows;
37+
38+
impl LimitPushPastWindows {
39+
pub fn new() -> Self {
40+
Self
41+
}
42+
}
43+
44+
impl PhysicalOptimizerRule for LimitPushPastWindows {
45+
fn optimize(
46+
&self,
47+
original: Arc<dyn ExecutionPlan>,
48+
config: &ConfigOptions,
49+
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
50+
if !config.optimizer.enable_window_limits {
51+
return Ok(original);
52+
}
53+
let mut latest_limit: Option<usize> = None;
54+
let mut latest_max = 0;
55+
let result = original.transform_down(|node| {
56+
// helper closure to DRY out most the early return cases
57+
let mut reset = |node,
58+
max: &mut usize|
59+
-> datafusion_common::Result<
60+
Transformed<Arc<dyn ExecutionPlan>>,
61+
> {
62+
latest_limit = None;
63+
*max = 0;
64+
Ok(Transformed::no(node))
65+
};
66+
67+
// traversing sides of joins will require more thought
68+
if node.children().len() > 1 {
69+
return reset(node, &mut latest_max);
70+
}
71+
72+
// grab the latest limit we see
73+
if let Some(limit) = node.as_any().downcast_ref::<GlobalLimitExec>() {
74+
latest_limit = limit.fetch().map(|fetch| fetch + limit.skip());
75+
latest_max = 0;
76+
return Ok(Transformed::no(node));
77+
}
78+
79+
// grow the limit if we hit a window function
80+
if let Some(window) = node.as_any().downcast_ref::<BoundedWindowAggExec>() {
81+
for expr in window.window_expr().iter() {
82+
let frame = expr.get_window_frame();
83+
if frame.units != WindowFrameUnits::Rows {
84+
return reset(node, &mut latest_max); // expression-based limits?
85+
}
86+
let Some(end_bound) = bound_to_usize(&frame.end_bound) else {
87+
return reset(node, &mut latest_max);
88+
};
89+
latest_max = cmp::max(end_bound, latest_max);
90+
}
91+
return Ok(Transformed::no(node));
92+
}
93+
94+
// Apply the limit if we hit a sort node
95+
if let Some(sort) = node.as_any().downcast_ref::<SortExec>() {
96+
let latest = latest_limit.take();
97+
let Some(fetch) = latest else {
98+
latest_max = 0;
99+
return Ok(Transformed::no(node));
100+
};
101+
let fetch = match sort.fetch() {
102+
None => fetch + latest_max,
103+
Some(existing) => cmp::min(existing, fetch + latest_max),
104+
};
105+
let sort: Arc<dyn ExecutionPlan> = Arc::new(sort.with_fetch(Some(fetch)));
106+
latest_max = 0;
107+
return Ok(Transformed::complete(sort));
108+
}
109+
110+
// we can't push the limit past nodes that decrease row count
111+
match node.cardinality_effect() {
112+
CardinalityEffect::Equal => {}
113+
_ => return reset(node, &mut latest_max),
114+
}
115+
116+
Ok(Transformed::no(node))
117+
})?;
118+
Ok(result.data)
119+
}
120+
121+
fn name(&self) -> &str {
122+
"LimitPushPastWindows"
123+
}
124+
125+
fn schema_check(&self) -> bool {
126+
false // we don't change the schema
127+
}
128+
}
129+
130+
fn bound_to_usize(bound: &WindowFrameBound) -> Option<usize> {
131+
match bound {
132+
WindowFrameBound::Preceding(_) => Some(0),
133+
WindowFrameBound::CurrentRow => Some(0),
134+
WindowFrameBound::Following(ScalarValue::UInt64(Some(scalar))) => {
135+
Some(*scalar as usize)
136+
}
137+
_ => None,
138+
}
139+
}
140+
141+
// tests: all branches are covered by sqllogictests

datafusion/sqllogictest/test_files/explain.slt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,7 @@ physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
248248
physical_plan after coalesce_batches SAME TEXT AS ABOVE
249249
physical_plan after OutputRequirements CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true
250250
physical_plan after LimitAggregation SAME TEXT AS ABOVE
251+
physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE
251252
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
252253
physical_plan after LimitPushdown SAME TEXT AS ABOVE
253254
physical_plan after SanityCheckPlan SAME TEXT AS ABOVE
@@ -324,6 +325,7 @@ physical_plan after OutputRequirements
324325
01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
325326
02)--ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
326327
physical_plan after LimitAggregation SAME TEXT AS ABOVE
328+
physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE
327329
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
328330
physical_plan after LimitPushdown ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
329331
physical_plan after SanityCheckPlan SAME TEXT AS ABOVE
@@ -364,6 +366,7 @@ physical_plan after OutputRequirements
364366
01)GlobalLimitExec: skip=0, fetch=10
365367
02)--ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10
366368
physical_plan after LimitAggregation SAME TEXT AS ABOVE
369+
physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE
367370
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
368371
physical_plan after LimitPushdown ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10
369372
physical_plan after SanityCheckPlan SAME TEXT AS ABOVE

datafusion/sqllogictest/test_files/information_schema.slt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,7 @@ datafusion.optimizer.default_filter_selectivity 20
240240
datafusion.optimizer.enable_distinct_aggregation_soft_limit true
241241
datafusion.optimizer.enable_round_robin_repartition true
242242
datafusion.optimizer.enable_topk_aggregation true
243+
datafusion.optimizer.enable_window_limits true
243244
datafusion.optimizer.expand_views_at_output false
244245
datafusion.optimizer.filter_null_join_keys false
245246
datafusion.optimizer.hash_join_single_partition_threshold 1048576
@@ -333,6 +334,7 @@ datafusion.optimizer.default_filter_selectivity 20 The default filter selectivit
333334
datafusion.optimizer.enable_distinct_aggregation_soft_limit true When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read.
334335
datafusion.optimizer.enable_round_robin_repartition true When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores
335336
datafusion.optimizer.enable_topk_aggregation true When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible
337+
datafusion.optimizer.enable_window_limits true When set to true, the optimizer will attempt to push limit operations past window functions, if possible
336338
datafusion.optimizer.expand_views_at_output false When set to true, if the returned type is a view type then the output will be coerced to a non-view. Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`.
337339
datafusion.optimizer.filter_null_join_keys false When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down.
338340
datafusion.optimizer.hash_join_single_partition_threshold 1048576 The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition

0 commit comments

Comments
 (0)