Skip to content

Commit

Permalink
HADOOP-16202. enhance-openfile: review feedback
Browse files Browse the repository at this point in the history
Feedback from dannycjones.

Change-Id: I546f28411c2475e1254b259c7e0734cc868ea9f0
  • Loading branch information
steveloughran committed Apr 19, 2022
1 parent e7b29ef commit 98ebf76
Show file tree
Hide file tree
Showing 18 changed files with 82 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,16 @@

/**
* Support for future IO and the FS Builder subclasses.
* If methods in here are needed for applications, promote
* to {@link FutureIO} for public use -with the original
* method relaying to it. This is to ensure that external
* filesystem implementations can safely use these methods
* All methods in this class have been superceded by those in
* {@link FutureIO}.
* The methods here are retained but all marked as deprecated.
* This is to ensure that any external
* filesystem implementations can still use these methods
* without linkage problems surfacing.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
@Deprecated
public final class FutureIOSupport {

private FutureIOSupport() {
Expand All @@ -60,7 +62,8 @@ private FutureIOSupport() {
* @throws IOException if something went wrong
* @throws RuntimeException any nested RTE thrown
*/
public static <T> T awaitFuture(final Future<T> future)
@Deprecated
public static <T> T awaitFuture(final Future<T> future)
throws InterruptedIOException, IOException, RuntimeException {
return FutureIO.awaitFuture(future);
}
Expand All @@ -78,6 +81,7 @@ public static <T> T awaitFuture(final Future<T> future)
* @throws RuntimeException any nested RTE thrown
* @throws TimeoutException the future timed out.
*/
@Deprecated
public static <T> T awaitFuture(final Future<T> future,
final long timeout,
final TimeUnit unit)
Expand All @@ -97,6 +101,7 @@ public static <T> T awaitFuture(final Future<T> future,
* any non-Runtime-Exception
* @throws RuntimeException if that is the inner cause.
*/
@Deprecated
public static <T> T raiseInnerCause(final ExecutionException e)
throws IOException {
return FutureIO.raiseInnerCause(e);
Expand All @@ -113,6 +118,7 @@ public static <T> T raiseInnerCause(final ExecutionException e)
* any non-Runtime-Exception
* @throws RuntimeException if that is the inner cause.
*/
@Deprecated
public static <T> T raiseInnerCause(final CompletionException e)
throws IOException {
return FutureIO.raiseInnerCause(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,11 @@
import org.apache.hadoop.classification.InterfaceStability;

/**
* A wrapper for an IOException which
* {@link FutureIOSupport#raiseInnerCause(ExecutionException)} knows to
* always extract the exception.
* A wrapper for an IOException.
*
* The constructor signature guarantees the cause will be an IOException,
* and as it checks for a null-argument, non-null.
* @deprecated use the {@code UncheckedIOException}.
* @deprecated use the {@code UncheckedIOException} directly.]
*/
@Deprecated
@InterfaceAudience.Private
Expand All @@ -51,8 +49,4 @@ public WrappedIOException(final IOException cause) {
super(Preconditions.checkNotNull(cause));
}

@Override
public synchronized IOException getCause() {
return (IOException) super.getCause();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

import org.apache.hadoop.util.DurationInfo;

import static org.apache.hadoop.fs.impl.FutureIOSupport.raiseInnerCause;
import static org.apache.hadoop.util.functional.FutureIO.raiseInnerCause;

/**
* A bridge from Callable to Supplier; catching exceptions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ public static <T> T awaitFuture(final Future<T> future)
* extracted and rethrown.
* </p>
* @param future future to evaluate
* @param timeout timeout to wait
* @param unit time unit.
* @param <T> type of the result.
* @return the result, if all went well.
* @throws InterruptedIOException future was interrupted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public synchronized void parse(HistoryEventHandler handler)
* Only used for unit tests.
*/
@Private
public synchronized void parse(EventReader reader, HistoryEventHandler handler)
public synchronized void parse(EventReader reader, HistoryEventHandler handler)
throws IOException {
int eventCtr = 0;
HistoryEvent event;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.functional.FutureIO;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,9 @@ private Constants() {
* The default value for this FS.
* Which for S3A, is adaptive.
* Value: {@value}
* @deprecated use the {@link Options.OpenFileOptions} value
* in code which only needs to be compiled against newer hadoop
* releases.
*/
public static final String INPUT_FADV_DEFAULT =
Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT;
Expand All @@ -645,6 +648,9 @@ private Constants() {
/**
* Optimized for sequential access.
* Value: {@value}
* @deprecated use the {@link Options.OpenFileOptions} value
* in code which only needs to be compiled against newer hadoop
* releases.
*/
public static final String INPUT_FADV_SEQUENTIAL =
Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
Expand All @@ -654,6 +660,9 @@ private Constants() {
* The performance of sequential IO may be reduced in exchange for
* more efficient {@code seek()} operations.
* Value: {@value}
* @deprecated use the {@link Options.OpenFileOptions} value
* in code which only needs to be compiled against newer hadoop
* releases.
*/
public static final String INPUT_FADV_RANDOM =
Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Globber;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.impl.OpenFileParameters;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A;
Expand Down Expand Up @@ -516,7 +517,8 @@ public void initialize(URI name, Configuration originalConf)
doBucketProbing();

inputPolicy = S3AInputPolicy.getPolicy(
conf.getTrimmed(INPUT_FADVISE, INPUT_FADV_DEFAULT),
conf.getTrimmed(INPUT_FADVISE,
Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT),
S3AInputPolicy.Normal);
LOG.debug("Input fadvise policy = {}", inputPolicy);
changeDetectionPolicy = ChangeDetectionPolicy.getPolicy(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
public enum S3AInputPolicy {

Normal(FS_OPTION_OPENFILE_READ_POLICY_DEFAULT, false, true),
Random(FS_OPTION_OPENFILE_READ_POLICY_RANDOM, false, false),
Sequential(FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL, true, false);
Random(FS_OPTION_OPENFILE_READ_POLICY_RANDOM, true, false),
Sequential(FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL, false, false);

/** Policy name. */
private final String policy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,11 +306,11 @@ public enum Statistic {
TYPE_COUNTER),
STREAM_READ_REMOTE_STREAM_ABORTED(
StreamStatisticNames.STREAM_READ_REMOTE_STREAM_ABORTED,
"Count/duration of aborting a remote stream during stream IO",
"Duration of aborting a remote stream during stream IO",
TYPE_DURATION),
STREAM_READ_REMOTE_STREAM_CLOSED(
StreamStatisticNames.STREAM_READ_REMOTE_STREAM_DRAINED,
"Count/duration of closing a remote stream during stream IO",
"Duration of closing a remote stream during stream IO",
TYPE_DURATION),

STREAM_READ_OPERATIONS_INCOMPLETE(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.fs.s3a.MultipartUtils;
Expand Down Expand Up @@ -426,7 +427,8 @@ public int run(String[] args, PrintStream out)
String encryption =
printOption(out, "\tEncryption", Constants.S3_ENCRYPTION_ALGORITHM,
"none");
printOption(out, "\tInput seek policy", INPUT_FADVISE, INPUT_FADV_DEFAULT);
printOption(out, "\tInput seek policy", INPUT_FADVISE,
Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT);
printOption(out, "\tChange Detection Source", CHANGE_DETECT_SOURCE,
CHANGE_DETECT_SOURCE_DEFAULT);
printOption(out, "\tChange Detection Mode", CHANGE_DETECT_MODE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
import org.apache.hadoop.util.NativeCodeLoader;

import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
import static org.apache.hadoop.util.Preconditions.checkNotNull;
import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADV_DEFAULT;
import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADV_RANDOM;
import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADV_SEQUENTIAL;
import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
import static org.apache.hadoop.fs.s3a.Constants.SSL_CHANNEL_MODE;
import static org.apache.hadoop.fs.s3a.S3ATestConstants.FS_S3A_IMPL_DISABLE_CACHE;
Expand Down Expand Up @@ -87,9 +87,9 @@ public class ITestS3AContractSeek extends AbstractContractSeekTest {
@Parameterized.Parameters
public static Collection<Object[]> params() {
return Arrays.asList(new Object[][]{
{INPUT_FADV_SEQUENTIAL, Default_JSSE},
{INPUT_FADV_RANDOM, OpenSSL},
{INPUT_FADV_DEFAULT, Default_JSSE_with_GCM},
{FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL, Default_JSSE},
{FS_OPTION_OPENFILE_READ_POLICY_RANDOM, OpenSSL},
{FS_OPTION_OPENFILE_READ_POLICY_DEFAULT, Default_JSSE_with_GCM},
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public Configuration createConfiguration() {
// this is so stream draining is always blocking, allowing
// assertions to be safely made without worrying
// about any race conditions
conf.setInt(ASYNC_DRAIN_THRESHOLD, 128_000);
conf.setInt(ASYNC_DRAIN_THRESHOLD, Integer.MAX_VALUE);
return conf;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE;
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_OPENED;
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_SEEK_BYTES_SKIPPED;
import static org.apache.hadoop.fs.s3a.performance.OperationCost.NO_IO;
import static org.apache.hadoop.fs.s3a.performance.OperationCost.NO_HEAD_OR_LIST;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertDurationRange;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.extractStatistics;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
Expand Down Expand Up @@ -115,13 +115,13 @@ public void testOpenFileWithStatusOfOtherFS() throws Throwable {
.withFileStatus(st2)
.build()
.get(),
always(NO_IO),
always(NO_HEAD_OR_LIST),
with(STREAM_READ_OPENED, 0));

// the stream gets opened during read
long readLen = verifyMetrics(() ->
readStream(in),
always(NO_IO),
always(NO_HEAD_OR_LIST),
with(STREAM_READ_OPENED, 1));
assertEquals("bytes read from file", fileLength, readLen);
}
Expand All @@ -144,7 +144,7 @@ public void testOpenFileShorterLength() throws Throwable {
.opt(FS_OPTION_OPENFILE_LENGTH, shortLen)
.build()
.get(),
always(NO_IO),
always(NO_HEAD_OR_LIST),
with(STREAM_READ_OPENED, 0));

// verify that the statistics are in range
Expand All @@ -158,24 +158,26 @@ public void testOpenFileShorterLength() throws Throwable {
// now read it
long r2 = verifyMetrics(() ->
readStream(in2),
always(NO_IO),
always(NO_HEAD_OR_LIST),
with(STREAM_READ_OPENED, 1),
with(STREAM_READ_BYTES_READ_CLOSE, 0),
with(STREAM_READ_SEEK_BYTES_SKIPPED, 0));

LOG.info("Statistics of read stream {}", statsString);

assertEquals("bytes read from file", shortLen, r2);
// the read has been ranged
// no bytes were discarded.
bytesDiscarded.assertDiffEquals(0);
}

@Test
public void testOpenFileLongerLength() throws Throwable {
// do a second read with the length declared as short.
// we now expect the bytes read to be shorter.
// do a second read with the length declared as longer
// than it is.
// An EOF will be read on readFully(), -1 on a read()

S3AFileSystem fs = getFileSystem();
// now set a length past the actual file length
// set a length past the actual file length
long longLen = fileLength + 10;
FSDataInputStream in3 = verifyMetrics(() ->
fs.openFile(testFile)
Expand All @@ -184,10 +186,10 @@ public void testOpenFileLongerLength() throws Throwable {
.must(FS_OPTION_OPENFILE_LENGTH, longLen)
.build()
.get(),
always(NO_IO));
always(NO_HEAD_OR_LIST));

// shows that irrespective of the declared length, you can read past it,
// and an EOFException is raised
// assert behaviors of seeking/reading past the file length.
// there is no attempt at recovery.
verifyMetrics(() -> {
byte[] out = new byte[(int) longLen];
intercept(EOFException.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ public final class OperationCost {
public static final OperationCost NO_IO =
new OperationCost(0, 0);

/**
* More detailed description of the NO_IO cost.
*/
public static final OperationCost NO_HEAD_OR_LIST =
NO_IO;

/** A HEAD operation. */
public static final OperationCost HEAD_OPERATION = new OperationCost(1, 0);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.util.DurationInfo;

import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADV_DEFAULT;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT;
import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
import static org.apache.hadoop.fs.s3a.select.CsvFile.ALL_QUOTES;
import static org.apache.hadoop.fs.s3a.select.SelectBinding.expandBackslashChars;
Expand Down Expand Up @@ -767,7 +767,8 @@ public void testSelectOddRecordsIgnoreHeaderV1()
JobConf conf = createJobConf();
inputOpt(conf, CSV_INPUT_HEADER, CSV_HEADER_OPT_NONE);
inputMust(conf, CSV_INPUT_HEADER, CSV_HEADER_OPT_IGNORE);
inputMust(conf, INPUT_FADVISE, INPUT_FADV_DEFAULT);
inputMust(conf, FS_OPTION_OPENFILE_READ_POLICY,
FS_OPTION_OPENFILE_READ_POLICY_DEFAULT);
inputMust(conf, SELECT_ERRORS_INCLUDE_SQL, "true");
verifySelectionCount(EVEN_ROWS_COUNT,
SELECT_EVEN_ROWS_NO_HEADER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.impl.FutureIOSupport.awaitFuture;
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;


/**
* Contract test for seek operation.
Expand Down
Loading

0 comments on commit 98ebf76

Please sign in to comment.