Skip to content

Commit d8db729

Browse files
committed
Fix import ordering, Test in ITestTerasortOnS3A and other minor fix
1 parent 7d21300 commit d8db729

File tree

8 files changed

+77
-41
lines changed

8 files changed

+77
-41
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@
236236
import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.hasDelegationTokenBinding;
237237
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS;
238238
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_ABORT_PENDING_UPLOADS;
239+
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_PENDING_OBJECT_ETAG_NAME;
239240
import static org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.isTrackMagicCommitsInMemoryEnabled;
240241
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
241242
import static org.apache.hadoop.fs.s3a.impl.CreateFileBuilder.OPTIONS_CREATE_FILE_NO_OVERWRITE;
@@ -3912,14 +3913,14 @@ public FileStatus getFileStatus(final Path f) throws IOException {
39123913
// Some downstream apps might call getFileStatus for a magic path to get the file size.
39133914
// when commit data is stored in memory construct the dummy S3AFileStatus with correct
39143915
// file size fetched from the memory.
3915-
if (InMemoryMagicCommitTracker.getTaskAttemptIdToBytesWritten().containsKey(path)) {
3916-
long len = InMemoryMagicCommitTracker.getTaskAttemptIdToBytesWritten().get(path);
3916+
if (InMemoryMagicCommitTracker.getPathToBytesWritten().containsKey(path)) {
3917+
long len = InMemoryMagicCommitTracker.getPathToBytesWritten().get(path);
39173918
return new S3AFileStatus(len,
39183919
0L,
39193920
path,
39203921
getDefaultBlockSize(path),
39213922
username,
3922-
null,
3923+
MAGIC_COMMITTER_PENDING_OBJECT_ETAG_NAME,
39233924
null);
39243925
}
39253926
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@ private CommitConstants() {
5858
*/
5959
public static final String PENDINGSET_SUFFIX = ".pendingset";
6060

61+
/**
62+
* Etag name to be returned on non-committed S3 object: {@value}.
63+
*/
64+
public static final String MAGIC_COMMITTER_PENDING_OBJECT_ETAG_NAME = "pending";
6165

6266
/**
6367
* Prefix to use for config options: {@value}.
@@ -242,10 +246,15 @@ private CommitConstants() {
242246
*/
243247
public static final int DEFAULT_COMMITTER_THREADS = 32;
244248

245-
249+
/**
250+
* Should Magic committer track all the pending commits in memory?
251+
*/
246252
public static final String FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED =
247253
"fs.s3a.committer.magic.track.commits.in.memory.enabled";
248254

255+
/**
256+
* Default value for {@link #FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED}: {@value}.
257+
*/
249258
public static final boolean FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED_DEFAULT =
250259
false;
251260

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,14 @@
2020

2121
import java.util.List;
2222

23-
import org.apache.hadoop.fs.s3a.commit.magic.InMemoryMagicCommitTracker;
24-
import org.apache.hadoop.fs.s3a.commit.magic.S3MagicCommitTracker;
2523
import org.slf4j.Logger;
2624
import org.slf4j.LoggerFactory;
2725

2826
import org.apache.hadoop.fs.Path;
2927
import org.apache.hadoop.fs.s3a.S3AFileSystem;
3028
import org.apache.hadoop.fs.s3a.Statistic;
29+
import org.apache.hadoop.fs.s3a.commit.magic.InMemoryMagicCommitTracker;
30+
import org.apache.hadoop.fs.s3a.commit.magic.S3MagicCommitTracker;
3131
import org.apache.hadoop.fs.s3a.impl.AbstractStoreOperation;
3232
import org.apache.hadoop.fs.s3a.statistics.PutTrackerStatistics;
3333

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/InMemoryMagicCommitTracker.java

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,24 @@
4545
*/
4646
public class InMemoryMagicCommitTracker extends MagicCommitTracker {
4747

48-
// stores taskAttemptId to commit data mapping
49-
private static Map<String, List<SinglePendingCommit>>
50-
taskAttemptIdToMpuMetdadataMap = new ConcurrentHashMap<>();
51-
52-
// stores the path to its length/size mapping
53-
private static Map<Path, Long> taskAttemptIdToBytesWritten = new ConcurrentHashMap<>();
54-
55-
// stores taskAttemptId to path mapping
56-
private static Map<String, List<Path>> taskAttemptIdToPath = new ConcurrentHashMap<>();
48+
/**
49+
* Map to store taskAttemptId, and it's corresponding list of pending commit data.
50+
* The entries in the Map gets removed when a task commits or aborts.
51+
*/
52+
private final static Map<String, List<SinglePendingCommit>>
53+
TASK_ATTEMPT_ID_TO_MPU_METDADATA = new ConcurrentHashMap<>();
54+
55+
/**
56+
* Map to store path of the file, and it's corresponding size.
57+
* The entries in the Map gets removed when a task commits or aborts.
58+
*/
59+
private final static Map<Path, Long> PATH_TO_BYTES_WRITTEN = new ConcurrentHashMap<>();
60+
61+
/**
62+
* Map to store taskAttemptId, and list of paths to files written by it.
63+
* The entries in the Map gets removed when a task commits or aborts.
64+
*/
65+
private final static Map<String, List<Path>> TASK_ATTEMPT_ID_TO_PATH = new ConcurrentHashMap<>();
5766

5867
public InMemoryMagicCommitTracker(Path path,
5968
String bucket,
@@ -92,16 +101,16 @@ public boolean aboutToComplete(String uploadId,
92101
String taskAttemptId = extractTaskAttemptIdFromPath(getPath());
93102

94103
// store the commit data with taskAttemptId as the key
95-
taskAttemptIdToMpuMetdadataMap.computeIfAbsent(taskAttemptId,
104+
TASK_ATTEMPT_ID_TO_MPU_METDADATA.computeIfAbsent(taskAttemptId,
96105
k -> Collections.synchronizedList(new ArrayList<>())).add(commitData);
97106

98107
// store the byteswritten(length) for the corresponding file
99-
taskAttemptIdToBytesWritten.put(getPath(), bytesWritten);
108+
PATH_TO_BYTES_WRITTEN.put(getPath(), bytesWritten);
100109

101110
// store the mapping between taskAttemptId and path
102111
// This information is used for removing entries from
103112
// the map once the taskAttempt is completed/committed.
104-
taskAttemptIdToPath.computeIfAbsent(taskAttemptId,
113+
TASK_ATTEMPT_ID_TO_PATH.computeIfAbsent(taskAttemptId,
105114
k -> Collections.synchronizedList(new ArrayList<>())).add(getPath());
106115

107116
LOG.info("commit metadata for {} parts in {}. size: {} byte(s) "
@@ -113,15 +122,26 @@ public boolean aboutToComplete(String uploadId,
113122
return false;
114123
}
115124

116-
public static Map<String, List<SinglePendingCommit>> getTaskAttemptIdToMpuMetdadataMap() {
117-
return taskAttemptIdToMpuMetdadataMap;
125+
@Override
126+
public String toString() {
127+
final StringBuilder sb = new StringBuilder(
128+
"InMemoryMagicCommitTracker{");
129+
sb.append(", Number of taskAttempts=").append(TASK_ATTEMPT_ID_TO_MPU_METDADATA.size());
130+
sb.append(", Number of files=").append(PATH_TO_BYTES_WRITTEN.size());
131+
sb.append('}');
132+
return sb.toString();
133+
}
134+
135+
136+
public static Map<String, List<SinglePendingCommit>> getTaskAttemptIdToMpuMetdadata() {
137+
return TASK_ATTEMPT_ID_TO_MPU_METDADATA;
118138
}
119139

120-
public static Map<Path, Long> getTaskAttemptIdToBytesWritten() {
121-
return taskAttemptIdToBytesWritten;
140+
public static Map<Path, Long> getPathToBytesWritten() {
141+
return PATH_TO_BYTES_WRITTEN;
122142
}
123143

124144
public static Map<String, List<Path>> getTaskAttemptIdToPath() {
125-
return taskAttemptIdToPath;
145+
return TASK_ATTEMPT_ID_TO_PATH;
126146
}
127147
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
* uses any datatype in hadoop-mapreduce.
4242
*/
4343
@InterfaceAudience.Private
44-
public class MagicCommitTracker extends PutTracker {
44+
public abstract class MagicCommitTracker extends PutTracker {
4545
public static final Logger LOG = LoggerFactory.getLogger(
4646
MagicCommitTracker.class);
4747

@@ -103,22 +103,20 @@ public boolean outputImmediatelyVisible() {
103103

104104
/**
105105
* Complete operation: generate the final commit data, put it.
106-
*
107-
* @param uploadId Upload ID
108-
* @param parts list of parts
106+
* @param uploadId Upload ID
107+
* @param parts list of parts
109108
* @param bytesWritten bytes written
110109
* @param iostatistics nullable IO statistics
111110
* @return false, indicating that the commit must fail.
112-
* @throws IOException any IO problem.
111+
* @throws IOException any IO problem.
113112
* @throws IllegalArgumentException bad argument
114113
*/
115114
@Override
116-
public boolean aboutToComplete(String uploadId,
115+
public abstract boolean aboutToComplete(String uploadId,
117116
List<CompletedPart> parts,
118117
long bytesWritten,
119-
final IOStatistics iostatistics) throws IOException {
120-
return false;
121-
}
118+
IOStatistics iostatistics)
119+
throws IOException;
122120

123121
@Override
124122
public String toString() {

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTrackerUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,13 @@
1818

1919
package org.apache.hadoop.fs.s3a.commit.magic;
2020

21+
import java.util.List;
22+
2123
import org.apache.hadoop.conf.Configuration;
2224
import org.apache.hadoop.fs.Path;
2325
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
2426
import org.apache.hadoop.fs.s3a.commit.MagicCommitPaths;
2527

26-
import java.util.List;
27-
2828
import static org.apache.hadoop.util.Preconditions.checkArgument;
2929

3030
/**

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
package org.apache.hadoop.fs.s3a.commit.magic;
2020

21-
import java.io.FileNotFoundException;
2221
import java.io.IOException;
2322
import java.util.ArrayList;
2423
import java.util.List;
@@ -288,7 +287,7 @@ private List<SinglePendingCommit> loadPendingCommitsFromMemory(TaskAttemptContex
288287
// get all the pending commit metadata associated with the taskAttemptId.
289288
// This will also remove the entry from the map.
290289
List<SinglePendingCommit> pendingCommits =
291-
InMemoryMagicCommitTracker.getTaskAttemptIdToMpuMetdadataMap().remove(taskAttemptId);
290+
InMemoryMagicCommitTracker.getTaskAttemptIdToMpuMetdadata().remove(taskAttemptId);
292291
// get all the path/files associated with the taskAttemptId.
293292
// This will also remove the entry from the map.
294293
List<Path> pathsAssociatedWithTaskAttemptId =
@@ -299,7 +298,7 @@ private List<SinglePendingCommit> loadPendingCommitsFromMemory(TaskAttemptContex
299298
if (pathsAssociatedWithTaskAttemptId != null) {
300299
for (Path path : pathsAssociatedWithTaskAttemptId) {
301300
boolean cleared =
302-
InMemoryMagicCommitTracker.getTaskAttemptIdToBytesWritten().remove(path) != null;
301+
InMemoryMagicCommitTracker.getPathToBytesWritten().remove(path) != null;
303302
LOG.debug("Removing path: {} from the memory isSuccess: {}", path, cleared);
304303
}
305304
} else {

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortOnS3A.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.hadoop.examples.terasort.TeraValidate;
4545
import org.apache.hadoop.fs.Path;
4646
import org.apache.hadoop.fs.s3a.commit.AbstractYarnClusterITest;
47+
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
4748
import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter;
4849
import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter;
4950
import org.apache.hadoop.mapred.JobConf;
@@ -97,6 +98,9 @@ public class ITestTerasortOnS3A extends AbstractYarnClusterITest {
9798
/** Name of the committer for this run. */
9899
private final String committerName;
99100

101+
/** Should Magic committer track pending commits in-memory. */
102+
private final boolean trackCommitsInMemory;
103+
100104
/** Base path for all the terasort input and output paths. */
101105
private Path terasortPath;
102106

@@ -117,12 +121,14 @@ public class ITestTerasortOnS3A extends AbstractYarnClusterITest {
117121
@Parameterized.Parameters(name = "{0}")
118122
public static Collection<Object[]> params() {
119123
return Arrays.asList(new Object[][]{
120-
{DirectoryStagingCommitter.NAME},
121-
{MagicS3GuardCommitter.NAME}});
124+
{DirectoryStagingCommitter.NAME, false},
125+
{MagicS3GuardCommitter.NAME, false},
126+
{MagicS3GuardCommitter.NAME, true}});
122127
}
123128

124-
public ITestTerasortOnS3A(final String committerName) {
129+
public ITestTerasortOnS3A(final String committerName, final boolean trackCommitsInMemory) {
125130
this.committerName = committerName;
131+
this.trackCommitsInMemory = trackCommitsInMemory;
126132
}
127133

128134
@Override
@@ -152,6 +158,9 @@ protected void applyCustomConfigOptions(JobConf conf) {
152158
conf.setBoolean(
153159
TeraSortConfigKeys.USE_SIMPLE_PARTITIONER.key(),
154160
false);
161+
conf.setBoolean(
162+
CommitConstants.FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED,
163+
trackCommitsInMemory);
155164
}
156165

157166
private int getExpectedPartitionCount() {
@@ -173,7 +182,7 @@ protected int getRowCount() {
173182
*/
174183
private void prepareToTerasort() {
175184
// small sample size for faster runs
176-
terasortPath = new Path("/terasort-" + committerName)
185+
terasortPath = new Path("/terasort-" + committerName + "-" + trackCommitsInMemory)
177186
.makeQualified(getFileSystem());
178187
sortInput = new Path(terasortPath, "sortin");
179188
sortOutput = new Path(terasortPath, "sortout");

0 commit comments

Comments
 (0)