Skip to content

Commit 8654e55

Browse files
Tom GravesGitHub Enterprise
authored andcommitted
Merge pull request apache#139 from abellina/yspark_2_1_1_YSPARK-654_pull_SPARK-18113_can_commit_retries
[SPARK-18113] Use ask to replace askWithRetry in canCommit and make r…
2 parents db0cc99 + 74e2ca7 commit 8654e55

File tree

2 files changed

+31
-4
lines changed

2 files changed

+31
-4
lines changed

core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import scala.collection.mutable
2222
import org.apache.spark._
2323
import org.apache.spark.internal.Logging
2424
import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv}
25+
import org.apache.spark.util.{RpcUtils, ThreadUtils}
2526

2627
private sealed trait OutputCommitCoordinationMessage extends Serializable
2728

@@ -88,7 +89,8 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
8889
val msg = AskPermissionToCommitOutput(stage, partition, attemptNumber)
8990
coordinatorRef match {
9091
case Some(endpointRef) =>
91-
endpointRef.askWithRetry[Boolean](msg)
92+
ThreadUtils.awaitResult(endpointRef.ask[Boolean](msg),
93+
RpcUtils.askRpcTimeout(conf).duration)
9294
case None =>
9395
logError(
9496
"canCommit called after coordinator was stopped (is SparkEnv shutdown in progress)?")
@@ -165,9 +167,18 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
165167
authorizedCommitters(partition) = attemptNumber
166168
true
167169
case existingCommitter =>
168-
logDebug(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage, " +
169-
s"partition=$partition; existingCommitter = $existingCommitter")
170-
false
170+
// Coordinator should be idempotent when receiving AskPermissionToCommit.
171+
if (existingCommitter == attemptNumber) {
172+
logWarning(s"Authorizing duplicate request to commit for " +
173+
s"attemptNumber=$attemptNumber to commit for stage=$stage," +
174+
s" partition=$partition; existingCommitter = $existingCommitter." +
175+
s" This can indicate dropped network traffic.")
176+
true
177+
} else {
178+
logDebug(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage, " +
179+
s"partition=$partition; existingCommitter = $existingCommitter")
180+
false
181+
}
171182
}
172183
case None =>
173184
logDebug(s"Stage $stage has completed, so not allowing attempt number $attemptNumber of" +

core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,12 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
190190
assert(
191191
!outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 3))
192192
}
193+
194+
test("Duplicate calls to canCommit from the authorized committer gets idempotent responses.") {
195+
val rdd = sc.parallelize(Seq(1), 1)
196+
sc.runJob(rdd, OutputCommitFunctions(tempDir.getAbsolutePath).callCanCommitMultipleTimes _,
197+
0 until rdd.partitions.size)
198+
}
193199
}
194200

195201
/**
@@ -222,6 +228,16 @@ private case class OutputCommitFunctions(tempDirPath: String) {
222228
if (ctx.attemptNumber == 0) failingOutputCommitter else successfulOutputCommitter)
223229
}
224230

231+
// Receiver should be idempotent for AskPermissionToCommitOutput
232+
def callCanCommitMultipleTimes(iter: Iterator[Int]): Unit = {
233+
val ctx = TaskContext.get()
234+
val canCommit1 = SparkEnv.get.outputCommitCoordinator
235+
.canCommit(ctx.stageId(), ctx.partitionId(), ctx.attemptNumber())
236+
val canCommit2 = SparkEnv.get.outputCommitCoordinator
237+
.canCommit(ctx.stageId(), ctx.partitionId(), ctx.attemptNumber())
238+
assert(canCommit1 && canCommit2)
239+
}
240+
225241
private def runCommitWithProvidedCommitter(
226242
ctx: TaskContext,
227243
iter: Iterator[Int],

0 commit comments

Comments
 (0)