Skip to content

Commit d35af54

Browse files
Emil Ejbyfeldtcloud-fan
Emil Ejbyfeldt
authored andcommitted
[SPARK-48428][SQL] Fix IllegalStateException in NestedColumnAliasing
### What changes were proposed in this pull request? In #35170 SPARK-37855 and #32301 SPARK-35194 introduced conditions for ExtractValues that can currently not be handled. The considtion is introduced after `collectRootReferenceAndExtractValue` and just removes these candidates. This is problematic since these expressions might have contained `AttributeReference` that needed to not do an incorrect aliasing. This fixes this family of bugs by moving the conditions into the function `collectRootReferenceAndExtractValue`. ### Why are the changes needed? The current code leads to `IllegalStateException` runtime failures. ### Does this PR introduce _any_ user-facing change? Yes, fixes a bug. ### How was this patch tested? Existing and new unit tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46756 from eejbyfeldt/SPARK-48428. Authored-by: Emil Ejbyfeldt <eejbyfeldt@liveintent.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit b11608c) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 6cee8e1 commit d35af54

File tree

2 files changed

+29
-8
lines changed

2 files changed

+29
-8
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,11 @@ object NestedColumnAliasing {
217217
case _ => false
218218
}
219219

220+
private def canAlias(ev: Expression): Boolean = {
221+
// we can not alias the attr from lambda variable whose expr id is not available
222+
!ev.exists(_.isInstanceOf[NamedLambdaVariable]) && ev.references.size == 1
223+
}
224+
220225
/**
221226
* Returns two types of expressions:
222227
* - Root references that are individually accessed
@@ -225,11 +230,11 @@ object NestedColumnAliasing {
225230
*/
226231
private def collectRootReferenceAndExtractValue(e: Expression): Seq[Expression] = e match {
227232
case _: AttributeReference => Seq(e)
228-
case GetStructField(_: ExtractValue | _: AttributeReference, _, _) => Seq(e)
233+
case GetStructField(_: ExtractValue | _: AttributeReference, _, _) if canAlias(e) => Seq(e)
229234
case GetArrayStructFields(_: MapValues |
230235
_: MapKeys |
231236
_: ExtractValue |
232-
_: AttributeReference, _, _, _, _) => Seq(e)
237+
_: AttributeReference, _, _, _, _) if canAlias(e) => Seq(e)
233238
case es if es.children.nonEmpty => es.children.flatMap(collectRootReferenceAndExtractValue)
234239
case _ => Seq.empty
235240
}
@@ -248,13 +253,8 @@ object NestedColumnAliasing {
248253
val otherRootReferences = new mutable.ArrayBuffer[AttributeReference]()
249254
exprList.foreach { e =>
250255
extractor(e).foreach {
251-
// we can not alias the attr from lambda variable whose expr id is not available
252-
case ev: ExtractValue if !ev.exists(_.isInstanceOf[NamedLambdaVariable]) =>
253-
if (ev.references.size == 1) {
254-
nestedFieldReferences.append(ev)
255-
}
256+
case ev: ExtractValue => nestedFieldReferences.append(ev)
256257
case ar: AttributeReference => otherRootReferences.append(ar)
257-
case _ => // ignore
258258
}
259259
}
260260
val exclusiveAttrSet = AttributeSet(exclusiveAttrs ++ otherRootReferences)

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -861,6 +861,27 @@ class NestedColumnAliasingSuite extends SchemaPruningTest {
861861
// The plan is expected to be unchanged.
862862
comparePlans(plan, RemoveNoopOperators.apply(optimized.get))
863863
}
864+
865+
test("SPARK-48428: Do not pushdown when attr is used in expression with mutliple references") {
866+
val query = contact
867+
.limit(5)
868+
.select(
869+
GetStructField(GetStructField(CreateStruct(Seq($"id", $"employer")), 1), 0),
870+
$"employer.id")
871+
.analyze
872+
873+
val optimized = Optimize.execute(query)
874+
875+
val expected = contact
876+
.select($"id", $"employer")
877+
.limit(5)
878+
.select(
879+
GetStructField(GetStructField(CreateStruct(Seq($"id", $"employer")), 1), 0),
880+
$"employer.id")
881+
.analyze
882+
883+
comparePlans(optimized, expected)
884+
}
864885
}
865886

866887
object NestedColumnAliasingSuite {

0 commit comments

Comments
 (0)