diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 3a3997ff9c722..ad40f924ef89d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -2826,7 +2826,7 @@ class Analyzer(override val catalogManager: CatalogManager) generatorOutput = ResolveGenerate.makeGeneratorOutput(generator, names), child) - (Some(g), res._2 ++ g.generatorOutput) + (Some(g), res._2 ++ g.nullableOutput) case other => (res._1, res._2 :+ other) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index b52ce468390e9..bdc6e48d08a48 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -143,16 +143,17 @@ case class Generate( override def producedAttributes: AttributeSet = AttributeSet(generatorOutput) + def nullableOutput: Seq[Attribute] = { + generatorOutput.map { a => + a.withNullability(outer || a.nullable) + } + } + def qualifiedGeneratorOutput: Seq[Attribute] = { - val qualifiedOutput = qualifier.map { q => + qualifier.map { q => // prepend the new qualifier to the existed one - generatorOutput.map(a => a.withQualifier(Seq(q))) - }.getOrElse(generatorOutput) - val nullableOutput = qualifiedOutput.map { - // if outer, make all attributes nullable, otherwise keep existing nullability - a => a.withNullability(outer || a.nullable) - } - nullableOutput + nullableOutput.map(a => a.withQualifier(Seq(q))) + }.getOrElse(nullableOutput) } def output: Seq[Attribute] = requiredChildOutput ++ qualifiedGeneratorOutput diff --git a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala index 08280c08cd2e6..49cdc8024107f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala @@ -425,6 +425,25 @@ class GeneratorFunctionSuite extends QueryTest with SharedSparkSession { testNullStruct } } + + test("SPARK-40963: generator output has correct nullability") { + // This test does not check nullability directly. Before SPARK-40963, + // the below query got wrong results due to incorrect nullability. + val df = sql( + """select c1, explode(c4) as c5 from ( + | select c1, array(c3) as c4 from ( + | select c1, explode_outer(c2) as c3 + | from values + | (1, array(1, 2)), + | (2, array(2, 3)), + | (3, null) + | as data(c1, c2) + | ) + |) + |""".stripMargin) + checkAnswer(df, + Row(1, 1) :: Row(1, 2) :: Row(2, 2) :: Row(2, 3) :: Row(3, null) :: Nil) + } } case class EmptyGenerator() extends Generator with LeafLike[Expression] {