Skip to content

[SPARK-4502][SQL] Parquet nested column pruning - foundation #21320

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conversation

mallman
Copy link
Contributor

@mallman mallman commented May 14, 2018

(Link to Jira: https://issues.apache.org/jira/browse/SPARK-4502)

N.B. This is a restart of PR #16578 which includes a subset of that code. Relevant review comments from that PR should be considered incorporated by reference. Please avoid duplication in review by reviewing that PR first. The summary below is an edited copy of the summary of the previous PR.

What changes were proposed in this pull request?

One of the hallmarks of a column-oriented data storage format is the ability to read data from a subset of columns, efficiently skipping reads from other columns. Spark has long had support for pruning unneeded top-level schema fields from the scan of a parquet file. For example, consider a table, contacts, backed by parquet with the following Spark SQL schema:

root
 |-- name: struct
 |    |-- first: string
 |    |-- last: string
 |-- address: string

Parquet stores this table's data in three physical columns: name.first, name.last and address. To answer the query

select address from contacts

Spark will read only from the address column of parquet data. However, to answer the query

select name.first from contacts

Spark will read name.first and name.last from parquet.

This PR modifies Spark SQL to support a finer-grain of schema pruning. With this patch, Spark reads only the name.first column to answer the previous query.

Implementation

There are two main components of this patch. First, there is a ParquetSchemaPruning optimizer rule for gathering the required schema fields of a PhysicalOperation over a parquet file, constructing a new schema based on those required fields and rewriting the plan in terms of that pruned schema. The pruned schema fields are pushed down to the parquet requested read schema. ParquetSchemaPruning uses a new ProjectionOverSchema extractor for rewriting a catalyst expression in terms of a pruned schema.

Second, the ParquetRowConverter has been patched to ensure the ordinals of the parquet columns read are correct for the pruned schema. ParquetReadSupport has been patched to address a compatibility mismatch between Spark's built in vectorized reader and the parquet-mr library's reader.

Limitation

Among the complex Spark SQL data types, this patch supports parquet column pruning of nested sequences of struct fields only.

How was this patch tested?

Care has been taken to ensure correctness and prevent regressions. A more advanced version of this patch incorporating optimizations for rewriting queries involving aggregations and joins has been running on a production Spark cluster at VideoAmp for several years. In that time, one bug was found and fixed early on, and we added a regression test for that bug.

We forward-ported this patch to Spark master in June 2016 and have been running this patch against Spark 2.x branches on ad-hoc clusters since then.

@mallman
Copy link
Contributor Author

mallman commented May 14, 2018

@gatorsmile I believe this is the PR you requested for review.

@SparkQA
Copy link

SparkQA commented May 14, 2018

Test build #90582 has finished for PR 21320 at commit 9e301b3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class ProjectionOverSchema(schema: StructType)

.internal()
.doc("Prune nested fields from a logical relation's output which are unnecessary in " +
"satisfying a query. This optimization allows columnar file format readers to avoid " +
"reading unnecessary nested column data. Currently Parquet is the only data source that " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about ORC?

cc @dongjoon-hyun Do you know whether it is also doable in the latest ORC version?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for pinging me, @gatorsmile . Let me check it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ORC should be able to support this capability as well, but this PR does not address that.

@@ -162,7 +162,9 @@ case class FilterExec(condition: Expression, child: SparkPlan)
val generatedIsNotNullChecks = new Array[Boolean](notNullPreds.length)
val generated = otherPreds.map { c =>
val nullChecks = c.references.map { r =>
val idx = notNullPreds.indexWhere { n => n.asInstanceOf[IsNotNull].child.semanticEquals(r)}
val idx = notNullPreds.indexWhere { n =>
n.asInstanceOf[IsNotNull].child.references.contains(r)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change related?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. IIRC a community member identified a bug and @viirya contributed a fix and unit test. See VideoAmp@8b5661c.

*/
private def scanNullIntolerantAttribute(expr: Expression): Seq[Attribute] = expr match {
private def scanNullIntolerantField(expr: Expression): Seq[Expression] = expr match {
case ev: ExtractValue => Seq(ev)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this improvement, can we do it in a separate PR? The corresponding unit test case are needed in InferFiltersFromConstraintsSuite instead of ParquetSchemaPruningSuite.

Copy link
Contributor Author

@mallman mallman May 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree adding more direct and independent test coverage for this change is a good idea. However, omitting this change will weaken the capabilities of this PR. It would also imply the removal of the failing test case in ParquetSchemaPruningSuite, which would imply two follow on PRs. The first would be to add this specific change plus the right test coverage. The next would be to restore the test case removed from ParquetSchemaPruningSuite.

Let me suggest an alternative. As this change is a valuable enhancement for this PR, let me try adding an appropriate test case in InferFiltersFromConstraintsSuite as part of this PR. That will eliminate the requirement for two more follow-on PRs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because of this change, we also need to change the code in basicPhysicalOperators.scala. I do not think this is the right solution. More importantly, the changes in basicPhysicalOperators.scala might break the others. We need a separate PR for these changes. Please remove the changes made in basicPhysicalOperators.scala and QueryPlanConstraints.scala

* An optional mix-in for columnar [[FileFormat]]s. This trait provides some helpful metadata when
* debugging a physical query plan.
*/
private[sql] trait ColumnarFileFormat {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do this in a separate PR? No need to block this PR due to the discussion about this implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm okay with adding this as a separate PR as requested, but I wouldn't want to ship this PR in a release without being able to see the physical column count associated with a query in a query plan. That is invaluable functionality in validating that physical column pruning is occurring as expected.

.getActiveSession
.map { sparkSession =>
val columnCount = columnar.columnCountForSchema(sparkSession, requiredSchema)
withOptPartitionCount + ("ColumnCount" -> columnCount.toString)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be in a separate PR as I suggested above. BTW, we could easily lose this metadata if this change does not have a test case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replied above.

@@ -879,6 +879,15 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
}
}
}

test("select function over nested data") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this PR, this test still can pass, right?

Could you submit a separate PR for these test coverage improvement? We really welcome the test coverage improvement PRs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some forensics to understand the origin and reason for this test. It was part of the first commit of my original PR almost two years ago. Even then, this test passes without the rest of the commit. So I can't say why I added this test except perhaps that I felt it was useful code coverage.

In any case, if you don't think it's a valuable contribution in this PR I'd rather just remove it entirely.

assertResult(None)(unapplySelect("col2"))
}

test("col2.field2, col2.field2[0] as foo") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you change the test case names based on the goal of these tests? The reviewers and the future coders/readers can easily find out whether these tests cover all the data types and scenarios.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll work on making these test names more readable.

* When [[expr]] is an [[Attribute]], construct a field around it and return the
* attribute as the second component of the returned tuple.
*/
private def getFields(expr: Expression): Seq[(StructField, Option[Attribute])] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Define a case class and return a sequence of a class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do.

}

val nonDataPartitionColumnNames =
partitionSchema.map(_.name).filterNot(dataSchemaFieldNames.contains).toSet
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have any test case for covering this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean a test case in which the table under test has partition columns as part of its data schema? I don't think so, however I will check and work on adding any missing test coverage. LMK if this is not what you're asking about.

@mallman mallman force-pushed the spark-4502-parquet_column_pruning-foundation branch from 9e301b3 to de06e81 Compare June 4, 2018 10:00
@mallman
Copy link
Contributor Author

mallman commented Jun 4, 2018

@gatorsmile I've addressed many of your points in today's commits. Can you please take a look at what I've done so far? I'm still working on the PRs you requested.

@SparkQA
Copy link

SparkQA commented Jun 4, 2018

Test build #91441 has finished for PR 21320 at commit de06e81.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 4, 2018

Test build #91442 has finished for PR 21320 at commit a8e080f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 4, 2018

Test build #91443 has finished for PR 21320 at commit 0351094.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class ContactWithDataPartitionColumn(name: FullName, address: String, pets: Int,
  • case class BriefContactWithDataPartitionColumn(name: Name, address: String, p: Int)

.reduceLeft(_ merge _)
val dataSchemaFieldNames = dataSchema.fieldNames.toSet
val prunedDataSchema =
StructType(prunedSchema.filter(f => dataSchemaFieldNames.contains(f.name)))
Copy link

@jainaks jainaks Jun 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check also filters the fields have same name, but in different case, other than the schema.

@jainaks
Copy link

jainaks commented Jun 8, 2018

Hi @mallman, Thanks for this PR. It has huge impact on performance, when querying the nested parquet schema. I had used the original PR#16578 and found an issue, that it does not works well when the query has column names in different case.
e.g. the schema is:
root
|-- name: struct
| |-- First: string
| |-- Last: string
|-- address: string
and if i put a join query, referring the column as "NAME.first".
It throws an exception:
ERROR: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: NAME#137322
If you want, i can share the exact schema and query for debugging.
Though, i have fixed this in my local repo and get it working fine.
I have commented on the exact code line, which causes this issue.
Please let me know if you want me to share the fix.

@mallman
Copy link
Contributor Author

mallman commented Jun 12, 2018

Hi @jainaks. I can see why your query would not work. In the example you provide, if you refer to the column as name.First, does your query succeed?

@mallman mallman force-pushed the spark-4502-parquet_column_pruning-foundation branch from 0351094 to 8ead76e Compare June 12, 2018 00:10
@SparkQA
Copy link

SparkQA commented Jun 12, 2018

Test build #91678 has finished for PR 21320 at commit 8ead76e.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class ContactWithDataPartitionColumn(name: FullName, address: String, pets: Int,
  • case class BriefContactWithDataPartitionColumn(name: Name, address: String, p: Int)

@SparkQA
Copy link

SparkQA commented Jun 12, 2018

Test build #91679 has finished for PR 21320 at commit 89febc8.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 12, 2018

Test build #91684 has finished for PR 21320 at commit 7f67ec0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jainaks
Copy link

jainaks commented Jun 12, 2018

@mallman It does work fine with "name.First".

@gatorsmile
Copy link
Member

@mallman Sorry for the delay. Super busy during the Spark summit. Will continue the code review in the next few days.

@jainaks
Copy link

jainaks commented Jun 12, 2018

Hi @mallman ,
I found another major issue after having this fix.
Schema:
a: struct (nullable = true)
| |-- b: struct (nullable = true)
| | |-- c1: string (nullable = true)
| | |-- c2: string (nullable = true)
| | |-- c3: string (nullable = true)
| | |-- c4: string (nullable = true)
| | |-- c5: boolean (nullable = true)
id: struct (nullable = true)
| |-- i1: struct (nullable = true)
| | |-- i2: string (nullable = true)
timestamp: bigint
Query:
select a.b.c3 as c3,
first(a.b.c3) over (partition by id.i1.i2 order by timestamp rows between current row and unbounded following) as first_c3
from temp;
The column "first_c3" gets the value of column "c2".
It works well, if i just turn the parquetSchemaPrunning flag to false.
It may sound odd in the first look and so does for me, but this is what i am getting.
PS: I am running all my tests using #16578 pr.

@mallman
Copy link
Contributor Author

mallman commented Jun 12, 2018

Hi @jainaks. Thanks for your report. Do you have the same problem running your test with this PR?

@mallman
Copy link
Contributor Author

mallman commented Jun 12, 2018

@gatorsmile The last couple of build test failures appear to be entirely unrelated to this PR. The error message in the one failed test reads org.scalatest.exceptions.TestFailedException: Unable to download Spark 2.2.0. Please run a new test build when you get back to the review if this is still the last problem with this PR's test build.

@mallman
Copy link
Contributor Author

mallman commented Jun 12, 2018

@mallman It does work fine with "name.First".

@jainaks What is the value of the Spark SQL configuration setting spark.sql.caseSensitive when you run this query? Also, are you querying the parquet file as part of a Hive metastore table or from a dataframe loaded with the DataFrameReader.parquet method?

@jainaks
Copy link

jainaks commented Jun 13, 2018

@jainaks What is the value of the Spark SQL configuration setting spark.sql.caseSensitive when you run this query? Also, are you querying the parquet file as part of a Hive metastore table or from a dataframe loaded with the DataFrameReader.parquet method?

@mallman "spark.sql.caseSensitive" value is not being set and is left to its default value as "false". We are querying the parquet file from a dataframe.

Hi @jainaks. Thanks for your report. Do you have the same problem running your test with this PR?

@mallman Yes, the issue with window functions is reproducible even with this PR.

.booleanConf
.createWithDefault(true)
.createWithDefault(false)
Copy link

@DaimonPl DaimonPl Jun 15, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about enabling it as default? there should be enough time to find any unexpected problems with 2.4.0

additionally nested column pruning would be enabled during all other automatic tests

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@mallman
Copy link
Contributor Author

mallman commented Jun 19, 2018

@mallman Yes, the issue with window functions is reproducible even with this PR.

Can you attach a (small) parquet file I can use to test this scenario?

@mallman mallman force-pushed the spark-4502-parquet_column_pruning-foundation branch from 7f67ec0 to a255bcb Compare June 19, 2018 05:40
@SparkQA
Copy link

SparkQA commented Jun 19, 2018

Test build #92070 has finished for PR 21320 at commit a255bcb.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mallman
Copy link
Contributor Author

mallman commented Jun 19, 2018

@gatorsmile The last build was killed by SIGKILL. Can you start a new build, please?

@dongjoon-hyun
Copy link
Member

Retest this please.

@SparkQA
Copy link

SparkQA commented Jun 22, 2018

Test build #92191 has finished for PR 21320 at commit a255bcb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -31,6 +32,7 @@ import org.apache.spark.sql.types.StructType
class ParquetSchemaPruningSuite
extends QueryTest
with ParquetTest
with SchemaPruningTest
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this is removed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my local, all of them still can pass. Am I wrong?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm. This commit just adds it back

@gatorsmile
Copy link
Member

LGTM, as I explained above. #21320 (comment)

Thanks for your patience and great work! @mallman Sorry, it takes two years to merge the code.

@HyukjinKwon
Copy link
Member

Seems fine to me too for the similar reasons.

@SparkQA
Copy link

SparkQA commented Aug 24, 2018

Test build #95185 has finished for PR 21320 at commit e6baf68.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

Thanks! Merged to master.

BTW, we can keep thinking whether there are other better solutions for nested column pruning.

Also cc @dongjoon-hyun If you are interested in the work for supporting ORC nested column pruning.

@asfgit asfgit closed this in f2d3542 Aug 24, 2018
@IgorBerman
Copy link

@mallman, wanted to say huge thanks for your work! this is great step forward.

@dongjoon-hyun
Copy link
Member

Sure, @gatorsmile !

@mallman
Copy link
Contributor Author

mallman commented Aug 24, 2018

Thanks everyone for your contributions, support and patience. It's been a journey and a half, and I'm excited for the future. I will open a follow-on PR to address the current known failure scenario (see ignored test) in this patch, and we can discuss if/how we can get it into 2.4 as well.

I know there are many early adopters of this patch and #16578. Bug reports will continue to be very helpful.

Beyond this patch, there are many possibilities for widening the scope of schema pruning. As part of our review process, we've pared the scope of this capability to just projection. IMHO, the first limitation we should address post 2.4 is supporting pruning with query filters of nested fields ("where" clauses). Joins, aggregations and window queries would be powerful enhancements as well, bringing the scope of schema pruning to analytic queries.

I believe all of the additional features VideoAmp has implemented for schema pruning are independent of the underlying column store. Future enhancements should be automagically inherited by any column store that implements functionality analogous to ParquetSchemaPruning.scala. This should widen not just the audience that can be reached, but the developer community that can contribute and review.

Thanks again.

@mallman mallman deleted the spark-4502-parquet_column_pruning-foundation branch August 24, 2018 17:31
@ajacques
Copy link
Contributor

@mallman Glad to see this got merged in. Thanks for all of your work pushing through. I'm looking forward to the next phase. Please let me know if I can help again. I did notice that window functions don't get pushed down fully and briefly started looking into that, but don't want to duplicate any work you might be planning.

asfgit pushed a commit that referenced this pull request Aug 30, 2018
## What changes were proposed in this pull request?
Introduced by #21320 and #11744

```
$ sbt
> ++2.12.6
> project sql
> compile
...
[error] [warn] spark/sql/core/src/main/scala/org/apache/spark/sql/execution/ProjectionOverSchema.scala:41: match may not be exhaustive.
[error] It would fail on the following inputs: (_, ArrayType(_, _)), (_, _)
[error] [warn]         getProjection(a.child).map(p => (p, p.dataType)).map {
[error] [warn]
[error] [warn] spark/sql/core/src/main/scala/org/apache/spark/sql/execution/ProjectionOverSchema.scala:52: match may not be exhaustive.
[error] It would fail on the following input: (_, _)
[error] [warn]         getProjection(child).map(p => (p, p.dataType)).map {
[error] [warn]
...
```

And

```
$ sbt
> ++2.12.6
> project hive
> testOnly *ParquetMetastoreSuite
...
[error] /Users/rendong/wdi/spark/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala:22: object tools is not a member of package scala
[error] import scala.tools.nsc.Properties
[error]              ^
[error] /Users/rendong/wdi/spark/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala:146: not found: value Properties
[error]     val version = Properties.versionNumberString match {
[error]                   ^
[error] two errors found
...
```

## How was this patch tested?
Existing tests.

Closes #22260 from sadhen/fix_exhaustive_match.

Authored-by: 忍冬 <rendong@wacai.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
fjh100456 pushed a commit to fjh100456/spark that referenced this pull request Aug 31, 2018
## What changes were proposed in this pull request?
Introduced by apache#21320 and apache#11744

```
$ sbt
> ++2.12.6
> project sql
> compile
...
[error] [warn] spark/sql/core/src/main/scala/org/apache/spark/sql/execution/ProjectionOverSchema.scala:41: match may not be exhaustive.
[error] It would fail on the following inputs: (_, ArrayType(_, _)), (_, _)
[error] [warn]         getProjection(a.child).map(p => (p, p.dataType)).map {
[error] [warn]
[error] [warn] spark/sql/core/src/main/scala/org/apache/spark/sql/execution/ProjectionOverSchema.scala:52: match may not be exhaustive.
[error] It would fail on the following input: (_, _)
[error] [warn]         getProjection(child).map(p => (p, p.dataType)).map {
[error] [warn]
...
```

And

```
$ sbt
> ++2.12.6
> project hive
> testOnly *ParquetMetastoreSuite
...
[error] /Users/rendong/wdi/spark/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala:22: object tools is not a member of package scala
[error] import scala.tools.nsc.Properties
[error]              ^
[error] /Users/rendong/wdi/spark/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala:146: not found: value Properties
[error]     val version = Properties.versionNumberString match {
[error]                   ^
[error] two errors found
...
```

## How was this patch tested?
Existing tests.

Closes apache#22260 from sadhen/fix_exhaustive_match.

Authored-by: 忍冬 <rendong@wacai.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
@dbtsai
Copy link
Member

dbtsai commented Sep 7, 2018

Thanks @mallman for schema pruning work which will be a big win in our pattern of accessing our data.

I'm testing this new feature, and find where clause on the selected nested column can break the schema pruning resulting lower performance (not a regression compared to 2.3).

For example,

    val q1 = sql("select name.first from contacts")
    val q2 = sql("select name.first from contacts where name.first = 'David'")

    q1.explain(true)
    q2.explain(true)

The physical plan of q1 is right and what we expect with this feature,

== Physical Plan ==
*(1) Project [name#19.first AS first#36]
+- *(1) FileScan parquet [name#19] Batched: false, Format: Parquet, PartitionFilters: [], 
    PushedFilters: [], ReadSchema: struct<name:struct<first:string>>

But the physical plan of q2 will have a pushed filter on name resulting reading the entire name column,

== Physical Plan ==
*(1) Project [name#19.first AS first#40]
+- *(1) Filter (isnotnull(name#19) && (name#19.first = David))
   +- *(1) FileScan parquet [name#19] Batched: false, Format: Parquet, PartitionFilters: [], 
    PushedFilters: [IsNotNull(name)], ReadSchema: struct<name:struct<first:string,middle:string,last:string>>

I understand that predicate push-down on the nested column is not implemented yet, and with schema pruning and where clause, we should be able to only read the selected nested columns and the columns with where caluse if I understand it correctly.

Thanks.

cc @beettlle

@dbtsai
Copy link
Member

dbtsai commented Sep 7, 2018

A PR fixing the issue I mentioned above is provided by Liang-Chi Hsieh. Thank you for the quick and clean solution.

asfgit pushed a commit that referenced this pull request Sep 12, 2018
…ecessary root fields

## What changes were proposed in this pull request?

Schema pruning doesn't work if nested column is used in where clause.

For example,
```
sql("select name.first from contacts where name.first = 'David'")

== Physical Plan ==
*(1) Project [name#19.first AS first#40]
+- *(1) Filter (isnotnull(name#19) && (name#19.first = David))
   +- *(1) FileScan parquet [name#19] Batched: false, Format: Parquet, PartitionFilters: [],
    PushedFilters: [IsNotNull(name)], ReadSchema: struct<name:struct<first:string,middle:string,last:string>>
```

In above query plan, the scan node reads the entire schema of `name` column.

This issue is reported by:
#21320 (comment)

The cause is that we infer a root field from expression `IsNotNull(name)`. However, for such expression, we don't really use the nested fields of this root field, so we can ignore the unnecessary nested fields.

## How was this patch tested?

Unit tests.

Closes #22357 from viirya/SPARK-25363.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
(cherry picked from commit 3030b82)
Signed-off-by: DB Tsai <d_tsai@apple.com>
asfgit pushed a commit that referenced this pull request Sep 12, 2018
…ecessary root fields

## What changes were proposed in this pull request?

Schema pruning doesn't work if nested column is used in where clause.

For example,
```
sql("select name.first from contacts where name.first = 'David'")

== Physical Plan ==
*(1) Project [name#19.first AS first#40]
+- *(1) Filter (isnotnull(name#19) && (name#19.first = David))
   +- *(1) FileScan parquet [name#19] Batched: false, Format: Parquet, PartitionFilters: [],
    PushedFilters: [IsNotNull(name)], ReadSchema: struct<name:struct<first:string,middle:string,last:string>>
```

In above query plan, the scan node reads the entire schema of `name` column.

This issue is reported by:
#21320 (comment)

The cause is that we infer a root field from expression `IsNotNull(name)`. However, for such expression, we don't really use the nested fields of this root field, so we can ignore the unnecessary nested fields.

## How was this patch tested?

Unit tests.

Closes #22357 from viirya/SPARK-25363.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
fjh100456 pushed a commit to fjh100456/spark that referenced this pull request Sep 13, 2018
…ecessary root fields

## What changes were proposed in this pull request?

Schema pruning doesn't work if nested column is used in where clause.

For example,
```
sql("select name.first from contacts where name.first = 'David'")

== Physical Plan ==
*(1) Project [name#19.first AS first#40]
+- *(1) Filter (isnotnull(name#19) && (name#19.first = David))
   +- *(1) FileScan parquet [name#19] Batched: false, Format: Parquet, PartitionFilters: [],
    PushedFilters: [IsNotNull(name)], ReadSchema: struct<name:struct<first:string,middle:string,last:string>>
```

In above query plan, the scan node reads the entire schema of `name` column.

This issue is reported by:
apache#21320 (comment)

The cause is that we infer a root field from expression `IsNotNull(name)`. However, for such expression, we don't really use the nested fields of this root field, so we can ignore the unnecessary nested fields.

## How was this patch tested?

Unit tests.

Closes apache#22357 from viirya/SPARK-25363.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
@Gauravshah
Copy link
Contributor

@mallman any way I can help pull in rest of the changes from your original PR (#16578) for next release ?

@mallman
Copy link
Contributor Author

mallman commented Sep 26, 2018

Hi @Gauravshah. That branch has diverged substantially from what’s in master. Right now I’m preparing a PR to address a problem with the current implementation in master, but I’m on holiday for a while.

Still, I am hopeful we will see schema pruning for joins and aggregations in 3.0.

Gauravshah added a commit to Gauravshah/spark that referenced this pull request Oct 30, 2018
@Gauravshah
Copy link
Contributor

backported to 2.3.2 just in case somebody needs it. https://github.com/Gauravshah/spark/tree/branch-2.3_SPARK-4502 Thanks @mallman

@dbtsai
Copy link
Member

dbtsai commented Oct 30, 2018

cc @viirya

https://issues.apache.org/jira/browse/SPARK-25879

If we select a nested field and a top level field, the schema pruning will fail. Here is the reproducible test,

  testSchemaPruning("select a single complex field and a top level field") {
    val query = sql("select * from contacts")
      .select("name.middle", "address")
    query.explain(true)
    query.printSchema()
    query.show()
    checkScan(query, "struct<name:struct<middle:string>,address:string>")
  }

and the exception is

23:16:05.864 ERROR org.apache.spark.executor.Executor: Exception in task 1.0 in stage 3.0 (TID 6)
org.apache.spark.sql.execution.QueryExecutionException: Encounter error while reading parquet files. One possible cause: Parquet column cannot be converted in the corresponding files. Details: 
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:193)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:674)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:850)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:850)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:325)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:289)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:419)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:425)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file file:/private/var/folders/pr/4q3b9vkx36lbygjr5jhfmjcw0000gn/T/spark-a4fff68d-d51a-4c79-aa18-54cfd7f81a75/contacts/p=2/part-00000-8a4d9396-7be3-4fed-a55a-5580684ebda6-c000.snappy.parquet
	at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
	at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:207)
	at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:181)
	... 19 more
Caused by: java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
	at java.util.ArrayList.rangeCheck(ArrayList.java:657)
	at java.util.ArrayList.get(ArrayList.java:433)
	at org.apache.parquet.io.GroupColumnIO.getFirst(GroupColumnIO.java:99)
	at org.apache.parquet.io.GroupColumnIO.getFirst(GroupColumnIO.java:99)
	at org.apache.parquet.io.PrimitiveColumnIO.getFirst(PrimitiveColumnIO.java:97)
	at org.apache.parquet.io.PrimitiveColumnIO.isFirst(PrimitiveColumnIO.java:92)
	at org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:278)
	at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
	at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
	at org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
	at org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
	at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
	at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222)
	... 24 more
23:16:05.896 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 1.0 in stage 3.0 (TID 6, localhost, executor driver): org.apache.spark.sql.execution.QueryExecutionException: Encounter error while reading parquet files. One possible cause: Parquet column cannot be converted in the corresponding files. Details: 
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:193)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:674)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:850)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:850)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:325)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:289)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:419)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:425)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file file:/private/var/folders/pr/4q3b9vkx36lbygjr5jhfmjcw0000gn/T/spark-a4fff68d-d51a-4c79-aa18-54cfd7f81a75/contacts/p=2/part-00000-8a4d9396-7be3-4fed-a55a-5580684ebda6-c000.snappy.parquet
	at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
	at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:207)
	at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:181)
	... 19 more
Caused by: java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
	at java.util.ArrayList.rangeCheck(ArrayList.java:657)
	at java.util.ArrayList.get(ArrayList.java:433)
	at org.apache.parquet.io.GroupColumnIO.getFirst(GroupColumnIO.java:99)
	at org.apache.parquet.io.GroupColumnIO.getFirst(GroupColumnIO.java:99)
	at org.apache.parquet.io.PrimitiveColumnIO.getFirst(PrimitiveColumnIO.java:97)
	at org.apache.parquet.io.PrimitiveColumnIO.isFirst(PrimitiveColumnIO.java:92)
	at org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:278)
	at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
	at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
	at org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
	at org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
	at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
	at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222)
	... 24 more

@mallman
Copy link
Contributor Author

mallman commented Oct 30, 2018

https://issues.apache.org/jira/browse/SPARK-25879

If we select a nested field and a top level field, the schema pruning will fail. Here is the reproducible test,
...

Hi @dbtsai.

I believe the problem you're seeing here is resolved by #22880 (https://issues.apache.org/jira/browse/SPARK-25407). It was a known problem at the time this PR was merged, but was pushed back to a future commit. Coincidentally, I just posted #22880 today.

The test case you provide is very similar to the test case introduced and exercised in that PR. I manually ran your test case on that branch locally, and the test passed. Would you like to try that branch and comment?

Cheers.

@aokolnychyi
Copy link
Contributor

@mallman @dbtsai @gatorsmile

One question on non-deterministic expressions. For example, let's consider a non-deterministic UDF.

val nonDeterministicUdf = udf((first: String) => first + " " + Math.random()).asNondeterministic()
val query = data.select(col("id"), nonDeterministicUdf(col("name.first")))

As it is today, there will be no schema pruning due to the way how collectProjectsAndFilters is defined in PhysicalOperation.

== Analyzed Logical Plan ==
id: int, UDF(name.first): string
Project [id#222, UDF(name#223.first) AS UDF(name.first)#246]
+- Project [id#222, name#223, address#224, pets#225, friends#226, relatives#227, employer#228, p#229]
   +- SubqueryAlias `contacts`
      +- Relation[id#222,name#223,address#224,pets#225,friends#226,relatives#227,employer#228,p#229] parquet

== Optimized Logical Plan ==
Project [id#222, UDF(name#223.first) AS UDF(name.first)#246]
+- Relation[id#222,name#223,address#224,pets#225,friends#226,relatives#227,employer#228,p#229] parquet

== Physical Plan ==
*(1) Project [id#222, UDF(name#223.first) AS UDF(name.first)#246]
+- *(1) FileScan parquet [id#222,name#223,address#224,pets#225,friends#226,relatives#227,employer#228,p#229] Batched: false, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/private/var/folders/f3/6jyczfzd15ndvh49zq0d_sg80000gn/T/spark-6b69e4e9-c6..., PartitionCount: 2, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int,name:struct<first:string,middle:string,last:string>,address:string,pets:int,friends...

To me, it seems valid to apply schema prunining in this case. What do you think?

otterc pushed a commit to linkedin/spark that referenced this pull request Mar 22, 2023
(Link to Jira: https://issues.apache.org/jira/browse/SPARK-4502)

_N.B. This is a restart of PR apache#16578 which includes a subset of that code. Relevant review comments from that PR should be considered incorporated by reference. Please avoid duplication in review by reviewing that PR first. The summary below is an edited copy of the summary of the previous PR._

One of the hallmarks of a column-oriented data storage format is the ability to read data from a subset of columns, efficiently skipping reads from other columns. Spark has long had support for pruning unneeded top-level schema fields from the scan of a parquet file. For example, consider a table, `contacts`, backed by parquet with the following Spark SQL schema:

```
root
 |-- name: struct
 |    |-- first: string
 |    |-- last: string
 |-- address: string
```

Parquet stores this table's data in three physical columns: `name.first`, `name.last` and `address`. To answer the query

```SQL
select address from contacts
```

Spark will read only from the `address` column of parquet data. However, to answer the query

```SQL
select name.first from contacts
```

Spark will read `name.first` and `name.last` from parquet.

This PR modifies Spark SQL to support a finer-grain of schema pruning. With this patch, Spark reads only the `name.first` column to answer the previous query.

There are two main components of this patch. First, there is a `ParquetSchemaPruning` optimizer rule for gathering the required schema fields of a `PhysicalOperation` over a parquet file, constructing a new schema based on those required fields and rewriting the plan in terms of that pruned schema. The pruned schema fields are pushed down to the parquet requested read schema. `ParquetSchemaPruning` uses a new `ProjectionOverSchema` extractor for rewriting a catalyst expression in terms of a pruned schema.

Second, the `ParquetRowConverter` has been patched to ensure the ordinals of the parquet columns read are correct for the pruned schema. `ParquetReadSupport` has been patched to address a compatibility mismatch between Spark's built in vectorized reader and the parquet-mr library's reader.

Among the complex Spark SQL data types, this patch supports parquet column pruning of nested sequences of struct fields only.

Care has been taken to ensure correctness and prevent regressions. A more advanced version of this patch incorporating optimizations for rewriting queries involving aggregations and joins has been running on a production Spark cluster at VideoAmp for several years. In that time, one bug was found and fixed early on, and we added a regression test for that bug.

We forward-ported this patch to Spark master in June 2016 and have been running this patch against Spark 2.x branches on ad-hoc clusters since then.

Closes apache#21320 from mallman/spark-4502-parquet_column_pruning-foundation.

Lead-authored-by: Michael Allman <msa@allman.ms>
Co-authored-by: Adam Jacques <adam@technowizardry.net>
Co-authored-by: Michael Allman <michael@videoamp.com>
Signed-off-by: Xiao Li <gatorsmile@gmail.com>

Ref: LIHADOOP-48531
otterc pushed a commit to linkedin/spark that referenced this pull request Mar 22, 2023
…ecessary root fields

Schema pruning doesn't work if nested column is used in where clause.

For example,
```
sql("select name.first from contacts where name.first = 'David'")

== Physical Plan ==
*(1) Project [name#19.first AS first#40]
+- *(1) Filter (isnotnull(name#19) && (name#19.first = David))
   +- *(1) FileScan parquet [name#19] Batched: false, Format: Parquet, PartitionFilters: [],
    PushedFilters: [IsNotNull(name)], ReadSchema: struct<name:struct<first:string,middle:string,last:string>>
```

In above query plan, the scan node reads the entire schema of `name` column.

This issue is reported by:
apache#21320 (comment)

The cause is that we infer a root field from expression `IsNotNull(name)`. However, for such expression, we don't really use the nested fields of this root field, so we can ignore the unnecessary nested fields.

Unit tests.

Closes apache#22357 from viirya/SPARK-25363.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
(cherry picked from commit 3030b82)
Signed-off-by: DB Tsai <d_tsai@apple.com>

Ref: LIHADOOP-48531
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.