Skip to content

Commit

Permalink
HADOOP-16202. Enhanced openFile(): hadoop-common changes. (#2584/1)
Browse files Browse the repository at this point in the history
This defines standard option and values for the
openFile() builder API for opening a file:

fs.option.openfile.read.policy
 A list of the desired read policy, in preferred order.
 standard values are
 adaptive, default, random, sequential, vector, whole-file

fs.option.openfile.length
 How long the file is.

fs.option.openfile.split.start
 start of a task's split

fs.option.openfile.split.end
 end of a task's split

These can be used by filesystem connectors to optimize their
reading of the source file, including but not limited to
* skipping existence/length probes when opening a file
* choosing a policy for prefetching/caching data

The hadoop shell commands which read files all declare "whole-file"
and "sequential", as appropriate.

Contributed by Steve Loughran.

Change-Id: Ia290f79ea7973ce8713d4f90f1315b24d7a23da1
  • Loading branch information
steveloughran committed Apr 27, 2022
1 parent 77eea7a commit 75950e4
Show file tree
Hide file tree
Showing 36 changed files with 1,321 additions and 261 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;

/** Adapts an {@link FSDataInputStream} to Avro's SeekableInput interface. */
@InterfaceAudience.Public
@InterfaceStability.Stable
Expand All @@ -42,7 +46,12 @@ public AvroFSInput(final FSDataInputStream in, final long len) {
public AvroFSInput(final FileContext fc, final Path p) throws IOException {
FileStatus status = fc.getFileStatus(p);
this.len = status.getLen();
this.stream = fc.open(p);
this.stream = awaitFuture(fc.openFile(p)
.opt(FS_OPTION_OPENFILE_READ_POLICY,
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
.withFileStatus(status)
.build());
fc.open(p);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.io.InputStream;
import java.nio.channels.ClosedChannelException;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand All @@ -45,6 +44,7 @@
import org.apache.hadoop.util.LambdaUtils;
import org.apache.hadoop.util.Progressable;

import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
import static org.apache.hadoop.fs.impl.StoreImplementationUtils.isProbeForSyncable;

Expand Down Expand Up @@ -889,7 +889,7 @@ protected CompletableFuture<FSDataInputStream> openFileWithOptions(
final OpenFileParameters parameters) throws IOException {
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
parameters.getMandatoryKeys(),
Collections.emptySet(),
FS_OPTION_OPENFILE_STANDARD_OPTIONS,
"for " + path);
return LambdaUtils.eval(
new CompletableFuture<>(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ public interface FSBuilder<S, B extends FSBuilder<S, B>> {
*/
B opt(@Nonnull String key, float value);

/**
* Set optional long parameter for the Builder.
*
* @see #opt(String, String)
*/
B opt(@Nonnull String key, long value);

/**
* Set optional double parameter for the Builder.
*
Expand Down Expand Up @@ -104,6 +111,13 @@ public interface FSBuilder<S, B extends FSBuilder<S, B>> {
*/
B must(@Nonnull String key, float value);

/**
* Set mandatory long option.
*
* @see #must(String, String)
*/
B must(@Nonnull String key, long value);

/**
* Set mandatory double option.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;

/**
* The FileContext class provides an interface for users of the Hadoop
Expand Down Expand Up @@ -2204,7 +2209,12 @@ public boolean copy(final Path src, final Path dst, boolean deleteSource,
EnumSet<CreateFlag> createFlag = overwrite ? EnumSet.of(
CreateFlag.CREATE, CreateFlag.OVERWRITE) :
EnumSet.of(CreateFlag.CREATE);
InputStream in = open(qSrc);
InputStream in = awaitFuture(openFile(qSrc)
.opt(FS_OPTION_OPENFILE_READ_POLICY,
FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
.opt(FS_OPTION_OPENFILE_LENGTH,
fs.getLen()) // file length hint for object stores
.build());
try (OutputStream out = create(qDst, createFlag)) {
IOUtils.copyBytes(in, out, conf, true);
} finally {
Expand Down Expand Up @@ -2936,9 +2946,11 @@ public CompletableFuture<FSDataInputStream> build() throws IOException {
final Path absF = fixRelativePart(getPath());
OpenFileParameters parameters = new OpenFileParameters()
.withMandatoryKeys(getMandatoryKeys())
.withOptionalKeys(getOptionalKeys())
.withOptions(getOptions())
.withBufferSize(getBufferSize())
.withStatus(getStatus());
.withStatus(getStatus())
.withBufferSize(
getOptions().getInt(FS_OPTION_OPENFILE_BUFFER_SIZE, getBufferSize()));
return new FSLinkResolver<CompletableFuture<FSDataInputStream>>() {
@Override
public CompletableFuture<FSDataInputStream> next(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkArgument;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
import static org.apache.hadoop.util.Preconditions.checkArgument;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.*;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;

Expand Down Expand Up @@ -4626,7 +4627,7 @@ protected CompletableFuture<FSDataInputStream> openFileWithOptions(
final OpenFileParameters parameters) throws IOException {
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
parameters.getMandatoryKeys(),
Collections.emptySet(),
Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS,
"for " + path);
return LambdaUtils.eval(
new CompletableFuture<>(), () ->
Expand Down Expand Up @@ -4654,7 +4655,7 @@ protected CompletableFuture<FSDataInputStream> openFileWithOptions(
final OpenFileParameters parameters) throws IOException {
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
parameters.getMandatoryKeys(),
Collections.emptySet(), "");
Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS, "");
CompletableFuture<FSDataInputStream> result = new CompletableFuture<>();
try {
result.complete(open(pathHandle, parameters.getBufferSize()));
Expand Down Expand Up @@ -4761,9 +4762,11 @@ public CompletableFuture<FSDataInputStream> build() throws IOException {
Optional<Path> optionalPath = getOptionalPath();
OpenFileParameters parameters = new OpenFileParameters()
.withMandatoryKeys(getMandatoryKeys())
.withOptionalKeys(getOptionalKeys())
.withOptions(getOptions())
.withBufferSize(getBufferSize())
.withStatus(super.getStatus()); // explicit to avoid IDE warnings
.withStatus(super.getStatus())
.withBufferSize(
getOptions().getInt(FS_OPTION_OPENFILE_BUFFER_SIZE, getBufferSize()));
if(optionalPath.isPresent()) {
return getFS().openFileWithOptions(optionalPath.get(),
parameters);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;

/**
* A collection of file-processing util methods
*/
Expand Down Expand Up @@ -395,7 +400,32 @@ public static boolean copy(FileSystem srcFS, Path src,
return copy(srcFS, fileStatus, dstFS, dst, deleteSource, overwrite, conf);
}

/** Copy files between FileSystems. */
/**
* Copy a file/directory tree within/between filesystems.
* <p></p>
* returns true if the operation succeeded. When deleteSource is true,
* this means "after the copy, delete(source) returned true"
* If the destination is a directory, and mkdirs (dest) fails,
* the operation will return false rather than raise any exception.
* <p></p>
* The overwrite flag is about overwriting files; it has no effect about
* handing an attempt to copy a file atop a directory (expect an IOException),
* or a directory over a path which contains a file (mkdir will fail, so
* "false").
* <p></p>
* The operation is recursive, and the deleteSource operation takes place
* as each subdirectory is copied. Therefore, if an operation fails partway
* through, the source tree may be partially deleted.
* @param srcFS source filesystem
* @param srcStatus status of source
* @param dstFS destination filesystem
* @param dst path of source
* @param deleteSource delete the source?
* @param overwrite overwrite files at destination?
* @param conf configuration to use when opening files
* @return true if the operation succeeded.
* @throws IOException failure
*/
public static boolean copy(FileSystem srcFS, FileStatus srcStatus,
FileSystem dstFS, Path dst,
boolean deleteSource,
Expand All @@ -408,22 +438,27 @@ public static boolean copy(FileSystem srcFS, FileStatus srcStatus,
if (!dstFS.mkdirs(dst)) {
return false;
}
FileStatus contents[] = srcFS.listStatus(src);
for (int i = 0; i < contents.length; i++) {
copy(srcFS, contents[i], dstFS,
new Path(dst, contents[i].getPath().getName()),
deleteSource, overwrite, conf);
RemoteIterator<FileStatus> contents = srcFS.listStatusIterator(src);
while (contents.hasNext()) {
FileStatus next = contents.next();
copy(srcFS, next, dstFS,
new Path(dst, next.getPath().getName()),
deleteSource, overwrite, conf);
}
} else {
InputStream in=null;
InputStream in = null;
OutputStream out = null;
try {
in = srcFS.open(src);
in = awaitFuture(srcFS.openFile(src)
.opt(FS_OPTION_OPENFILE_READ_POLICY,
FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
.opt(FS_OPTION_OPENFILE_LENGTH,
srcStatus.getLen()) // file length hint for object stores
.build());
out = dstFS.create(dst, overwrite);
IOUtils.copyBytes(in, out, conf, true);
} catch (IOException e) {
IOUtils.closeStream(out);
IOUtils.closeStream(in);
IOUtils.cleanupWithLogger(LOG, in, out);
throw e;
}
}
Expand Down Expand Up @@ -502,7 +537,11 @@ private static boolean copy(FileSystem srcFS, FileStatus srcStatus,
deleteSource, conf);
}
} else {
InputStream in = srcFS.open(src);
InputStream in = awaitFuture(srcFS.openFile(src)
.withFileStatus(srcStatus)
.opt(FS_OPTION_OPENFILE_READ_POLICY,
FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
.build());
IOUtils.copyBytes(in, Files.newOutputStream(dst.toPath()), conf);
}
if (deleteSource) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.fs;

import javax.annotation.Nullable;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;

Expand All @@ -34,7 +35,7 @@
* options accordingly, for example:
*
* If the option is not related to the file system, the option will be ignored.
* If the option is must, but not supported by the file system, a
* If the option is must, but not supported/known by the file system, an
* {@link IllegalArgumentException} will be thrown.
*
*/
Expand All @@ -51,10 +52,11 @@ CompletableFuture<FSDataInputStream> build()
/**
* A FileStatus may be provided to the open request.
* It is up to the implementation whether to use this or not.
* @param status status.
* @param status status: may be null
* @return the builder.
*/
default FutureDataInputStreamBuilder withFileStatus(FileStatus status) {
default FutureDataInputStreamBuilder withFileStatus(
@Nullable FileStatus status) {
return this;
}

Expand Down
Loading

0 comments on commit 75950e4

Please sign in to comment.