Skip to content

Commit f09003e

Browse files
Emil Ejbyfeldteejbyfeldt
Emil Ejbyfeldt
authored andcommitted
SPARK-48428: Fix IllegalStateException in NestedColumnAliasing
In apache#35170 SPARK-37855 and apache#32301 SPARK-35194 introduced conditions for ExtractValues that can currently not be handled. The considtion is introduced after `collectRootReferenceAndExtractValue` and just removes these candidates. This is problematic since these expressions might have contained `AttributeReference` that needed to not do an incorrect rewrite. This fixes these family of bugs by moving the conditions into the function `collectRootReferenceAndExtractValue`.
1 parent fb5697d commit f09003e

File tree

2 files changed

+29
-8
lines changed

2 files changed

+29
-8
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,11 @@ object NestedColumnAliasing {
218218
case _ => false
219219
}
220220

221+
private def canTransform(ev: Expression): Boolean = {
222+
// we can not alias the attr from lambda variable whose expr id is not available
223+
!ev.exists(_.isInstanceOf[NamedLambdaVariable]) && ev.references.size == 1
224+
}
225+
221226
/**
222227
* Returns two types of expressions:
223228
* - Root references that are individually accessed
@@ -226,11 +231,11 @@ object NestedColumnAliasing {
226231
*/
227232
private def collectRootReferenceAndExtractValue(e: Expression): Seq[Expression] = e match {
228233
case _: AttributeReference => Seq(e)
229-
case GetStructField(_: ExtractValue | _: AttributeReference, _, _) => Seq(e)
234+
case GetStructField(_: ExtractValue | _: AttributeReference, _, _) if canTransform(e) => Seq(e)
230235
case GetArrayStructFields(_: MapValues |
231236
_: MapKeys |
232237
_: ExtractValue |
233-
_: AttributeReference, _, _, _, _) => Seq(e)
238+
_: AttributeReference, _, _, _, _) if canTransform(e) => Seq(e)
234239
case es if es.children.nonEmpty => es.children.flatMap(collectRootReferenceAndExtractValue)
235240
case _ => Seq.empty
236241
}
@@ -249,13 +254,8 @@ object NestedColumnAliasing {
249254
val otherRootReferences = new mutable.ArrayBuffer[AttributeReference]()
250255
exprList.foreach { e =>
251256
extractor(e).foreach {
252-
// we can not alias the attr from lambda variable whose expr id is not available
253-
case ev: ExtractValue if !ev.exists(_.isInstanceOf[NamedLambdaVariable]) =>
254-
if (ev.references.size == 1) {
255-
nestedFieldReferences.append(ev)
256-
}
257+
case ev: ExtractValue => nestedFieldReferences.append(ev)
257258
case ar: AttributeReference => otherRootReferences.append(ar)
258-
case _ => // ignore
259259
}
260260
}
261261
val exclusiveAttrSet = AttributeSet(exclusiveAttrs ++ otherRootReferences)

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -863,6 +863,27 @@ class NestedColumnAliasingSuite extends SchemaPruningTest {
863863
// The plan is expected to be unchanged.
864864
comparePlans(plan, RemoveNoopOperators.apply(optimized.get))
865865
}
866+
867+
test("SPARK-48428: Do not pushdown when attr is used in expression with mutliple references") {
868+
val query = contact
869+
.limit(5)
870+
.select(
871+
GetStructField(GetStructField(CreateStruct(Seq($"id", $"employer")), 1), 0),
872+
$"employer.id")
873+
.analyze
874+
875+
val optimized = Optimize.execute(query)
876+
877+
val expected = contact
878+
.select($"id", $"employer")
879+
.limit(5)
880+
.select(
881+
GetStructField(GetStructField(CreateStruct(Seq($"id", $"employer")), 1), 0),
882+
$"employer.id")
883+
.analyze
884+
885+
comparePlans(optimized, expected)
886+
}
866887
}
867888

868889
object NestedColumnAliasingSuite {

0 commit comments

Comments
 (0)