Skip to content

Commit 7b56861

Browse files
committed
HADOOP-16202. fix test failure; S3A committers to use withFileStatus
Passing in the FileStatus to openfile will have tangible benefits in task and job commit for the magic committer, while being harmless for staging committers (unless they stage to S3) In task commit: eliminate one HEAD request per file created in that task attempt In job commit: eliminate one HEAD request per task Change-Id: I20d705067d3d3749c05a854b306888dbf72c83a5
1 parent c3829f1 commit 7b56861

File tree

5 files changed

+48
-11
lines changed

5 files changed

+48
-11
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/JsonSerialization.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.hadoop.util;
2020

21+
import javax.annotation.Nullable;
2122
import java.io.EOFException;
2223
import java.io.File;
2324
import java.io.FileNotFoundException;
@@ -258,7 +259,7 @@ public T load(FileSystem fs, Path path) throws IOException {
258259
* @throws EOFException file status references an empty file
259260
* @throws IOException IO problems
260261
*/
261-
public T load(FileSystem fs, Path path, FileStatus status)
262+
public T load(FileSystem fs, Path path, @Nullable FileStatus status)
262263
throws IOException {
263264

264265
if (status != null && status.getLen() == 0) {

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestJsonSerialization.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ public void testFileSystemEmptyPath() throws Throwable {
176176
Path tempPath = new Path(tempFile.toURI());
177177
LocalFileSystem fs = FileSystem.getLocal(new Configuration());
178178
try {
179-
LambdaTestUtils.intercept(EOFException.class,
179+
LambdaTestUtils.intercept(PathIOException.class,
180180
() -> serDeser.load(fs, tempPath));
181181
fs.delete(tempPath, false);
182182
LambdaTestUtils.intercept(FileNotFoundException.class,

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ public List<LocatedFileStatus> locateAllSinglePendingCommits(
241241
List<Pair<LocatedFileStatus, IOException>> failures = new ArrayList<>(1);
242242
for (LocatedFileStatus status : statusList) {
243243
try {
244-
commits.add(SinglePendingCommit.load(fs, status.getPath()));
244+
commits.add(SinglePendingCommit.load(fs, status.getPath(), status));
245245
} catch (IOException e) {
246246
LOG.warn("Failed to load commit file {}", status.getPath(), e);
247247
failures.add(Pair.of(status, e));
@@ -324,10 +324,12 @@ public MaybeIOE abortAllSinglePendingCommits(Path pendingDir,
324324
LOG.debug("No files to abort under {}", pendingDir);
325325
}
326326
while (pendingFiles.hasNext()) {
327-
Path pendingFile = pendingFiles.next().getPath();
327+
LocatedFileStatus status = pendingFiles.next();
328+
Path pendingFile = status.getPath();
328329
if (pendingFile.getName().endsWith(CommitConstants.PENDING_SUFFIX)) {
329330
try {
330-
abortSingleCommit(SinglePendingCommit.load(fs, pendingFile));
331+
abortSingleCommit(SinglePendingCommit.load(fs, pendingFile,
332+
status));
331333
} catch (FileNotFoundException e) {
332334
LOG.debug("listed file already deleted: {}", pendingFile);
333335
} catch (IOException | IllegalArgumentException e) {

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PendingSet.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.hadoop.fs.s3a.commit.files;
2020

21+
import javax.annotation.Nullable;
2122
import java.io.IOException;
2223
import java.io.ObjectInputStream;
2324
import java.util.ArrayList;
@@ -108,10 +109,7 @@ public static JsonSerialization<PendingSet> serializer() {
108109
*/
109110
public static PendingSet load(FileSystem fs, Path path)
110111
throws IOException {
111-
LOG.debug("Reading pending commits in file {}", path);
112-
PendingSet instance = serializer().load(fs, path);
113-
instance.validate();
114-
return instance;
112+
return load(fs, path, null);
115113
}
116114

117115
/**
@@ -124,7 +122,25 @@ public static PendingSet load(FileSystem fs, Path path)
124122
*/
125123
public static PendingSet load(FileSystem fs, FileStatus status)
126124
throws IOException {
127-
return load(fs, status.getPath());
125+
return load(fs, status.getPath(), status);
126+
}
127+
128+
/**
129+
* Load an instance from a file, then validate it.
130+
* @param fs filesystem
131+
* @param path path
132+
* @param status status of file to load
133+
* @return the loaded instance
134+
* @throws IOException IO failure
135+
* @throws ValidationFailure if the data is invalid
136+
*/
137+
public static PendingSet load(FileSystem fs, Path path,
138+
@Nullable FileStatus status)
139+
throws IOException {
140+
LOG.debug("Reading pending commits in file {}", path);
141+
PendingSet instance = serializer().load(fs, path, status);
142+
instance.validate();
143+
return instance;
128144
}
129145

130146
/**

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SinglePendingCommit.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.hadoop.fs.s3a.commit.files;
2020

21+
import javax.annotation.Nullable;
2122
import java.io.IOException;
2223
import java.io.ObjectInputStream;
2324
import java.io.Serializable;
@@ -36,6 +37,7 @@
3637
import org.apache.commons.lang3.StringUtils;
3738
import org.apache.hadoop.classification.InterfaceAudience;
3839
import org.apache.hadoop.classification.InterfaceStability;
40+
import org.apache.hadoop.fs.FileStatus;
3941
import org.apache.hadoop.fs.FileSystem;
4042
import org.apache.hadoop.fs.Path;
4143
import org.apache.hadoop.fs.s3a.commit.ValidationFailure;
@@ -137,7 +139,23 @@ public static JsonSerialization<SinglePendingCommit> serializer() {
137139
*/
138140
public static SinglePendingCommit load(FileSystem fs, Path path)
139141
throws IOException {
140-
SinglePendingCommit instance = serializer().load(fs, path);
142+
return load(fs, path, null);
143+
}
144+
145+
/**
146+
* Load an instance from a file, then validate it.
147+
* @param fs filesystem
148+
* @param path path
149+
* @param status status of file to load or null
150+
* @return the loaded instance
151+
* @throws IOException IO failure
152+
* @throws ValidationFailure if the data is invalid
153+
*/
154+
public static SinglePendingCommit load(FileSystem fs,
155+
Path path,
156+
@Nullable FileStatus status)
157+
throws IOException {
158+
SinglePendingCommit instance = serializer().load(fs, path, status);
141159
instance.filename = path.toString();
142160
instance.validate();
143161
return instance;

0 commit comments

Comments
 (0)