-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-34638][SQL] Single field nested column prune on generator output #31966
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Kubernetes integration test starting |
Kubernetes integration test status failure |
This comment has been minimized.
This comment has been minimized.
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
Outdated
Show resolved
Hide resolved
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #136549 has finished for PR 31966 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #136553 has finished for PR 31966 at commit
|
...alyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for pinging me, @viirya . I agree that we can handle multiple field case
later.
For now I only add single field support. Multiple field case would be more complicated. We can consider to add it later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This behaivour (the col name that is internally auto-generated is visible to a user) is expected one?
scala> sql("select __auto_generated_subquery_name.col.a from (select explode(items) from t)").show()
+---+
| a|
+---+
| 1|
| 2|
+---+
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
Show resolved
Hide resolved
@@ -241,12 +262,69 @@ object GeneratorNestedColumnAliasing { | |||
// On top on `Generate`, a `Project` that might have nested column accessors. | |||
// We try to get alias maps for both project list and generator's children expressions. | |||
val exprsToPrune = projectList ++ g.generator.children | |||
NestedColumnAliasing.getAliasSubMap(exprsToPrune, g.qualifiedGeneratorOutput).map { | |||
NestedColumnAliasing.getAliasSubMap(exprsToPrune).map { | |||
case (nestedFieldToAlias, attrToAliases) => | |||
// Defer updating `Generate.unrequiredChildIndex` to next round of `ColumnPruning`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this comment into L275-276?
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
Outdated
Show resolved
Hide resolved
I can see we use it in test:
|
Addressed above comments except for #31966 (comment). For case-sensitive test case, I need to figure out a suspicious bug in nested column pruning first. |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #136907 has finished for PR 31966 at commit
|
@maropu @dongjoon-hyun Thanks for your review and patience. I added test-sensitivity test. Once you have time, please take another look. Thanks. |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #137170 has finished for PR 31966 at commit
|
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
Show resolved
Hide resolved
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #137407 has finished for PR 31966 at commit
|
Looks fine otherwise. I think it's better to have @dongjoon-hyun check this before merging. |
Thanks @maropu! |
|
||
// As we change the child of the generator, its output data type must be updated. | ||
val updatedGeneratorOutput = rewrittenG.generatorOutput | ||
.zip(rewrittenG.generator.elementSchema.toAttributes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indentation? It seems that two-space is enough in this case.
newAttr.withExprId(oldAttr.exprId).withName(oldAttr.name) | ||
} | ||
assert(updatedGeneratorOutput.length == rewrittenG.generatorOutput.length, | ||
"Updated generator output must have same length as original generator output.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe, same length as
-> the same length with
?
Hi, @viirya . While testing this PR, I found the following regression. BEFORE (3.1.1) scala> sql("select * from values array(array(named_struct('a', 1, 'b', 3), named_struct('a', 2, 'b', 4))) T(items)").write.parquet("/tmp/nested_array")
scala> spark.read.parquet("/tmp/nested_array").createOrReplaceTempView("t")
scala> sql("select d.a from (select explode(c) d from (select explode(items) c from t))").show()
+---+
| a|
+---+
| 1|
| 2|
+---+ BEFORE(master) scala> spark.read.parquet("/tmp/nested_array").createOrReplaceTempView("t")
scala> sql("select d.a from (select explode(c) d from (select explode(items) c from t))").show()
+---+
| a|
+---+
| 1|
| 2|
+---+ AFTER (This PR) scala> spark.read.parquet("/tmp/nested_array").createOrReplaceTempView("t")
scala> sql("select d.a from (select explode(c) d from (select explode(items) c from t))").show()
java.lang.ClassCastException: org.apache.spark.sql.types.ArrayType cannot be cast to org.apache.spark.sql.types.StructType
at org.apache.spark.sql.catalyst.expressions.SelectedField$.selectField(SelectedField.scala:81)
at org.apache.spark.sql.catalyst.expressions.SelectedField$.unapply(SelectedField.scala:62)
at org.apache.spark.sql.catalyst.expressions.SchemaPruning$.getRootFields(SchemaPruning.scala:124)
at org.apache.spark.sql.catalyst.expressions.SchemaPruning$.$anonfun$identifyRootFields$1(SchemaPruning.scala:81) Could you double-check this and add some test coverage? |
Thanks @dongjoon-hyun. Let me take a look. |
@dongjoon-hyun Sorry for late. I addressed the cases and added test for it. |
Kubernetes integration test starting |
Kubernetes integration test status failure |
No problem. Thank you for updates, @viirya . |
// It is because we don't allow field extractor on two-level array, | ||
// i.e., attr.field when attr is a ArrayType(ArrayType(...)). | ||
// Similarily, we also cannot push through if the child of generator is `MapType`. | ||
g.generator.children.head.dataType match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Let me play more with this PR for a while. It seems I need more tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you!
Test build #137898 has finished for PR 31966 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @dongjoon-hyun @maropu |
Merging to master. |
What changes were proposed in this pull request?
This patch proposes an improvement on nested column pruning if the pruning target is generator's output. Previously we disallow such case. This patch allows to prune on it if there is only one single nested column is accessed after
Generate
.E.g.,
df.select(explode($"items").as('item)).select($"item.itemId")
. As we only needitemId
fromitem
, we can prune other fields out and only keepitemId
.In this patch, we only address explode-like generators. We will address other generators in followups.
Why are the changes needed?
This helps to extend the availability of nested column pruning.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Unit test