Skip to content

Commit

Permalink
[SPARK-40292][SQL] Fix column names in "arrays_zip" function when arr…
Browse files Browse the repository at this point in the history
…ays are referenced from nested structs

### What changes were proposed in this pull request?

This PR fixes an issue in `arrays_zip` function where a field index was used as a column name in the resulting schema which was a regression from Spark 3.1. With this change, the original behaviour is restored: a corresponding struct field name will be used instead of a field index.

Example:
```sql
with q as (
  select
    named_struct(
      'my_array', array(1, 2, 3),
      'my_array2', array(4, 5, 6)
    ) as my_struct
)
select
  arrays_zip(my_struct.my_array, my_struct.my_array2)
from
  q
```

would return schema:
```
root
 |-- arrays_zip(my_struct.my_array, my_struct.my_array2): array (nullable = false)
 |    |-- element: struct (containsNull = false)
 |    |    |-- 0: integer (nullable = true)
 |    |    |-- 1: integer (nullable = true)
```

which is somewhat inaccurate. PR adds handling of `GetStructField` expression to return the struct field names like this:
```
root
 |-- arrays_zip(my_struct.my_array, my_struct.my_array2): array (nullable = false)
 |    |-- element: struct (containsNull = false)
 |    |    |-- my_array: integer (nullable = true)
 |    |    |-- my_array2: integer (nullable = true)
```

### Why are the changes needed?

### Does this PR introduce _any_ user-facing change?

Yes, `arrays_zip` function returns struct field names now as in Spark 3.1 instead of field indices.
Some users might have worked around this issue so this patch would affect them by bringing back the original behaviour.

### How was this patch tested?

Existing unit tests. I also added a test case that reproduces the problem.

Closes apache#37833 from sadikovi/SPARK-40292.

Authored-by: Ivan Sadikov <ivan.sadikov@databricks.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
  • Loading branch information
sadikovi authored and MaxGekk committed Sep 12, 2022
1 parent 78d492c commit 443eea9
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ case class ArraysZip(children: Seq[Expression], names: Seq[Expression])
case (u: UnresolvedAttribute, _) => Literal(u.nameParts.last)
case (e: NamedExpression, _) if e.resolved => Literal(e.name)
case (e: NamedExpression, _) => NamePlaceholder
case (e: GetStructField, _) => Literal(e.extractFieldName)
case (_, idx) => Literal(idx.toString)
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,25 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession {
}
}

test("SPARK-40292: arrays_zip should retain field names in nested structs") {
val df = spark.sql("""
select
named_struct(
'arr_1', array(named_struct('a', 1, 'b', 2)),
'arr_2', array(named_struct('p', 1, 'q', 2)),
'field', named_struct(
'arr_3', array(named_struct('x', 1, 'y', 2))
)
) as obj
""")

val res = df.selectExpr("arrays_zip(obj.arr_1, obj.arr_2, obj.field.arr_3) as arr")

val fieldNames = res.schema.head.dataType.asInstanceOf[ArrayType]
.elementType.asInstanceOf[StructType].fieldNames
assert(fieldNames.toSeq === Seq("arr_1", "arr_2", "arr_3"))
}

def testSizeOfMap(sizeOfNull: Any): Unit = {
val df = Seq(
(Map[Int, Int](1 -> 1, 2 -> 2), "x"),
Expand Down

0 comments on commit 443eea9

Please sign in to comment.