Skip to content

Commit 09e5d15

Browse files
author
Marcelo Vanzin
committed
[SPARK-24552][core] Correctly identify tasks in output commit coordinator.
When an output stage is retried, it's possible that tasks from the previous attempt are still running. In that case, there would be a new task for the same partition in the new attempt, and the coordinator would allow both tasks to commit their output since it did not keep track of stage attempts. The change adds more information to the stage state tracked by the coordinator, so that only one task if allowed to commit the output in the above case. This also removes some code added in SPARK-18113 that allowed for duplicate commit requests; with the RPC code used in Spark 2, that situation cannot happen, so there is no need to handle it.
1 parent 495d8cf commit 09e5d15

File tree

4 files changed

+101
-90
lines changed

4 files changed

+101
-90
lines changed

core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,9 @@ object SparkHadoopMapRedUtil extends Logging {
6969

7070
if (shouldCoordinateWithDriver) {
7171
val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
72-
val taskAttemptNumber = TaskContext.get().attemptNumber()
73-
val stageId = TaskContext.get().stageId()
74-
val canCommit = outputCommitCoordinator.canCommit(stageId, splitId, taskAttemptNumber)
72+
val ctx = TaskContext.get()
73+
val canCommit = outputCommitCoordinator.canCommit(ctx.stageId(), ctx.stageAttemptNumber(),
74+
splitId, ctx.attemptNumber())
7575

7676
if (canCommit) {
7777
performCommit()
@@ -81,7 +81,7 @@ object SparkHadoopMapRedUtil extends Logging {
8181
logInfo(message)
8282
// We need to abort the task so that the driver can reschedule new attempts, if necessary
8383
committer.abortTask(mrTaskContext)
84-
throw new CommitDeniedException(message, stageId, splitId, taskAttemptNumber)
84+
throw new CommitDeniedException(message, ctx.stageId(), splitId, ctx.attemptNumber())
8585
}
8686
} else {
8787
// Speculation is disabled or a user has chosen to manually bypass the commit coordination

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1171,6 +1171,7 @@ class DAGScheduler(
11711171

11721172
outputCommitCoordinator.taskCompleted(
11731173
stageId,
1174+
task.stageAttemptId,
11741175
task.partitionId,
11751176
event.taskInfo.attemptNumber, // this is a task attempt number
11761177
event.reason)

core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala

Lines changed: 58 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,11 @@ import org.apache.spark.util.{RpcUtils, ThreadUtils}
2727
private sealed trait OutputCommitCoordinationMessage extends Serializable
2828

2929
private case object StopCoordinator extends OutputCommitCoordinationMessage
30-
private case class AskPermissionToCommitOutput(stage: Int, partition: Int, attemptNumber: Int)
30+
private case class AskPermissionToCommitOutput(
31+
stage: Int,
32+
stageAttempt: Int,
33+
partition: Int,
34+
attemptNumber: Int)
3135

3236
/**
3337
* Authority that decides whether tasks can commit output to HDFS. Uses a "first committer wins"
@@ -45,13 +49,15 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
4549
// Initialized by SparkEnv
4650
var coordinatorRef: Option[RpcEndpointRef] = None
4751

48-
private type StageId = Int
49-
private type PartitionId = Int
50-
private type TaskAttemptNumber = Int
51-
private val NO_AUTHORIZED_COMMITTER: TaskAttemptNumber = -1
52+
// Class used to identify a committer. The task ID for a committer is implicitly defined by
53+
// the partition being processed, but the coordinator need to keep track of both the stage
54+
// attempt and the task attempt, because in some situations the same task may be running
55+
// concurrently in two different attempts of the same stage.
56+
private case class TaskIdentifier(stageAttempt: Int, taskAttempt: Int)
57+
5258
private case class StageState(numPartitions: Int) {
53-
val authorizedCommitters = Array.fill[TaskAttemptNumber](numPartitions)(NO_AUTHORIZED_COMMITTER)
54-
val failures = mutable.Map[PartitionId, mutable.Set[TaskAttemptNumber]]()
59+
val authorizedCommitters = Array.fill[TaskIdentifier](numPartitions)(null)
60+
val failures = mutable.Map[Int, mutable.Set[TaskIdentifier]]()
5561
}
5662

5763
/**
@@ -64,7 +70,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
6470
*
6571
* Access to this map should be guarded by synchronizing on the OutputCommitCoordinator instance.
6672
*/
67-
private val stageStates = mutable.Map[StageId, StageState]()
73+
private val stageStates = mutable.Map[Int, StageState]()
6874

6975
/**
7076
* Returns whether the OutputCommitCoordinator's internal data structures are all empty.
@@ -87,10 +93,11 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
8793
* @return true if this task is authorized to commit, false otherwise
8894
*/
8995
def canCommit(
90-
stage: StageId,
91-
partition: PartitionId,
92-
attemptNumber: TaskAttemptNumber): Boolean = {
93-
val msg = AskPermissionToCommitOutput(stage, partition, attemptNumber)
96+
stage: Int,
97+
stageAttempt: Int,
98+
partition: Int,
99+
attemptNumber: Int): Boolean = {
100+
val msg = AskPermissionToCommitOutput(stage, stageAttempt, partition, attemptNumber)
94101
coordinatorRef match {
95102
case Some(endpointRef) =>
96103
ThreadUtils.awaitResult(endpointRef.ask[Boolean](msg),
@@ -109,20 +116,21 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
109116
* @param maxPartitionId the maximum partition id that could appear in this stage's tasks (i.e.
110117
* the maximum possible value of `context.partitionId`).
111118
*/
112-
private[scheduler] def stageStart(stage: StageId, maxPartitionId: Int): Unit = synchronized {
119+
private[scheduler] def stageStart(stage: Int, maxPartitionId: Int): Unit = synchronized {
113120
stageStates(stage) = new StageState(maxPartitionId + 1)
114121
}
115122

116123
// Called by DAGScheduler
117-
private[scheduler] def stageEnd(stage: StageId): Unit = synchronized {
124+
private[scheduler] def stageEnd(stage: Int): Unit = synchronized {
118125
stageStates.remove(stage)
119126
}
120127

121128
// Called by DAGScheduler
122129
private[scheduler] def taskCompleted(
123-
stage: StageId,
124-
partition: PartitionId,
125-
attemptNumber: TaskAttemptNumber,
130+
stage: Int,
131+
stageAttempt: Int,
132+
partition: Int,
133+
attemptNumber: Int,
126134
reason: TaskEndReason): Unit = synchronized {
127135
val stageState = stageStates.getOrElse(stage, {
128136
logDebug(s"Ignoring task completion for completed stage")
@@ -132,15 +140,16 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
132140
case Success =>
133141
// The task output has been committed successfully
134142
case denied: TaskCommitDenied =>
135-
logInfo(s"Task was denied committing, stage: $stage, partition: $partition, " +
136-
s"attempt: $attemptNumber")
143+
logInfo(s"Task was denied committing, stage: $stage / $stageAttempt, " +
144+
s"partition: $partition, attempt: $attemptNumber")
137145
case otherReason =>
138146
// Mark the attempt as failed to blacklist from future commit protocol
139-
stageState.failures.getOrElseUpdate(partition, mutable.Set()) += attemptNumber
140-
if (stageState.authorizedCommitters(partition) == attemptNumber) {
147+
val taskId = TaskIdentifier(stageAttempt, attemptNumber)
148+
stageState.failures.getOrElseUpdate(partition, mutable.Set()) += taskId
149+
if (stageState.authorizedCommitters(partition) == taskId) {
141150
logDebug(s"Authorized committer (attemptNumber=$attemptNumber, stage=$stage, " +
142151
s"partition=$partition) failed; clearing lock")
143-
stageState.authorizedCommitters(partition) = NO_AUTHORIZED_COMMITTER
152+
stageState.authorizedCommitters(partition) = null
144153
}
145154
}
146155
}
@@ -155,47 +164,41 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
155164

156165
// Marked private[scheduler] instead of private so this can be mocked in tests
157166
private[scheduler] def handleAskPermissionToCommit(
158-
stage: StageId,
159-
partition: PartitionId,
160-
attemptNumber: TaskAttemptNumber): Boolean = synchronized {
167+
stage: Int,
168+
stageAttempt: Int,
169+
partition: Int,
170+
attemptNumber: Int): Boolean = synchronized {
161171
stageStates.get(stage) match {
162-
case Some(state) if attemptFailed(state, partition, attemptNumber) =>
163-
logInfo(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage," +
164-
s" partition=$partition as task attempt $attemptNumber has already failed.")
172+
case Some(state) if attemptFailed(state, stageAttempt, partition, attemptNumber) =>
173+
logInfo(s"Commit denied for stage=$stage/$attemptNumber, partition=$partition: " +
174+
s"task attempt $attemptNumber already marked as failed.")
165175
false
166176
case Some(state) =>
167-
state.authorizedCommitters(partition) match {
168-
case NO_AUTHORIZED_COMMITTER =>
169-
logDebug(s"Authorizing attemptNumber=$attemptNumber to commit for stage=$stage, " +
170-
s"partition=$partition")
171-
state.authorizedCommitters(partition) = attemptNumber
172-
true
173-
case existingCommitter =>
174-
// Coordinator should be idempotent when receiving AskPermissionToCommit.
175-
if (existingCommitter == attemptNumber) {
176-
logWarning(s"Authorizing duplicate request to commit for " +
177-
s"attemptNumber=$attemptNumber to commit for stage=$stage," +
178-
s" partition=$partition; existingCommitter = $existingCommitter." +
179-
s" This can indicate dropped network traffic.")
180-
true
181-
} else {
182-
logDebug(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage, " +
183-
s"partition=$partition; existingCommitter = $existingCommitter")
184-
false
185-
}
177+
val existing = state.authorizedCommitters(partition)
178+
if (existing == null) {
179+
logDebug(s"Commit allowed for stage=$stage/$attemptNumber, partition=$partition: " +
180+
s"task attempt $attemptNumber")
181+
state.authorizedCommitters(partition) = TaskIdentifier(stageAttempt, attemptNumber)
182+
true
183+
} else {
184+
logDebug(s"Commit denied for stage=$stage/$attemptNumber, partition=$partition: " +
185+
s"already committed by $existing")
186+
false
186187
}
187188
case None =>
188-
logDebug(s"Stage $stage has completed, so not allowing" +
189-
s" attempt number $attemptNumber of partition $partition to commit")
189+
logDebug(s"Commit denied for stage=$stage/$attemptNumber, partition=$partition: " +
190+
"stage already marked as completed.")
190191
false
191192
}
192193
}
193194

194195
private def attemptFailed(
195196
stageState: StageState,
196-
partition: PartitionId,
197-
attempt: TaskAttemptNumber): Boolean = synchronized {
198-
stageState.failures.get(partition).exists(_.contains(attempt))
197+
stageAttempt: Int,
198+
partition: Int,
199+
attempt: Int): Boolean = synchronized {
200+
val failInfo = TaskIdentifier(stageAttempt, attempt)
201+
stageState.failures.get(partition).exists(_.contains(failInfo))
199202
}
200203
}
201204

@@ -215,9 +218,10 @@ private[spark] object OutputCommitCoordinator {
215218
}
216219

217220
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
218-
case AskPermissionToCommitOutput(stage, partition, attemptNumber) =>
221+
case AskPermissionToCommitOutput(stage, stageAttempt, partition, attemptNumber) =>
219222
context.reply(
220-
outputCommitCoordinator.handleAskPermissionToCommit(stage, partition, attemptNumber))
223+
outputCommitCoordinator.handleAskPermissionToCommit(stage, stageAttempt, partition,
224+
attemptNumber))
221225
}
222226
}
223227
}

core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala

Lines changed: 38 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
153153
test("Job should not complete if all commits are denied") {
154154
// Create a mock OutputCommitCoordinator that denies all attempts to commit
155155
doReturn(false).when(outputCommitCoordinator).handleAskPermissionToCommit(
156-
Matchers.any(), Matchers.any(), Matchers.any())
156+
Matchers.any(), Matchers.any(), Matchers.any(), Matchers.any())
157157
val rdd: RDD[Int] = sc.parallelize(Seq(1), 1)
158158
def resultHandler(x: Int, y: Unit): Unit = {}
159159
val futureAction: SimpleFutureAction[Unit] = sc.submitJob[Int, Unit, Unit](rdd,
@@ -169,45 +169,61 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
169169

170170
test("Only authorized committer failures can clear the authorized committer lock (SPARK-6614)") {
171171
val stage: Int = 1
172+
val stageAttempt: Int = 1
172173
val partition: Int = 2
173174
val authorizedCommitter: Int = 3
174175
val nonAuthorizedCommitter: Int = 100
175176
outputCommitCoordinator.stageStart(stage, maxPartitionId = 2)
176177

177-
assert(outputCommitCoordinator.canCommit(stage, partition, authorizedCommitter))
178-
assert(!outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter))
178+
assert(outputCommitCoordinator.canCommit(stage, stageAttempt, partition, authorizedCommitter))
179+
assert(!outputCommitCoordinator.canCommit(stage, stageAttempt, partition,
180+
nonAuthorizedCommitter))
179181
// The non-authorized committer fails
180-
outputCommitCoordinator.taskCompleted(
181-
stage, partition, attemptNumber = nonAuthorizedCommitter, reason = TaskKilled("test"))
182+
outputCommitCoordinator.taskCompleted(stage, stageAttempt, partition,
183+
attemptNumber = nonAuthorizedCommitter, reason = TaskKilled("test"))
182184
// New tasks should still not be able to commit because the authorized committer has not failed
183-
assert(
184-
!outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 1))
185+
assert(!outputCommitCoordinator.canCommit(stage, stageAttempt, partition,
186+
nonAuthorizedCommitter + 1))
185187
// The authorized committer now fails, clearing the lock
186-
outputCommitCoordinator.taskCompleted(
187-
stage, partition, attemptNumber = authorizedCommitter, reason = TaskKilled("test"))
188+
outputCommitCoordinator.taskCompleted(stage, stageAttempt, partition,
189+
attemptNumber = authorizedCommitter, reason = TaskKilled("test"))
188190
// A new task should now be allowed to become the authorized committer
189-
assert(
190-
outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 2))
191+
assert(outputCommitCoordinator.canCommit(stage, stageAttempt, partition,
192+
nonAuthorizedCommitter + 2))
191193
// There can only be one authorized committer
192-
assert(
193-
!outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 3))
194-
}
195-
196-
test("Duplicate calls to canCommit from the authorized committer gets idempotent responses.") {
197-
val rdd = sc.parallelize(Seq(1), 1)
198-
sc.runJob(rdd, OutputCommitFunctions(tempDir.getAbsolutePath).callCanCommitMultipleTimes _,
199-
0 until rdd.partitions.size)
194+
assert(!outputCommitCoordinator.canCommit(stage, stageAttempt, partition,
195+
nonAuthorizedCommitter + 3))
200196
}
201197

202198
test("SPARK-19631: Do not allow failed attempts to be authorized for committing") {
203199
val stage: Int = 1
200+
val stageAttempt: Int = 1
204201
val partition: Int = 1
205202
val failedAttempt: Int = 0
206203
outputCommitCoordinator.stageStart(stage, maxPartitionId = 1)
207-
outputCommitCoordinator.taskCompleted(stage, partition, attemptNumber = failedAttempt,
204+
outputCommitCoordinator.taskCompleted(stage, stageAttempt, partition,
205+
attemptNumber = failedAttempt,
208206
reason = ExecutorLostFailure("0", exitCausedByApp = true, None))
209-
assert(!outputCommitCoordinator.canCommit(stage, partition, failedAttempt))
210-
assert(outputCommitCoordinator.canCommit(stage, partition, failedAttempt + 1))
207+
assert(!outputCommitCoordinator.canCommit(stage, stageAttempt, partition, failedAttempt))
208+
assert(outputCommitCoordinator.canCommit(stage, stageAttempt, partition, failedAttempt + 1))
209+
}
210+
211+
test("SPARK-24552: Differentiate tasks from different stage attempts") {
212+
var stage = 1
213+
val taskAttempt = 1
214+
val partition = 1
215+
216+
outputCommitCoordinator.stageStart(stage, maxPartitionId = 1)
217+
assert(outputCommitCoordinator.canCommit(stage, 1, partition, taskAttempt))
218+
assert(!outputCommitCoordinator.canCommit(stage, 2, partition, taskAttempt))
219+
220+
// Fail the task in the first attempt, the task in the second attempt should succeed.
221+
stage += 1
222+
outputCommitCoordinator.stageStart(stage, maxPartitionId = 1)
223+
outputCommitCoordinator.taskCompleted(stage, 1, partition, taskAttempt,
224+
ExecutorLostFailure("0", exitCausedByApp = true, None))
225+
assert(!outputCommitCoordinator.canCommit(stage, 1, partition, taskAttempt))
226+
assert(outputCommitCoordinator.canCommit(stage, 2, partition, taskAttempt))
211227
}
212228
}
213229

@@ -243,16 +259,6 @@ private case class OutputCommitFunctions(tempDirPath: String) {
243259
if (ctx.attemptNumber == 0) failingOutputCommitter else successfulOutputCommitter)
244260
}
245261

246-
// Receiver should be idempotent for AskPermissionToCommitOutput
247-
def callCanCommitMultipleTimes(iter: Iterator[Int]): Unit = {
248-
val ctx = TaskContext.get()
249-
val canCommit1 = SparkEnv.get.outputCommitCoordinator
250-
.canCommit(ctx.stageId(), ctx.partitionId(), ctx.attemptNumber())
251-
val canCommit2 = SparkEnv.get.outputCommitCoordinator
252-
.canCommit(ctx.stageId(), ctx.partitionId(), ctx.attemptNumber())
253-
assert(canCommit1 && canCommit2)
254-
}
255-
256262
private def runCommitWithProvidedCommitter(
257263
ctx: TaskContext,
258264
iter: Iterator[Int],

0 commit comments

Comments
 (0)