@@ -24,9 +24,12 @@ import org.apache.spark.sql.types._
24
24
25
25
/**
26
26
* This aims to handle a nested column aliasing pattern inside the [[ColumnPruning ]] optimizer rule.
27
- * If a project or its child references to nested fields, and not all the fields
28
- * in a nested attribute are used, we can substitute them by alias attributes; then a project
29
- * of the nested fields as aliases on the children of the child will be created.
27
+ * If:
28
+ * - A [[Project ]] or its child references nested fields
29
+ * - Not all of the fields in a nested attribute are used
30
+ * Then:
31
+ * - Substitute the nested field references with alias attributes
32
+ * - Add grandchild [[Project ]]s transforming the nested fields to aliases
30
33
*
31
34
* Example 1: Project
32
35
* ------------------
@@ -76,7 +79,7 @@ import org.apache.spark.sql.types._
76
79
*/
77
80
object NestedColumnAliasing {
78
81
79
- def unapply (plan : LogicalPlan ): Option [Map [ Attribute , Seq [ ExtractValue ]] ] = plan match {
82
+ def unapply (plan : LogicalPlan ): Option [LogicalPlan ] = plan match {
80
83
/**
81
84
* This pattern is needed to support [[Filter ]] plan cases like
82
85
* [[Project ]]->[[Filter ]]->listed plan in [[canProjectPushThrough ]] (e.g., [[Window ]]).
@@ -85,25 +88,40 @@ object NestedColumnAliasing {
85
88
*/
86
89
case Project (projectList, Filter (condition, child)) if
87
90
SQLConf .get.nestedSchemaPruningEnabled && canProjectPushThrough(child) =>
88
- getAttributeToExtractValues (
89
- projectList ++ Seq (condition) ++ child.expressions, child.producedAttributes.toSeq)
91
+ rewritePlanIfSubsetFieldsUsed (
92
+ plan, projectList ++ Seq (condition) ++ child.expressions, child.producedAttributes.toSeq)
90
93
91
94
case Project (projectList, child) if
92
95
SQLConf .get.nestedSchemaPruningEnabled && canProjectPushThrough(child) =>
93
- getAttributeToExtractValues (
94
- projectList ++ child.expressions, child.producedAttributes.toSeq)
96
+ rewritePlanIfSubsetFieldsUsed (
97
+ plan, projectList ++ child.expressions, child.producedAttributes.toSeq)
95
98
96
99
case p if SQLConf .get.nestedSchemaPruningEnabled && canPruneOn(p) =>
97
- getAttributeToExtractValues (
98
- p.expressions, p.producedAttributes.toSeq)
100
+ rewritePlanIfSubsetFieldsUsed (
101
+ plan, p.expressions, p.producedAttributes.toSeq)
99
102
100
103
case _ => None
101
104
}
102
105
106
+ /**
107
+ * Rewrites a plan with aliases if only a subset of the nested fields are used.
108
+ */
109
+ def rewritePlanIfSubsetFieldsUsed (
110
+ plan : LogicalPlan ,
111
+ exprList : Seq [Expression ],
112
+ exclusiveAttrs : Seq [Attribute ]): Option [LogicalPlan ] = {
113
+ val attrToExtractValues = getAttributeToExtractValues(exprList, exclusiveAttrs)
114
+ if (attrToExtractValues.isEmpty) {
115
+ None
116
+ } else {
117
+ Some (rewritePlanWithAliases(plan, attrToExtractValues))
118
+ }
119
+ }
120
+
103
121
/**
104
122
* Replace nested columns to prune unused nested columns later.
105
123
*/
106
- def replacePlanWithAliases (
124
+ def rewritePlanWithAliases (
107
125
plan : LogicalPlan ,
108
126
attributeToExtractValues : Map [Attribute , Seq [ExtractValue ]]): LogicalPlan = {
109
127
// Each expression can contain multiple nested fields.
@@ -218,20 +236,19 @@ object NestedColumnAliasing {
218
236
}
219
237
220
238
/**
221
- * Creates a map from root [[Attribute ]]s to non-redundant nested [[ExtractValue ]]s in the
222
- * case that only a subset of the nested fields are used.
239
+ * Creates a map from root [[Attribute ]]s to non-redundant nested [[ExtractValue ]]s.
223
240
* Nested field accessors of `exclusiveAttrs` are not considered in nested fields aliasing.
224
241
*/
225
242
def getAttributeToExtractValues (
226
243
exprList : Seq [Expression ],
227
- exclusiveAttrs : Seq [Attribute ]): Option [ Map [Attribute , Seq [ExtractValue ] ]] = {
244
+ exclusiveAttrs : Seq [Attribute ]): Map [Attribute , Seq [ExtractValue ]] = {
228
245
229
246
val nestedFieldReferences = exprList.flatMap(collectExtractValue)
230
247
val otherRootReferences = exprList.flatMap(collectAttributeReference)
231
248
val exclusiveAttrSet = AttributeSet (exclusiveAttrs ++ otherRootReferences)
232
249
233
250
// Remove cosmetic variations when we group extractors by their references
234
- val attributeToExtractValues = nestedFieldReferences
251
+ nestedFieldReferences
235
252
.filter(! _.references.subsetOf(exclusiveAttrSet))
236
253
.groupBy(_.references.head.canonicalized.asInstanceOf [Attribute ])
237
254
.flatMap { case (attr : Attribute , nestedFields : Seq [ExtractValue ]) =>
@@ -258,12 +275,6 @@ object NestedColumnAliasing {
258
275
None
259
276
}
260
277
}
261
-
262
- if (attributeToExtractValues.isEmpty) {
263
- None
264
- } else {
265
- Some (attributeToExtractValues)
266
- }
267
278
}
268
279
269
280
/**
@@ -281,11 +292,10 @@ object NestedColumnAliasing {
281
292
}
282
293
283
294
/**
284
- * This prunes unnecessary nested columns from [[Generate ]] and optional [[Project ]] on top
285
- * of it.
295
+ * This prunes unnecessary nested columns from [[Generate ]], or [[Project ]] -> [[Generate ]]
286
296
*/
287
297
object GeneratorNestedColumnAliasing {
288
- def unapply (plan : LogicalPlan ): Option [Map [ Attribute , Seq [ ExtractValue ]] ] = plan match {
298
+ def unapply (plan : LogicalPlan ): Option [LogicalPlan ] = plan match {
289
299
// Either `nestedPruningOnExpressions` or `nestedSchemaPruningEnabled` is enabled, we
290
300
// need to prune nested columns through Project and under Generate. The difference is
291
301
// when `nestedSchemaPruningEnabled` is on, nested columns will be pruned further at
@@ -294,17 +304,17 @@ object GeneratorNestedColumnAliasing {
294
304
SQLConf .get.nestedSchemaPruningEnabled) && canPruneGenerator(g.generator) =>
295
305
// On top on `Generate`, a `Project` that might have nested column accessors.
296
306
// We try to get alias maps for both project list and generator's children expressions.
297
- NestedColumnAliasing .getAttributeToExtractValues (
298
- projectList ++ g.generator.children, g.qualifiedGeneratorOutput)
307
+ NestedColumnAliasing .rewritePlanIfSubsetFieldsUsed (
308
+ plan, projectList ++ g.generator.children, g.qualifiedGeneratorOutput)
299
309
300
310
case g : Generate if SQLConf .get.nestedSchemaPruningEnabled &&
301
311
canPruneGenerator(g.generator) =>
302
312
// If any child output is required by higher projection, we cannot prune on it even we
303
313
// only use part of nested column of it. A required child output means it is referred
304
314
// as a whole or partially by higher projection, pruning it here will cause unresolved
305
315
// query plan.
306
- NestedColumnAliasing .getAttributeToExtractValues (
307
- g.generator.children, g.requiredChildOutput)
316
+ NestedColumnAliasing .rewritePlanIfSubsetFieldsUsed (
317
+ plan, g.generator.children, g.requiredChildOutput)
308
318
309
319
case _ =>
310
320
None
0 commit comments