Skip to content

Commit 1b4dba9

Browse files
committed
HADOOP-16202. Enhanced openFile(): hadoop-common changes. (#2584/1)
This defines standard option and values for the openFile() builder API for opening a file: fs.option.openfile.read.policy A list of the desired read policy, in preferred order. standard values are adaptive, default, random, sequential, vector, whole-file fs.option.openfile.length How long the file is. fs.option.openfile.split.start start of a task's split fs.option.openfile.split.end end of a task's split These can be used by filesystem connectors to optimize their reading of the source file, including but not limited to * skipping existence/length probes when opening a file * choosing a policy for prefetching/caching data The hadoop shell commands which read files all declare "whole-file" and "sequential", as appropriate. Contributed by Steve Loughran. Change-Id: Ia290f79ea7973ce8713d4f90f1315b24d7a23da1
1 parent 17d64ba commit 1b4dba9

36 files changed

+1320
-260
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AvroFSInput.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@
2525
import org.apache.hadoop.classification.InterfaceAudience;
2626
import org.apache.hadoop.classification.InterfaceStability;
2727

28+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
29+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
30+
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
31+
2832
/** Adapts an {@link FSDataInputStream} to Avro's SeekableInput interface. */
2933
@InterfaceAudience.Public
3034
@InterfaceStability.Stable
@@ -42,7 +46,12 @@ public AvroFSInput(final FSDataInputStream in, final long len) {
4246
public AvroFSInput(final FileContext fc, final Path p) throws IOException {
4347
FileStatus status = fc.getFileStatus(p);
4448
this.len = status.getLen();
45-
this.stream = fc.open(p);
49+
this.stream = awaitFuture(fc.openFile(p)
50+
.opt(FS_OPTION_OPENFILE_READ_POLICY,
51+
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
52+
.withFileStatus(status)
53+
.build());
54+
fc.open(p);
4655
}
4756

4857
@Override

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import java.io.InputStream;
2525
import java.nio.channels.ClosedChannelException;
2626
import java.util.Arrays;
27-
import java.util.Collections;
2827
import java.util.EnumSet;
2928
import java.util.List;
3029
import java.util.concurrent.CompletableFuture;
@@ -45,6 +44,7 @@
4544
import org.apache.hadoop.util.LambdaUtils;
4645
import org.apache.hadoop.util.Progressable;
4746

47+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS;
4848
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
4949
import static org.apache.hadoop.fs.impl.StoreImplementationUtils.isProbeForSyncable;
5050

@@ -889,7 +889,7 @@ protected CompletableFuture<FSDataInputStream> openFileWithOptions(
889889
final OpenFileParameters parameters) throws IOException {
890890
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
891891
parameters.getMandatoryKeys(),
892-
Collections.emptySet(),
892+
FS_OPTION_OPENFILE_STANDARD_OPTIONS,
893893
"for " + path);
894894
return LambdaUtils.eval(
895895
new CompletableFuture<>(),

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSBuilder.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,13 @@ public interface FSBuilder<S, B extends FSBuilder<S, B>> {
6161
*/
6262
B opt(@Nonnull String key, float value);
6363

64+
/**
65+
* Set optional long parameter for the Builder.
66+
*
67+
* @see #opt(String, String)
68+
*/
69+
B opt(@Nonnull String key, long value);
70+
6471
/**
6572
* Set optional double parameter for the Builder.
6673
*
@@ -104,6 +111,13 @@ public interface FSBuilder<S, B extends FSBuilder<S, B>> {
104111
*/
105112
B must(@Nonnull String key, float value);
106113

114+
/**
115+
* Set mandatory long option.
116+
*
117+
* @see #must(String, String)
118+
*/
119+
B must(@Nonnull String key, long value);
120+
107121
/**
108122
* Set mandatory double option.
109123
*

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,12 @@
7070
import org.slf4j.Logger;
7171
import org.slf4j.LoggerFactory;
7272

73+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
74+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
75+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
76+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
7377
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
78+
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
7479

7580
/**
7681
* The FileContext class provides an interface for users of the Hadoop
@@ -2198,7 +2203,12 @@ public boolean copy(final Path src, final Path dst, boolean deleteSource,
21982203
EnumSet<CreateFlag> createFlag = overwrite ? EnumSet.of(
21992204
CreateFlag.CREATE, CreateFlag.OVERWRITE) :
22002205
EnumSet.of(CreateFlag.CREATE);
2201-
InputStream in = open(qSrc);
2206+
InputStream in = awaitFuture(openFile(qSrc)
2207+
.opt(FS_OPTION_OPENFILE_READ_POLICY,
2208+
FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
2209+
.opt(FS_OPTION_OPENFILE_LENGTH,
2210+
fs.getLen()) // file length hint for object stores
2211+
.build());
22022212
try (OutputStream out = create(qDst, createFlag)) {
22032213
IOUtils.copyBytes(in, out, conf, true);
22042214
} finally {
@@ -2930,9 +2940,11 @@ public CompletableFuture<FSDataInputStream> build() throws IOException {
29302940
final Path absF = fixRelativePart(getPath());
29312941
OpenFileParameters parameters = new OpenFileParameters()
29322942
.withMandatoryKeys(getMandatoryKeys())
2943+
.withOptionalKeys(getOptionalKeys())
29332944
.withOptions(getOptions())
2934-
.withBufferSize(getBufferSize())
2935-
.withStatus(getStatus());
2945+
.withStatus(getStatus())
2946+
.withBufferSize(
2947+
getOptions().getInt(FS_OPTION_OPENFILE_BUFFER_SIZE, getBufferSize()));
29362948
return new FSLinkResolver<CompletableFuture<FSDataInputStream>>() {
29372949
@Override
29382950
public CompletableFuture<FSDataInputStream> next(

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
import org.slf4j.Logger;
8989
import org.slf4j.LoggerFactory;
9090

91+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
9192
import static org.apache.hadoop.util.Preconditions.checkArgument;
9293
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.*;
9394
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
@@ -4616,7 +4617,7 @@ protected CompletableFuture<FSDataInputStream> openFileWithOptions(
46164617
final OpenFileParameters parameters) throws IOException {
46174618
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
46184619
parameters.getMandatoryKeys(),
4619-
Collections.emptySet(),
4620+
Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS,
46204621
"for " + path);
46214622
return LambdaUtils.eval(
46224623
new CompletableFuture<>(), () ->
@@ -4644,7 +4645,7 @@ protected CompletableFuture<FSDataInputStream> openFileWithOptions(
46444645
final OpenFileParameters parameters) throws IOException {
46454646
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
46464647
parameters.getMandatoryKeys(),
4647-
Collections.emptySet(), "");
4648+
Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS, "");
46484649
CompletableFuture<FSDataInputStream> result = new CompletableFuture<>();
46494650
try {
46504651
result.complete(open(pathHandle, parameters.getBufferSize()));
@@ -4751,9 +4752,11 @@ public CompletableFuture<FSDataInputStream> build() throws IOException {
47514752
Optional<Path> optionalPath = getOptionalPath();
47524753
OpenFileParameters parameters = new OpenFileParameters()
47534754
.withMandatoryKeys(getMandatoryKeys())
4755+
.withOptionalKeys(getOptionalKeys())
47544756
.withOptions(getOptions())
4755-
.withBufferSize(getBufferSize())
4756-
.withStatus(super.getStatus()); // explicit to avoid IDE warnings
4757+
.withStatus(super.getStatus())
4758+
.withBufferSize(
4759+
getOptions().getInt(FS_OPTION_OPENFILE_BUFFER_SIZE, getBufferSize()));
47574760
if(optionalPath.isPresent()) {
47584761
return getFS().openFileWithOptions(optionalPath.get(),
47594762
parameters);

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java

Lines changed: 50 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,11 @@
7777
import org.slf4j.Logger;
7878
import org.slf4j.LoggerFactory;
7979

80+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
81+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
82+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
83+
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
84+
8085
/**
8186
* A collection of file-processing util methods
8287
*/
@@ -396,7 +401,32 @@ public static boolean copy(FileSystem srcFS, Path src,
396401
return copy(srcFS, fileStatus, dstFS, dst, deleteSource, overwrite, conf);
397402
}
398403

399-
/** Copy files between FileSystems. */
404+
/**
405+
* Copy a file/directory tree within/between filesystems.
406+
* <p></p>
407+
* returns true if the operation succeeded. When deleteSource is true,
408+
* this means "after the copy, delete(source) returned true"
409+
* If the destination is a directory, and mkdirs (dest) fails,
410+
* the operation will return false rather than raise any exception.
411+
* <p></p>
412+
* The overwrite flag is about overwriting files; it has no effect about
413+
* handing an attempt to copy a file atop a directory (expect an IOException),
414+
* or a directory over a path which contains a file (mkdir will fail, so
415+
* "false").
416+
* <p></p>
417+
* The operation is recursive, and the deleteSource operation takes place
418+
* as each subdirectory is copied. Therefore, if an operation fails partway
419+
* through, the source tree may be partially deleted.
420+
* @param srcFS source filesystem
421+
* @param srcStatus status of source
422+
* @param dstFS destination filesystem
423+
* @param dst path of source
424+
* @param deleteSource delete the source?
425+
* @param overwrite overwrite files at destination?
426+
* @param conf configuration to use when opening files
427+
* @return true if the operation succeeded.
428+
* @throws IOException failure
429+
*/
400430
public static boolean copy(FileSystem srcFS, FileStatus srcStatus,
401431
FileSystem dstFS, Path dst,
402432
boolean deleteSource,
@@ -409,22 +439,27 @@ public static boolean copy(FileSystem srcFS, FileStatus srcStatus,
409439
if (!dstFS.mkdirs(dst)) {
410440
return false;
411441
}
412-
FileStatus contents[] = srcFS.listStatus(src);
413-
for (int i = 0; i < contents.length; i++) {
414-
copy(srcFS, contents[i], dstFS,
415-
new Path(dst, contents[i].getPath().getName()),
416-
deleteSource, overwrite, conf);
442+
RemoteIterator<FileStatus> contents = srcFS.listStatusIterator(src);
443+
while (contents.hasNext()) {
444+
FileStatus next = contents.next();
445+
copy(srcFS, next, dstFS,
446+
new Path(dst, next.getPath().getName()),
447+
deleteSource, overwrite, conf);
417448
}
418449
} else {
419-
InputStream in=null;
450+
InputStream in = null;
420451
OutputStream out = null;
421452
try {
422-
in = srcFS.open(src);
453+
in = awaitFuture(srcFS.openFile(src)
454+
.opt(FS_OPTION_OPENFILE_READ_POLICY,
455+
FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
456+
.opt(FS_OPTION_OPENFILE_LENGTH,
457+
srcStatus.getLen()) // file length hint for object stores
458+
.build());
423459
out = dstFS.create(dst, overwrite);
424460
IOUtils.copyBytes(in, out, conf, true);
425461
} catch (IOException e) {
426-
IOUtils.closeStream(out);
427-
IOUtils.closeStream(in);
462+
IOUtils.cleanupWithLogger(LOG, in, out);
428463
throw e;
429464
}
430465
}
@@ -503,7 +538,11 @@ private static boolean copy(FileSystem srcFS, FileStatus srcStatus,
503538
deleteSource, conf);
504539
}
505540
} else {
506-
InputStream in = srcFS.open(src);
541+
InputStream in = awaitFuture(srcFS.openFile(src)
542+
.withFileStatus(srcStatus)
543+
.opt(FS_OPTION_OPENFILE_READ_POLICY,
544+
FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
545+
.build());
507546
IOUtils.copyBytes(in, Files.newOutputStream(dst.toPath()), conf);
508547
}
509548
if (deleteSource) {

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FutureDataInputStreamBuilder.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.hadoop.fs;
1919

20+
import javax.annotation.Nullable;
2021
import java.io.IOException;
2122
import java.util.concurrent.CompletableFuture;
2223

@@ -34,7 +35,7 @@
3435
* options accordingly, for example:
3536
*
3637
* If the option is not related to the file system, the option will be ignored.
37-
* If the option is must, but not supported by the file system, a
38+
* If the option is must, but not supported/known by the file system, an
3839
* {@link IllegalArgumentException} will be thrown.
3940
*
4041
*/
@@ -51,10 +52,11 @@ CompletableFuture<FSDataInputStream> build()
5152
/**
5253
* A FileStatus may be provided to the open request.
5354
* It is up to the implementation whether to use this or not.
54-
* @param status status.
55+
* @param status status: may be null
5556
* @return the builder.
5657
*/
57-
default FutureDataInputStreamBuilder withFileStatus(FileStatus status) {
58+
default FutureDataInputStreamBuilder withFileStatus(
59+
@Nullable FileStatus status) {
5860
return this;
5961
}
6062

0 commit comments

Comments
 (0)