Skip to content

Commit

Permalink
[SPARK-40940] Remove Multi-stateful operator checkers for streaming q…
Browse files Browse the repository at this point in the history
…ueries

### What changes were proposed in this pull request?

As a followup to [SPARK-40925], [github PR](apache#38405), Remove corresponding checks in UnsupportedOperationChecker so that customers don't have to explicitly add new conf withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") to use the new multi-stateful operators. In other words we are enabling multi-stateful operators by default.

As a side effect, the API of `checkStreamingQueryGlobalWatermarkLimit(LogicalPlan, OutputMode)` is also changed to `checkStreamingQueryGlobalWatermarkLimit(LogicalPlan)`

New tests are added to `MultiStatefulOperatorsSuite.scala`, but I could also add equivalent ones to `UnsupportedOperationsSuite.scala` if requested.

### Why are the changes needed?

To enable new multiple-stateful operators by default. Right now users need to set SQL conf `unsupportedOperationCheck` to false explicitly, which also disables many other useful checks.

### Does this PR introduce _any_ user-facing change?

No. All current running queries won't be impacted. But new queries could use chained stateful operators.

### How was this patch tested?

Unit Tests.

Closes apache#38503 from WweiL/SPARK-40940-multi-state-checkers.

Authored-by: Wei Liu <wei.liu@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
  • Loading branch information
WweiL authored and beliefer committed Dec 18, 2022
1 parent a6d4319 commit 12fe244
Show file tree
Hide file tree
Showing 3 changed files with 705 additions and 498 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, CurrentDate, CurrentTimestampLike, GroupingSets, LocalTimestamp, MonotonicallyIncreasingID, SessionWindow}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BinaryComparison, CurrentDate, CurrentTimestampLike, Expression, GreaterThan, GreaterThanOrEqual, GroupingSets, LessThan, LessThanOrEqual, LocalTimestamp, MonotonicallyIncreasingID, SessionWindow}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode}

/**
* Analyzes the presence of unsupported operations in a logical plan.
Expand All @@ -42,40 +43,97 @@ object UnsupportedOperationChecker extends Logging {
}

/**
* Checks for possible correctness issue in chained stateful operators. The behavior is
* controlled by SQL config `spark.sql.streaming.statefulOperator.checkCorrectness.enabled`.
* Once it is enabled, an analysis exception will be thrown. Otherwise, Spark will just
* print a warning message.
* Checks if the expression has a event time column
* @param exp the expression to be checked
* @return true if it is a event time column.
*/
def checkStreamingQueryGlobalWatermarkLimit(
plan: LogicalPlan,
outputMode: OutputMode): Unit = {
def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p match {
case s: Aggregate
if s.isStreaming && outputMode == InternalOutputModes.Append => true
case Join(left, right, joinType, _, _)
if left.isStreaming && right.isStreaming && joinType != Inner => true
case f: FlatMapGroupsWithState
if f.isStreaming && f.outputMode == OutputMode.Append() => true
case _ => false
private def hasEventTimeCol(exp: Expression): Boolean = exp.exists {
case a: AttributeReference => a.metadata.contains(EventTimeWatermark.delayKey)
case _ => false
}

/**
* Checks if the expression contains a range comparison, in which
* either side of the comparison is an event-time column. This is used for checking
* stream-stream time interval join.
* @param e the expression to be checked
* @return true if there is a time-interval join.
*/
private def hasRangeExprAgainstEventTimeCol(e: Expression): Boolean = {
def hasEventTimeColBinaryComp(neq: Expression): Boolean = {
val exp = neq.asInstanceOf[BinaryComparison]
hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
}

def isStatefulOperation(p: LogicalPlan): Boolean = p match {
case s: Aggregate if s.isStreaming => true
case _ @ Join(left, right, _, _, _) if left.isStreaming && right.isStreaming => true
case f: FlatMapGroupsWithState if f.isStreaming => true
case f: FlatMapGroupsInPandasWithState if f.isStreaming => true
case d: Deduplicate if d.isStreaming => true
e.exists {
case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan) =>
hasEventTimeColBinaryComp(neq)
case _ => false
}
}

val failWhenDetected = SQLConf.get.statefulOperatorCorrectnessCheckEnabled
/**
* This method, combined with isStatefulOperation, determines all disallowed
* behaviors in multiple stateful operators.
* Concretely, All conditions defined below cannot be followed by any streaming stateful
* operator as defined in isStatefulOperation.
* @param p logical plan to be checked
* @param outputMode query output mode
* @return true if it is not allowed when followed by any streaming stateful
* operator as defined in isStatefulOperation.
*/
private def ifCannotBeFollowedByStatefulOperation(
p: LogicalPlan, outputMode: OutputMode): Boolean = p match {
case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) =>
left.isStreaming && right.isStreaming &&
otherCondition.isDefined && hasRangeExprAgainstEventTimeCol(otherCondition.get)
// FlatMapGroupsWithState configured with event time
case f @ FlatMapGroupsWithState(_, _, _, _, _, _, _, _, _, timeout, _, _, _, _, _, _)
if f.isStreaming && timeout == GroupStateTimeout.EventTimeTimeout => true
case p @ FlatMapGroupsInPandasWithState(_, _, _, _, _, timeout, _)
if p.isStreaming && timeout == GroupStateTimeout.EventTimeTimeout => true
case a: Aggregate if a.isStreaming && outputMode != InternalOutputModes.Append => true
// Since the Distinct node will be replaced to Aggregate in the optimizer rule
// [[ReplaceDistinctWithAggregate]], here we also need to check all Distinct node by
// assuming it as Aggregate.
case d @ Distinct(_: LogicalPlan) if d.isStreaming
&& outputMode != InternalOutputModes.Append => true
case _ => false
}

/**
* This method is only used with ifCannotBeFollowedByStatefulOperation.
* Here we list up stateful operators but there is an exception for Deduplicate:
* it is only counted here when it has an event time column.
* @param p the logical plan to be checked
* @return true if there is a streaming stateful operation
*/
private def isStatefulOperation(p: LogicalPlan): Boolean = p match {
case s: Aggregate if s.isStreaming => true
// Since the Distinct node will be replaced to Aggregate in the optimizer rule
// [[ReplaceDistinctWithAggregate]], here we also need to check all Distinct node by
// assuming it as Aggregate.
case d @ Distinct(_: LogicalPlan) if d.isStreaming => true
case _ @ Join(left, right, _, _, _) if left.isStreaming && right.isStreaming => true
case f: FlatMapGroupsWithState if f.isStreaming => true
case f: FlatMapGroupsInPandasWithState if f.isStreaming => true
case d: Deduplicate if d.isStreaming && d.keys.exists(hasEventTimeCol) => true
case _ => false
}

/**
* Checks for possible correctness issue in chained stateful operators. The behavior is
* controlled by SQL config `spark.sql.streaming.statefulOperator.checkCorrectness.enabled`.
* Once it is enabled, an analysis exception will be thrown. Otherwise, Spark will just
* print a warning message.
*/
def checkStreamingQueryGlobalWatermarkLimit(plan: LogicalPlan, outputMode: OutputMode): Unit = {
val failWhenDetected = SQLConf.get.statefulOperatorCorrectnessCheckEnabled
try {
plan.foreach { subPlan =>
if (isStatefulOperation(subPlan)) {
subPlan.find { p =>
(p ne subPlan) && isStatefulOperationPossiblyEmitLateRows(p)
(p ne subPlan) && ifCannotBeFollowedByStatefulOperation(p, outputMode)
}.foreach { _ =>
val errorMsg = "Detected pattern of possible 'correctness' issue " +
"due to global watermark. " +
Expand Down Expand Up @@ -154,15 +212,7 @@ object UnsupportedOperationChecker extends Logging {
"DataFrames/Datasets")(plan)
}

// Disallow multiple streaming aggregations
val aggregates = collectStreamingAggregates(plan)

if (aggregates.size > 1) {
throwError(
"Multiple streaming aggregations are not supported with " +
"streaming DataFrames/Datasets")(plan)
}

// Disallow some output mode
outputMode match {
case InternalOutputModes.Append if aggregates.nonEmpty =>
Expand Down Expand Up @@ -266,12 +316,8 @@ object UnsupportedOperationChecker extends Logging {
" DataFrame/Dataset")
}
if (m.isMapGroupsWithState) { // check mapGroupsWithState
// allowed only in update query output mode and without aggregation
if (aggsInQuery.nonEmpty) {
throwError(
"mapGroupsWithState is not supported with aggregation " +
"on a streaming DataFrame/Dataset")
} else if (outputMode != InternalOutputModes.Update) {
// allowed only in update query output mode
if (outputMode != InternalOutputModes.Update) {
throwError(
"mapGroupsWithState is not supported with " +
s"$outputMode output mode on a streaming DataFrame/Dataset")
Expand All @@ -294,16 +340,11 @@ object UnsupportedOperationChecker extends Logging {
case _ =>
}
} else {
// flatMapGroupsWithState with aggregation: update operation mode not allowed, and
// *groupsWithState after aggregation not allowed
// flatMapGroupsWithState with aggregation: update operation mode not allowed
if (m.outputMode == InternalOutputModes.Update) {
throwError(
"flatMapGroupsWithState in update mode is not supported with " +
"aggregation on a streaming DataFrame/Dataset")
} else if (collectStreamingAggregates(m).nonEmpty) {
throwError(
"flatMapGroupsWithState in append mode is not supported after " +
"aggregation on a streaming DataFrame/Dataset")
}
}
}
Expand Down Expand Up @@ -373,10 +414,6 @@ object UnsupportedOperationChecker extends Logging {
}
}

case d: Deduplicate if collectStreamingAggregates(d).nonEmpty =>
throwError("dropDuplicates is not supported after aggregation on a " +
"streaming DataFrame/Dataset")

case j @ Join(left, right, joinType, condition, _) =>
if (left.isStreaming && right.isStreaming && outputMode != InternalOutputModes.Append) {
throwError("Join between two streaming DataFrames/Datasets is not supported" +
Expand Down
Loading

0 comments on commit 12fe244

Please sign in to comment.