Skip to content

Commit b8b59d6

Browse files
viiryacloud-fan
authored andcommitted
[SPARK-29239][SPARK-29221][SQL] Subquery should not cause NPE when eliminating subexpression
### What changes were proposed in this pull request? This patch proposes to skip PlanExpression when doing subexpression elimination on executors. ### Why are the changes needed? Subexpression elimination can possibly cause NPE when applying on execution subquery expression like ScalarSubquery on executors. It is because PlanExpression wraps query plan. To compare query plan on executor when eliminating subexpression, can cause unexpected error, like NPE when accessing transient fields. The NPE looks like: ``` [info] - SPARK-29239: Subquery should not cause NPE when eliminating subexpression *** FAILED *** (175 milliseconds) [info] org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1395.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1395.0 (TID 3447, 10.0.0.196, executor driver): java.lang.NullPointerException [info] at org.apache.spark.sql.execution.LocalTableScanExec.stringArgs(LocalTableScanExec.scala:62) [info] at org.apache.spark.sql.catalyst.trees.TreeNode.argString(TreeNode.scala:506) [info] at org.apache.spark.sql.catalyst.trees.TreeNode.simpleString(TreeNode.scala:534) [info] at org.apache.spark.sql.catalyst.plans.QueryPlan.simpleString(QueryPlan.scala:179) [info] at org.apache.spark.sql.catalyst.plans.QueryPlan.verboseString(QueryPlan.scala:181) [info] at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:647) [info] at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:675) [info] at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:675) [info] at org.apache.spark.sql.catalyst.trees.TreeNode.treeString(TreeNode.scala:569) [info] at org.apache.spark.sql.catalyst.trees.TreeNode.treeString(TreeNode.scala:559) [info] at org.apache.spark.sql.catalyst.trees.TreeNode.treeString(TreeNode.scala:551) [info] at org.apache.spark.sql.catalyst.trees.TreeNode.toString(TreeNode.scala:548) [info] at org.apache.spark.sql.catalyst.errors.package$TreeNodeException.<init>(package.scala:36) [info] at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56) [info] at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:436) [info] at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:425) [info] at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:102) [info] at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:63) [info] at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:132) [info] at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:261) ``` ### Does this PR introduce any user-facing change? No ### How was this patch tested? Added unit test. Closes #25925 from viirya/SPARK-29239. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 6a4235a commit b8b59d6

File tree

2 files changed

+22
-1
lines changed

2 files changed

+22
-1
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions
1919

2020
import scala.collection.mutable
2121

22+
import org.apache.spark.TaskContext
2223
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
2324
import org.apache.spark.sql.catalyst.expressions.objects.LambdaVariable
2425

@@ -72,7 +73,10 @@ class EquivalentExpressions {
7273
val skip = expr.isInstanceOf[LeafExpression] ||
7374
// `LambdaVariable` is usually used as a loop variable, which can't be evaluated ahead of the
7475
// loop. So we can't evaluate sub-expressions containing `LambdaVariable` at the beginning.
75-
expr.find(_.isInstanceOf[LambdaVariable]).isDefined
76+
expr.find(_.isInstanceOf[LambdaVariable]).isDefined ||
77+
// `PlanExpression` wraps query plan. To compare query plans of `PlanExpression` on executor,
78+
// can cause error like NPE.
79+
(expr.isInstanceOf[PlanExpression[_]] && TaskContext.get != null)
7680

7781
// There are some special expressions that we should not recurse into all of its children.
7882
// 1. CodegenFallback: it's children will not be used to generate code (call eval() instead)

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean
2424

2525
import org.apache.spark.{AccumulatorSuite, SparkException}
2626
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
27+
import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
2728
import org.apache.spark.sql.catalyst.util.StringUtils
2829
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec}
2930
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
@@ -3173,6 +3174,7 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession {
31733174
checkAnswer(sql("select * from t1 where d > '1999-13'"), Row(result))
31743175
checkAnswer(sql("select to_timestamp('2000-01-01 01:10:00') > '1'"), Row(true))
31753176
}
3177+
sql("DROP VIEW t1")
31763178
}
31773179

31783180
test("SPARK-28156: self-join should not miss cached view") {
@@ -3216,6 +3218,21 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession {
32163218
checkAnswer(df3, Array(Row(new java.math.BigDecimal("0.100000000000000000000000100"))))
32173219
}
32183220
}
3221+
3222+
test("SPARK-29239: Subquery should not cause NPE when eliminating subexpression") {
3223+
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
3224+
SQLConf.SUBQUERY_REUSE_ENABLED.key -> "false",
3225+
SQLConf.CODEGEN_FACTORY_MODE.key -> "CODEGEN_ONLY",
3226+
SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> ConvertToLocalRelation.ruleName) {
3227+
withTempView("t1", "t2") {
3228+
sql("create temporary view t1 as select * from values ('val1a', 10L) as t1(t1a, t1b)")
3229+
sql("create temporary view t2 as select * from values ('val3a', 110L) as t2(t2a, t2b)")
3230+
val df = sql("SELECT min, min from (SELECT (SELECT min(t2b) FROM t2) min " +
3231+
"FROM t1 WHERE t1a = 'val1c')")
3232+
assert(df.collect().size == 0)
3233+
}
3234+
}
3235+
}
32193236
}
32203237

32213238
case class Foo(bar: Option[String])

0 commit comments

Comments
 (0)