Skip to content

[SPARK-4502][SQL] Parquet nested column pruning #16578

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
51f7346
[SPARK-4502][SQL] Parquet nested column pruning
Jun 24, 2016
c1c6156
Replace '* with "*" for better github syntax highlighting
Jan 13, 2017
72b5319
Execute some refactorings in ParquetSchemaPruning.scala to simplify it
Feb 13, 2017
d400d8c
Fix SelectedField extractor to correctly handle the case of a nested
Feb 13, 2017
a9e648a
Add a ColumnCount metadata property to the FileSourceScanExec to count
Feb 16, 2017
da2bb09
WIP
Mar 10, 2017
0050167
WIP
Apr 26, 2017
106296f
Refine and (attempt to) clarify SelectedField.scala
Jul 21, 2017
ea1e01a
"Fix" scalastyle errors in SelectedFieldSuite.scala
Jul 21, 2017
ab46037
Adjust some code in ParquetSchemaPruning.scala to conform to external
mallman Sep 20, 2017
96c00d8
(Updates from PR review feedback)
mallman Sep 21, 2017
17d00ce
Document ColumnarFileFormat and make it private[sql]
mallman Sep 21, 2017
d95cb65
Update documentation of `QueryPlanConstraints.scala` to cover case of
mallman Sep 21, 2017
d36d380
Compare the parquet file schema and pruned parquet schema in
mallman Sep 22, 2017
90ae3cd
Update order of fields in expected schema in some test cases in
mallman Sep 22, 2017
7d2ddf7
Replace nonsensical example data in ParquetSchemaPruningSuite.scala with
mallman Sep 22, 2017
8b5661c
Fix Filter null check.
viirya Sep 28, 2017
750954c
Alter some code docs to change terminology from "proper field" to
mallman Oct 2, 2017
dfc17a1
Forgo the use of `ParquetSchemaConverter` when counting the number of
mallman Oct 2, 2017
50ae29a
Enhance documentation and skip application of the
mallman Nov 3, 2017
f4cf8ea
Don't print the schema before running the tests in the SelectedFieldS…
mallman Nov 3, 2017
812c70d
Put the schema pruning optimizer rules behind a boolean SQLConf flag,
mallman Nov 3, 2017
336cd40
Remark in the documentation of the spark.sql.nestedSchemaPruning.enabled
mallman Nov 15, 2017
0cd85b6
Don't set catalogTable to None when copying an existing logical relation
mallman Nov 20, 2017
dd4f2d8
Add a SchemaPruningTest trait for running tests with schema pruning
mallman Nov 21, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, NamedExpression}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project}

/**
* Pushes down aliases to [[expressions.GetStructField]] expressions in an aggregate's grouping and
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please describe this is specialized for pushing down pruned nested column.

* aggregate expressions into a projection over its children. The original
* [[expressions.GetStructField]] expressions are replaced with references to the pushed down
* aliases. This allows the optimizer to minimize the columns read from a column-oriented file
* format for aggregation queries involving only nested fields.
*/
object AggregateFieldExtractionPushdown extends FieldExtractionPushdown {
override protected def apply0(plan: LogicalPlan): LogicalPlan =
plan transformDown {
case agg @ Aggregate(groupingExpressions, aggregateExpressions, child) =>
val expressions = groupingExpressions ++ aggregateExpressions
val attributes = AttributeSet(expressions.collect { case att: Attribute => att })
val childAttributes = AttributeSet(child.expressions)
val fieldExtractors0 =
expressions
.flatMap(getFieldExtractors)
.distinct
val fieldExtractors1 =
fieldExtractors0
.filter(_.collectFirst { case att: Attribute => att }
.filter(attributes.contains).isEmpty)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to trim the extractors which contain attributes referred from groupingExpressions ++ aggregateExpressions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to trim the extractors which contain attributes referred from groupingExpressions ++ aggregateExpressions?

Consider this query:

select a, a.b, count(1) from r1 group by a, a.b

The grouping fields are a and a.b. a is an Attribute. a.b is a GetStructField. Since we need all of a to answer this query, it doesn't make sense to attempt to push down a.b. At the same time, fieldExtractors0 includes all GetStructField instances. This includes a.b. The code you refer to above filters out the a.b GetStructField because our query requires all of a.

If we do not filter out a.b, then the child (projection) of the new Aggregate will not contain a in its output. The query planner will barf.

The logic for the creation of the new child projection is here: https://github.com/apache/spark/blob/00ab80c9b78c45c1a8f8c202c5bab04a62cda2ef/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/AggregateFieldExtractionPushdown.scala#L63-L70

This case is tested by https://github.com/apache/spark/blob/00ab80c9b78c45c1a8f8c202c5bab04a62cda2ef/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateFieldExtractionPushdownSuite.scala#L60-L76

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the query is:

select a.b, count(1) from r1 group by a.b

fieldExtractors0 gets GetStructField a.b. Won't fieldExtractors1 filter it out, because the attribute a is contained in the attribute set of all expressions? But we don't need all fields of a now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The attribute a is not in expressions, so it is not in attributes. When we construct attributes, we simply collect instances of Attribute. We don't do any recursion.

Your query is tested by the "basic aggregate field extraction pushdown" test in AggregateFieldExtractionPushdownSuite. It's a little difficult to see because I'm using the Catalyst DataFrame DSL. This seems to be the convention in these tests, though.

val fieldExtractors =
fieldExtractors1
.filter(_.collectFirst { case att: Attribute => att }
.filter(childAttributes.contains).nonEmpty)

if (fieldExtractors.nonEmpty) {
val (aliases, substituteAttributes) = constructAliasesAndSubstitutions(fieldExtractors)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can return original plan if aliases is empty.


if (aliases.nonEmpty) {
// Construct the new grouping and aggregate expressions by substituting
// each GetStructField expression with a reference to its alias
val newAggregateExpressions =
aggregateExpressions.map(substituteAttributes)
.collect { case named: NamedExpression => named }
val newGroupingExpressions = groupingExpressions.map(substituteAttributes)

// We need to push down the aliases we've created. We do this with a new projection over
// this aggregate's child consisting of the aliases and original child's output sans
// attributes referenced by the aliases

// None of these attributes are required by this aggregate because we filtered out the
// GetStructField instances which referred to attributes that were required
val unnecessaryAttributes = aliases.map(_.child.references).reduce(_ ++ _)
// The output we require from this aggregate is the child's output minus the unnecessary
// attributes
val requiredChildOutput = child.output.filterNot(unnecessaryAttributes.contains)
val projects = requiredChildOutput ++ aliases
val newProject = Project(projects, child)

Aggregate(newGroupingExpressions, newAggregateExpressions, newProject)
} else {
agg
}
} else {
agg
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, GetStructField}
import org.apache.spark.sql.catalyst.planning.SelectedField
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.internal.SQLConf

abstract class FieldExtractionPushdown extends Rule[LogicalPlan] {
final override def apply(plan: LogicalPlan): LogicalPlan =
if (SQLConf.get.nestedSchemaPruningEnabled) {
apply0(plan)
} else {
plan
}

protected def apply0(plan: LogicalPlan): LogicalPlan

// Gets the top-level GetStructField expressions from the given expression
// and its children. This does not return children of a GetStructField.
protected final def getFieldExtractors(expr: Expression): Seq[GetStructField] =
expr match {
// Check that getField matches SelectedField(_) to ensure that getField defines a chain of
// extractors down to an attribute.
case getField: GetStructField if SelectedField.unapply(getField).isDefined =>
getField :: Nil
case _ =>
expr.children.flatMap(getFieldExtractors)
}

// Constructs aliases and a substitution function for the given sequence of
// GetStructField expressions.
protected final def constructAliasesAndSubstitutions(fieldExtractors: Seq[GetStructField]) = {
val aliases =
fieldExtractors.map(extractor =>
Alias(extractor, extractor.childSchema(extractor.ordinal).name)())

val attributes = aliases.map(alias => (alias.child, alias.toAttribute)).toMap

val substituteAttributes: Expression => Expression = _.transformDown {
case expr: GetStructField => attributes.getOrElse(expr, expr)
}

(aliases, substituteAttributes)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, NamedExpression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, Project}

/**
* Pushes down aliases to [[expressions.GetStructField]] expressions in a projection over a join
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please describe this is specialized for pushing down pruned nested column.

* and its join condition. The original [[expressions.GetStructField]] expressions are replaced
* with references to the pushed down aliases. This allows the optimizer to minimize the columns
* read from a column-oriented file format for joins involving only nested fields.
*/
object JoinFieldExtractionPushdown extends FieldExtractionPushdown {
override protected def apply0(plan: LogicalPlan): LogicalPlan =
plan transformDown {
case op @ PhysicalOperation(projects, Seq(),
join @ Join(left, right, joinType, Some(joinCondition))) =>
val fieldExtractors = (projects :+ joinCondition).flatMap(getFieldExtractors).distinct

if (fieldExtractors.nonEmpty) {
val (aliases, substituteAttributes) = constructAliasesAndSubstitutions(fieldExtractors)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can return original plan if aliases is empty.


if (aliases.nonEmpty) {
// Construct the new projections and join condition by substituting each GetStructField
// expression with a reference to its alias
val newProjects =
projects.map(substituteAttributes).collect { case named: NamedExpression => named }
val newJoinCondition = substituteAttributes(joinCondition)

// Prune left and right output attributes according to whether they're needed by the
// new projections or join conditions
val aliasAttributes = AttributeSet(aliases.map(_.toAttribute))
val neededAttributes = AttributeSet((newProjects :+ newJoinCondition)
.flatMap(_.collect { case att: Attribute => att })) -- aliasAttributes
val leftAtts = left.output.filter(neededAttributes.contains)
val rightAtts = right.output.filter(neededAttributes.contains)

// Construct the left and right side aliases by partitioning the aliases according to
// whether they reference attributes in the left side or the right side
val (leftAliases, rightAliases) =
aliases.partition(_.references.intersect(left.outputSet).nonEmpty)

val newLeft = Project(leftAtts.toSeq ++ leftAliases, left)
val newRight = Project(rightAtts.toSeq ++ rightAliases, right)

Project(newProjects, Join(newLeft, newRight, joinType, Some(newJoinCondition)))
} else {
op
}
} else {
op
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
// The following batch should be executed after batch "Join Reorder" and "LocalRelation".
Batch("Check Cartesian Products", Once,
CheckCartesianProducts) :+
Batch("Field Extraction Pushdown", fixedPoint,
AggregateFieldExtractionPushdown,
JoinFieldExtractionPushdown) :+
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mallman Could you split these new optimizer rules to two PRs first?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @gatorsmile.

Given the scope of your request, can I ask you to provide a reason for it? What you ask would invalidate some of the existing conversation and review of this PR. It would also substantially restrict the practical usability of this patch.

I believe I've written this patch with a logical separation of concerns along the lines you've requested. As a compromise, would you consider an incremental review starting with the basic projection/filter functionality and proceeding to the optimizer rules following them?

BTW I'm traveling for a few weeks, and I'm spending most of my time away from work. If I'm delayed in responding, that's the reason. I'll still keep up, but at a slower pace.

Thanks.

Batch("RewriteSubquery", Once,
RewritePredicateSubquery,
ColumnPruning,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.planning

import org.apache.spark.sql.catalyst.expressions.{Expression, GetStructField}
import org.apache.spark.sql.types.StructField

/**
* A Scala extractor that extracts the child expression and struct field from a [[GetStructField]].
* This is in contrast to the [[GetStructField]] case class extractor which returns the field
* ordinal instead of the field itself.
*/
private[planning] object GetStructFieldObject {
def unapply(getStructField: GetStructField): Option[(Expression, StructField)] =
Some((
getStructField.child,
getStructField.childSchema(getStructField.ordinal)))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.planning

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._

/**
* A Scala extractor that projects an expression over a given schema. Data types,
* field indexes and field counts of complex type extractors and attributes
* are adjusted to fit the schema. All other expressions are left as-is. This
* class is motivated by columnar nested schema pruning.
*/
case class ProjectionOverSchema(schema: StructType) {
private val fieldNames = schema.fieldNames.toSet

def unapply(expr: Expression): Option[Expression] = getProjection(expr)

private def getProjection(expr: Expression): Option[Expression] =
expr match {
case a @ AttributeReference(name, _, _, _) if (fieldNames.contains(name)) =>
Some(a.copy(dataType = schema(name).dataType)(a.exprId, a.qualifier))
case GetArrayItem(child, arrayItemOrdinal) =>
getProjection(child).map {
case projection =>
GetArrayItem(projection, arrayItemOrdinal)
}
case GetArrayStructFields(child, StructField(name, _, _, _), _, numFields, containsNull) =>
getProjection(child).map(p => (p, p.dataType)).map {
case (projection, ArrayType(projSchema @ StructType(_), _)) =>
GetArrayStructFields(projection,
projSchema(name), projSchema.fieldIndex(name), projSchema.size, containsNull)
}
case GetMapValue(child, key) =>
getProjection(child).map {
case projection =>
GetMapValue(projection, key)
}
case GetStructFieldObject(child, StructField(name, _, _, _)) =>
getProjection(child).map(p => (p, p.dataType)).map {
case (projection, projSchema @ StructType(_)) =>
GetStructField(projection, projSchema.fieldIndex(name))
}
case _ =>
None
}
}
Loading