Skip to content

Commit 9a4b82b

Browse files
committed
Adds javadoc and addresses @aarondav's comments
1 parent dfdf3ef commit 9a4b82b

File tree

1 file changed

+24
-15
lines changed

1 file changed

+24
-15
lines changed

core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -73,18 +73,18 @@ trait SparkHadoopMapRedUtil {
7373
}
7474

7575
object SparkHadoopMapRedUtil extends Logging {
76-
def commitTask(
77-
committer: MapReduceOutputCommitter,
78-
mrTaskContext: MapReduceTaskAttemptContext,
79-
sparkTaskContext: TaskContext): Unit = {
80-
commitTask(
81-
committer,
82-
mrTaskContext,
83-
sparkTaskContext.stageId(),
84-
sparkTaskContext.partitionId(),
85-
sparkTaskContext.attemptNumber())
86-
}
87-
76+
/**
77+
* Commits a task output. Before committing the task output, we need to know whether some other
78+
* task attempt might be racing to commit the same output partition. Therefore, coordinate with
79+
* the driver in order to determine whether this attempt can commit (please see SPARK-4879 for
80+
* details).
81+
*
82+
* Commit output coordinator is only contacted when the following two configurations are both set
83+
* to `true`:
84+
*
85+
* - `spark.speculation`
86+
* - `spark.hadoop.outputCommitCoordination.enabled`
87+
*/
8888
def commitTask(
8989
committer: MapReduceOutputCommitter,
9090
mrTaskContext: MapReduceTaskAttemptContext,
@@ -109,9 +109,6 @@ object SparkHadoopMapRedUtil extends Logging {
109109

110110
// First, check whether the task's output has already been committed by some other attempt
111111
if (committer.needsTaskCommit(mrTaskContext)) {
112-
// The task output needs to be committed, but we don't know whether some other task attempt
113-
// might be racing to commit the same output partition. Therefore, coordinate with the driver
114-
// in order to determine whether this attempt can commit (see SPARK-4879).
115112
val shouldCoordinateWithDriver: Boolean = {
116113
val sparkConf = SparkEnv.get.conf
117114
// We only need to coordinate with the driver if there are multiple concurrent task
@@ -144,4 +141,16 @@ object SparkHadoopMapRedUtil extends Logging {
144141
logInfo(s"No need to commit output of task because needsTaskCommit=false: $mrTaskAttemptID")
145142
}
146143
}
144+
145+
def commitTask(
146+
committer: MapReduceOutputCommitter,
147+
mrTaskContext: MapReduceTaskAttemptContext,
148+
sparkTaskContext: TaskContext): Unit = {
149+
commitTask(
150+
committer,
151+
mrTaskContext,
152+
sparkTaskContext.stageId(),
153+
sparkTaskContext.partitionId(),
154+
sparkTaskContext.attemptNumber())
155+
}
147156
}

0 commit comments

Comments
 (0)