Skip to content

Commit d50f6e6

Browse files
viiryacloud-fan
authored andcommitted
[SPARK-25903][CORE] TimerTask should be synchronized on ContextBarrierState
### What changes were proposed in this pull request? BarrierCoordinator sets up a TimerTask for a round of global sync. Currently the run method is synchronized on the created TimerTask. But to be synchronized with handleRequest, it should be synchronized on the ContextBarrierState object, not TimerTask object. ### Why are the changes needed? ContextBarrierState.handleRequest and TimerTask.run both access the internal status of a ContextBarrierState object. If TimerTask doesn't be synchronized on the same ContextBarrierState object, when the timer task is triggered, handleRequest still accepts new request and modify requesters field in the ContextBarrierState object. It makes the behavior inconsistency. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Test locally Closes #25897 from viirya/SPARK-25903. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 0c40b94 commit d50f6e6

File tree

1 file changed

+3
-3
lines changed

1 file changed

+3
-3
lines changed

core/src/main/scala/org/apache/spark/BarrierCoordinator.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,9 +107,9 @@ private[spark] class BarrierCoordinator(
107107
private var timerTask: TimerTask = null
108108

109109
// Init a TimerTask for a barrier() call.
110-
private def initTimerTask(): Unit = {
110+
private def initTimerTask(state: ContextBarrierState): Unit = {
111111
timerTask = new TimerTask {
112-
override def run(): Unit = synchronized {
112+
override def run(): Unit = state.synchronized {
113113
// Timeout current barrier() call, fail all the sync requests.
114114
requesters.foreach(_.sendFailure(new SparkException("The coordinator didn't get all " +
115115
s"barrier sync requests for barrier epoch $barrierEpoch from $barrierId within " +
@@ -148,7 +148,7 @@ private[spark] class BarrierCoordinator(
148148
// If this is the first sync message received for a barrier() call, start timer to ensure
149149
// we may timeout for the sync.
150150
if (requesters.isEmpty) {
151-
initTimerTask()
151+
initTimerTask(this)
152152
timer.schedule(timerTask, timeoutInSecs * 1000)
153153
}
154154
// Add the requester to array of RPCCallContexts pending for reply.

0 commit comments

Comments
 (0)