Skip to content

Commit 7ae5eeb

Browse files
committed
fix concurrent promise issue
1 parent db71de8 commit 7ae5eeb

File tree

2 files changed

+16
-15
lines changed

2 files changed

+16
-15
lines changed

core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,7 @@ private[spark] class JobWaiter[T](
5151
* all the tasks belonging to this job, it will fail this job with a SparkException.
5252
*/
5353
def cancel(reason: Option[String] = None, quiet: Boolean = false): Unit = {
54-
if (quiet) {
55-
dagScheduler.cancelJob(jobId, reason, quiet)
56-
} else {
57-
dagScheduler.cancelJob(jobId, None, quiet)
58-
}
54+
dagScheduler.cancelJob(jobId, reason, quiet)
5955
}
6056

6157
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,13 @@ trait ShuffleExchangeLike extends Exchange {
7878
private[sql] // Exposed for testing
7979
val futureAction = new AtomicReference[Option[FutureAction[MapOutputStatistics]]](None)
8080

81+
@volatile
8182
@transient
82-
private val isCancelled: AtomicReference[Boolean] = new AtomicReference(false)
83+
private var isCancelled: Boolean = false
84+
85+
@volatile
86+
@transient
87+
private var quietly: Boolean = false
8388

8489
@transient
8590
private lazy val triggerFuture: java.util.concurrent.Future[Any] = {
@@ -90,7 +95,7 @@ trait ShuffleExchangeLike extends Exchange {
9095
executeQuery(null)
9196
// Submit shuffle job if not cancelled.
9297
this.synchronized {
93-
if (isCancelled.get) {
98+
if (isCancelled && !quietly) {
9499
promise.tryFailure(new SparkException("Shuffle cancelled."))
95100
} else {
96101
val shuffleJob = RDDOperationScope.withScope(sparkContext, nodeName, false, true) {
@@ -125,15 +130,15 @@ trait ShuffleExchangeLike extends Exchange {
125130
* Cancels the shuffle job with an optional reason.
126131
*/
127132
final def cancelShuffleJob(reason: Option[String], quiet: Boolean): Unit = this.synchronized {
128-
if (!isCancelled.get) {
129-
if (quiet) {
130-
// tryFailure before isCancelled.set(true) to avoid concurrent issue with triggerFuture
131-
promise.tryFailure(new SparkAQEStageCancelException())
133+
this.synchronized {
134+
if (!isCancelled) {
135+
isCancelled = true
136+
if (quiet) {
137+
quietly = quiet
138+
promise.tryFailure(new SparkAQEStageCancelException)
139+
}
140+
futureAction.get().foreach(_.cancel(reason, quiet))
132141
}
133-
isCancelled.set(true)
134-
// this may take no effect, if the job has already been submitted
135-
// but futureAction is not set yet, it's a best effort to cancel it
136-
futureAction.get().foreach(_.cancel(reason, quiet))
137142
}
138143
}
139144

0 commit comments

Comments
 (0)