-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-4502][SQL] Parquet nested column pruning #16578
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-4502][SQL] Parquet nested column pruning #16578
Conversation
val originalQuery = | ||
testRelation | ||
.select('a) | ||
.groupBy('a getField "a1")('a getField "a1" as 'a1, Count('*)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replace it by Count("*")
?
@marmbrus I would love to get your feedback on this if you have the time. |
Test build #71339 has finished for PR 16578 at commit
|
Test build #71343 has finished for PR 16578 at commit
|
Maybe we also want to get feedback from @liancheng ? |
Does this take over #14957? If so, we might need |
I don't know. @xuanyuanking, how do you feel about this? |
@mallman Thanks for let me know. I'll try your patch and check #14957 take over or not. |
* This is in contrast to the [[GetStructField]] case class extractor which returns the field | ||
* ordinal instead of the field itself. | ||
*/ | ||
private[planning] object GetStructField2 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have a better name for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or combine it with GetStructField
case class extractor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean by combining it with the existing case class extractor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, nvm, I thought GetStructField
is another Scala extractor. Actually it is a case class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But we can have a better name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. I think the best name in this context is GetStructField
, but that's already taken. I'll keep thinking about a good alternative.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about GetStructFieldObject
? Or GetStructFieldRef
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GetStructFieldObject
or GetStructFieldExtractor
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's go with GetStructFieldObject
.
} | ||
|
||
/** | ||
* Converts some chain of complex type extractors into a [[StructField]]. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is better to add few example input and output in the comment.
// return [[op]] | ||
if (requestedFields.exists { case (_, optAtt) => optAtt.isEmpty }) { | ||
val prunedSchema = requestedFields | ||
.map { case (field, _) => field } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those two map
can be done one map
.
// If the original Parquet relation data fields are different from the | ||
// pruned data fields, continue. Otherwise, return [[op]] | ||
if (parquetDataFields != prunedDataFields) { | ||
val dataSchemaFieldNames = hadoopFsRelation.dataSchema.fieldNames |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think dataSchemaFieldNames
is just parquetDataColumnNames
above?
if (parquetDataFields != prunedDataFields) { | ||
val dataSchemaFieldNames = hadoopFsRelation.dataSchema.fieldNames | ||
val newDataSchema = | ||
StructType(prunedSchema.filter(f => dataSchemaFieldNames.contains(f.name))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is newDataSchema
actually prunedDataSchema
, if dataSchemaFieldNames
is parquetDataColumnNames
?
|
||
logDebug("New projects:\n" + newProjects.map(_.treeString).mkString("\n")) | ||
|
||
require(prunedDataSchema == prunedParquetRelation.dataSchema, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we break this require? I think it is not.
val childField = fieldOpt.map(field => StructField(name, ArrayType( | ||
StructType(Array(field)), arrayNullable), fieldNullable)).getOrElse(field) | ||
selectField(child, Some(childField)) | ||
case GetArrayStructFields(child, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we have a chain of GetArrayStructFields
, looks like this will produce wrong result.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example, if we add a "field5" like the following to the schema in SelectedFieldSuite
.
StructField("field5", ArrayType(StructType(
StructField("subfield1", StructType(
StructField("subsubfield1", IntegerType) ::
StructField("subsubfield2", IntegerType) :: Nil)) ::
StructField("subfield2", StructType(
StructField("subsubfield1", IntegerType) ::
StructField("subsubfield2", IntegerType) :: Nil)) :: Nil)), false)
The extractor like col2.field5.subfield1.subsubfield1
will get:
StructField(col2,
StructType(
StructField(field5,
ArrayType(
ArrayType(
StructType(
StructField(subfield1,
StructType(
StructField(subsubfield1,IntegerType,true))
,true)
),
true),
true),
false)
),
true)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe I have a fix for this, but I probably won't be able to post a new commit until early next week—I'm working on a proposal for the Spark Summit RFP.
Cheers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've spent some time this week developing a few different solutions to this problem, however none of them are very easy to understand or verify. I'm going to spend some more time working on a simpler solution before posting something back.
Test build #72836 has finished for PR 16578 at commit
|
@viirya I've added a commit to address some of your feedback. I will have another commit to address the others, but I'm not sure when I'll have it in. Hopefully by the end of next week. |
@viirya A month has gone by since my last update. I've added much more comprehensive coverage to the I hope to spend some more time on this by the end of next week. I would love to push an update by then. After next week I will be away for two weeks. Cheers. |
I'm still working actively on this PR (as I have time), but I wanted to share that I will be away and unavailable from tonight, March 24th until Tuesday, April 11th. If you post a comment to this PR during that timeframe I'll respond as soon as I can after I return. Cheers. |
948b584
to
b20da5c
Compare
Rebased to latest master. |
Test build #75178 has finished for PR 16578 at commit
|
Test build #75181 has finished for PR 16578 at commit
|
can I do something to help this pull request ? |
Hi @Gauravshah. Thanks for asking. Right now I need to fix a broken piece of the code, or reimplement it. At the moment this is something I'm dedicating myself to. Once I have that completed, a code review would be very helpful. |
@mallman Would it be helpful for me to rebase this off the head? I am interested in helping with this too. |
@saulshanabrook I'm sorry this effort has stalled. I actually have a lot of new code for this PR in my own clone, including a lot of new tests, but I haven't convinced myself that the code is actually correct and complete. I actually wrote a lot of new code to satisfy failing test cases that I found. I don't know that there aren't more test cases that would fail that I simply haven't uncovered. I will take some time right now to re-examine my current code-to-date. If I push something tonight, it's likely to be something I have not established full confidence in. |
30bc7e9
to
9f2f340
Compare
@saulshanabrook I've pushed my latest work (rebased off the latest master). I'm not sure it's quite complete, but feel free to review and comment. FYI, the one piece of code that's given me the most trouble is the If you'd like to submit actual changes to the code in this PR, please submit a PR against our https://github.com/VideoAmp/spark-public/tree/spark-4502-parquet_column_pruning branch, and I will review it there. Thanks for your help. |
constraints on complex type extractors
ParquetSchemaPruning.scala when deciding whether to perform schema pruning
ParquetSchemaPruningJoinSuite.scala to match revised pruning behavior
more idiomatic data. Also, add coverage to test for appropriate application of isnotnull, parquet-mr read support compatibility and the absence of schema pruning when the requested schema and the file schema differ only in the order of their columns
leaf fields in a catalyst data type. Use a method which counts the leaf fields directly
AggregateFieldExtractionPushdown and JoinFieldExtractionPushdown optimizations if there are no aliases
spark.sql.nestedSchemaPruning.enabled
SQL config flag that Parquet is currently the only supported data source. Also, set default value of this flag to "false" to bias toward correctness
in ParquetSchemaPruning.scala
27737a0
to
dd4f2d8
Compare
Test build #89794 has finished for PR 16578 at commit
|
To ensure the PR and review quality, we normally avoid doing everything in a single huge PR. It would be much better if you can cut it to a few smaller PRs. Both @cloud-fan and I think separating the optimizer rules makes sense. WDYT? |
I'll have a go at it. Of course this will rewrite most of the commits, but I assume you don't mind that. |
Yeah. That is fine. Will try to review the relevant PRs ASAP. Please ping me. Thanks again! |
BTW I’ve been and am currently traveling with a busy itinerary. I haven’t started work on this and probably won’t get to work on it until Monday at the very earliest.
… On May 5, 2018, at 8:32 AM, Xiao Li ***@***.***> wrote:
Yeah. That is fine. Will try to review the relevant PRs ASAP. Please ping me. Thanks again!
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub, or mute the thread.
|
(Link to Jira: https://issues.apache.org/jira/browse/SPARK-4502) _N.B. This is a restart of PR #16578 which includes a subset of that code. Relevant review comments from that PR should be considered incorporated by reference. Please avoid duplication in review by reviewing that PR first. The summary below is an edited copy of the summary of the previous PR._ ## What changes were proposed in this pull request? One of the hallmarks of a column-oriented data storage format is the ability to read data from a subset of columns, efficiently skipping reads from other columns. Spark has long had support for pruning unneeded top-level schema fields from the scan of a parquet file. For example, consider a table, `contacts`, backed by parquet with the following Spark SQL schema: ``` root |-- name: struct | |-- first: string | |-- last: string |-- address: string ``` Parquet stores this table's data in three physical columns: `name.first`, `name.last` and `address`. To answer the query ```SQL select address from contacts ``` Spark will read only from the `address` column of parquet data. However, to answer the query ```SQL select name.first from contacts ``` Spark will read `name.first` and `name.last` from parquet. This PR modifies Spark SQL to support a finer-grain of schema pruning. With this patch, Spark reads only the `name.first` column to answer the previous query. ### Implementation There are two main components of this patch. First, there is a `ParquetSchemaPruning` optimizer rule for gathering the required schema fields of a `PhysicalOperation` over a parquet file, constructing a new schema based on those required fields and rewriting the plan in terms of that pruned schema. The pruned schema fields are pushed down to the parquet requested read schema. `ParquetSchemaPruning` uses a new `ProjectionOverSchema` extractor for rewriting a catalyst expression in terms of a pruned schema. Second, the `ParquetRowConverter` has been patched to ensure the ordinals of the parquet columns read are correct for the pruned schema. `ParquetReadSupport` has been patched to address a compatibility mismatch between Spark's built in vectorized reader and the parquet-mr library's reader. ### Limitation Among the complex Spark SQL data types, this patch supports parquet column pruning of nested sequences of struct fields only. ## How was this patch tested? Care has been taken to ensure correctness and prevent regressions. A more advanced version of this patch incorporating optimizations for rewriting queries involving aggregations and joins has been running on a production Spark cluster at VideoAmp for several years. In that time, one bug was found and fixed early on, and we added a regression test for that bug. We forward-ported this patch to Spark master in June 2016 and have been running this patch against Spark 2.x branches on ad-hoc clusters since then. Closes #21320 from mallman/spark-4502-parquet_column_pruning-foundation. Lead-authored-by: Michael Allman <msa@allman.ms> Co-authored-by: Adam Jacques <adam@technowizardry.net> Co-authored-by: Michael Allman <michael@videoamp.com> Signed-off-by: Xiao Li <gatorsmile@gmail.com>
(Link to Jira: https://issues.apache.org/jira/browse/SPARK-4502) _N.B. This is a restart of PR apache#16578 which includes a subset of that code. Relevant review comments from that PR should be considered incorporated by reference. Please avoid duplication in review by reviewing that PR first. The summary below is an edited copy of the summary of the previous PR._ One of the hallmarks of a column-oriented data storage format is the ability to read data from a subset of columns, efficiently skipping reads from other columns. Spark has long had support for pruning unneeded top-level schema fields from the scan of a parquet file. For example, consider a table, `contacts`, backed by parquet with the following Spark SQL schema: ``` root |-- name: struct | |-- first: string | |-- last: string |-- address: string ``` Parquet stores this table's data in three physical columns: `name.first`, `name.last` and `address`. To answer the query ```SQL select address from contacts ``` Spark will read only from the `address` column of parquet data. However, to answer the query ```SQL select name.first from contacts ``` Spark will read `name.first` and `name.last` from parquet. This PR modifies Spark SQL to support a finer-grain of schema pruning. With this patch, Spark reads only the `name.first` column to answer the previous query. There are two main components of this patch. First, there is a `ParquetSchemaPruning` optimizer rule for gathering the required schema fields of a `PhysicalOperation` over a parquet file, constructing a new schema based on those required fields and rewriting the plan in terms of that pruned schema. The pruned schema fields are pushed down to the parquet requested read schema. `ParquetSchemaPruning` uses a new `ProjectionOverSchema` extractor for rewriting a catalyst expression in terms of a pruned schema. Second, the `ParquetRowConverter` has been patched to ensure the ordinals of the parquet columns read are correct for the pruned schema. `ParquetReadSupport` has been patched to address a compatibility mismatch between Spark's built in vectorized reader and the parquet-mr library's reader. Among the complex Spark SQL data types, this patch supports parquet column pruning of nested sequences of struct fields only. Care has been taken to ensure correctness and prevent regressions. A more advanced version of this patch incorporating optimizations for rewriting queries involving aggregations and joins has been running on a production Spark cluster at VideoAmp for several years. In that time, one bug was found and fixed early on, and we added a regression test for that bug. We forward-ported this patch to Spark master in June 2016 and have been running this patch against Spark 2.x branches on ad-hoc clusters since then. Closes apache#21320 from mallman/spark-4502-parquet_column_pruning-foundation. Lead-authored-by: Michael Allman <msa@allman.ms> Co-authored-by: Adam Jacques <adam@technowizardry.net> Co-authored-by: Michael Allman <michael@videoamp.com> Signed-off-by: Xiao Li <gatorsmile@gmail.com> Ref: LIHADOOP-48531
(Link to Jira: https://issues.apache.org/jira/browse/SPARK-4502)
What changes were proposed in this pull request?
One of the hallmarks of a column-oriented data storage format is the ability to read data from a subset of columns, efficiently skipping reads from other columns. Spark has long had support for pruning unneeded top-level schema fields from the scan of a parquet file. For example, consider a table,
contacts
, backed by parquet with the following Spark SQL schema:Parquet stores this table's data in three physical columns:
name.first
,name.last
andaddress
. To answer the querySpark will read only from the
address
column of parquet data. However, to answer the querySpark will read
name.first
andname.last
from parquet.This PR modifies Spark SQL to support a finer-grain of schema pruning. With this patch, Spark reads only the
name.first
column to answer the previous query.Implementation
There are three main components of this patch. First, there is a
ParquetSchemaPruning
optimizer rule for gathering the required schema fields of aPhysicalOperation
over a parquet file, constructing a new schema based on those required fields and rewriting the plan in terms of that pruned schema. The pruned schema fields are pushed down to the parquet requested read schema.ParquetSchemaPruning
uses a newProjectionOverSchema
extractor for rewriting a catalyst expression in terms of a pruned schema.Second, the
ParquetRowConverter
has been patched to ensure the ordinals of the parquet columns read are correct for the pruned schema.ParquetReadSupport
has been patched to address a compatibility mismatch between Spark's built in vectorized reader and the parquet-mr library's reader.Third, we introduce two new catalyst query transformations,
AggregateFieldExtractionPushdown
andJoinFieldExtractionPushdown
, to support schema pruning in aggregation and join query plans. These rules extract field references in aggregations and joins respectively, push down aliases to those references and replace them with references to the pushed down aliases. They use a newSelectedField
extractor that transforms a catalyst complex type extractor (the "selected field") into a correspondingStructField
.Performance
The performance difference in executing queries with this patch compared to master is related to the depth of the table schema and the query itself. At VideoAmp, one of our biggest tables stores OpenRTB bid requests we receive from our exchange partners. Our bid request table's schema closely follows the OpenRTB bid request object schema. Additionally, when we bid we save our response along with the request in the same table. We store these two objects as two top-level fields in our table. Therefore, all bid request and response data are contained within nested fields.
For the purposes of measuring the performance impact of this patch, we ran some queries on our bid request table with the un-patched and patched master. We measured query execution time and the amount of data read from the underlying parquet files. I'll focus on a couple of benchmarks. (All benchmarks were run on an AWS EC2 cluster with four c3.8xl workers.) The first query I'll highlight is
(Hopefully it's obvious what this query means.) On the un-patched master, this query ran in 2.7 minutes and read 34.3 GB of data. On the patched master, this query ran in 4 seconds and read 987.3 MB of data.
We also ran a reporting-oriented query benchmark. I won't reproduce the query here, but it reads a larger subset of the bid request fields and joins against another table with a deeply nested schema. In addition to a join, we perform several aggregations in this query. On the un-patched master, this query ran in 3.4 minutes and read 34.6 GB of data. On the patched master, this query ran in 59 seconds and read 2.6 GB of data.
Limitation
Among the complex Spark SQL data types, this patch supports parquet column pruning of nested sequences of struct fields only.
How was this patch tested?
Care has been taken to ensure correctness and prevent regressions. This patch introduces over two dozen new unit tests and has been running on a production Spark 1.5 cluster at VideoAmp for about a year. In that time, one bug was found and fixed early on, and we added a regression test for that bug.
We forward-ported this patch to Spark master in June 2016 and have been running this patch against Spark 2.0 and 2.1 branches on ad-hoc clusters since then.