Skip to content

Commit 330f726

Browse files
jinxingcmonkey
jinxing
authored andcommitted
[SPARK-18113] Use ask to replace askWithRetry in canCommit and make receiver idempotent.
## What changes were proposed in this pull request? Method canCommit sends AskPermissionToCommitOutput using askWithRetry. If timeout, it will send again. Thus AskPermissionToCommitOutput can be received multi times. Method canCommit should return the same value when called by the same attempt multi times. In implementation before this fix, method handleAskPermissionToCommit just check if there is committer already registered, which is not enough. When worker retries AskPermissionToCommitOutput it will get CommitDeniedException, then the task will fail with reason TaskCommitDenied, which is not regarded as a task failure(SPARK-11178), so TaskScheduler will schedule this task infinitely. In this fix, use `ask` to replace `askWithRetry` in `canCommit` and make receiver idempotent. ## How was this patch tested? Added a new unit test to OutputCommitCoordinatorSuite. Author: jinxing <jinxing@meituan.com> Closes apache#16503 from jinxing64/SPARK-18113.
1 parent 5277429 commit 330f726

File tree

2 files changed

+31
-4
lines changed

2 files changed

+31
-4
lines changed

core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import scala.collection.mutable
2222
import org.apache.spark._
2323
import org.apache.spark.internal.Logging
2424
import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv}
25+
import org.apache.spark.util.{RpcUtils, ThreadUtils}
2526

2627
private sealed trait OutputCommitCoordinationMessage extends Serializable
2728

@@ -88,7 +89,8 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
8889
val msg = AskPermissionToCommitOutput(stage, partition, attemptNumber)
8990
coordinatorRef match {
9091
case Some(endpointRef) =>
91-
endpointRef.askWithRetry[Boolean](msg)
92+
ThreadUtils.awaitResult(endpointRef.ask[Boolean](msg),
93+
RpcUtils.askRpcTimeout(conf).duration)
9294
case None =>
9395
logError(
9496
"canCommit called after coordinator was stopped (is SparkEnv shutdown in progress)?")
@@ -165,9 +167,18 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
165167
authorizedCommitters(partition) = attemptNumber
166168
true
167169
case existingCommitter =>
168-
logDebug(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage, " +
169-
s"partition=$partition; existingCommitter = $existingCommitter")
170-
false
170+
// Coordinator should be idempotent when receiving AskPermissionToCommit.
171+
if (existingCommitter == attemptNumber) {
172+
logWarning(s"Authorizing duplicate request to commit for " +
173+
s"attemptNumber=$attemptNumber to commit for stage=$stage," +
174+
s" partition=$partition; existingCommitter = $existingCommitter." +
175+
s" This can indicate dropped network traffic.")
176+
true
177+
} else {
178+
logDebug(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage, " +
179+
s"partition=$partition; existingCommitter = $existingCommitter")
180+
false
181+
}
171182
}
172183
case None =>
173184
logDebug(s"Stage $stage has completed, so not allowing attempt number $attemptNumber of" +

core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,12 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
189189
assert(
190190
!outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 3))
191191
}
192+
193+
test("Duplicate calls to canCommit from the authorized committer gets idempotent responses.") {
194+
val rdd = sc.parallelize(Seq(1), 1)
195+
sc.runJob(rdd, OutputCommitFunctions(tempDir.getAbsolutePath).callCanCommitMultipleTimes _,
196+
0 until rdd.partitions.size)
197+
}
192198
}
193199

194200
/**
@@ -221,6 +227,16 @@ private case class OutputCommitFunctions(tempDirPath: String) {
221227
if (ctx.attemptNumber == 0) failingOutputCommitter else successfulOutputCommitter)
222228
}
223229

230+
// Receiver should be idempotent for AskPermissionToCommitOutput
231+
def callCanCommitMultipleTimes(iter: Iterator[Int]): Unit = {
232+
val ctx = TaskContext.get()
233+
val canCommit1 = SparkEnv.get.outputCommitCoordinator
234+
.canCommit(ctx.stageId(), ctx.partitionId(), ctx.attemptNumber())
235+
val canCommit2 = SparkEnv.get.outputCommitCoordinator
236+
.canCommit(ctx.stageId(), ctx.partitionId(), ctx.attemptNumber())
237+
assert(canCommit1 && canCommit2)
238+
}
239+
224240
private def runCommitWithProvidedCommitter(
225241
ctx: TaskContext,
226242
iter: Iterator[Int],

0 commit comments

Comments
 (0)