-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-4502][SQL] Parquet nested column pruning - foundation #21320
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
6e41081
0d0e8a0
44e78cb
9488cb5
f3735b1
2120ab5
8d53bbd
e213471
9e6ef5f
e6ea9c2
2d02ab3
cfffc95
2779351
9329f77
ec313c1
42aff39
71f4c7b
0e5594b
1573ae8
09dd655
61c7937
97b3a51
2711746
e6baf68
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1381,8 +1381,18 @@ object SQLConf { | |
"issues. Turn on this config to insert a local sort before actually doing repartition " + | ||
"to generate consistent repartition results. The performance of repartition() may go " + | ||
"down since we insert extra local sort before it.") | ||
.booleanConf | ||
.createWithDefault(true) | ||
|
||
val NESTED_SCHEMA_PRUNING_ENABLED = | ||
buildConf("spark.sql.nestedSchemaPruning.enabled") | ||
.internal() | ||
.doc("Prune nested fields from a logical relation's output which are unnecessary in " + | ||
"satisfying a query. This optimization allows columnar file format readers to avoid " + | ||
"reading unnecessary nested column data. Currently Parquet is the only data source that " + | ||
"implements this optimization.") | ||
.booleanConf | ||
.createWithDefault(true) | ||
.createWithDefault(false) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how about enabling it as default? there should be enough time to find any unexpected problems with 2.4.0 additionally nested column pruning would be enabled during all other automatic tests There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm against enabling this feature by default with a known failing test case. For example, https://github.com/apache/spark/pull/21320/files#diff-0c6c7481232e9637b91c179f1005426aR71. |
||
|
||
val TOP_K_SORT_FALLBACK_THRESHOLD = | ||
buildConf("spark.sql.execution.topKSortFallbackThreshold") | ||
|
@@ -1863,6 +1873,8 @@ class SQLConf extends Serializable with Logging { | |
def partitionOverwriteMode: PartitionOverwriteMode.Value = | ||
PartitionOverwriteMode.withName(getConf(PARTITION_OVERWRITE_MODE)) | ||
|
||
def nestedSchemaPruningEnabled: Boolean = getConf(NESTED_SCHEMA_PRUNING_ENABLED) | ||
|
||
def csvColumnPruning: Boolean = getConf(SQLConf.CSV_PARSER_COLUMN_PRUNING) | ||
|
||
def legacySizeOfNull: Boolean = getConf(SQLConf.LEGACY_SIZE_OF_NULL) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.catalyst | ||
|
||
import org.scalatest.BeforeAndAfterAll | ||
|
||
import org.apache.spark.sql.catalyst.plans.PlanTest | ||
import org.apache.spark.sql.internal.SQLConf.NESTED_SCHEMA_PRUNING_ENABLED | ||
|
||
/** | ||
* A PlanTest that ensures that all tests in this suite are run with nested schema pruning enabled. | ||
* Remove this trait once the default value of SQLConf.NESTED_SCHEMA_PRUNING_ENABLED is set to true. | ||
*/ | ||
private[sql] trait SchemaPruningTest extends PlanTest with BeforeAndAfterAll { | ||
private var originalConfSchemaPruningEnabled = false | ||
|
||
override protected def beforeAll(): Unit = { | ||
originalConfSchemaPruningEnabled = conf.nestedSchemaPruningEnabled | ||
conf.setConf(NESTED_SCHEMA_PRUNING_ENABLED, true) | ||
super.beforeAll() | ||
} | ||
|
||
override protected def afterAll(): Unit = { | ||
try { | ||
super.afterAll() | ||
} finally { | ||
conf.setConf(NESTED_SCHEMA_PRUNING_ENABLED, originalConfSchemaPruningEnabled) | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.execution | ||
|
||
import org.apache.spark.sql.catalyst.expressions.{Expression, GetStructField} | ||
import org.apache.spark.sql.types.StructField | ||
|
||
/** | ||
* A Scala extractor that extracts the child expression and struct field from a [[GetStructField]]. | ||
* This is in contrast to the [[GetStructField]] case class extractor which returns the field | ||
* ordinal instead of the field itself. | ||
*/ | ||
private[execution] object GetStructFieldObject { | ||
def unapply(getStructField: GetStructField): Option[(Expression, StructField)] = | ||
Some(( | ||
getStructField.child, | ||
getStructField.childSchema(getStructField.ordinal))) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.execution | ||
|
||
import org.apache.spark.sql.catalyst.expressions._ | ||
import org.apache.spark.sql.types._ | ||
|
||
/** | ||
* A Scala extractor that projects an expression over a given schema. Data types, | ||
* field indexes and field counts of complex type extractors and attributes | ||
* are adjusted to fit the schema. All other expressions are left as-is. This | ||
* class is motivated by columnar nested schema pruning. | ||
*/ | ||
private[execution] case class ProjectionOverSchema(schema: StructType) { | ||
private val fieldNames = schema.fieldNames.toSet | ||
|
||
def unapply(expr: Expression): Option[Expression] = getProjection(expr) | ||
|
||
private def getProjection(expr: Expression): Option[Expression] = | ||
expr match { | ||
case a: AttributeReference if fieldNames.contains(a.name) => | ||
Some(a.copy(dataType = schema(a.name).dataType)(a.exprId, a.qualifier)) | ||
case GetArrayItem(child, arrayItemOrdinal) => | ||
getProjection(child).map { projection => GetArrayItem(projection, arrayItemOrdinal) } | ||
case a: GetArrayStructFields => | ||
getProjection(a.child).map(p => (p, p.dataType)).map { | ||
case (projection, ArrayType(projSchema @ StructType(_), _)) => | ||
GetArrayStructFields(projection, | ||
projSchema(a.field.name), | ||
projSchema.fieldIndex(a.field.name), | ||
projSchema.size, | ||
a.containsNull) | ||
} | ||
case GetMapValue(child, key) => | ||
getProjection(child).map { projection => GetMapValue(projection, key) } | ||
case GetStructFieldObject(child, field: StructField) => | ||
getProjection(child).map(p => (p, p.dataType)).map { | ||
case (projection, projSchema: StructType) => | ||
GetStructField(projection, projSchema.fieldIndex(field.name)) | ||
} | ||
case _ => | ||
None | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,134 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.execution | ||
|
||
import org.apache.spark.sql.catalyst.expressions._ | ||
import org.apache.spark.sql.types._ | ||
|
||
/** | ||
* A Scala extractor that builds a [[org.apache.spark.sql.types.StructField]] from a Catalyst | ||
* complex type extractor. For example, consider a relation with the following schema: | ||
* | ||
* {{{ | ||
* root | ||
* |-- name: struct (nullable = true) | ||
* | |-- first: string (nullable = true) | ||
* | |-- last: string (nullable = true) | ||
* }}} | ||
* | ||
* Further, suppose we take the select expression `name.first`. This will parse into an | ||
* `Alias(child, "first")`. Ignoring the alias, `child` matches the following pattern: | ||
* | ||
* {{{ | ||
* GetStructFieldObject( | ||
* AttributeReference("name", StructType(_), _, _), | ||
* StructField("first", StringType, _, _)) | ||
* }}} | ||
* | ||
* [[SelectedField]] converts that expression into | ||
* | ||
* {{{ | ||
* StructField("name", StructType(Array(StructField("first", StringType)))) | ||
* }}} | ||
* | ||
* by mapping each complex type extractor to a [[org.apache.spark.sql.types.StructField]] with the | ||
* same name as its child (or "parent" going right to left in the select expression) and a data | ||
* type appropriate to the complex type extractor. In our example, the name of the child expression | ||
* is "name" and its data type is a [[org.apache.spark.sql.types.StructType]] with a single string | ||
* field named "first". | ||
* | ||
* @param expr the top-level complex type extractor | ||
*/ | ||
private[execution] object SelectedField { | ||
def unapply(expr: Expression): Option[StructField] = { | ||
// If this expression is an alias, work on its child instead | ||
val unaliased = expr match { | ||
case Alias(child, _) => child | ||
case expr => expr | ||
} | ||
selectField(unaliased, None) | ||
} | ||
|
||
private def selectField(expr: Expression, fieldOpt: Option[StructField]): Option[StructField] = { | ||
expr match { | ||
// No children. Returns a StructField with the attribute name or None if fieldOpt is None. | ||
case AttributeReference(name, dataType, nullable, metadata) => | ||
fieldOpt.map(field => | ||
StructField(name, wrapStructType(dataType, field), nullable, metadata)) | ||
// Handles case "expr0.field[n]", where "expr0" is of struct type and "expr0.field" is of | ||
// array type. | ||
case GetArrayItem(x @ GetStructFieldObject(child, field @ StructField(name, | ||
dataType, nullable, metadata)), _) => | ||
val childField = fieldOpt.map(field => StructField(name, | ||
wrapStructType(dataType, field), nullable, metadata)).getOrElse(field) | ||
selectField(child, Some(childField)) | ||
// Handles case "expr0.field[n]", where "expr0.field" is of array type. | ||
case GetArrayItem(child, _) => | ||
selectField(child, fieldOpt) | ||
// Handles case "expr0.field.subfield", where "expr0" and "expr0.field" are of array type. | ||
case GetArrayStructFields(child: GetArrayStructFields, | ||
field @ StructField(name, dataType, nullable, metadata), _, _, _) => | ||
val childField = fieldOpt.map(field => StructField(name, | ||
wrapStructType(dataType, field), | ||
nullable, metadata)).orElse(Some(field)) | ||
selectField(child, childField) | ||
// Handles case "expr0.field", where "expr0" is of array type. | ||
case GetArrayStructFields(child, | ||
field @ StructField(name, dataType, nullable, metadata), _, _, _) => | ||
val childField = | ||
fieldOpt.map(field => StructField(name, | ||
wrapStructType(dataType, field), | ||
nullable, metadata)).orElse(Some(field)) | ||
selectField(child, childField) | ||
// Handles case "expr0.field[key]", where "expr0" is of struct type and "expr0.field" is of | ||
// map type. | ||
case GetMapValue(x @ GetStructFieldObject(child, field @ StructField(name, | ||
dataType, | ||
nullable, metadata)), _) => | ||
val childField = fieldOpt.map(field => StructField(name, | ||
wrapStructType(dataType, field), | ||
nullable, metadata)).orElse(Some(field)) | ||
selectField(child, childField) | ||
// Handles case "expr0.field[key]", where "expr0.field" is of map type. | ||
case GetMapValue(child, _) => | ||
selectField(child, fieldOpt) | ||
// Handles case "expr0.field", where expr0 is of struct type. | ||
case GetStructFieldObject(child, | ||
field @ StructField(name, dataType, nullable, metadata)) => | ||
val childField = fieldOpt.map(field => StructField(name, | ||
wrapStructType(dataType, field), | ||
nullable, metadata)).orElse(Some(field)) | ||
selectField(child, childField) | ||
case _ => | ||
None | ||
} | ||
} | ||
|
||
// Constructs a composition of complex types with a StructType(Array(field)) at its core. Returns | ||
// a StructType for a StructType, an ArrayType for an ArrayType and a MapType for a MapType. | ||
private def wrapStructType(dataType: DataType, field: StructField): DataType = { | ||
dataType match { | ||
case _: StructType => | ||
StructType(Array(field)) | ||
case ArrayType(elementType, containsNull) => | ||
ArrayType(wrapStructType(elementType, field), containsNull) | ||
case MapType(keyType, valueType, valueContainsNull) => | ||
MapType(keyType, wrapStructType(valueType, field), valueContainsNull) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about ORC?
cc @dongjoon-hyun Do you know whether it is also doable in the latest ORC version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for pinging me, @gatorsmile . Let me check it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ORC should be able to support this capability as well, but this PR does not address that.