Skip to content

Commit 9c6909a

Browse files
committed
MR-7431 test dev
Change-Id: If8d91942cbb78f03887bace5c5fb5d2d77919ec0
1 parent 2b2a853 commit 9c6909a

File tree

7 files changed

+251
-22
lines changed

7 files changed

+251
-22
lines changed

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitter.java

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,13 @@
4646
import static org.apache.commons.io.FileUtils.byteCountToDisplaySize;
4747
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
4848
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtDebug;
49+
import static org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.PENDING_DIR_NAME;
4950
import static org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER;
5051
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.DEFAULT_CREATE_SUCCESSFUL_JOB_DIR_MARKER;
5152
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_IO_PROCESSORS;
5253
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_IO_PROCESSORS_DEFAULT;
5354
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_MANIFEST_PROCESSORS;
5455
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_MANIFEST_PROCESSORS_DEFAULT;
55-
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.TEMPORARY;
5656
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterSupport.buildJobUUID;
5757
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterSupport.createIOStatisticsStore;
5858

@@ -164,6 +164,47 @@ public boolean needsTaskCommit(final TaskAttemptContext context)
164164
return true;
165165
}
166166

167+
/**
168+
* Filaure during Job Commit is not recoverable from.
169+
*
170+
* @param jobContext
171+
* Context of the job whose output is being written.
172+
* @return false, always
173+
* @throws IOException never
174+
*/
175+
@Override
176+
public boolean isCommitJobRepeatable(final JobContext jobContext)
177+
throws IOException {
178+
return false;
179+
}
180+
181+
/**
182+
* Declare that task recovery is not supported.
183+
* It would be, if someone added the code *and tests*.
184+
* @param jobContext
185+
* Context of the job whose output is being written.
186+
* @return false, always
187+
* @throws IOException never
188+
*/
189+
@Override
190+
public boolean isRecoverySupported(final JobContext jobContext)
191+
throws IOException {
192+
return false;
193+
}
194+
195+
/**
196+
*
197+
* @param taskContext Context of the task whose output is being recovered
198+
* @throws IOException always
199+
*/
200+
@Override
201+
public void recoverTask(final TaskAttemptContext taskContext)
202+
throws IOException {
203+
LOG.warn("Rejecting recoverTask({}) call", taskContext.getTaskAttemptID());
204+
throw new IOException("Cannot recover task "
205+
+ taskContext.getTaskAttemptID());
206+
}
207+
167208
/**
168209
* Commit the task.
169210
* This is where the task attempt tree list takes place.
@@ -416,9 +457,9 @@ static final class ManifestCommitterConfig {
416457
SUCCESSFUL_JOB_OUTPUT_DIR_MARKER,
417458
DEFAULT_CREATE_SUCCESSFUL_JOB_DIR_MARKER);
418459
jobAttemptId = buildJobUUID(conf, context.getJobID());
419-
final Path tempSubDir = new Path(outputPath, TEMPORARY);
460+
final Path tempSubDir = new Path(outputPath, PENDING_DIR_NAME);
420461
jobAttemptPath = new Path(tempSubDir, jobAttemptId);
421-
final Path jobAttemptSubDir = new Path(jobAttemptPath, TEMPORARY);
462+
final Path jobAttemptSubDir = new Path(jobAttemptPath, PENDING_DIR_NAME);
422463

423464
// if constructed with a task attempt, build the task ID and path.
424465
if (context instanceof TaskAttemptContext) {

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterConstants.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
*/
2727
final class ManifestCommitterConstants {
2828

29+
public static final int INITIAL_APP_ATTEMPT_ID = 0;
30+
2931
private ManifestCommitterConstants() {
3032
}
3133

@@ -147,12 +149,6 @@ private ManifestCommitterConstants() {
147149
*/
148150
public static final int SUCCESS_MARKER_FILE_LIMIT = 100;
149151

150-
/**
151-
* Name of temp dir under the output dir for all the tasks and manifests.
152-
* Value: {@value}.
153-
*/
154-
public static final String TEMPORARY = PENDING_DIR_NAME;
155-
156152
/**
157153
* The UUID for jobs: {@value}.
158154
* This was historically created in Spark 1.x's SQL queries, but "went away".

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterSupport.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,19 @@
1919
package org.apache.hadoop.mapreduce.lib.output.committer.manifest;
2020

2121
import org.apache.hadoop.conf.Configuration;
22+
import org.apache.hadoop.fs.Path;
2223
import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
2324
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
2425
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStoreBuilder;
26+
import org.apache.hadoop.mapreduce.JobContext;
2527
import org.apache.hadoop.mapreduce.JobID;
28+
import org.apache.hadoop.mapreduce.MRJobConfig;
2629

2730
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;
31+
import static org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.PENDING_DIR_NAME;
2832
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.COUNTER_STATISTICS;
2933
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.DURATION_STATISTICS;
34+
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.INITIAL_APP_ATTEMPT_ID;
3035
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.SPARK_WRITE_UUID;
3136

3237
/**
@@ -77,9 +82,40 @@ static void maybeAddIOStatistics(IOStatisticsAggregator ios, Object o) {
7782
static String buildJobUUID(Configuration conf, JobID jobId) {
7883
String jobUUID = conf.getTrimmed(SPARK_WRITE_UUID, "");
7984
if (jobUUID.isEmpty()) {
80-
jobUUID = jobId.toString();
85+
jobUUID = jobId.toString() + ;
8186
}
8287
return jobUUID;
8388
}
8489

90+
/**
91+
* Get the location of pending job attempts.
92+
* @param out the base output directory.
93+
* @return the location of pending job attempts.
94+
*/
95+
static Path getPendingJobAttemptsPath(Path out) {
96+
return new Path(out, PENDING_DIR_NAME);
97+
}
98+
99+
/**
100+
* Get the Application Attempt Id for this job.
101+
* @param context the context to look in
102+
* @return the Application Attempt Id for a given job.
103+
*/
104+
static int getAppAttemptId(JobContext context) {
105+
return getAppAttemptId(context.getConfiguration());
106+
}
107+
108+
/**
109+
* Get the Application Attempt Id for this job
110+
* by looking for {@link MRJobConfig#APPLICATION_ATTEMPT_ID}
111+
* in the configuration, falling back to 0 if unset.
112+
* For spark it will always be 0, for MR it will be set in the AM
113+
* to the {@code ApplicationAttemptId} the AM is launched with.
114+
* @param conf job configuration.
115+
* @return the Application Attempt Id for the job.
116+
*/
117+
static int getAppAttemptId(Configuration conf) {
118+
return conf.getInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
119+
INITIAL_APP_ATTEMPT_ID);
120+
}
85121
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.mapreduce.lib.output.committer.manifest;
20+
21+
import java.util.concurrent.atomic.LongAdder;
22+
23+
import org.apache.hadoop.fs.Path;
24+
import org.apache.hadoop.fs.statistics.IOStatistics;
25+
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
26+
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
27+
import org.apache.hadoop.mapreduce.JobID;
28+
import org.apache.hadoop.util.Progressable;
29+
30+
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.COUNTER_STATISTICS;
31+
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterSupport.createIOStatisticsStore;
32+
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterTestSupport.randomJobId;
33+
34+
/**
35+
* This class represents a binding to a job in the target dir with TA, JA
36+
* and associated paths.
37+
* It's self contained so as to be usable in any test suite.
38+
*/
39+
class CommitterTestBinding implements
40+
IOStatisticsSource {
41+
42+
public static final String PROGRESS_EVENTS = "progress_events";
43+
44+
/**
45+
* IOStatistics to update.
46+
*/
47+
private IOStatisticsStore iostatistics;
48+
49+
/**
50+
* Job attempt ID:.
51+
*/
52+
private String jobAttemptId;
53+
54+
/**
55+
* ID of the task.
56+
*/
57+
private String taskId;
58+
59+
/**
60+
* ID of this specific attempt at a task.
61+
*/
62+
private String taskAttemptId;
63+
64+
/**
65+
* Destination of job.
66+
*/
67+
private Path destinationDir;
68+
69+
/**
70+
* Job attempt dir.
71+
*/
72+
private Path jobAttemptDir;
73+
74+
/**
75+
* Task attempt dir.
76+
*/
77+
private Path taskAttemptDir;
78+
79+
private LongAdder progressCounter;
80+
81+
private final JobID jobId;
82+
83+
CommitterTestBinding() {
84+
iostatistics = createIOStatisticsStore()
85+
.withCounters(PROGRESS_EVENTS)
86+
.build();
87+
88+
89+
// this is the job ID, with no attempt info.
90+
jobId = JobID.forName(randomJobId());
91+
jobAttemptId = jobId.toString() + "_0";
92+
}
93+
94+
/**
95+
* Create a stage config.
96+
* All stats go to the local IOStatisticsStore;
97+
* there's a progress callback also set to increment
98+
* the counter {@link #PROGRESS_EVENTS}
99+
* @return a stage config
100+
*/
101+
StageConfig createStageConfig() {
102+
StageConfig stageConfig = new StageConfig();
103+
stageConfig
104+
.withDestinationDir(destinationDir)
105+
.withIOstatistics(createIOStatisticsStore())
106+
.withJobAttemptDir(jobAttemptPath)
107+
.withJobAttemptId(jobAttemptId)
108+
.withTaskAttemptDir(taskAttemptDir)
109+
.withTaskAttemptId(taskAttemptId)
110+
.withTaskId(taskId)
111+
.withProgressable(new ProgressCallback());
112+
return stageConfig;
113+
}
114+
115+
@Override
116+
public IOStatisticsStore getIOStatistics() {
117+
return iostatistics;
118+
}
119+
120+
/**
121+
* Whenever this progress callback is invoked, the progress_events
122+
* counter is incremented. This allows for tests to verify that
123+
* callbacks have occurred by asserting on the event counter.
124+
*/
125+
private final class ProgressCallback implements Progressable {
126+
127+
@Override
128+
public void progress() {
129+
iostatistics.incrementCounter(PROGRESS_EVENTS, 1);
130+
}
131+
}
132+
}

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterTestSupport.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,8 @@ public class ManifestCommitterTestSupport {
4848
* Create a random Job ID using the fork ID as part of the number if
4949
* set in the current process.
5050
* @return fork ID string in a format parseable by Jobs
51-
* @throws Exception failure
5251
*/
53-
public static String randomJobId() throws Exception {
52+
public static String randomJobId() {
5453
String testUniqueForkId = System.getProperty("test.unique.fork.id", "0001");
5554
int l = testUniqueForkId.length();
5655
String trailingDigits = testUniqueForkId.substring(l - 4, l);
@@ -60,7 +59,7 @@ public static String randomJobId() throws Exception {
6059
(long) (Math.random() * 1000),
6160
digitValue);
6261
} catch (NumberFormatException e) {
63-
throw new Exception("Failed to parse " + trailingDigits, e);
62+
throw new RuntimeException("Failed to parse " + trailingDigits, e);
6463
}
6564
}
6665

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestStageIO.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,7 @@ public class TestStageIO extends AbstractManifestCommitterTest {
3030
@Test
3131
public void testCreateStage() throws Throwable {
3232
StageConfig stageConfig = new StageConfig();
33-
/* stageConfig
34-
.withDestinationDir(destinationDir)
35-
.withIOstatistics(createIOStatisticsStore())
36-
.withJobAttemptDir(jobAttemptPath)
37-
.withJobAttemptId(jobAttemptId)
38-
.withTaskAttemptDir(taskAttemptDir)
39-
.withTaskAttemptId(taskAttemptId)
40-
.withTaskId(taskId)
41-
.withProgressable(progressable);*/
33+
4234
}
4335

4436
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
<?xml version="1.0"?>
2+
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
3+
<!--
4+
~ Licensed to the Apache Software Foundation (ASF) under one
5+
~ or more contributor license agreements. See the NOTICE file
6+
~ distributed with this work for additional information
7+
~ regarding copyright ownership. The ASF licenses this file
8+
~ to you under the Apache License, Version 2.0 (the
9+
~ "License"); you may not use this file except in compliance
10+
~ with the License. You may obtain a copy of the License at
11+
~
12+
~ http://www.apache.org/licenses/LICENSE-2.0
13+
~
14+
~ Unless required by applicable law or agreed to in writing, software
15+
~ distributed under the License is distributed on an "AS IS" BASIS,
16+
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
~ See the License for the specific language governing permissions and
18+
~ limitations under the License.
19+
-->
20+
21+
<!-- Values used when running unit tests. Specify any values in here that
22+
should override the default values. -->
23+
24+
<configuration>
25+
26+
<property>
27+
<name>hadoop.tmp.dir</name>
28+
<value>target/build/test</value>
29+
<description>A base for other temporary directories.</description>
30+
<final>false</final>
31+
</property>
32+
33+
</configuration>

0 commit comments

Comments
 (0)