Skip to content

[SPARK-4502][SQL] Parquet nested column pruning #16578

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

mallman
Copy link
Contributor

@mallman mallman commented Jan 13, 2017

(Link to Jira: https://issues.apache.org/jira/browse/SPARK-4502)

What changes were proposed in this pull request?

One of the hallmarks of a column-oriented data storage format is the ability to read data from a subset of columns, efficiently skipping reads from other columns. Spark has long had support for pruning unneeded top-level schema fields from the scan of a parquet file. For example, consider a table, contacts, backed by parquet with the following Spark SQL schema:

root
 |-- name: struct
 |    |-- first: string
 |    |-- last: string
 |-- address: string

Parquet stores this table's data in three physical columns: name.first, name.last and address. To answer the query

select address from contacts

Spark will read only from the address column of parquet data. However, to answer the query

select name.first from contacts

Spark will read name.first and name.last from parquet.

This PR modifies Spark SQL to support a finer-grain of schema pruning. With this patch, Spark reads only the name.first column to answer the previous query.

Implementation

There are three main components of this patch. First, there is a ParquetSchemaPruning optimizer rule for gathering the required schema fields of a PhysicalOperation over a parquet file, constructing a new schema based on those required fields and rewriting the plan in terms of that pruned schema. The pruned schema fields are pushed down to the parquet requested read schema. ParquetSchemaPruning uses a new ProjectionOverSchema extractor for rewriting a catalyst expression in terms of a pruned schema.

Second, the ParquetRowConverter has been patched to ensure the ordinals of the parquet columns read are correct for the pruned schema. ParquetReadSupport has been patched to address a compatibility mismatch between Spark's built in vectorized reader and the parquet-mr library's reader.

Third, we introduce two new catalyst query transformations, AggregateFieldExtractionPushdown and JoinFieldExtractionPushdown, to support schema pruning in aggregation and join query plans. These rules extract field references in aggregations and joins respectively, push down aliases to those references and replace them with references to the pushed down aliases. They use a new SelectedField extractor that transforms a catalyst complex type extractor (the "selected field") into a corresponding StructField.

Performance

The performance difference in executing queries with this patch compared to master is related to the depth of the table schema and the query itself. At VideoAmp, one of our biggest tables stores OpenRTB bid requests we receive from our exchange partners. Our bid request table's schema closely follows the OpenRTB bid request object schema. Additionally, when we bid we save our response along with the request in the same table. We store these two objects as two top-level fields in our table. Therefore, all bid request and response data are contained within nested fields.

For the purposes of measuring the performance impact of this patch, we ran some queries on our bid request table with the un-patched and patched master. We measured query execution time and the amount of data read from the underlying parquet files. I'll focus on a couple of benchmarks. (All benchmarks were run on an AWS EC2 cluster with four c3.8xl workers.) The first query I'll highlight is

select count(request.device.ip) from event.bid_request where ds=20161128 and h=0

(Hopefully it's obvious what this query means.) On the un-patched master, this query ran in 2.7 minutes and read 34.3 GB of data. On the patched master, this query ran in 4 seconds and read 987.3 MB of data.

We also ran a reporting-oriented query benchmark. I won't reproduce the query here, but it reads a larger subset of the bid request fields and joins against another table with a deeply nested schema. In addition to a join, we perform several aggregations in this query. On the un-patched master, this query ran in 3.4 minutes and read 34.6 GB of data. On the patched master, this query ran in 59 seconds and read 2.6 GB of data.

Limitation

Among the complex Spark SQL data types, this patch supports parquet column pruning of nested sequences of struct fields only.

How was this patch tested?

Care has been taken to ensure correctness and prevent regressions. This patch introduces over two dozen new unit tests and has been running on a production Spark 1.5 cluster at VideoAmp for about a year. In that time, one bug was found and fixed early on, and we added a regression test for that bug.

We forward-ported this patch to Spark master in June 2016 and have been running this patch against Spark 2.0 and 2.1 branches on ad-hoc clusters since then.

val originalQuery =
testRelation
.select('a)
.groupBy('a getField "a1")('a getField "a1" as 'a1, Count('*))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replace it by Count("*")?

@mallman
Copy link
Contributor Author

mallman commented Jan 13, 2017

cc @rxin @ericl @cloud-fan

@marmbrus I would love to get your feedback on this if you have the time.

@SparkQA
Copy link

SparkQA commented Jan 13, 2017

Test build #71339 has finished for PR 16578 at commit 8925372.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • abstract class FieldExtractionPushdown extends Rule[LogicalPlan]
  • case class ProjectionOverSchema(schema: StructType)

@SparkQA
Copy link

SparkQA commented Jan 13, 2017

Test build #71343 has finished for PR 16578 at commit efc4d76.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@lw-lin
Copy link
Contributor

lw-lin commented Jan 15, 2017

Maybe we also want to get feedback from @liancheng ?

@HyukjinKwon
Copy link
Member

Does this take over #14957? If so, we might need Closes #14957 in the PR description for the merge script to close that one or let the author know this takes over that.

@mallman
Copy link
Contributor Author

mallman commented Jan 16, 2017

Does this take over #14957? If so, we might need Closes #14957 in the PR description for the merge script to close that one or let the author know this takes over that.

I don't know. @xuanyuanking, how do you feel about this?

@xuanyuanking
Copy link
Member

@mallman Thanks for let me know. I'll try your patch and check #14957 take over or not.
I also think we need getting feedback from @liancheng , from our last discussion, liancheng may do some work based on the old PR.

* This is in contrast to the [[GetStructField]] case class extractor which returns the field
* ordinal instead of the field itself.
*/
private[planning] object GetStructField2 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a better name for this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or combine it with GetStructField case class extractor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by combining it with the existing case class extractor?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, nvm, I thought GetStructField is another Scala extractor. Actually it is a case class.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But we can have a better name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I think the best name in this context is GetStructField, but that's already taken. I'll keep thinking about a good alternative.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about GetStructFieldObject? Or GetStructFieldRef?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetStructFieldObject or GetStructFieldExtractor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's go with GetStructFieldObject.

}

/**
* Converts some chain of complex type extractors into a [[StructField]].
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is better to add few example input and output in the comment.

// return [[op]]
if (requestedFields.exists { case (_, optAtt) => optAtt.isEmpty }) {
val prunedSchema = requestedFields
.map { case (field, _) => field }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those two map can be done one map.

// If the original Parquet relation data fields are different from the
// pruned data fields, continue. Otherwise, return [[op]]
if (parquetDataFields != prunedDataFields) {
val dataSchemaFieldNames = hadoopFsRelation.dataSchema.fieldNames
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think dataSchemaFieldNames is just parquetDataColumnNames above?

if (parquetDataFields != prunedDataFields) {
val dataSchemaFieldNames = hadoopFsRelation.dataSchema.fieldNames
val newDataSchema =
StructType(prunedSchema.filter(f => dataSchemaFieldNames.contains(f.name)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is newDataSchema actually prunedDataSchema, if dataSchemaFieldNames is parquetDataColumnNames?


logDebug("New projects:\n" + newProjects.map(_.treeString).mkString("\n"))

require(prunedDataSchema == prunedParquetRelation.dataSchema,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we break this require? I think it is not.

val childField = fieldOpt.map(field => StructField(name, ArrayType(
StructType(Array(field)), arrayNullable), fieldNullable)).getOrElse(field)
selectField(child, Some(childField))
case GetArrayStructFields(child,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have a chain of GetArrayStructFields, looks like this will produce wrong result.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, if we add a "field5" like the following to the schema in SelectedFieldSuite.

StructField("field5", ArrayType(StructType(
  StructField("subfield1", StructType(
    StructField("subsubfield1", IntegerType) ::
    StructField("subsubfield2", IntegerType) :: Nil)) ::
  StructField("subfield2", StructType(
    StructField("subsubfield1", IntegerType) ::
    StructField("subsubfield2", IntegerType) :: Nil)) :: Nil)), false)

The extractor like col2.field5.subfield1.subsubfield1 will get:

StructField(col2,
  StructType(
    StructField(field5,
      ArrayType(
        ArrayType(
          StructType(
            StructField(subfield1,
              StructType(
                StructField(subsubfield1,IntegerType,true))
            ,true)
          ),
        true),
      true),
    false)
  ),
true)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe I have a fix for this, but I probably won't be able to post a new commit until early next week—I'm working on a proposal for the Spark Summit RFP.

Cheers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've spent some time this week developing a few different solutions to this problem, however none of them are very easy to understand or verify. I'm going to spend some more time working on a simpler solution before posting something back.

@SparkQA
Copy link

SparkQA commented Feb 14, 2017

Test build #72836 has finished for PR 16578 at commit 948b584.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mallman
Copy link
Contributor Author

mallman commented Feb 14, 2017

@viirya I've added a commit to address some of your feedback. I will have another commit to address the others, but I'm not sure when I'll have it in. Hopefully by the end of next week.

@mallman
Copy link
Contributor Author

mallman commented Mar 16, 2017

@viirya A month has gone by since my last update. I've added much more comprehensive coverage to the SelectedFieldSuite, however I haven't yet fixed the SelectedField extractor to pass all of the tests. All of the failures are related to handling path expressions including GetArrayStructFields extractors. There are many complicated cases, and they are proving quite a challenge to resolve comprehensively.

I hope to spend some more time on this by the end of next week. I would love to push an update by then. After next week I will be away for two weeks.

Cheers.

@mallman
Copy link
Contributor Author

mallman commented Mar 24, 2017

I'm still working actively on this PR (as I have time), but I wanted to share that I will be away and unavailable from tonight, March 24th until Tuesday, April 11th. If you post a comment to this PR during that timeframe I'll respond as soon as I can after I return.

Cheers.

@mallman mallman force-pushed the spark-4502-parquet_column_pruning branch from 948b584 to b20da5c Compare March 24, 2017 21:00
@mallman
Copy link
Contributor Author

mallman commented Mar 24, 2017

Rebased to latest master.

@SparkQA
Copy link

SparkQA commented Mar 24, 2017

Test build #75178 has finished for PR 16578 at commit b20da5c.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 24, 2017

Test build #75181 has finished for PR 16578 at commit 30bc7e9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@Gauravshah
Copy link
Contributor

can I do something to help this pull request ?

@mallman
Copy link
Contributor Author

mallman commented Apr 12, 2017

can I do something to help this pull request?

Hi @Gauravshah. Thanks for asking. Right now I need to fix a broken piece of the code, or reimplement it. At the moment this is something I'm dedicating myself to. Once I have that completed, a code review would be very helpful.

@saulshanabrook
Copy link

@mallman Would it be helpful for me to rebase this off the head? I am interested in helping with this too.

@mallman
Copy link
Contributor Author

mallman commented May 24, 2017

@saulshanabrook I'm sorry this effort has stalled. I actually have a lot of new code for this PR in my own clone, including a lot of new tests, but I haven't convinced myself that the code is actually correct and complete. I actually wrote a lot of new code to satisfy failing test cases that I found. I don't know that there aren't more test cases that would fail that I simply haven't uncovered.

I will take some time right now to re-examine my current code-to-date. If I push something tonight, it's likely to be something I have not established full confidence in.

@mallman mallman force-pushed the spark-4502-parquet_column_pruning branch from 30bc7e9 to 9f2f340 Compare May 25, 2017 02:21
@mallman
Copy link
Contributor Author

mallman commented May 25, 2017

@saulshanabrook I've pushed my latest work (rebased off the latest master). I'm not sure it's quite complete, but feel free to review and comment. FYI, the one piece of code that's given me the most trouble is the SelectedField extractor. It's passing all of the test cases I've thrown at it, but I can't quite convince myself that it's complete/correct. If you have some ideas for how to simplify it to make it easier to understand I welcome them.

If you'd like to submit actual changes to the code in this PR, please submit a PR against our https://github.com/VideoAmp/spark-public/tree/spark-4502-parquet_column_pruning branch, and I will review it there.

Thanks for your help.

Michael Allman and others added 18 commits April 25, 2018 01:10
ParquetSchemaPruning.scala when deciding whether to perform schema
pruning
ParquetSchemaPruningJoinSuite.scala to match revised pruning behavior
more idiomatic data. Also, add coverage to test for appropriate
application of isnotnull, parquet-mr read support compatibility and the
absence of schema pruning when the requested schema and the file schema
differ only in the order of their columns
leaf fields in a catalyst data type. Use a method which counts the leaf
fields directly
AggregateFieldExtractionPushdown and JoinFieldExtractionPushdown
optimizations if there are no aliases
SQL config flag that Parquet is currently the only supported data
source. Also, set default value of this flag to "false" to bias toward
correctness
@mallman mallman force-pushed the spark-4502-parquet_column_pruning branch from 27737a0 to dd4f2d8 Compare April 24, 2018 17:34
@SparkQA
Copy link

SparkQA commented Apr 24, 2018

Test build #89794 has finished for PR 16578 at commit dd4f2d8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class AggregateFieldExtractionPushdownSuite extends SchemaPruningTest
  • class JoinFieldExtractionPushdownSuite extends SchemaPruningTest

@gatorsmile
Copy link
Member

To ensure the PR and review quality, we normally avoid doing everything in a single huge PR. It would be much better if you can cut it to a few smaller PRs. Both @cloud-fan and I think separating the optimizer rules makes sense. WDYT?

@mallman
Copy link
Contributor Author

mallman commented May 5, 2018

To ensure the PR and review quality, we normally avoid doing everything in a single huge PR. It would be much better if you can cut it to a few smaller PRs.

I'll have a go at it. Of course this will rewrite most of the commits, but I assume you don't mind that.

@gatorsmile
Copy link
Member

Yeah. That is fine. Will try to review the relevant PRs ASAP. Please ping me. Thanks again!

@mallman
Copy link
Contributor Author

mallman commented May 11, 2018 via email

@mallman
Copy link
Contributor Author

mallman commented May 14, 2018

I'm closing this PR in favor of #21320. That PR deals with simple projection and filter queries only. I will submit subsequent PRs for aggregation and join queries following the acceptance of #21320.

@mallman mallman closed this May 14, 2018
asfgit pushed a commit that referenced this pull request Aug 24, 2018
(Link to Jira: https://issues.apache.org/jira/browse/SPARK-4502)

_N.B. This is a restart of PR #16578 which includes a subset of that code. Relevant review comments from that PR should be considered incorporated by reference. Please avoid duplication in review by reviewing that PR first. The summary below is an edited copy of the summary of the previous PR._

## What changes were proposed in this pull request?

One of the hallmarks of a column-oriented data storage format is the ability to read data from a subset of columns, efficiently skipping reads from other columns. Spark has long had support for pruning unneeded top-level schema fields from the scan of a parquet file. For example, consider a table, `contacts`, backed by parquet with the following Spark SQL schema:

```
root
 |-- name: struct
 |    |-- first: string
 |    |-- last: string
 |-- address: string
```

Parquet stores this table's data in three physical columns: `name.first`, `name.last` and `address`. To answer the query

```SQL
select address from contacts
```

Spark will read only from the `address` column of parquet data. However, to answer the query

```SQL
select name.first from contacts
```

Spark will read `name.first` and `name.last` from parquet.

This PR modifies Spark SQL to support a finer-grain of schema pruning. With this patch, Spark reads only the `name.first` column to answer the previous query.

### Implementation

There are two main components of this patch. First, there is a `ParquetSchemaPruning` optimizer rule for gathering the required schema fields of a `PhysicalOperation` over a parquet file, constructing a new schema based on those required fields and rewriting the plan in terms of that pruned schema. The pruned schema fields are pushed down to the parquet requested read schema. `ParquetSchemaPruning` uses a new `ProjectionOverSchema` extractor for rewriting a catalyst expression in terms of a pruned schema.

Second, the `ParquetRowConverter` has been patched to ensure the ordinals of the parquet columns read are correct for the pruned schema. `ParquetReadSupport` has been patched to address a compatibility mismatch between Spark's built in vectorized reader and the parquet-mr library's reader.

### Limitation

Among the complex Spark SQL data types, this patch supports parquet column pruning of nested sequences of struct fields only.

## How was this patch tested?

Care has been taken to ensure correctness and prevent regressions. A more advanced version of this patch incorporating optimizations for rewriting queries involving aggregations and joins has been running on a production Spark cluster at VideoAmp for several years. In that time, one bug was found and fixed early on, and we added a regression test for that bug.

We forward-ported this patch to Spark master in June 2016 and have been running this patch against Spark 2.x branches on ad-hoc clusters since then.

Closes #21320 from mallman/spark-4502-parquet_column_pruning-foundation.

Lead-authored-by: Michael Allman <msa@allman.ms>
Co-authored-by: Adam Jacques <adam@technowizardry.net>
Co-authored-by: Michael Allman <michael@videoamp.com>
Signed-off-by: Xiao Li <gatorsmile@gmail.com>
otterc pushed a commit to linkedin/spark that referenced this pull request Mar 22, 2023
(Link to Jira: https://issues.apache.org/jira/browse/SPARK-4502)

_N.B. This is a restart of PR apache#16578 which includes a subset of that code. Relevant review comments from that PR should be considered incorporated by reference. Please avoid duplication in review by reviewing that PR first. The summary below is an edited copy of the summary of the previous PR._

One of the hallmarks of a column-oriented data storage format is the ability to read data from a subset of columns, efficiently skipping reads from other columns. Spark has long had support for pruning unneeded top-level schema fields from the scan of a parquet file. For example, consider a table, `contacts`, backed by parquet with the following Spark SQL schema:

```
root
 |-- name: struct
 |    |-- first: string
 |    |-- last: string
 |-- address: string
```

Parquet stores this table's data in three physical columns: `name.first`, `name.last` and `address`. To answer the query

```SQL
select address from contacts
```

Spark will read only from the `address` column of parquet data. However, to answer the query

```SQL
select name.first from contacts
```

Spark will read `name.first` and `name.last` from parquet.

This PR modifies Spark SQL to support a finer-grain of schema pruning. With this patch, Spark reads only the `name.first` column to answer the previous query.

There are two main components of this patch. First, there is a `ParquetSchemaPruning` optimizer rule for gathering the required schema fields of a `PhysicalOperation` over a parquet file, constructing a new schema based on those required fields and rewriting the plan in terms of that pruned schema. The pruned schema fields are pushed down to the parquet requested read schema. `ParquetSchemaPruning` uses a new `ProjectionOverSchema` extractor for rewriting a catalyst expression in terms of a pruned schema.

Second, the `ParquetRowConverter` has been patched to ensure the ordinals of the parquet columns read are correct for the pruned schema. `ParquetReadSupport` has been patched to address a compatibility mismatch between Spark's built in vectorized reader and the parquet-mr library's reader.

Among the complex Spark SQL data types, this patch supports parquet column pruning of nested sequences of struct fields only.

Care has been taken to ensure correctness and prevent regressions. A more advanced version of this patch incorporating optimizations for rewriting queries involving aggregations and joins has been running on a production Spark cluster at VideoAmp for several years. In that time, one bug was found and fixed early on, and we added a regression test for that bug.

We forward-ported this patch to Spark master in June 2016 and have been running this patch against Spark 2.x branches on ad-hoc clusters since then.

Closes apache#21320 from mallman/spark-4502-parquet_column_pruning-foundation.

Lead-authored-by: Michael Allman <msa@allman.ms>
Co-authored-by: Adam Jacques <adam@technowizardry.net>
Co-authored-by: Michael Allman <michael@videoamp.com>
Signed-off-by: Xiao Li <gatorsmile@gmail.com>

Ref: LIHADOOP-48531
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.