Skip to content

[SPARK-27111][SS]Fix a race that a continuous query may fail with InterruptedException #24034

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -268,13 +268,29 @@ class ContinuousExecution(
logInfo(s"Query $id ignoring exception from reconfiguring: $t")
// interrupted by reconfiguration - swallow exception so we can restart the query
} finally {
epochEndpoint.askSync[Unit](StopContinuousExecutionWrites)
SparkEnv.get.rpcEnv.stop(epochEndpoint)

epochUpdateThread.interrupt()
epochUpdateThread.join()

sparkSession.sparkContext.cancelJobGroup(runId.toString)
// The above execution may finish before getting interrupted, for example, a Spark job having
// 0 partitions will complete immediately. Then the interrupted status will sneak here.
//
// To handle this case, we do the two things here:
//
// 1. Clean up the resources in `queryExecutionThread.runUninterruptibly`. This may increase
// the waiting time of `stop` but should be minor because the operations here are very fast
// (just sending an RPC message in the same process and stopping a very simple thread).
// 2. Clear the interrupted status at the end so that it won't impact the `runContinuous`
// call. We may clear the interrupted status set by `stop`, but it doesn't affect the query
// termination because `runActivatedStream` will check `state` and exit accordingly.
queryExecutionThread.runUninterruptibly {
try {
epochEndpoint.askSync[Unit](StopContinuousExecutionWrites)
} finally {
SparkEnv.get.rpcEnv.stop(epochEndpoint)
epochUpdateThread.interrupt()
epochUpdateThread.join()
// The following line must be the last line because it may fail if SparkContext is stopped
sparkSession.sparkContext.cancelJobGroup(runId.toString)
}
}
Thread.interrupted()
}
}

Expand Down