Skip to content

Commit e42ea5c

Browse files
zhipengmao-dbcloud-fan
authored andcommitted
[SPARK-48871] Fix INVALID_NON_DETERMINISTIC_EXPRESSIONS validation in…
… CheckAnalysis ### What changes were proposed in this pull request? The PR added a trait that logical plans can extend to implement a method to decide whether there can be non-deterministic expressions for the operator, and check this method in checkAnalysis. ### Why are the changes needed? I encountered the `INVALID_NON_DETERMINISTIC_EXPRESSIONS` exception when attempting to use a non-deterministic udf in my query. The non-deterministic expression can be safely allowed for my custom LogicalPlan, but it is disabled in the checkAnalysis phase. The CheckAnalysis rule is too strict so that reasonable use cases of non-deterministic expressions are also disabled. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? The test case `"SPARK-48871: AllowsNonDeterministicExpression allow lists non-deterministic expressions"` is added. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#47304 from zhipengmao-db/zhipengmao-db/SPARK-48871-check-analysis. Lead-authored-by: zhipeng.mao <zhipeng.mao@databricks.com> Co-authored-by: Wenchen Fan <cloud0fan@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent c57f87f commit e42ea5c

File tree

3 files changed

+45
-0
lines changed

3 files changed

+45
-0
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,17 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
143143
errorClass, missingCol, orderedCandidates, a.origin)
144144
}
145145

146+
/**
147+
* Checks whether the operator allows non-deterministic expressions.
148+
*/
149+
private def operatorAllowsNonDeterministicExpressions(plan: LogicalPlan): Boolean = {
150+
plan match {
151+
case p: SupportsNonDeterministicExpression =>
152+
p.allowNonDeterministicExpression
153+
case _ => false
154+
}
155+
}
156+
146157
def checkAnalysis(plan: LogicalPlan): Unit = {
147158
// We should inline all CTE relations to restore the original plan shape, as the analysis check
148159
// may need to match certain plan shapes. For dangling CTE relations, they will still be kept
@@ -718,6 +729,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
718729
"dataType" -> toSQLType(mapCol.dataType)))
719730

720731
case o if o.expressions.exists(!_.deterministic) &&
732+
!operatorAllowsNonDeterministicExpressions(o) &&
721733
!o.isInstanceOf[Project] &&
722734
// non-deterministic expressions inside CollectMetrics have been
723735
// already validated inside checkMetric function

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1998,6 +1998,16 @@ case class DeduplicateWithinWatermark(keys: Seq[Attribute], child: LogicalPlan)
19981998
*/
19991999
trait SupportsSubquery extends LogicalPlan
20002000

2001+
/**
2002+
* Trait that logical plans can extend to check whether it can allow non-deterministic
2003+
* expressions and pass the CheckAnalysis rule.
2004+
*/
2005+
trait SupportsNonDeterministicExpression extends LogicalPlan {
2006+
2007+
/** Returns whether it allows non-deterministic expressions. */
2008+
def allowNonDeterministicExpression: Boolean
2009+
}
2010+
20012011
/**
20022012
* Collect arbitrary (named) metrics from a dataset. As soon as the query reaches a completion
20032013
* point (batch query completes or streaming query epoch completes) an event is emitted on the

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,13 @@ case class TestFunctionWithTypeCheckFailure(
9090

9191
case class UnresolvedTestPlan() extends UnresolvedLeafNode
9292

93+
case class SupportsNonDeterministicExpressionTestOperator(
94+
actions: Seq[Expression],
95+
allowNonDeterministicExpression: Boolean)
96+
extends LeafNode with SupportsNonDeterministicExpression {
97+
override def output: Seq[Attribute] = Seq()
98+
}
99+
93100
class AnalysisErrorSuite extends AnalysisTest with DataTypeErrorsBase {
94101
import TestRelations._
95102

@@ -1364,4 +1371,20 @@ class AnalysisErrorSuite extends AnalysisTest with DataTypeErrorsBase {
13641371
messageParameters = Map(
13651372
"expr" -> "\"_w0\"",
13661373
"exprType" -> "\"MAP<STRING, STRING>\""))
1374+
1375+
test("SPARK-48871: SupportsNonDeterministicExpression allows non-deterministic expressions") {
1376+
val nonDeterministicExpressions = Seq(new Rand())
1377+
val tolerantPlan =
1378+
SupportsNonDeterministicExpressionTestOperator(
1379+
nonDeterministicExpressions, allowNonDeterministicExpression = true)
1380+
assertAnalysisSuccess(tolerantPlan)
1381+
1382+
val intolerantPlan =
1383+
SupportsNonDeterministicExpressionTestOperator(
1384+
nonDeterministicExpressions, allowNonDeterministicExpression = false)
1385+
assertAnalysisError(
1386+
intolerantPlan,
1387+
"INVALID_NON_DETERMINISTIC_EXPRESSIONS" :: Nil
1388+
)
1389+
}
13671390
}

0 commit comments

Comments
 (0)