Skip to content

Commit 3a3bde0

Browse files
committed
Subquery should not cause NPE when eliminating subexpression.
1 parent e650f8f commit 3a3bde0

File tree

2 files changed

+21
-1
lines changed

2 files changed

+21
-1
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,10 @@ class EquivalentExpressions {
7272
val skip = expr.isInstanceOf[LeafExpression] ||
7373
// `LambdaVariable` is usually used as a loop variable, which can't be evaluated ahead of the
7474
// loop. So we can't evaluate sub-expressions containing `LambdaVariable` at the beginning.
75-
expr.find(_.isInstanceOf[LambdaVariable]).isDefined
75+
expr.find(_.isInstanceOf[LambdaVariable]).isDefined ||
76+
// `PlanExpression` wraps query plan. To compare query plans of `PlanExpression` on executor,
77+
// can cause unexpected error.
78+
expr.isInstanceOf[PlanExpression[_]]
7679

7780
// There are some special expressions that we should not recurse into all of its children.
7881
// 1. CodegenFallback: it's children will not be used to generate code (call eval() instead)

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean
2424

2525
import org.apache.spark.{AccumulatorSuite, SparkException}
2626
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
27+
import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
2728
import org.apache.spark.sql.catalyst.util.StringUtils
2829
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec}
2930
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
@@ -3149,6 +3150,7 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession {
31493150
checkAnswer(sql("select * from t1 where d > '1999-13'"), Row(result))
31503151
checkAnswer(sql("select to_timestamp('2000-01-01 01:10:00') > '1'"), Row(true))
31513152
}
3153+
sql("DROP VIEW t1")
31523154
}
31533155

31543156
test("SPARK-28156: self-join should not miss cached view") {
@@ -3192,6 +3194,21 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession {
31923194
checkAnswer(df3, Array(Row(new java.math.BigDecimal("0.100000000000000000000000100"))))
31933195
}
31943196
}
3197+
3198+
test("SPARK-29239: Subquery should not cause NPE when eliminating subexpression") {
3199+
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
3200+
SQLConf.SUBQUERY_REUSE_ENABLED.key -> "false",
3201+
SQLConf.CODEGEN_FACTORY_MODE.key -> "CODEGEN_ONLY",
3202+
SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> ConvertToLocalRelation.ruleName) {
3203+
withTempView("t1", "t2") {
3204+
sql("create temporary view t1 as select * from values ('val1a', 10L) as t1(t1a, t1b)")
3205+
sql("create temporary view t2 as select * from values ('val3a', 110L) as t2(t2a, t2b)")
3206+
val df = sql("SELECT min, min from (SELECT (SELECT min(t2b) FROM t2) min " +
3207+
"FROM t1 WHERE t1a = 'val1c')")
3208+
assert(df.collect().size == 0)
3209+
}
3210+
}
3211+
}
31953212
}
31963213

31973214
case class Foo(bar: Option[String])

0 commit comments

Comments
 (0)