Skip to content

Commit ed89a21

Browse files
committed
Consolidate projection check
1 parent d524da9 commit ed89a21

File tree

1 file changed

+7
-28
lines changed

1 file changed

+7
-28
lines changed

datafusion/core/src/physical_optimizer/projection_pushdown.rs

Lines changed: 7 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,13 @@ pub fn remove_unnecessary_projections(
9797
if is_projection_removable(projection) {
9898
return Ok(Transformed::Yes(projection.input().clone()));
9999
}
100+
101+
// If the projection does not narrow the the schema, we should not try
102+
// to push it down
103+
if projection.expr().len() >= projection.input().schema().fields().len() {
104+
return Ok(Transformed::No(plan));
105+
}
106+
100107
// If it does, check if we can push it under its child(ren):
101108
let input = projection.input().as_any();
102109
if let Some(csv) = input.downcast_ref::<CsvExec>() {
@@ -227,11 +234,6 @@ fn try_swapping_with_output_req(
227234
projection: &ProjectionExec,
228235
output_req: &OutputRequirementExec,
229236
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
230-
// If the projection does not narrow the the schema, we should not try to push it down:
231-
if projection.expr().len() >= projection.input().schema().fields().len() {
232-
return Ok(None);
233-
}
234-
235237
let mut updated_sort_reqs = vec![];
236238
// None or empty_vec can be treated in the same way.
237239
if let Some(reqs) = &output_req.required_input_ordering()[0] {
@@ -277,10 +279,6 @@ fn try_swapping_with_output_req(
277279
fn try_swapping_with_coalesce_partitions(
278280
projection: &ProjectionExec,
279281
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
280-
// If the projection does not narrow the the schema, we should not try to push it down:
281-
if projection.expr().len() >= projection.input().schema().fields().len() {
282-
return Ok(None);
283-
}
284282
// CoalescePartitionsExec always has a single child, so zero indexing is safe.
285283
make_with_child(projection, &projection.input().children()[0])
286284
.map(|e| Some(Arc::new(CoalescePartitionsExec::new(e)) as _))
@@ -292,10 +290,6 @@ fn try_swapping_with_filter(
292290
projection: &ProjectionExec,
293291
filter: &FilterExec,
294292
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
295-
// If the projection does not narrow the the schema, we should not try to push it down:
296-
if projection.expr().len() >= projection.input().schema().fields().len() {
297-
return Ok(None);
298-
}
299293
// Each column in the predicate expression must exist after the projection.
300294
let Some(new_predicate) = update_expr(filter.predicate(), projection.expr(), false)?
301295
else {
@@ -313,11 +307,6 @@ fn try_swapping_with_repartition(
313307
projection: &ProjectionExec,
314308
repartition: &RepartitionExec,
315309
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
316-
// If the projection does not narrow the the schema, we should not try to push it down.
317-
if projection.expr().len() >= projection.input().schema().fields().len() {
318-
return Ok(None);
319-
}
320-
321310
// If pushdown is not beneficial or applicable, break it.
322311
if projection.benefits_from_input_partitioning()[0] || !all_columns(projection.expr())
323312
{
@@ -355,11 +344,6 @@ fn try_swapping_with_sort(
355344
projection: &ProjectionExec,
356345
sort: &SortExec,
357346
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
358-
// If the projection does not narrow the the schema, we should not try to push it down.
359-
if projection.expr().len() >= projection.input().schema().fields().len() {
360-
return Ok(None);
361-
}
362-
363347
let mut updated_exprs = vec![];
364348
for sort in sort.expr() {
365349
let Some(new_expr) = update_expr(&sort.expr, projection.expr(), false)? else {
@@ -384,11 +368,6 @@ fn try_swapping_with_sort_preserving_merge(
384368
projection: &ProjectionExec,
385369
spm: &SortPreservingMergeExec,
386370
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
387-
// If the projection does not narrow the the schema, we should not try to push it down.
388-
if projection.expr().len() >= projection.input().schema().fields().len() {
389-
return Ok(None);
390-
}
391-
392371
let mut updated_exprs = vec![];
393372
for sort in spm.expr() {
394373
let Some(updated_expr) = update_expr(&sort.expr, projection.expr(), false)?

0 commit comments

Comments
 (0)