3636import org .apache .hadoop .mapreduce .lib .output .committer .manifest .files .TaskManifest ;
3737import org .apache .hadoop .thirdparty .com .google .common .base .Preconditions ;
3838import org .apache .hadoop .util .DurationInfo ;
39- import org .apache .hadoop .util .Progressable ;
4039import org .apache .hadoop .util .functional .FunctionRaisingIOE ;
4140import org .apache .hadoop .util .functional .RemoteIterators ;
4241
@@ -69,44 +68,15 @@ public abstract class AbstractJobCommitStage<R, S>
6968 AbstractJobCommitStage .class );
7069
7170 /**
72- * Name of the stage for statistics and logging.
73- */
74- private final String stageStatisticName ;
75-
76- /**
77- * IOStatistics to update.
78- */
79- private final IOStatisticsStore iostatistics ;
80-
81- /**
82- * Job attempt ID: never null.
83- */
84- private final String jobAttemptId ;
85-
86- /**
87- * ID of the task.
88- */
89- private final String taskId ;
90-
91- /**
92- * ID of this specific attempt at a task.
93- */
94- private final String taskAttemptId ;
95-
96- /**
97- * Job attempt dir.
71+ * Configuration of all the stages in the ongoing committer
72+ * operation.
9873 */
99- private final Path jobAttemptDir ;
74+ private final StageConfig stageConfig ;
10075
10176 /**
102- * Task attempt dir.
103- */
104- private final Path taskAttemptDir ;
105-
106- /**
107- * Destination of job.
77+ * Name of the stage for statistics and logging.
10878 */
109- private final Path destinationDir ;
79+ private final String stageStatisticName ;
11080
11181 /**
11282 * Callbacks to update store.
@@ -135,11 +105,6 @@ public abstract class AbstractJobCommitStage<R, S>
135105 */
136106 private final AtomicBoolean executed = new AtomicBoolean (false );
137107
138- /**
139- * Optional progress callback.
140- */
141- private final Progressable progressable ;
142-
143108 /**
144109 * Constructor.
145110 * @param stageConfig stage-independent configuration.
@@ -152,16 +117,12 @@ protected AbstractJobCommitStage(
152117 final String stageStatisticName ,
153118 final boolean requireIOProcessors ,
154119 final boolean requireManifestProcessors ) {
155- this .destinationDir = requireNonNull (stageConfig .getDestinationDir ());
156- this .jobAttemptDir = requireNonNull (stageConfig .getJobAttemptDir ());
157- this .jobAttemptId = requireNonNull (stageConfig .getJobAttemptId ());
158- this .iostatistics = requireNonNull (stageConfig .getIOStatistics ());
159- this .operations = requireNonNull (stageConfig .getOperations ());
160- this .progressable = stageConfig .getProgressable ();
161- this .taskId = stageConfig .getTaskId ();
162- this .taskAttemptId = stageConfig .getTaskAttemptId ();
163- this .taskAttemptDir = stageConfig .getTaskAttemptDir ();
164120 this .stageStatisticName = stageStatisticName ;
121+ this .stageConfig = stageConfig ;
122+ requireNonNull (stageConfig .getDestinationDir ());
123+ requireNonNull (stageConfig .getJobId ());
124+ requireNonNull (stageConfig .getJobAttemptDir ());
125+ this .operations = requireNonNull (stageConfig .getOperations ());
165126 // and the processors of work if required.
166127 this .ioProcessors = bindProcessor (
167128 requireIOProcessors ,
@@ -202,7 +163,7 @@ public final S apply(final R arguments) throws IOException {
202163 progress ();
203164 try (DurationInfo ignored = new DurationInfo (LOG ,
204165 true , "Executing stage %s" , stageStatisticName )) {
205- return trackDuration (iostatistics , stageStatisticName , () ->
166+ return trackDuration (getIOStatistics () , stageStatisticName , () ->
206167 executeStage (arguments ));
207168 } finally {
208169 progress ();
@@ -229,17 +190,22 @@ private void executeOnlyOnce() {
229190 "Stage attempted twice" );
230191 }
231192
193+ /**
194+ * The IOStatistics are shared across all uses of the
195+ * StageConfig.
196+ * @return the (possibly shared) IOStatistics.
197+ */
232198 @ Override
233199 public final IOStatisticsStore getIOStatistics () {
234- return iostatistics ;
200+ return stageConfig . getIOStatistics () ;
235201 }
236202
237203 /**
238204 * Call progress() on any Progressable passed in.
239205 */
240206 protected final void progress () {
241- if (progressable != null ) {
242- progressable .progress ();
207+ if (stageConfig . getProgressable () != null ) {
208+ stageConfig . getProgressable () .progress ();
243209 }
244210 }
245211
@@ -266,7 +232,7 @@ protected final FileStatus getFileStatusOrNull(Path path)
266232 */
267233 protected final FileStatus getFileStatus (Path path ) throws IOException {
268234 LOG .trace ("getFileStatus('{}')" , path );
269- return trackDuration (iostatistics , OP_GET_FILE_STATUS , () ->
235+ return trackDuration (getIOStatistics () , OP_GET_FILE_STATUS , () ->
270236 operations .getFileStatus (path ));
271237 }
272238
@@ -280,7 +246,7 @@ protected final FileStatus getFileStatus(Path path) throws IOException {
280246 protected final boolean delete (Path path , final boolean recursive )
281247 throws IOException {
282248 LOG .trace ("delete('{}, {}')" , path , recursive );
283- return trackDuration (iostatistics , OP_DELETE , () ->
249+ return trackDuration (getIOStatistics () , OP_DELETE , () ->
284250 operations .delete (path , recursive ));
285251 }
286252
@@ -293,7 +259,7 @@ protected final boolean delete(Path path, final boolean recursive)
293259 */
294260 protected final boolean mkdirs (Path path ) throws IOException {
295261 LOG .trace ("mkdirs('{}')" , path );
296- return trackDuration (iostatistics , OP_MKDIRS , () ->
262+ return trackDuration (getIOStatistics () , OP_MKDIRS , () ->
297263 operations .mkdirs (path ));
298264 }
299265
@@ -307,7 +273,7 @@ protected final boolean mkdirs(Path path) throws IOException {
307273 protected final RemoteIterator <FileStatus > listStatusIterator (Path path )
308274 throws IOException {
309275 LOG .trace ("listStatusIterator('{}')" , path );
310- return trackDuration (iostatistics , OP_LIST_STATUS , () ->
276+ return trackDuration (getIOStatistics () , OP_LIST_STATUS , () ->
311277 operations .listStatusIterator (path ));
312278 }
313279
@@ -320,7 +286,7 @@ protected final RemoteIterator<FileStatus> listStatusIterator(Path path)
320286 protected final TaskManifest loadManifest (final FileStatus status )
321287 throws IOException {
322288 LOG .trace ("loadManifest('{}')" , status );
323- return trackDuration (iostatistics , OP_LOAD_MANIFEST , () ->
289+ return trackDuration (getIOStatistics () , OP_LOAD_MANIFEST , () ->
324290 operations .loadTaskManifest (status ));
325291 }
326292
@@ -331,7 +297,7 @@ protected final TaskManifest loadManifest(final FileStatus status)
331297 */
332298 protected final RemoteIterator <FileStatus > listManifests () throws IOException {
333299 return RemoteIterators .filteringRemoteIterator (
334- listStatusIterator (jobAttemptDir ),
300+ listStatusIterator (getJobAttemptDir () ),
335301 st -> st .getPath ().toUri ().toString ().endsWith (MANIFEST_SUFFIX ));
336302 }
337303
@@ -399,7 +365,7 @@ protected final void save(AbstractManifestData manifestData,
399365 final Path tempPath ,
400366 final Path finalPath ) throws IOException {
401367 LOG .trace ("save('{}, {}, {}')" , manifestData , tempPath , finalPath );
402- trackDurationOfInvocation (iostatistics , OP_CREATE , () ->
368+ trackDurationOfInvocation (getIOStatistics () , OP_CREATE , () ->
403369 operations .save (manifestData , tempPath , true ));
404370 rename (tempPath , finalPath );
405371 }
@@ -416,7 +382,7 @@ protected final void rename(final Path source, final Path dest)
416382 throws IOException {
417383 // delete the destination, always.
418384 delete (dest , true );
419- boolean renamed = trackDuration (iostatistics , OP_RENAME_FILE , () ->
385+ boolean renamed = trackDuration (getIOStatistics () , OP_RENAME_FILE , () ->
420386 operations .renameFile (source , dest ));
421387 if (!renamed ) {
422388 final FileStatus sourceStatus = getFileStatusOrNull (source );
@@ -431,17 +397,17 @@ protected final void rename(final Path source, final Path dest)
431397 }
432398
433399 /**
434- * Job attempt ID: never null.
400+ * Job ID: never null.
435401 */
436- protected String getJobAttemptId () {
437- return jobAttemptId ;
402+ protected String getJobId () {
403+ return stageConfig . getJobId () ;
438404 }
439405
440406 /**
441407 * ID of the task.
442408 */
443409 protected String getTaskId () {
444- return taskId ;
410+ return stageConfig . getTaskId () ;
445411 }
446412
447413 /**
@@ -457,7 +423,7 @@ protected String getRequiredTaskId() {
457423 * ID of this specific attempt at a task.
458424 */
459425 protected String getTaskAttemptId () {
460- return taskAttemptId ;
426+ return stageConfig . getTaskAttemptId () ;
461427 }
462428
463429 /**
@@ -474,14 +440,14 @@ protected String getRequiredTaskAttemptId() {
474440 * Job attempt dir.
475441 */
476442 protected Path getJobAttemptDir () {
477- return jobAttemptDir ;
443+ return stageConfig . getJobAttemptDir () ;
478444 }
479445
480446 /**
481447 * Task attempt dir.
482448 */
483449 protected Path getTaskAttemptDir () {
484- return taskAttemptDir ;
450+ return stageConfig . getTaskAttemptDir () ;
485451 }
486452
487453 /**
@@ -498,7 +464,7 @@ protected Path getRequiredTaskAttemptDir() {
498464 * Destination of job.
499465 */
500466 protected Path getDestinationDir () {
501- return destinationDir ;
467+ return stageConfig . getDestinationDir () ;
502468 }
503469
504470 /**
0 commit comments