-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-23301][SQL] data source column pruning should work for arbitrary expressions #20476
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,7 +17,7 @@ | |
|
||
package org.apache.spark.sql.execution.datasources.v2 | ||
|
||
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeMap, Expression, NamedExpression, PredicateHelper} | ||
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeMap, AttributeSet, Expression, NamedExpression, PredicateHelper} | ||
import org.apache.spark.sql.catalyst.optimizer.RemoveRedundantProject | ||
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} | ||
import org.apache.spark.sql.catalyst.rules.Rule | ||
|
@@ -81,35 +81,34 @@ object PushDownOperatorsToDataSource extends Rule[LogicalPlan] with PredicateHel | |
|
||
// TODO: add more push down rules. | ||
|
||
// TODO: nested fields pruning | ||
def pushDownRequiredColumns(plan: LogicalPlan, requiredByParent: Seq[Attribute]): Unit = { | ||
plan match { | ||
case Project(projectList, child) => | ||
val required = projectList.filter(requiredByParent.contains).flatMap(_.references) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This line is wrong and I fixed to https://github.com/apache/spark/pull/20476/files#diff-b7f3810e65a2bb1585de9609ea491469R93 |
||
pushDownRequiredColumns(child, required) | ||
|
||
case Filter(condition, child) => | ||
val required = requiredByParent ++ condition.references | ||
pushDownRequiredColumns(child, required) | ||
|
||
case DataSourceV2Relation(fullOutput, reader) => reader match { | ||
case r: SupportsPushDownRequiredColumns => | ||
// Match original case of attributes. | ||
val attrMap = AttributeMap(fullOutput.zip(fullOutput)) | ||
val requiredColumns = requiredByParent.map(attrMap) | ||
r.pruneColumns(requiredColumns.toStructType) | ||
case _ => | ||
} | ||
pushDownRequiredColumns(filterPushed, filterPushed.outputSet) | ||
// After column pruning, we may have redundant PROJECT nodes in the query plan, remove them. | ||
RemoveRedundantProject(filterPushed) | ||
} | ||
|
||
// TODO: nested fields pruning | ||
private def pushDownRequiredColumns(plan: LogicalPlan, requiredByParent: AttributeSet): Unit = { | ||
plan match { | ||
case Project(projectList, child) => | ||
val required = projectList.flatMap(_.references) | ||
pushDownRequiredColumns(child, AttributeSet(required)) | ||
|
||
case Filter(condition, child) => | ||
val required = requiredByParent ++ condition.references | ||
pushDownRequiredColumns(child, required) | ||
|
||
// TODO: there may be more operators can be used to calculate required columns, we can add | ||
// more and more in the future. | ||
case _ => plan.children.foreach(child => pushDownRequiredColumns(child, child.output)) | ||
case relation: DataSourceV2Relation => relation.reader match { | ||
case reader: SupportsPushDownRequiredColumns => | ||
val requiredColumns = relation.output.filter(requiredByParent.contains) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. a cleaner way to retain the original case of attributes. |
||
reader.pruneColumns(requiredColumns.toStructType) | ||
|
||
case _ => | ||
} | ||
} | ||
|
||
pushDownRequiredColumns(filterPushed, filterPushed.output) | ||
// After column pruning, we may have redundant PROJECT nodes in the query plan, remove them. | ||
RemoveRedundantProject(filterPushed) | ||
// TODO: there may be more operators that can be used to calculate the required columns. We | ||
// can add more and more in the future. | ||
case _ => plan.children.foreach(child => pushDownRequiredColumns(child, child.outputSet)) | ||
} | ||
} | ||
|
||
/** | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make it a private method instead of an inline method