Skip to content

Commit 1f02953

Browse files
committed
remove unnecessary computations
1 parent 29af731 commit 1f02953

File tree

3 files changed

+10
-30
lines changed

3 files changed

+10
-30
lines changed

datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@ use futures::stream::Stream;
6767
use futures::{ready, StreamExt};
6868
use hashbrown::hash_table::HashTable;
6969
use indexmap::IndexMap;
70-
use itertools::Itertools;
7170
use log::debug;
7271

7372
/// Window execution plan
@@ -104,10 +103,8 @@ impl BoundedWindowAggExec {
104103
partition_keys: Vec<Arc<dyn PhysicalExpr>>,
105104
input_order_mode: InputOrderMode,
106105
) -> Result<Self> {
107-
let old_fields_len = input.schema().fields.len();
108106
let schema = create_schema(&input.schema(), &window_expr)?;
109107
let schema = Arc::new(schema);
110-
let window_expr_indices = (old_fields_len..schema.fields.len()).collect_vec();
111108
let partition_by_exprs = window_expr[0].partition_by();
112109
let ordered_partition_by_indices = match &input_order_mode {
113110
InputOrderMode::Sorted => {
@@ -126,8 +123,7 @@ impl BoundedWindowAggExec {
126123
vec![]
127124
}
128125
};
129-
let cache =
130-
Self::compute_properties(&input, &schema, &window_expr, window_expr_indices);
126+
let cache = Self::compute_properties(&input, &schema, &window_expr);
131127
Ok(Self {
132128
input,
133129
window_expr,
@@ -195,15 +191,9 @@ impl BoundedWindowAggExec {
195191
input: &Arc<dyn ExecutionPlan>,
196192
schema: &SchemaRef,
197193
window_exprs: &[Arc<dyn WindowExpr>],
198-
window_expr_indices: Vec<usize>,
199194
) -> PlanProperties {
200195
// Calculate equivalence properties:
201-
let eq_properties = window_equivalence_properties(
202-
schema,
203-
input,
204-
window_exprs,
205-
window_expr_indices,
206-
);
196+
let eq_properties = window_equivalence_properties(schema, input, window_exprs);
207197

208198
// As we can have repartitioning using the partition keys, this can
209199
// be either one or more than one, depending on the presence of

datafusion/physical-plan/src/windows/mod.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -337,21 +337,25 @@ pub(crate) fn window_equivalence_properties(
337337
schema: &SchemaRef,
338338
input: &Arc<dyn ExecutionPlan>,
339339
window_exprs: &[Arc<dyn WindowExpr>],
340-
window_expr_indices: Vec<usize>,
341340
) -> EquivalenceProperties {
342341
// We need to update the schema, so we can not directly use
343342
// `input.equivalence_properties()`.
344343
let mut window_eq_properties = EquivalenceProperties::new(Arc::clone(schema))
345344
.extend(input.equivalence_properties().clone());
346345

346+
let schema_len = schema.fields.len();
347+
let window_expr_indices = (schema_len..(schema_len - window_exprs.len()))
348+
.rev()
349+
.collect::<Vec<_>>();
350+
347351
for (i, expr) in window_exprs.iter().enumerate() {
348-
let window_expr_index = window_expr_indices[i];
349352
if let Some(udf_window_expr) = expr.as_any().downcast_ref::<StandardWindowExpr>()
350353
{
351354
udf_window_expr.add_equal_orderings(&mut window_eq_properties);
352355
} else if let Some(aggregate_udf_window_expr) =
353356
expr.as_any().downcast_ref::<PlainAggregateWindowExpr>()
354357
{
358+
let window_expr_index = window_expr_indices[i];
355359
aggregate_udf_window_expr
356360
.add_equal_orderings(&mut window_eq_properties, window_expr_index);
357361
}

datafusion/physical-plan/src/windows/window_agg_exec.rs

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ use datafusion_execution::TaskContext;
4747
use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
4848

4949
use futures::{ready, Stream, StreamExt};
50-
use itertools::Itertools;
5150

5251
/// Window execution plan
5352
#[derive(Debug, Clone)]
@@ -76,19 +75,12 @@ impl WindowAggExec {
7675
input: Arc<dyn ExecutionPlan>,
7776
partition_keys: Vec<Arc<dyn PhysicalExpr>>,
7877
) -> Result<Self> {
79-
let old_fields_len = input.schema().fields.len();
8078
let schema = create_schema(&input.schema(), &window_expr)?;
8179
let schema = Arc::new(schema);
82-
let window_expr_indices = (old_fields_len..schema.fields.len()).collect_vec();
8380

8481
let ordered_partition_by_indices =
8582
get_ordered_partition_by_indices(window_expr[0].partition_by(), &input);
86-
let cache = Self::compute_properties(
87-
Arc::clone(&schema),
88-
&input,
89-
&window_expr,
90-
window_expr_indices,
91-
);
83+
let cache = Self::compute_properties(Arc::clone(&schema), &input, &window_expr);
9284
Ok(Self {
9385
input,
9486
window_expr,
@@ -129,15 +121,9 @@ impl WindowAggExec {
129121
schema: SchemaRef,
130122
input: &Arc<dyn ExecutionPlan>,
131123
window_exprs: &[Arc<dyn WindowExpr>],
132-
window_expr_indices: Vec<usize>,
133124
) -> PlanProperties {
134125
// Calculate equivalence properties:
135-
let eq_properties = window_equivalence_properties(
136-
&schema,
137-
input,
138-
window_exprs,
139-
window_expr_indices,
140-
);
126+
let eq_properties = window_equivalence_properties(&schema, input, window_exprs);
141127

142128
// Get output partitioning:
143129
// Because we can have repartitioning using the partition keys this

0 commit comments

Comments
 (0)