Skip to content

Commit 928845a

Browse files
cloud-fangatorsmile
authored andcommitted
[SPARK-24172][SQL] we should not apply operator pushdown to data source v2 many times
## What changes were proposed in this pull request? In `PushDownOperatorsToDataSource`, we use `transformUp` to match `PhysicalOperation` and apply pushdown. This is problematic if we have multiple `Filter` and `Project` above the data source v2 relation. e.g. for a query ``` Project Filter DataSourceV2Relation ``` The pattern match will be triggered twice and we will do operator pushdown twice. This is unnecessary, we can use `mapChildren` to only apply pushdown once. ## How was this patch tested? existing test Author: Wenchen Fan <wenchen@databricks.com> Closes #21230 from cloud-fan/step2.
1 parent 5403268 commit 928845a

File tree

1 file changed

+5
-10
lines changed

1 file changed

+5
-10
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,10 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project
2323
import org.apache.spark.sql.catalyst.rules.Rule
2424

2525
object PushDownOperatorsToDataSource extends Rule[LogicalPlan] {
26-
override def apply(
27-
plan: LogicalPlan): LogicalPlan = plan transformUp {
26+
override def apply(plan: LogicalPlan): LogicalPlan = plan match {
2827
// PhysicalOperation guarantees that filters are deterministic; no need to check
29-
case PhysicalOperation(project, newFilters, relation : DataSourceV2Relation) =>
30-
// merge the filters
31-
val filters = relation.filters match {
32-
case Some(existing) =>
33-
existing ++ newFilters
34-
case _ =>
35-
newFilters
36-
}
28+
case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
29+
assert(relation.filters.isEmpty, "data source v2 should do push down only once.")
3730

3831
val projectAttrs = project.map(_.toAttribute)
3932
val projectSet = AttributeSet(project.flatMap(_.references))
@@ -67,5 +60,7 @@ object PushDownOperatorsToDataSource extends Rule[LogicalPlan] {
6760
} else {
6861
filtered
6962
}
63+
64+
case other => other.mapChildren(apply)
7065
}
7166
}

0 commit comments

Comments
 (0)