-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-34638][SQL] Single field nested column prune on generator output #31966
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
Show all changes
10 commits
Select commit
Hold shift + click to select a range
5221be3
Nested column prune on generator output for one field.
viirya 4758f96
Add e2e test cases.
viirya 7908788
Fix wrong transform logic.
viirya fe286df
Address comments.
viirya 72e3e13
Merge remote-tracking branch 'upstream/master' into SPARK-34638
viirya df5c44d
Add case-insensitive test.
viirya 6c9d839
Add comment.
viirya ad3d191
Address review comments.
viirya 8d4309a
Deal with special cases.
viirya a719409
Address some comments.
viirya File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -231,6 +231,27 @@ object NestedColumnAliasing { | |
* of it. | ||
*/ | ||
object GeneratorNestedColumnAliasing { | ||
// Partitions `attrToAliases` based on whether the attribute is in Generator's output. | ||
private def aliasesOnGeneratorOutput( | ||
attrToAliases: Map[ExprId, Seq[Alias]], | ||
generatorOutput: Seq[Attribute]) = { | ||
val generatorOutputExprId = generatorOutput.map(_.exprId) | ||
attrToAliases.partition { k => | ||
generatorOutputExprId.contains(k._1) | ||
} | ||
} | ||
|
||
// Partitions `nestedFieldToAlias` based on whether the attribute of nested field extractor | ||
// is in Generator's output. | ||
private def nestedFieldOnGeneratorOutput( | ||
nestedFieldToAlias: Map[ExtractValue, Alias], | ||
generatorOutput: Seq[Attribute]) = { | ||
val generatorOutputSet = AttributeSet(generatorOutput) | ||
nestedFieldToAlias.partition { pair => | ||
pair._1.references.subsetOf(generatorOutputSet) | ||
} | ||
} | ||
|
||
def unapply(plan: LogicalPlan): Option[LogicalPlan] = plan match { | ||
// Either `nestedPruningOnExpressions` or `nestedSchemaPruningEnabled` is enabled, we | ||
// need to prune nested columns through Project and under Generate. The difference is | ||
|
@@ -241,12 +262,81 @@ object GeneratorNestedColumnAliasing { | |
// On top on `Generate`, a `Project` that might have nested column accessors. | ||
// We try to get alias maps for both project list and generator's children expressions. | ||
val exprsToPrune = projectList ++ g.generator.children | ||
NestedColumnAliasing.getAliasSubMap(exprsToPrune, g.qualifiedGeneratorOutput).map { | ||
NestedColumnAliasing.getAliasSubMap(exprsToPrune).map { | ||
case (nestedFieldToAlias, attrToAliases) => | ||
val (nestedFieldsOnGenerator, nestedFieldsNotOnGenerator) = | ||
nestedFieldOnGeneratorOutput(nestedFieldToAlias, g.qualifiedGeneratorOutput) | ||
val (attrToAliasesOnGenerator, attrToAliasesNotOnGenerator) = | ||
aliasesOnGeneratorOutput(attrToAliases, g.qualifiedGeneratorOutput) | ||
|
||
// Push nested column accessors through `Generator`. | ||
// Defer updating `Generate.unrequiredChildIndex` to next round of `ColumnPruning`. | ||
val newChild = | ||
NestedColumnAliasing.replaceWithAliases(g, nestedFieldToAlias, attrToAliases) | ||
Project(NestedColumnAliasing.getNewProjectList(projectList, nestedFieldToAlias), newChild) | ||
val newChild = NestedColumnAliasing.replaceWithAliases(g, | ||
nestedFieldsNotOnGenerator, attrToAliasesNotOnGenerator) | ||
val pushedThrough = Project(NestedColumnAliasing | ||
.getNewProjectList(projectList, nestedFieldsNotOnGenerator), newChild) | ||
|
||
// If the generator output is `ArrayType`, we cannot push through the extractor. | ||
// It is because we don't allow field extractor on two-level array, | ||
// i.e., attr.field when attr is a ArrayType(ArrayType(...)). | ||
// Similarily, we also cannot push through if the child of generator is `MapType`. | ||
g.generator.children.head.dataType match { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it. Let me play more with this PR for a while. It seems I need more tests. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you! |
||
case _: MapType => return Some(pushedThrough) | ||
case ArrayType(_: ArrayType, _) => return Some(pushedThrough) | ||
case _ => | ||
} | ||
|
||
// Pruning on `Generator`'s output. We only process single field case. | ||
// For multiple field case, we cannot directly move field extractor into | ||
// the generator expression. A workaround is to re-construct array of struct | ||
// from multiple fields. But it will be more complicated and may not worth. | ||
// TODO(SPARK-34956): support multiple fields. | ||
if (nestedFieldsOnGenerator.size > 1 || nestedFieldsOnGenerator.isEmpty) { | ||
pushedThrough | ||
} else { | ||
// Only one nested column accessor. | ||
// E.g., df.select(explode($"items").as("item")).select($"item.a") | ||
pushedThrough match { | ||
case p @ Project(_, newG: Generate) => | ||
// Replace the child expression of `ExplodeBase` generator with | ||
// nested column accessor. | ||
// E.g., df.select(explode($"items").as("item")).select($"item.a") => | ||
// df.select(explode($"items.a").as("item.a")) | ||
val rewrittenG = newG.transformExpressions { | ||
case e: ExplodeBase => | ||
dongjoon-hyun marked this conversation as resolved.
Show resolved
Hide resolved
|
||
val extractor = nestedFieldsOnGenerator.head._1.transformUp { | ||
case _: Attribute => | ||
e.child | ||
case g: GetStructField => | ||
ExtractValue(g.child, Literal(g.extractFieldName), SQLConf.get.resolver) | ||
dongjoon-hyun marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
e.withNewChildren(Seq(extractor)) | ||
} | ||
|
||
// As we change the child of the generator, its output data type must be updated. | ||
val updatedGeneratorOutput = rewrittenG.generatorOutput | ||
.zip(rewrittenG.generator.elementSchema.toAttributes) | ||
.map { case (oldAttr, newAttr) => | ||
newAttr.withExprId(oldAttr.exprId).withName(oldAttr.name) | ||
} | ||
assert(updatedGeneratorOutput.length == rewrittenG.generatorOutput.length, | ||
dongjoon-hyun marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"Updated generator output must have the same length " + | ||
"with original generator output.") | ||
val updatedGenerate = rewrittenG.copy(generatorOutput = updatedGeneratorOutput) | ||
|
||
// Replace nested column accessor with generator output. | ||
p.withNewChildren(Seq(updatedGenerate)).transformExpressions { | ||
case f: ExtractValue if nestedFieldsOnGenerator.contains(f) => | ||
updatedGenerate.output | ||
.find(a => attrToAliasesOnGenerator.contains(a.exprId)) | ||
.getOrElse(f) | ||
} | ||
|
||
case other => | ||
// We should not reach here. | ||
throw new IllegalStateException(s"Unreasonable plan after optimization: $other") | ||
} | ||
} | ||
} | ||
|
||
case g: Generate if SQLConf.get.nestedSchemaPruningEnabled && | ||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.