Skip to content

Commit a58b8a8

Browse files
ulysses-youviirya
authored andcommitted
[SPARK-37855][SQL][3.2] IllegalStateException when transforming an array inside a nested struct
This is a backport of #35170 for branch-3.2. ### What changes were proposed in this pull request? Skip alias the `ExtractValue` whose children contains `NamedLambdaVariable`. ### Why are the changes needed? Since #32773, the `NamedLambdaVariable` can produce the references, however it cause the rule `NestedColumnAliasing` alias the `ExtractValue` which contains `NamedLambdaVariable`. It fails since we can not match a `NamedLambdaVariable` to an actual attribute. Talk more: During `NamedLambdaVariable#replaceWithAliases`, it uses the references of nestedField to match the output attributes of grandchildren. However `NamedLambdaVariable` is created at analyzer as a virtual attribute, and it is not resolved from the output of children. So we can not get any attribute when use the references of `NamedLambdaVariable` to match the grandchildren's output. ### Does this PR introduce _any_ user-facing change? yes, bug fix ### How was this patch tested? Add new test Closes #35175 from ulysses-you/SPARK-37855-branch-3.2. Authored-by: ulysses-you <ulyssesyou18@gmail.com> Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
1 parent 31dfbde commit a58b8a8

File tree

2 files changed

+57
-1
lines changed

2 files changed

+57
-1
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,11 +245,13 @@ object NestedColumnAliasing {
245245
val otherRootReferences = new mutable.ArrayBuffer[AttributeReference]()
246246
exprList.foreach { e =>
247247
collectRootReferenceAndExtractValue(e).foreach {
248-
case ev: ExtractValue =>
248+
// we can not alias the attr from lambda variable whose expr id is not available
249+
case ev: ExtractValue if ev.find(_.isInstanceOf[NamedLambdaVariable]).isEmpty =>
249250
if (ev.references.size == 1) {
250251
nestedFieldReferences.append(ev)
251252
}
252253
case ar: AttributeReference => otherRootReferences.append(ar)
254+
case _ => // ignore
253255
}
254256
}
255257
val exclusiveAttrSet = AttributeSet(exclusiveAttrs ++ otherRootReferences)

sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2943,6 +2943,60 @@ class DataFrameSuite extends QueryTest
29432943
.withSequenceColumn("default_index").collect().map(_.getLong(0))
29442944
assert(ids.toSet === Range(0, 10).toSet)
29452945
}
2946+
2947+
test("SPARK-37855: IllegalStateException when transforming an array inside a nested struct") {
2948+
def makeInput(): DataFrame = {
2949+
val innerElement1 = Row(3, 3.12)
2950+
val innerElement2 = Row(4, 2.1)
2951+
val innerElement3 = Row(1, 985.2)
2952+
val innerElement4 = Row(10, 757548.0)
2953+
val innerElement5 = Row(1223, 0.665)
2954+
2955+
val outerElement1 = Row(1, Row(List(innerElement1, innerElement2)))
2956+
val outerElement2 = Row(2, Row(List(innerElement3)))
2957+
val outerElement3 = Row(3, Row(List(innerElement4, innerElement5)))
2958+
2959+
val data = Seq(
2960+
Row("row1", List(outerElement1)),
2961+
Row("row2", List(outerElement2, outerElement3))
2962+
)
2963+
2964+
val schema = new StructType()
2965+
.add("name", StringType)
2966+
.add("outer_array", ArrayType(new StructType()
2967+
.add("id", IntegerType)
2968+
.add("inner_array_struct", new StructType()
2969+
.add("inner_array", ArrayType(new StructType()
2970+
.add("id", IntegerType)
2971+
.add("value", DoubleType)
2972+
))
2973+
)
2974+
))
2975+
2976+
spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
2977+
}
2978+
2979+
val df = makeInput().limit(2)
2980+
2981+
val res = df.withColumn("extracted", transform(
2982+
col("outer_array"),
2983+
c1 => {
2984+
struct(
2985+
c1.getField("id").alias("outer_id"),
2986+
transform(
2987+
c1.getField("inner_array_struct").getField("inner_array"),
2988+
c2 => {
2989+
struct(
2990+
c2.getField("value").alias("inner_value")
2991+
)
2992+
}
2993+
)
2994+
)
2995+
}
2996+
))
2997+
2998+
assert(res.collect.length == 2)
2999+
}
29463000
}
29473001

29483002
case class GroupByKey(a: Int, b: Int)

0 commit comments

Comments
 (0)