Skip to content

Commit 5433d56

Browse files
committed
HADOOP-16202. Enhanced openFile(): hadoop-common changes. (apache#2584/1)
This defines standard option and values for the openFile() builder API for opening a file: fs.option.openfile.read.policy A list of the desired read policy, in preferred order. standard values are adaptive, default, random, sequential, vector, whole-file fs.option.openfile.length How long the file is. fs.option.openfile.split.start start of a task's split fs.option.openfile.split.end end of a task's split These can be used by filesystem connectors to optimize their reading of the source file, including but not limited to * skipping existence/length probes when opening a file * choosing a policy for prefetching/caching data The hadoop shell commands which read files all declare "whole-file" and "sequential", as appropriate. Contributed by Steve Loughran. Change-Id: Ia290f79ea7973ce8713d4f90f1315b24d7a23da1
1 parent 97735de commit 5433d56

36 files changed

+1321
-261
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AvroFSInput.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@
2525
import org.apache.hadoop.classification.InterfaceAudience;
2626
import org.apache.hadoop.classification.InterfaceStability;
2727

28+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
29+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
30+
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
31+
2832
/** Adapts an {@link FSDataInputStream} to Avro's SeekableInput interface. */
2933
@InterfaceAudience.Public
3034
@InterfaceStability.Stable
@@ -42,7 +46,12 @@ public AvroFSInput(final FSDataInputStream in, final long len) {
4246
public AvroFSInput(final FileContext fc, final Path p) throws IOException {
4347
FileStatus status = fc.getFileStatus(p);
4448
this.len = status.getLen();
45-
this.stream = fc.open(p);
49+
this.stream = awaitFuture(fc.openFile(p)
50+
.opt(FS_OPTION_OPENFILE_READ_POLICY,
51+
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
52+
.withFileStatus(status)
53+
.build());
54+
fc.open(p);
4655
}
4756

4857
@Override

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import java.io.InputStream;
2525
import java.nio.channels.ClosedChannelException;
2626
import java.util.Arrays;
27-
import java.util.Collections;
2827
import java.util.EnumSet;
2928
import java.util.List;
3029
import java.util.concurrent.CompletableFuture;
@@ -45,6 +44,7 @@
4544
import org.apache.hadoop.util.LambdaUtils;
4645
import org.apache.hadoop.util.Progressable;
4746

47+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS;
4848
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
4949
import static org.apache.hadoop.fs.impl.StoreImplementationUtils.isProbeForSyncable;
5050

@@ -889,7 +889,7 @@ protected CompletableFuture<FSDataInputStream> openFileWithOptions(
889889
final OpenFileParameters parameters) throws IOException {
890890
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
891891
parameters.getMandatoryKeys(),
892-
Collections.emptySet(),
892+
FS_OPTION_OPENFILE_STANDARD_OPTIONS,
893893
"for " + path);
894894
return LambdaUtils.eval(
895895
new CompletableFuture<>(),

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSBuilder.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,13 @@ public interface FSBuilder<S, B extends FSBuilder<S, B>> {
6161
*/
6262
B opt(@Nonnull String key, float value);
6363

64+
/**
65+
* Set optional long parameter for the Builder.
66+
*
67+
* @see #opt(String, String)
68+
*/
69+
B opt(@Nonnull String key, long value);
70+
6471
/**
6572
* Set optional double parameter for the Builder.
6673
*
@@ -104,6 +111,13 @@ public interface FSBuilder<S, B extends FSBuilder<S, B>> {
104111
*/
105112
B must(@Nonnull String key, float value);
106113

114+
/**
115+
* Set mandatory long option.
116+
*
117+
* @see #must(String, String)
118+
*/
119+
B must(@Nonnull String key, long value);
120+
107121
/**
108122
* Set mandatory double option.
109123
*

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,12 @@
7171
import org.slf4j.Logger;
7272
import org.slf4j.LoggerFactory;
7373

74+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
75+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
76+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
77+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
7478
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
79+
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
7580

7681
/**
7782
* The FileContext class provides an interface for users of the Hadoop
@@ -2204,7 +2209,12 @@ public boolean copy(final Path src, final Path dst, boolean deleteSource,
22042209
EnumSet<CreateFlag> createFlag = overwrite ? EnumSet.of(
22052210
CreateFlag.CREATE, CreateFlag.OVERWRITE) :
22062211
EnumSet.of(CreateFlag.CREATE);
2207-
InputStream in = open(qSrc);
2212+
InputStream in = awaitFuture(openFile(qSrc)
2213+
.opt(FS_OPTION_OPENFILE_READ_POLICY,
2214+
FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
2215+
.opt(FS_OPTION_OPENFILE_LENGTH,
2216+
fs.getLen()) // file length hint for object stores
2217+
.build());
22082218
try (OutputStream out = create(qDst, createFlag)) {
22092219
IOUtils.copyBytes(in, out, conf, true);
22102220
} finally {
@@ -2936,9 +2946,11 @@ public CompletableFuture<FSDataInputStream> build() throws IOException {
29362946
final Path absF = fixRelativePart(getPath());
29372947
OpenFileParameters parameters = new OpenFileParameters()
29382948
.withMandatoryKeys(getMandatoryKeys())
2949+
.withOptionalKeys(getOptionalKeys())
29392950
.withOptions(getOptions())
2940-
.withBufferSize(getBufferSize())
2941-
.withStatus(getStatus());
2951+
.withStatus(getStatus())
2952+
.withBufferSize(
2953+
getOptions().getInt(FS_OPTION_OPENFILE_BUFFER_SIZE, getBufferSize()));
29422954
return new FSLinkResolver<CompletableFuture<FSDataInputStream>>() {
29432955
@Override
29442956
public CompletableFuture<FSDataInputStream> next(

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,8 @@
9191
import org.slf4j.Logger;
9292
import org.slf4j.LoggerFactory;
9393

94-
import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkArgument;
94+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
95+
import static org.apache.hadoop.util.Preconditions.checkArgument;
9596
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.*;
9697
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
9798

@@ -4626,7 +4627,7 @@ protected CompletableFuture<FSDataInputStream> openFileWithOptions(
46264627
final OpenFileParameters parameters) throws IOException {
46274628
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
46284629
parameters.getMandatoryKeys(),
4629-
Collections.emptySet(),
4630+
Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS,
46304631
"for " + path);
46314632
return LambdaUtils.eval(
46324633
new CompletableFuture<>(), () ->
@@ -4654,7 +4655,7 @@ protected CompletableFuture<FSDataInputStream> openFileWithOptions(
46544655
final OpenFileParameters parameters) throws IOException {
46554656
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
46564657
parameters.getMandatoryKeys(),
4657-
Collections.emptySet(), "");
4658+
Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS, "");
46584659
CompletableFuture<FSDataInputStream> result = new CompletableFuture<>();
46594660
try {
46604661
result.complete(open(pathHandle, parameters.getBufferSize()));
@@ -4761,9 +4762,11 @@ public CompletableFuture<FSDataInputStream> build() throws IOException {
47614762
Optional<Path> optionalPath = getOptionalPath();
47624763
OpenFileParameters parameters = new OpenFileParameters()
47634764
.withMandatoryKeys(getMandatoryKeys())
4765+
.withOptionalKeys(getOptionalKeys())
47644766
.withOptions(getOptions())
4765-
.withBufferSize(getBufferSize())
4766-
.withStatus(super.getStatus()); // explicit to avoid IDE warnings
4767+
.withStatus(super.getStatus())
4768+
.withBufferSize(
4769+
getOptions().getInt(FS_OPTION_OPENFILE_BUFFER_SIZE, getBufferSize()));
47674770
if(optionalPath.isPresent()) {
47684771
return getFS().openFileWithOptions(optionalPath.get(),
47694772
parameters);

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java

Lines changed: 50 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,11 @@
7676
import org.slf4j.Logger;
7777
import org.slf4j.LoggerFactory;
7878

79+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
80+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
81+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
82+
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
83+
7984
/**
8085
* A collection of file-processing util methods
8186
*/
@@ -395,7 +400,32 @@ public static boolean copy(FileSystem srcFS, Path src,
395400
return copy(srcFS, fileStatus, dstFS, dst, deleteSource, overwrite, conf);
396401
}
397402

398-
/** Copy files between FileSystems. */
403+
/**
404+
* Copy a file/directory tree within/between filesystems.
405+
* <p></p>
406+
* returns true if the operation succeeded. When deleteSource is true,
407+
* this means "after the copy, delete(source) returned true"
408+
* If the destination is a directory, and mkdirs (dest) fails,
409+
* the operation will return false rather than raise any exception.
410+
* <p></p>
411+
* The overwrite flag is about overwriting files; it has no effect about
412+
* handing an attempt to copy a file atop a directory (expect an IOException),
413+
* or a directory over a path which contains a file (mkdir will fail, so
414+
* "false").
415+
* <p></p>
416+
* The operation is recursive, and the deleteSource operation takes place
417+
* as each subdirectory is copied. Therefore, if an operation fails partway
418+
* through, the source tree may be partially deleted.
419+
* @param srcFS source filesystem
420+
* @param srcStatus status of source
421+
* @param dstFS destination filesystem
422+
* @param dst path of source
423+
* @param deleteSource delete the source?
424+
* @param overwrite overwrite files at destination?
425+
* @param conf configuration to use when opening files
426+
* @return true if the operation succeeded.
427+
* @throws IOException failure
428+
*/
399429
public static boolean copy(FileSystem srcFS, FileStatus srcStatus,
400430
FileSystem dstFS, Path dst,
401431
boolean deleteSource,
@@ -408,22 +438,27 @@ public static boolean copy(FileSystem srcFS, FileStatus srcStatus,
408438
if (!dstFS.mkdirs(dst)) {
409439
return false;
410440
}
411-
FileStatus contents[] = srcFS.listStatus(src);
412-
for (int i = 0; i < contents.length; i++) {
413-
copy(srcFS, contents[i], dstFS,
414-
new Path(dst, contents[i].getPath().getName()),
415-
deleteSource, overwrite, conf);
441+
RemoteIterator<FileStatus> contents = srcFS.listStatusIterator(src);
442+
while (contents.hasNext()) {
443+
FileStatus next = contents.next();
444+
copy(srcFS, next, dstFS,
445+
new Path(dst, next.getPath().getName()),
446+
deleteSource, overwrite, conf);
416447
}
417448
} else {
418-
InputStream in=null;
449+
InputStream in = null;
419450
OutputStream out = null;
420451
try {
421-
in = srcFS.open(src);
452+
in = awaitFuture(srcFS.openFile(src)
453+
.opt(FS_OPTION_OPENFILE_READ_POLICY,
454+
FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
455+
.opt(FS_OPTION_OPENFILE_LENGTH,
456+
srcStatus.getLen()) // file length hint for object stores
457+
.build());
422458
out = dstFS.create(dst, overwrite);
423459
IOUtils.copyBytes(in, out, conf, true);
424460
} catch (IOException e) {
425-
IOUtils.closeStream(out);
426-
IOUtils.closeStream(in);
461+
IOUtils.cleanupWithLogger(LOG, in, out);
427462
throw e;
428463
}
429464
}
@@ -502,7 +537,11 @@ private static boolean copy(FileSystem srcFS, FileStatus srcStatus,
502537
deleteSource, conf);
503538
}
504539
} else {
505-
InputStream in = srcFS.open(src);
540+
InputStream in = awaitFuture(srcFS.openFile(src)
541+
.withFileStatus(srcStatus)
542+
.opt(FS_OPTION_OPENFILE_READ_POLICY,
543+
FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
544+
.build());
506545
IOUtils.copyBytes(in, Files.newOutputStream(dst.toPath()), conf);
507546
}
508547
if (deleteSource) {

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FutureDataInputStreamBuilder.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.hadoop.fs;
1919

20+
import javax.annotation.Nullable;
2021
import java.io.IOException;
2122
import java.util.concurrent.CompletableFuture;
2223

@@ -34,7 +35,7 @@
3435
* options accordingly, for example:
3536
*
3637
* If the option is not related to the file system, the option will be ignored.
37-
* If the option is must, but not supported by the file system, a
38+
* If the option is must, but not supported/known by the file system, an
3839
* {@link IllegalArgumentException} will be thrown.
3940
*
4041
*/
@@ -51,10 +52,11 @@ CompletableFuture<FSDataInputStream> build()
5152
/**
5253
* A FileStatus may be provided to the open request.
5354
* It is up to the implementation whether to use this or not.
54-
* @param status status.
55+
* @param status status: may be null
5556
* @return the builder.
5657
*/
57-
default FutureDataInputStreamBuilder withFileStatus(FileStatus status) {
58+
default FutureDataInputStreamBuilder withFileStatus(
59+
@Nullable FileStatus status) {
5860
return this;
5961
}
6062

0 commit comments

Comments
 (0)