Skip to content

Commit

Permalink
[SPARK-40963][SQL] Set nullable correctly in project created by `Extr…
Browse files Browse the repository at this point in the history
…actGenerator`

### What changes were proposed in this pull request?

When creating the project list for the new projection In `ExtractGenerator`, take into account whether the generator is outer when setting nullable on generator-related output attributes.

### Why are the changes needed?

This PR fixes an issue that can produce either incorrect results or a `NullPointerException`. It's a bit of an obscure issue in that I am hard-pressed to reproduce without using a subquery that has a inline table.

Example:
```
select c1, explode(c4) as c5 from (
  select c1, array(c3) as c4 from (
    select c1, explode_outer(c2) as c3
    from values
    (1, array(1, 2)),
    (2, array(2, 3)),
    (3, null)
    as data(c1, c2)
  )
);

+---+---+
|c1 |c5 |
+---+---+
|1  |1  |
|1  |2  |
|2  |2  |
|2  |3  |
|3  |0  |
+---+---+
```
In the last row, `c5` is 0, but should be `NULL`.

Another example:
```
select c1, exists(c4, x -> x is null) as c5 from (
  select c1, array(c3) as c4 from (
    select c1, explode_outer(c2) as c3
    from values
    (1, array(1, 2)),
    (2, array(2, 3)),
    (3, array())
    as data(c1, c2)
  )
);

+---+-----+
|c1 |c5   |
+---+-----+
|1  |false|
|1  |false|
|2  |false|
|2  |false|
|3  |false|
+---+-----+
```
In the last row, `false` should be `true`.

In both cases, at the time `CreateArray(c3)` is instantiated, `c3`'s nullability is incorrect because the new projection created by `ExtractGenerator` uses `generatorOutput` from `explode_outer(c2)` as a projection list. `generatorOutput` doesn't take into account that `explode_outer(c2)` is an _outer_ explode, so the nullability setting is lost.

`UpdateAttributeNullability` will eventually fix the nullable setting for attributes referring to `c3`, but it doesn't fix the `containsNull` setting for `c4` in `explode(c4)` (from the first example) or `exists(c4, x -> x is null)` (from the second example).

This example fails with a `NullPointerException`:
```
select c1, inline_outer(c4) from (
  select c1, array(c3) as c4 from (
    select c1, explode_outer(c2) as c3
    from values
    (1, array(named_struct('a', 1, 'b', 2))),
    (2, array(named_struct('a', 3, 'b', 4), named_struct('a', 5, 'b', 6))),
    (3, array())
    as data(c1, c2)
  )
);
22/10/30 17:34:42 ERROR Executor: Exception in task 1.0 in stage 8.0 (TID 14)
java.lang.NullPointerException
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.generate_doConsume_1$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.generate_doConsume_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
```

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?

New unit test.

Closes #38440 from bersprockets/SPARK-40963.

Authored-by: Bruce Robbins <bersprockets@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
bersprockets authored and HyukjinKwon committed Oct 31, 2022
1 parent 406d0e2 commit 90d3154
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2910,7 +2910,7 @@ class Analyzer(override val catalogManager: CatalogManager)
generatorOutput = ResolveGenerate.makeGeneratorOutput(generator, names),
child)

(Some(g), res._2 ++ g.generatorOutput)
(Some(g), res._2 ++ g.nullableOutput)
case other =>
(res._1, res._2 :+ other)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,16 +273,17 @@ case class Generate(

override def producedAttributes: AttributeSet = AttributeSet(generatorOutput)

def nullableOutput: Seq[Attribute] = {
generatorOutput.map { a =>
a.withNullability(outer || a.nullable)
}
}

def qualifiedGeneratorOutput: Seq[Attribute] = {
val qualifiedOutput = qualifier.map { q =>
qualifier.map { q =>
// prepend the new qualifier to the existed one
generatorOutput.map(a => a.withQualifier(Seq(q)))
}.getOrElse(generatorOutput)
val nullableOutput = qualifiedOutput.map {
// if outer, make all attributes nullable, otherwise keep existing nullability
a => a.withNullability(outer || a.nullable)
}
nullableOutput
nullableOutput.map(a => a.withQualifier(Seq(q)))
}.getOrElse(nullableOutput)
}

def output: Seq[Attribute] = requiredChildOutput ++ qualifiedGeneratorOutput
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,25 @@ class GeneratorFunctionSuite extends QueryTest with SharedSparkSession {
testNullStruct
}
}

test("SPARK-40963: generator output has correct nullability") {
// This test does not check nullability directly. Before SPARK-40963,
// the below query got wrong results due to incorrect nullability.
val df = sql(
"""select c1, explode(c4) as c5 from (
| select c1, array(c3) as c4 from (
| select c1, explode_outer(c2) as c3
| from values
| (1, array(1, 2)),
| (2, array(2, 3)),
| (3, null)
| as data(c1, c2)
| )
|)
|""".stripMargin)
checkAnswer(df,
Row(1, 1) :: Row(1, 2) :: Row(2, 2) :: Row(2, 3) :: Row(3, null) :: Nil)
}
}

case class EmptyGenerator() extends Generator with LeafLike[Expression] {
Expand Down

0 comments on commit 90d3154

Please sign in to comment.