Skip to content

Commit f1777ef

Browse files
committed
remove unnecessary computations
1 parent 29af731 commit f1777ef

File tree

3 files changed

+10
-25
lines changed

3 files changed

+10
-25
lines changed

datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,7 @@ impl BoundedWindowAggExec {
126126
vec![]
127127
}
128128
};
129-
let cache =
130-
Self::compute_properties(&input, &schema, &window_expr, window_expr_indices);
129+
let cache = Self::compute_properties(&input, &schema, &window_expr);
131130
Ok(Self {
132131
input,
133132
window_expr,
@@ -195,15 +194,9 @@ impl BoundedWindowAggExec {
195194
input: &Arc<dyn ExecutionPlan>,
196195
schema: &SchemaRef,
197196
window_exprs: &[Arc<dyn WindowExpr>],
198-
window_expr_indices: Vec<usize>,
199197
) -> PlanProperties {
200198
// Calculate equivalence properties:
201-
let eq_properties = window_equivalence_properties(
202-
schema,
203-
input,
204-
window_exprs,
205-
window_expr_indices,
206-
);
199+
let eq_properties = window_equivalence_properties(schema, input, window_exprs);
207200

208201
// As we can have repartitioning using the partition keys, this can
209202
// be either one or more than one, depending on the presence of

datafusion/physical-plan/src/windows/mod.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -337,21 +337,25 @@ pub(crate) fn window_equivalence_properties(
337337
schema: &SchemaRef,
338338
input: &Arc<dyn ExecutionPlan>,
339339
window_exprs: &[Arc<dyn WindowExpr>],
340-
window_expr_indices: Vec<usize>,
341340
) -> EquivalenceProperties {
342341
// We need to update the schema, so we can not directly use
343342
// `input.equivalence_properties()`.
344343
let mut window_eq_properties = EquivalenceProperties::new(Arc::clone(schema))
345344
.extend(input.equivalence_properties().clone());
346345

346+
let schema_len = schema.fields.len();
347+
let window_expr_indices = (schema_len..(schema_len - window_exprs.len()))
348+
.rev()
349+
.collect::<Vec<_>>();
350+
347351
for (i, expr) in window_exprs.iter().enumerate() {
348-
let window_expr_index = window_expr_indices[i];
349352
if let Some(udf_window_expr) = expr.as_any().downcast_ref::<StandardWindowExpr>()
350353
{
351354
udf_window_expr.add_equal_orderings(&mut window_eq_properties);
352355
} else if let Some(aggregate_udf_window_expr) =
353356
expr.as_any().downcast_ref::<PlainAggregateWindowExpr>()
354357
{
358+
let window_expr_index = window_expr_indices[i];
355359
aggregate_udf_window_expr
356360
.add_equal_orderings(&mut window_eq_properties, window_expr_index);
357361
}

datafusion/physical-plan/src/windows/window_agg_exec.rs

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -79,16 +79,10 @@ impl WindowAggExec {
7979
let old_fields_len = input.schema().fields.len();
8080
let schema = create_schema(&input.schema(), &window_expr)?;
8181
let schema = Arc::new(schema);
82-
let window_expr_indices = (old_fields_len..schema.fields.len()).collect_vec();
8382

8483
let ordered_partition_by_indices =
8584
get_ordered_partition_by_indices(window_expr[0].partition_by(), &input);
86-
let cache = Self::compute_properties(
87-
Arc::clone(&schema),
88-
&input,
89-
&window_expr,
90-
window_expr_indices,
91-
);
85+
let cache = Self::compute_properties(Arc::clone(&schema), &input, &window_expr);
9286
Ok(Self {
9387
input,
9488
window_expr,
@@ -129,15 +123,9 @@ impl WindowAggExec {
129123
schema: SchemaRef,
130124
input: &Arc<dyn ExecutionPlan>,
131125
window_exprs: &[Arc<dyn WindowExpr>],
132-
window_expr_indices: Vec<usize>,
133126
) -> PlanProperties {
134127
// Calculate equivalence properties:
135-
let eq_properties = window_equivalence_properties(
136-
&schema,
137-
input,
138-
window_exprs,
139-
window_expr_indices,
140-
);
128+
let eq_properties = window_equivalence_properties(&schema, input, window_exprs);
141129

142130
// Get output partitioning:
143131
// Because we can have repartitioning using the partition keys this

0 commit comments

Comments
 (0)