-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-24589][core] Correctly identify tasks in output commit coordinator. #21577
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
Show all changes
12 commits
Select commit
Hold shift + click to select a range
09e5d15
[SPARK-24552][core] Correctly identify tasks in output commit coordin…
d471b74
Fix sql compilation.
1cde305
Cleanup.
5437d4a
Add stage ID to CommitDeniedException.
a2f4c1b
Fix log message (task attempt -> stage attempt).
37ff307
Revert "Add stage ID to CommitDeniedException."
066dca5
Reuse the stage state when it's immediately re-attempted.
adb0d18
Prettier logs.
535fbd4
Remove unneeded import.
0e91f1b
Another unused import.
5ece2f1
Add another check in new test.
264c533
Add one more test just in case.
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,7 +27,11 @@ import org.apache.spark.util.{RpcUtils, ThreadUtils} | |
private sealed trait OutputCommitCoordinationMessage extends Serializable | ||
|
||
private case object StopCoordinator extends OutputCommitCoordinationMessage | ||
private case class AskPermissionToCommitOutput(stage: Int, partition: Int, attemptNumber: Int) | ||
private case class AskPermissionToCommitOutput( | ||
stage: Int, | ||
stageAttempt: Int, | ||
partition: Int, | ||
attemptNumber: Int) | ||
|
||
/** | ||
* Authority that decides whether tasks can commit output to HDFS. Uses a "first committer wins" | ||
|
@@ -45,13 +49,15 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) | |
// Initialized by SparkEnv | ||
var coordinatorRef: Option[RpcEndpointRef] = None | ||
|
||
private type StageId = Int | ||
private type PartitionId = Int | ||
private type TaskAttemptNumber = Int | ||
private val NO_AUTHORIZED_COMMITTER: TaskAttemptNumber = -1 | ||
// Class used to identify a committer. The task ID for a committer is implicitly defined by | ||
// the partition being processed, but the coordinator needs to keep track of both the stage | ||
// attempt and the task attempt, because in some situations the same task may be running | ||
// concurrently in two different attempts of the same stage. | ||
private case class TaskIdentifier(stageAttempt: Int, taskAttempt: Int) | ||
|
||
private case class StageState(numPartitions: Int) { | ||
val authorizedCommitters = Array.fill[TaskAttemptNumber](numPartitions)(NO_AUTHORIZED_COMMITTER) | ||
val failures = mutable.Map[PartitionId, mutable.Set[TaskAttemptNumber]]() | ||
val authorizedCommitters = Array.fill[TaskIdentifier](numPartitions)(null) | ||
val failures = mutable.Map[Int, mutable.Set[TaskIdentifier]]() | ||
} | ||
|
||
/** | ||
|
@@ -64,7 +70,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) | |
* | ||
* Access to this map should be guarded by synchronizing on the OutputCommitCoordinator instance. | ||
*/ | ||
private val stageStates = mutable.Map[StageId, StageState]() | ||
private val stageStates = mutable.Map[Int, StageState]() | ||
|
||
/** | ||
* Returns whether the OutputCommitCoordinator's internal data structures are all empty. | ||
|
@@ -87,10 +93,11 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) | |
* @return true if this task is authorized to commit, false otherwise | ||
*/ | ||
def canCommit( | ||
stage: StageId, | ||
partition: PartitionId, | ||
attemptNumber: TaskAttemptNumber): Boolean = { | ||
val msg = AskPermissionToCommitOutput(stage, partition, attemptNumber) | ||
stage: Int, | ||
stageAttempt: Int, | ||
partition: Int, | ||
attemptNumber: Int): Boolean = { | ||
val msg = AskPermissionToCommitOutput(stage, stageAttempt, partition, attemptNumber) | ||
coordinatorRef match { | ||
case Some(endpointRef) => | ||
ThreadUtils.awaitResult(endpointRef.ask[Boolean](msg), | ||
|
@@ -103,26 +110,35 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) | |
} | ||
|
||
/** | ||
* Called by the DAGScheduler when a stage starts. | ||
* Called by the DAGScheduler when a stage starts. Initializes the stage's state if it hasn't | ||
* yet been initialized. | ||
* | ||
* @param stage the stage id. | ||
* @param maxPartitionId the maximum partition id that could appear in this stage's tasks (i.e. | ||
* the maximum possible value of `context.partitionId`). | ||
*/ | ||
private[scheduler] def stageStart(stage: StageId, maxPartitionId: Int): Unit = synchronized { | ||
stageStates(stage) = new StageState(maxPartitionId + 1) | ||
private[scheduler] def stageStart(stage: Int, maxPartitionId: Int): Unit = synchronized { | ||
stageStates.get(stage) match { | ||
case Some(state) => | ||
require(state.authorizedCommitters.length == maxPartitionId + 1) | ||
logInfo(s"Reusing state from previous attempt of stage $stage.") | ||
|
||
case _ => | ||
stageStates(stage) = new StageState(maxPartitionId + 1) | ||
} | ||
} | ||
|
||
// Called by DAGScheduler | ||
private[scheduler] def stageEnd(stage: StageId): Unit = synchronized { | ||
private[scheduler] def stageEnd(stage: Int): Unit = synchronized { | ||
stageStates.remove(stage) | ||
} | ||
|
||
// Called by DAGScheduler | ||
private[scheduler] def taskCompleted( | ||
stage: StageId, | ||
partition: PartitionId, | ||
attemptNumber: TaskAttemptNumber, | ||
stage: Int, | ||
stageAttempt: Int, | ||
partition: Int, | ||
attemptNumber: Int, | ||
reason: TaskEndReason): Unit = synchronized { | ||
val stageState = stageStates.getOrElse(stage, { | ||
logDebug(s"Ignoring task completion for completed stage") | ||
|
@@ -131,16 +147,17 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) | |
reason match { | ||
case Success => | ||
// The task output has been committed successfully | ||
case denied: TaskCommitDenied => | ||
logInfo(s"Task was denied committing, stage: $stage, partition: $partition, " + | ||
s"attempt: $attemptNumber") | ||
case otherReason => | ||
case _: TaskCommitDenied => | ||
logInfo(s"Task was denied committing, stage: $stage.$stageAttempt, " + | ||
s"partition: $partition, attempt: $attemptNumber") | ||
case _ => | ||
// Mark the attempt as failed to blacklist from future commit protocol | ||
stageState.failures.getOrElseUpdate(partition, mutable.Set()) += attemptNumber | ||
if (stageState.authorizedCommitters(partition) == attemptNumber) { | ||
val taskId = TaskIdentifier(stageAttempt, attemptNumber) | ||
stageState.failures.getOrElseUpdate(partition, mutable.Set()) += taskId | ||
if (stageState.authorizedCommitters(partition) == taskId) { | ||
logDebug(s"Authorized committer (attemptNumber=$attemptNumber, stage=$stage, " + | ||
s"partition=$partition) failed; clearing lock") | ||
stageState.authorizedCommitters(partition) = NO_AUTHORIZED_COMMITTER | ||
stageState.authorizedCommitters(partition) = null | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: why not use Option[TaskIdentifier] and None here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Less memory usage, at least. Not sure what advantage using |
||
} | ||
} | ||
} | ||
|
@@ -155,47 +172,41 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) | |
|
||
// Marked private[scheduler] instead of private so this can be mocked in tests | ||
private[scheduler] def handleAskPermissionToCommit( | ||
stage: StageId, | ||
partition: PartitionId, | ||
attemptNumber: TaskAttemptNumber): Boolean = synchronized { | ||
stage: Int, | ||
stageAttempt: Int, | ||
partition: Int, | ||
attemptNumber: Int): Boolean = synchronized { | ||
stageStates.get(stage) match { | ||
case Some(state) if attemptFailed(state, partition, attemptNumber) => | ||
logInfo(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage," + | ||
s" partition=$partition as task attempt $attemptNumber has already failed.") | ||
case Some(state) if attemptFailed(state, stageAttempt, partition, attemptNumber) => | ||
logInfo(s"Commit denied for stage=$stage.$stageAttempt, partition=$partition: " + | ||
s"task attempt $attemptNumber already marked as failed.") | ||
false | ||
case Some(state) => | ||
state.authorizedCommitters(partition) match { | ||
case NO_AUTHORIZED_COMMITTER => | ||
logDebug(s"Authorizing attemptNumber=$attemptNumber to commit for stage=$stage, " + | ||
s"partition=$partition") | ||
state.authorizedCommitters(partition) = attemptNumber | ||
true | ||
case existingCommitter => | ||
// Coordinator should be idempotent when receiving AskPermissionToCommit. | ||
if (existingCommitter == attemptNumber) { | ||
logWarning(s"Authorizing duplicate request to commit for " + | ||
s"attemptNumber=$attemptNumber to commit for stage=$stage," + | ||
s" partition=$partition; existingCommitter = $existingCommitter." + | ||
s" This can indicate dropped network traffic.") | ||
true | ||
} else { | ||
logDebug(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage, " + | ||
s"partition=$partition; existingCommitter = $existingCommitter") | ||
false | ||
} | ||
val existing = state.authorizedCommitters(partition) | ||
if (existing == null) { | ||
logDebug(s"Commit allowed for stage=$stage.$stageAttempt, partition=$partition, " + | ||
s"task attempt $attemptNumber") | ||
state.authorizedCommitters(partition) = TaskIdentifier(stageAttempt, attemptNumber) | ||
true | ||
} else { | ||
logDebug(s"Commit denied for stage=$stage.$stageAttempt, partition=$partition: " + | ||
s"already committed by $existing") | ||
false | ||
} | ||
case None => | ||
logDebug(s"Stage $stage has completed, so not allowing" + | ||
s" attempt number $attemptNumber of partition $partition to commit") | ||
logDebug(s"Commit denied for stage=$stage.$stageAttempt, partition=$partition: " + | ||
"stage already marked as completed.") | ||
false | ||
} | ||
} | ||
|
||
private def attemptFailed( | ||
stageState: StageState, | ||
partition: PartitionId, | ||
attempt: TaskAttemptNumber): Boolean = synchronized { | ||
stageState.failures.get(partition).exists(_.contains(attempt)) | ||
stageAttempt: Int, | ||
partition: Int, | ||
attempt: Int): Boolean = synchronized { | ||
val failInfo = TaskIdentifier(stageAttempt, attempt) | ||
stageState.failures.get(partition).exists(_.contains(failInfo)) | ||
} | ||
} | ||
|
||
|
@@ -215,9 +226,10 @@ private[spark] object OutputCommitCoordinator { | |
} | ||
|
||
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { | ||
case AskPermissionToCommitOutput(stage, partition, attemptNumber) => | ||
case AskPermissionToCommitOutput(stage, stageAttempt, partition, attemptNumber) => | ||
context.reply( | ||
outputCommitCoordinator.handleAskPermissionToCommit(stage, partition, attemptNumber)) | ||
outputCommitCoordinator.handleAskPermissionToCommit(stage, stageAttempt, partition, | ||
attemptNumber)) | ||
} | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we also include stage attempt number in the exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Was trying to minimize changes in the first version, for testing.