-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-4502][SQL] Parquet nested column pruning - foundation #21320
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-4502][SQL] Parquet nested column pruning - foundation #21320
Conversation
@gatorsmile I believe this is the PR you requested for review. |
Test build #90582 has finished for PR 21320 at commit
|
.internal() | ||
.doc("Prune nested fields from a logical relation's output which are unnecessary in " + | ||
"satisfying a query. This optimization allows columnar file format readers to avoid " + | ||
"reading unnecessary nested column data. Currently Parquet is the only data source that " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about ORC?
cc @dongjoon-hyun Do you know whether it is also doable in the latest ORC version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for pinging me, @gatorsmile . Let me check it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ORC should be able to support this capability as well, but this PR does not address that.
@@ -162,7 +162,9 @@ case class FilterExec(condition: Expression, child: SparkPlan) | |||
val generatedIsNotNullChecks = new Array[Boolean](notNullPreds.length) | |||
val generated = otherPreds.map { c => | |||
val nullChecks = c.references.map { r => | |||
val idx = notNullPreds.indexWhere { n => n.asInstanceOf[IsNotNull].child.semanticEquals(r)} | |||
val idx = notNullPreds.indexWhere { n => | |||
n.asInstanceOf[IsNotNull].child.references.contains(r) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this change related?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. IIRC a community member identified a bug and @viirya contributed a fix and unit test. See VideoAmp@8b5661c.
*/ | ||
private def scanNullIntolerantAttribute(expr: Expression): Seq[Attribute] = expr match { | ||
private def scanNullIntolerantField(expr: Expression): Seq[Expression] = expr match { | ||
case ev: ExtractValue => Seq(ev) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this improvement, can we do it in a separate PR? The corresponding unit test case are needed in InferFiltersFromConstraintsSuite
instead of ParquetSchemaPruningSuite
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree adding more direct and independent test coverage for this change is a good idea. However, omitting this change will weaken the capabilities of this PR. It would also imply the removal of the failing test case in ParquetSchemaPruningSuite
, which would imply two follow on PRs. The first would be to add this specific change plus the right test coverage. The next would be to restore the test case removed from ParquetSchemaPruningSuite
.
Let me suggest an alternative. As this change is a valuable enhancement for this PR, let me try adding an appropriate test case in InferFiltersFromConstraintsSuite
as part of this PR. That will eliminate the requirement for two more follow-on PRs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because of this change, we also need to change the code in basicPhysicalOperators.scala. I do not think this is the right solution. More importantly, the changes in basicPhysicalOperators.scala might break the others. We need a separate PR for these changes. Please remove the changes made in basicPhysicalOperators.scala and QueryPlanConstraints.scala
* An optional mix-in for columnar [[FileFormat]]s. This trait provides some helpful metadata when | ||
* debugging a physical query plan. | ||
*/ | ||
private[sql] trait ColumnarFileFormat { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we do this in a separate PR? No need to block this PR due to the discussion about this implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm okay with adding this as a separate PR as requested, but I wouldn't want to ship this PR in a release without being able to see the physical column count associated with a query in a query plan. That is invaluable functionality in validating that physical column pruning is occurring as expected.
.getActiveSession | ||
.map { sparkSession => | ||
val columnCount = columnar.columnCountForSchema(sparkSession, requiredSchema) | ||
withOptPartitionCount + ("ColumnCount" -> columnCount.toString) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be in a separate PR as I suggested above. BTW, we could easily lose this metadata if this change does not have a test case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replied above.
@@ -879,6 +879,15 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext | |||
} | |||
} | |||
} | |||
|
|||
test("select function over nested data") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without this PR, this test still can pass, right?
Could you submit a separate PR for these test coverage improvement? We really welcome the test coverage improvement PRs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did some forensics to understand the origin and reason for this test. It was part of the first commit of my original PR almost two years ago. Even then, this test passes without the rest of the commit. So I can't say why I added this test except perhaps that I felt it was useful code coverage.
In any case, if you don't think it's a valuable contribution in this PR I'd rather just remove it entirely.
assertResult(None)(unapplySelect("col2")) | ||
} | ||
|
||
test("col2.field2, col2.field2[0] as foo") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you change the test case names based on the goal of these tests? The reviewers and the future coders/readers can easily find out whether these tests cover all the data types and scenarios.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll work on making these test names more readable.
* When [[expr]] is an [[Attribute]], construct a field around it and return the | ||
* attribute as the second component of the returned tuple. | ||
*/ | ||
private def getFields(expr: Expression): Seq[(StructField, Option[Attribute])] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Define a case class and return a sequence of a class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do.
} | ||
|
||
val nonDataPartitionColumnNames = | ||
partitionSchema.map(_.name).filterNot(dataSchemaFieldNames.contains).toSet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have any test case for covering this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean a test case in which the table under test has partition columns as part of its data schema? I don't think so, however I will check and work on adding any missing test coverage. LMK if this is not what you're asking about.
9e301b3
to
de06e81
Compare
@gatorsmile I've addressed many of your points in today's commits. Can you please take a look at what I've done so far? I'm still working on the PRs you requested. |
Test build #91441 has finished for PR 21320 at commit
|
Test build #91442 has finished for PR 21320 at commit
|
Test build #91443 has finished for PR 21320 at commit
|
.reduceLeft(_ merge _) | ||
val dataSchemaFieldNames = dataSchema.fieldNames.toSet | ||
val prunedDataSchema = | ||
StructType(prunedSchema.filter(f => dataSchemaFieldNames.contains(f.name))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check also filters the fields have same name, but in different case, other than the schema.
Hi @mallman, Thanks for this PR. It has huge impact on performance, when querying the nested parquet schema. I had used the original PR#16578 and found an issue, that it does not works well when the query has column names in different case. |
Hi @jainaks. I can see why your query would not work. In the example you provide, if you refer to the column as |
0351094
to
8ead76e
Compare
Test build #91678 has finished for PR 21320 at commit
|
Test build #91679 has finished for PR 21320 at commit
|
Test build #91684 has finished for PR 21320 at commit
|
@mallman It does work fine with "name.First". |
@mallman Sorry for the delay. Super busy during the Spark summit. Will continue the code review in the next few days. |
Hi @mallman , |
Hi @jainaks. Thanks for your report. Do you have the same problem running your test with this PR? |
@gatorsmile The last couple of build test failures appear to be entirely unrelated to this PR. The error message in the one failed test reads |
@jainaks What is the value of the Spark SQL configuration setting |
@mallman "spark.sql.caseSensitive" value is not being set and is left to its default value as "false". We are querying the parquet file from a dataframe.
@mallman Yes, the issue with window functions is reproducible even with this PR. |
.booleanConf | ||
.createWithDefault(true) | ||
.createWithDefault(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about enabling it as default? there should be enough time to find any unexpected problems with 2.4.0
additionally nested column pruning would be enabled during all other automatic tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Can you attach a (small) parquet file I can use to test this scenario? |
7f67ec0
to
a255bcb
Compare
Test build #92070 has finished for PR 21320 at commit
|
@gatorsmile The last build was killed by SIGKILL. Can you start a new build, please? |
Retest this please. |
Test build #92191 has finished for PR 21320 at commit
|
@@ -31,6 +32,7 @@ import org.apache.spark.sql.types.StructType | |||
class ParquetSchemaPruningSuite | |||
extends QueryTest | |||
with ParquetTest | |||
with SchemaPruningTest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this is removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my local, all of them still can pass. Am I wrong?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nvm. This commit just adds it back
LGTM, as I explained above. #21320 (comment) Thanks for your patience and great work! @mallman Sorry, it takes two years to merge the code. |
Seems fine to me too for the similar reasons. |
Test build #95185 has finished for PR 21320 at commit
|
Thanks! Merged to master. BTW, we can keep thinking whether there are other better solutions for nested column pruning. Also cc @dongjoon-hyun If you are interested in the work for supporting ORC nested column pruning. |
@mallman, wanted to say huge thanks for your work! this is great step forward. |
Sure, @gatorsmile ! |
Thanks everyone for your contributions, support and patience. It's been a journey and a half, and I'm excited for the future. I will open a follow-on PR to address the current known failure scenario (see ignored test) in this patch, and we can discuss if/how we can get it into 2.4 as well. I know there are many early adopters of this patch and #16578. Bug reports will continue to be very helpful. Beyond this patch, there are many possibilities for widening the scope of schema pruning. As part of our review process, we've pared the scope of this capability to just projection. IMHO, the first limitation we should address post 2.4 is supporting pruning with query filters of nested fields ("where" clauses). Joins, aggregations and window queries would be powerful enhancements as well, bringing the scope of schema pruning to analytic queries. I believe all of the additional features VideoAmp has implemented for schema pruning are independent of the underlying column store. Future enhancements should be automagically inherited by any column store that implements functionality analogous to Thanks again. |
@mallman Glad to see this got merged in. Thanks for all of your work pushing through. I'm looking forward to the next phase. Please let me know if I can help again. I did notice that window functions don't get pushed down fully and briefly started looking into that, but don't want to duplicate any work you might be planning. |
## What changes were proposed in this pull request? Introduced by #21320 and #11744 ``` $ sbt > ++2.12.6 > project sql > compile ... [error] [warn] spark/sql/core/src/main/scala/org/apache/spark/sql/execution/ProjectionOverSchema.scala:41: match may not be exhaustive. [error] It would fail on the following inputs: (_, ArrayType(_, _)), (_, _) [error] [warn] getProjection(a.child).map(p => (p, p.dataType)).map { [error] [warn] [error] [warn] spark/sql/core/src/main/scala/org/apache/spark/sql/execution/ProjectionOverSchema.scala:52: match may not be exhaustive. [error] It would fail on the following input: (_, _) [error] [warn] getProjection(child).map(p => (p, p.dataType)).map { [error] [warn] ... ``` And ``` $ sbt > ++2.12.6 > project hive > testOnly *ParquetMetastoreSuite ... [error] /Users/rendong/wdi/spark/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala:22: object tools is not a member of package scala [error] import scala.tools.nsc.Properties [error] ^ [error] /Users/rendong/wdi/spark/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala:146: not found: value Properties [error] val version = Properties.versionNumberString match { [error] ^ [error] two errors found ... ``` ## How was this patch tested? Existing tests. Closes #22260 from sadhen/fix_exhaustive_match. Authored-by: 忍冬 <rendong@wacai.com> Signed-off-by: hyukjinkwon <gurwls223@apache.org>
## What changes were proposed in this pull request? Introduced by apache#21320 and apache#11744 ``` $ sbt > ++2.12.6 > project sql > compile ... [error] [warn] spark/sql/core/src/main/scala/org/apache/spark/sql/execution/ProjectionOverSchema.scala:41: match may not be exhaustive. [error] It would fail on the following inputs: (_, ArrayType(_, _)), (_, _) [error] [warn] getProjection(a.child).map(p => (p, p.dataType)).map { [error] [warn] [error] [warn] spark/sql/core/src/main/scala/org/apache/spark/sql/execution/ProjectionOverSchema.scala:52: match may not be exhaustive. [error] It would fail on the following input: (_, _) [error] [warn] getProjection(child).map(p => (p, p.dataType)).map { [error] [warn] ... ``` And ``` $ sbt > ++2.12.6 > project hive > testOnly *ParquetMetastoreSuite ... [error] /Users/rendong/wdi/spark/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala:22: object tools is not a member of package scala [error] import scala.tools.nsc.Properties [error] ^ [error] /Users/rendong/wdi/spark/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala:146: not found: value Properties [error] val version = Properties.versionNumberString match { [error] ^ [error] two errors found ... ``` ## How was this patch tested? Existing tests. Closes apache#22260 from sadhen/fix_exhaustive_match. Authored-by: 忍冬 <rendong@wacai.com> Signed-off-by: hyukjinkwon <gurwls223@apache.org>
Thanks @mallman for schema pruning work which will be a big win in our pattern of accessing our data. I'm testing this new feature, and find For example, val q1 = sql("select name.first from contacts")
val q2 = sql("select name.first from contacts where name.first = 'David'")
q1.explain(true)
q2.explain(true) The physical plan of
But the physical plan of
I understand that predicate push-down on the nested column is not implemented yet, and with schema pruning and Thanks. cc @beettlle |
A PR fixing the issue I mentioned above is provided by Liang-Chi Hsieh. Thank you for the quick and clean solution. |
…ecessary root fields ## What changes were proposed in this pull request? Schema pruning doesn't work if nested column is used in where clause. For example, ``` sql("select name.first from contacts where name.first = 'David'") == Physical Plan == *(1) Project [name#19.first AS first#40] +- *(1) Filter (isnotnull(name#19) && (name#19.first = David)) +- *(1) FileScan parquet [name#19] Batched: false, Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(name)], ReadSchema: struct<name:struct<first:string,middle:string,last:string>> ``` In above query plan, the scan node reads the entire schema of `name` column. This issue is reported by: #21320 (comment) The cause is that we infer a root field from expression `IsNotNull(name)`. However, for such expression, we don't really use the nested fields of this root field, so we can ignore the unnecessary nested fields. ## How was this patch tested? Unit tests. Closes #22357 from viirya/SPARK-25363. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: DB Tsai <d_tsai@apple.com> (cherry picked from commit 3030b82) Signed-off-by: DB Tsai <d_tsai@apple.com>
…ecessary root fields ## What changes were proposed in this pull request? Schema pruning doesn't work if nested column is used in where clause. For example, ``` sql("select name.first from contacts where name.first = 'David'") == Physical Plan == *(1) Project [name#19.first AS first#40] +- *(1) Filter (isnotnull(name#19) && (name#19.first = David)) +- *(1) FileScan parquet [name#19] Batched: false, Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(name)], ReadSchema: struct<name:struct<first:string,middle:string,last:string>> ``` In above query plan, the scan node reads the entire schema of `name` column. This issue is reported by: #21320 (comment) The cause is that we infer a root field from expression `IsNotNull(name)`. However, for such expression, we don't really use the nested fields of this root field, so we can ignore the unnecessary nested fields. ## How was this patch tested? Unit tests. Closes #22357 from viirya/SPARK-25363. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: DB Tsai <d_tsai@apple.com>
…ecessary root fields ## What changes were proposed in this pull request? Schema pruning doesn't work if nested column is used in where clause. For example, ``` sql("select name.first from contacts where name.first = 'David'") == Physical Plan == *(1) Project [name#19.first AS first#40] +- *(1) Filter (isnotnull(name#19) && (name#19.first = David)) +- *(1) FileScan parquet [name#19] Batched: false, Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(name)], ReadSchema: struct<name:struct<first:string,middle:string,last:string>> ``` In above query plan, the scan node reads the entire schema of `name` column. This issue is reported by: apache#21320 (comment) The cause is that we infer a root field from expression `IsNotNull(name)`. However, for such expression, we don't really use the nested fields of this root field, so we can ignore the unnecessary nested fields. ## How was this patch tested? Unit tests. Closes apache#22357 from viirya/SPARK-25363. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: DB Tsai <d_tsai@apple.com>
Hi @Gauravshah. That branch has diverged substantially from what’s in master. Right now I’m preparing a PR to address a problem with the current implementation in master, but I’m on holiday for a while. Still, I am hopeful we will see schema pruning for joins and aggregations in 3.0. |
backported to 2.3.2 just in case somebody needs it. https://github.com/Gauravshah/spark/tree/branch-2.3_SPARK-4502 Thanks @mallman |
cc @viirya https://issues.apache.org/jira/browse/SPARK-25879 If we select a nested field and a top level field, the schema pruning will fail. Here is the reproducible test, testSchemaPruning("select a single complex field and a top level field") {
val query = sql("select * from contacts")
.select("name.middle", "address")
query.explain(true)
query.printSchema()
query.show()
checkScan(query, "struct<name:struct<middle:string>,address:string>")
} and the exception is
|
Hi @dbtsai. I believe the problem you're seeing here is resolved by #22880 (https://issues.apache.org/jira/browse/SPARK-25407). It was a known problem at the time this PR was merged, but was pushed back to a future commit. Coincidentally, I just posted #22880 today. The test case you provide is very similar to the test case introduced and exercised in that PR. I manually ran your test case on that branch locally, and the test passed. Would you like to try that branch and comment? Cheers. |
One question on non-deterministic expressions. For example, let's consider a non-deterministic UDF.
As it is today, there will be no schema pruning due to the way how
To me, it seems valid to apply schema prunining in this case. What do you think? |
(Link to Jira: https://issues.apache.org/jira/browse/SPARK-4502) _N.B. This is a restart of PR apache#16578 which includes a subset of that code. Relevant review comments from that PR should be considered incorporated by reference. Please avoid duplication in review by reviewing that PR first. The summary below is an edited copy of the summary of the previous PR._ One of the hallmarks of a column-oriented data storage format is the ability to read data from a subset of columns, efficiently skipping reads from other columns. Spark has long had support for pruning unneeded top-level schema fields from the scan of a parquet file. For example, consider a table, `contacts`, backed by parquet with the following Spark SQL schema: ``` root |-- name: struct | |-- first: string | |-- last: string |-- address: string ``` Parquet stores this table's data in three physical columns: `name.first`, `name.last` and `address`. To answer the query ```SQL select address from contacts ``` Spark will read only from the `address` column of parquet data. However, to answer the query ```SQL select name.first from contacts ``` Spark will read `name.first` and `name.last` from parquet. This PR modifies Spark SQL to support a finer-grain of schema pruning. With this patch, Spark reads only the `name.first` column to answer the previous query. There are two main components of this patch. First, there is a `ParquetSchemaPruning` optimizer rule for gathering the required schema fields of a `PhysicalOperation` over a parquet file, constructing a new schema based on those required fields and rewriting the plan in terms of that pruned schema. The pruned schema fields are pushed down to the parquet requested read schema. `ParquetSchemaPruning` uses a new `ProjectionOverSchema` extractor for rewriting a catalyst expression in terms of a pruned schema. Second, the `ParquetRowConverter` has been patched to ensure the ordinals of the parquet columns read are correct for the pruned schema. `ParquetReadSupport` has been patched to address a compatibility mismatch between Spark's built in vectorized reader and the parquet-mr library's reader. Among the complex Spark SQL data types, this patch supports parquet column pruning of nested sequences of struct fields only. Care has been taken to ensure correctness and prevent regressions. A more advanced version of this patch incorporating optimizations for rewriting queries involving aggregations and joins has been running on a production Spark cluster at VideoAmp for several years. In that time, one bug was found and fixed early on, and we added a regression test for that bug. We forward-ported this patch to Spark master in June 2016 and have been running this patch against Spark 2.x branches on ad-hoc clusters since then. Closes apache#21320 from mallman/spark-4502-parquet_column_pruning-foundation. Lead-authored-by: Michael Allman <msa@allman.ms> Co-authored-by: Adam Jacques <adam@technowizardry.net> Co-authored-by: Michael Allman <michael@videoamp.com> Signed-off-by: Xiao Li <gatorsmile@gmail.com> Ref: LIHADOOP-48531
…ecessary root fields Schema pruning doesn't work if nested column is used in where clause. For example, ``` sql("select name.first from contacts where name.first = 'David'") == Physical Plan == *(1) Project [name#19.first AS first#40] +- *(1) Filter (isnotnull(name#19) && (name#19.first = David)) +- *(1) FileScan parquet [name#19] Batched: false, Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(name)], ReadSchema: struct<name:struct<first:string,middle:string,last:string>> ``` In above query plan, the scan node reads the entire schema of `name` column. This issue is reported by: apache#21320 (comment) The cause is that we infer a root field from expression `IsNotNull(name)`. However, for such expression, we don't really use the nested fields of this root field, so we can ignore the unnecessary nested fields. Unit tests. Closes apache#22357 from viirya/SPARK-25363. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: DB Tsai <d_tsai@apple.com> (cherry picked from commit 3030b82) Signed-off-by: DB Tsai <d_tsai@apple.com> Ref: LIHADOOP-48531
(Link to Jira: https://issues.apache.org/jira/browse/SPARK-4502)
N.B. This is a restart of PR #16578 which includes a subset of that code. Relevant review comments from that PR should be considered incorporated by reference. Please avoid duplication in review by reviewing that PR first. The summary below is an edited copy of the summary of the previous PR.
What changes were proposed in this pull request?
One of the hallmarks of a column-oriented data storage format is the ability to read data from a subset of columns, efficiently skipping reads from other columns. Spark has long had support for pruning unneeded top-level schema fields from the scan of a parquet file. For example, consider a table,
contacts
, backed by parquet with the following Spark SQL schema:Parquet stores this table's data in three physical columns:
name.first
,name.last
andaddress
. To answer the querySpark will read only from the
address
column of parquet data. However, to answer the querySpark will read
name.first
andname.last
from parquet.This PR modifies Spark SQL to support a finer-grain of schema pruning. With this patch, Spark reads only the
name.first
column to answer the previous query.Implementation
There are two main components of this patch. First, there is a
ParquetSchemaPruning
optimizer rule for gathering the required schema fields of aPhysicalOperation
over a parquet file, constructing a new schema based on those required fields and rewriting the plan in terms of that pruned schema. The pruned schema fields are pushed down to the parquet requested read schema.ParquetSchemaPruning
uses a newProjectionOverSchema
extractor for rewriting a catalyst expression in terms of a pruned schema.Second, the
ParquetRowConverter
has been patched to ensure the ordinals of the parquet columns read are correct for the pruned schema.ParquetReadSupport
has been patched to address a compatibility mismatch between Spark's built in vectorized reader and the parquet-mr library's reader.Limitation
Among the complex Spark SQL data types, this patch supports parquet column pruning of nested sequences of struct fields only.
How was this patch tested?
Care has been taken to ensure correctness and prevent regressions. A more advanced version of this patch incorporating optimizations for rewriting queries involving aggregations and joins has been running on a production Spark cluster at VideoAmp for several years. In that time, one bug was found and fixed early on, and we added a regression test for that bug.
We forward-ported this patch to Spark master in June 2016 and have been running this patch against Spark 2.x branches on ad-hoc clusters since then.