-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-4502][SQL] Parquet nested column pruning #16578
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
51f7346
c1c6156
72b5319
d400d8c
a9e648a
da2bb09
0050167
106296f
ea1e01a
ab46037
96c00d8
17d00ce
d95cb65
d36d380
90ae3cd
7d2ddf7
8b5661c
750954c
dfc17a1
50ae29a
f4cf8ea
812c70d
336cd40
0cd85b6
dd4f2d8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.catalyst.optimizer | ||
|
||
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, NamedExpression} | ||
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project} | ||
|
||
/** | ||
* Pushes down aliases to [[expressions.GetStructField]] expressions in an aggregate's grouping and | ||
* aggregate expressions into a projection over its children. The original | ||
* [[expressions.GetStructField]] expressions are replaced with references to the pushed down | ||
* aliases. This allows the optimizer to minimize the columns read from a column-oriented file | ||
* format for aggregation queries involving only nested fields. | ||
*/ | ||
object AggregateFieldExtractionPushdown extends FieldExtractionPushdown { | ||
override protected def apply0(plan: LogicalPlan): LogicalPlan = | ||
plan transformDown { | ||
case agg @ Aggregate(groupingExpressions, aggregateExpressions, child) => | ||
val expressions = groupingExpressions ++ aggregateExpressions | ||
val attributes = AttributeSet(expressions.collect { case att: Attribute => att }) | ||
val childAttributes = AttributeSet(child.expressions) | ||
val fieldExtractors0 = | ||
expressions | ||
.flatMap(getFieldExtractors) | ||
.distinct | ||
val fieldExtractors1 = | ||
fieldExtractors0 | ||
.filter(_.collectFirst { case att: Attribute => att } | ||
.filter(attributes.contains).isEmpty) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need to trim the extractors which contain attributes referred from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Consider this query: select a, a.b, count(1) from r1 group by a, a.b The grouping fields are If we do not filter out The logic for the creation of the new child projection is here: https://github.com/apache/spark/blob/00ab80c9b78c45c1a8f8c202c5bab04a62cda2ef/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/AggregateFieldExtractionPushdown.scala#L63-L70 This case is tested by https://github.com/apache/spark/blob/00ab80c9b78c45c1a8f8c202c5bab04a62cda2ef/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateFieldExtractionPushdownSuite.scala#L60-L76 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the query is: select a.b, count(1) from r1 group by a.b
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The attribute Your query is tested by the "basic aggregate field extraction pushdown" test in |
||
val fieldExtractors = | ||
fieldExtractors1 | ||
.filter(_.collectFirst { case att: Attribute => att } | ||
.filter(childAttributes.contains).nonEmpty) | ||
|
||
if (fieldExtractors.nonEmpty) { | ||
val (aliases, substituteAttributes) = constructAliasesAndSubstitutions(fieldExtractors) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can return original plan if aliases is empty. |
||
|
||
if (aliases.nonEmpty) { | ||
// Construct the new grouping and aggregate expressions by substituting | ||
// each GetStructField expression with a reference to its alias | ||
val newAggregateExpressions = | ||
aggregateExpressions.map(substituteAttributes) | ||
.collect { case named: NamedExpression => named } | ||
val newGroupingExpressions = groupingExpressions.map(substituteAttributes) | ||
|
||
// We need to push down the aliases we've created. We do this with a new projection over | ||
// this aggregate's child consisting of the aliases and original child's output sans | ||
// attributes referenced by the aliases | ||
|
||
// None of these attributes are required by this aggregate because we filtered out the | ||
// GetStructField instances which referred to attributes that were required | ||
val unnecessaryAttributes = aliases.map(_.child.references).reduce(_ ++ _) | ||
// The output we require from this aggregate is the child's output minus the unnecessary | ||
// attributes | ||
val requiredChildOutput = child.output.filterNot(unnecessaryAttributes.contains) | ||
val projects = requiredChildOutput ++ aliases | ||
val newProject = Project(projects, child) | ||
|
||
Aggregate(newGroupingExpressions, newAggregateExpressions, newProject) | ||
} else { | ||
agg | ||
} | ||
} else { | ||
agg | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.catalyst.optimizer | ||
|
||
import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, GetStructField} | ||
import org.apache.spark.sql.catalyst.planning.SelectedField | ||
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan | ||
import org.apache.spark.sql.catalyst.rules.Rule | ||
import org.apache.spark.sql.internal.SQLConf | ||
|
||
abstract class FieldExtractionPushdown extends Rule[LogicalPlan] { | ||
final override def apply(plan: LogicalPlan): LogicalPlan = | ||
if (SQLConf.get.nestedSchemaPruningEnabled) { | ||
apply0(plan) | ||
} else { | ||
plan | ||
} | ||
|
||
protected def apply0(plan: LogicalPlan): LogicalPlan | ||
|
||
// Gets the top-level GetStructField expressions from the given expression | ||
// and its children. This does not return children of a GetStructField. | ||
protected final def getFieldExtractors(expr: Expression): Seq[GetStructField] = | ||
expr match { | ||
// Check that getField matches SelectedField(_) to ensure that getField defines a chain of | ||
// extractors down to an attribute. | ||
case getField: GetStructField if SelectedField.unapply(getField).isDefined => | ||
getField :: Nil | ||
case _ => | ||
expr.children.flatMap(getFieldExtractors) | ||
} | ||
|
||
// Constructs aliases and a substitution function for the given sequence of | ||
// GetStructField expressions. | ||
protected final def constructAliasesAndSubstitutions(fieldExtractors: Seq[GetStructField]) = { | ||
val aliases = | ||
fieldExtractors.map(extractor => | ||
Alias(extractor, extractor.childSchema(extractor.ordinal).name)()) | ||
|
||
val attributes = aliases.map(alias => (alias.child, alias.toAttribute)).toMap | ||
|
||
val substituteAttributes: Expression => Expression = _.transformDown { | ||
case expr: GetStructField => attributes.getOrElse(expr, expr) | ||
} | ||
|
||
(aliases, substituteAttributes) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.catalyst.optimizer | ||
|
||
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, NamedExpression} | ||
import org.apache.spark.sql.catalyst.planning.PhysicalOperation | ||
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, Project} | ||
|
||
/** | ||
* Pushes down aliases to [[expressions.GetStructField]] expressions in a projection over a join | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please describe this is specialized for pushing down pruned nested column. |
||
* and its join condition. The original [[expressions.GetStructField]] expressions are replaced | ||
* with references to the pushed down aliases. This allows the optimizer to minimize the columns | ||
* read from a column-oriented file format for joins involving only nested fields. | ||
*/ | ||
object JoinFieldExtractionPushdown extends FieldExtractionPushdown { | ||
override protected def apply0(plan: LogicalPlan): LogicalPlan = | ||
plan transformDown { | ||
case op @ PhysicalOperation(projects, Seq(), | ||
join @ Join(left, right, joinType, Some(joinCondition))) => | ||
val fieldExtractors = (projects :+ joinCondition).flatMap(getFieldExtractors).distinct | ||
|
||
if (fieldExtractors.nonEmpty) { | ||
val (aliases, substituteAttributes) = constructAliasesAndSubstitutions(fieldExtractors) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can return original plan if |
||
|
||
if (aliases.nonEmpty) { | ||
// Construct the new projections and join condition by substituting each GetStructField | ||
// expression with a reference to its alias | ||
val newProjects = | ||
projects.map(substituteAttributes).collect { case named: NamedExpression => named } | ||
val newJoinCondition = substituteAttributes(joinCondition) | ||
|
||
// Prune left and right output attributes according to whether they're needed by the | ||
// new projections or join conditions | ||
val aliasAttributes = AttributeSet(aliases.map(_.toAttribute)) | ||
val neededAttributes = AttributeSet((newProjects :+ newJoinCondition) | ||
.flatMap(_.collect { case att: Attribute => att })) -- aliasAttributes | ||
val leftAtts = left.output.filter(neededAttributes.contains) | ||
val rightAtts = right.output.filter(neededAttributes.contains) | ||
|
||
// Construct the left and right side aliases by partitioning the aliases according to | ||
// whether they reference attributes in the left side or the right side | ||
val (leftAliases, rightAliases) = | ||
aliases.partition(_.references.intersect(left.outputSet).nonEmpty) | ||
|
||
val newLeft = Project(leftAtts.toSeq ++ leftAliases, left) | ||
val newRight = Project(rightAtts.toSeq ++ rightAliases, right) | ||
|
||
Project(newProjects, Join(newLeft, newRight, joinType, Some(newJoinCondition))) | ||
} else { | ||
op | ||
} | ||
} else { | ||
op | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -151,6 +151,9 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) | |
// The following batch should be executed after batch "Join Reorder" and "LocalRelation". | ||
Batch("Check Cartesian Products", Once, | ||
CheckCartesianProducts) :+ | ||
Batch("Field Extraction Pushdown", fixedPoint, | ||
AggregateFieldExtractionPushdown, | ||
JoinFieldExtractionPushdown) :+ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mallman Could you split these new optimizer rules to two PRs first? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @gatorsmile. Given the scope of your request, can I ask you to provide a reason for it? What you ask would invalidate some of the existing conversation and review of this PR. It would also substantially restrict the practical usability of this patch. I believe I've written this patch with a logical separation of concerns along the lines you've requested. As a compromise, would you consider an incremental review starting with the basic projection/filter functionality and proceeding to the optimizer rules following them? BTW I'm traveling for a few weeks, and I'm spending most of my time away from work. If I'm delayed in responding, that's the reason. I'll still keep up, but at a slower pace. Thanks. |
||
Batch("RewriteSubquery", Once, | ||
RewritePredicateSubquery, | ||
ColumnPruning, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.catalyst.planning | ||
|
||
import org.apache.spark.sql.catalyst.expressions.{Expression, GetStructField} | ||
import org.apache.spark.sql.types.StructField | ||
|
||
/** | ||
* A Scala extractor that extracts the child expression and struct field from a [[GetStructField]]. | ||
* This is in contrast to the [[GetStructField]] case class extractor which returns the field | ||
* ordinal instead of the field itself. | ||
*/ | ||
private[planning] object GetStructFieldObject { | ||
def unapply(getStructField: GetStructField): Option[(Expression, StructField)] = | ||
Some(( | ||
getStructField.child, | ||
getStructField.childSchema(getStructField.ordinal))) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.catalyst.planning | ||
|
||
import org.apache.spark.sql.catalyst.expressions._ | ||
import org.apache.spark.sql.types._ | ||
|
||
/** | ||
* A Scala extractor that projects an expression over a given schema. Data types, | ||
* field indexes and field counts of complex type extractors and attributes | ||
* are adjusted to fit the schema. All other expressions are left as-is. This | ||
* class is motivated by columnar nested schema pruning. | ||
*/ | ||
case class ProjectionOverSchema(schema: StructType) { | ||
private val fieldNames = schema.fieldNames.toSet | ||
|
||
def unapply(expr: Expression): Option[Expression] = getProjection(expr) | ||
|
||
private def getProjection(expr: Expression): Option[Expression] = | ||
expr match { | ||
case a @ AttributeReference(name, _, _, _) if (fieldNames.contains(name)) => | ||
Some(a.copy(dataType = schema(name).dataType)(a.exprId, a.qualifier)) | ||
case GetArrayItem(child, arrayItemOrdinal) => | ||
getProjection(child).map { | ||
case projection => | ||
GetArrayItem(projection, arrayItemOrdinal) | ||
} | ||
case GetArrayStructFields(child, StructField(name, _, _, _), _, numFields, containsNull) => | ||
getProjection(child).map(p => (p, p.dataType)).map { | ||
case (projection, ArrayType(projSchema @ StructType(_), _)) => | ||
GetArrayStructFields(projection, | ||
projSchema(name), projSchema.fieldIndex(name), projSchema.size, containsNull) | ||
} | ||
case GetMapValue(child, key) => | ||
getProjection(child).map { | ||
case projection => | ||
GetMapValue(projection, key) | ||
} | ||
case GetStructFieldObject(child, StructField(name, _, _, _)) => | ||
getProjection(child).map(p => (p, p.dataType)).map { | ||
case (projection, projSchema @ StructType(_)) => | ||
GetStructField(projection, projSchema.fieldIndex(name)) | ||
} | ||
case _ => | ||
None | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please describe this is specialized for pushing down pruned nested column.