diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureIOSupport.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureIOSupport.java index b462a2b63a626..f47e5f4fbfbd6 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureIOSupport.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureIOSupport.java @@ -36,14 +36,16 @@ /** * Support for future IO and the FS Builder subclasses. - * If methods in here are needed for applications, promote - * to {@link FutureIO} for public use -with the original - * method relaying to it. This is to ensure that external - * filesystem implementations can safely use these methods + * All methods in this class have been superceded by those in + * {@link FutureIO}. + * The methods here are retained but all marked as deprecated. + * This is to ensure that any external + * filesystem implementations can still use these methods * without linkage problems surfacing. */ @InterfaceAudience.Private @InterfaceStability.Unstable +@Deprecated public final class FutureIOSupport { private FutureIOSupport() { @@ -60,7 +62,8 @@ private FutureIOSupport() { * @throws IOException if something went wrong * @throws RuntimeException any nested RTE thrown */ - public static T awaitFuture(final Future future) + @Deprecated + public static T awaitFuture(final Future future) throws InterruptedIOException, IOException, RuntimeException { return FutureIO.awaitFuture(future); } @@ -78,6 +81,7 @@ public static T awaitFuture(final Future future) * @throws RuntimeException any nested RTE thrown * @throws TimeoutException the future timed out. */ + @Deprecated public static T awaitFuture(final Future future, final long timeout, final TimeUnit unit) @@ -97,6 +101,7 @@ public static T awaitFuture(final Future future, * any non-Runtime-Exception * @throws RuntimeException if that is the inner cause. */ + @Deprecated public static T raiseInnerCause(final ExecutionException e) throws IOException { return FutureIO.raiseInnerCause(e); @@ -113,6 +118,7 @@ public static T raiseInnerCause(final ExecutionException e) * any non-Runtime-Exception * @throws RuntimeException if that is the inner cause. */ + @Deprecated public static T raiseInnerCause(final CompletionException e) throws IOException { return FutureIO.raiseInnerCause(e); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WrappedIOException.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WrappedIOException.java index d2c999683c6c6..2fbcd33beac6c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WrappedIOException.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WrappedIOException.java @@ -27,13 +27,11 @@ import org.apache.hadoop.classification.InterfaceStability; /** - * A wrapper for an IOException which - * {@link FutureIOSupport#raiseInnerCause(ExecutionException)} knows to - * always extract the exception. + * A wrapper for an IOException. * * The constructor signature guarantees the cause will be an IOException, * and as it checks for a null-argument, non-null. - * @deprecated use the {@code UncheckedIOException}. + * @deprecated use the {@code UncheckedIOException} directly.] */ @Deprecated @InterfaceAudience.Private @@ -51,8 +49,4 @@ public WrappedIOException(final IOException cause) { super(Preconditions.checkNotNull(cause)); } - @Override - public synchronized IOException getCause() { - return (IOException) super.getCause(); - } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/CommonCallableSupplier.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/CommonCallableSupplier.java index e2cdc0fd41472..32e299b4d45b1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/CommonCallableSupplier.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/CommonCallableSupplier.java @@ -34,7 +34,7 @@ import org.apache.hadoop.util.DurationInfo; -import static org.apache.hadoop.fs.impl.FutureIOSupport.raiseInnerCause; +import static org.apache.hadoop.util.functional.FutureIO.raiseInnerCause; /** * A bridge from Callable to Supplier; catching exceptions diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FutureIO.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FutureIO.java index 2d9fd9729cdfe..c3fda19d8d73b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FutureIO.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FutureIO.java @@ -90,6 +90,8 @@ public static T awaitFuture(final Future future) * extracted and rethrown. *

* @param future future to evaluate + * @param timeout timeout to wait + * @param unit time unit. * @param type of the result. * @return the result, if all went well. * @throws InterruptedIOException future was interrupted diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java index c94eceabfa33d..57c58ba35a1d1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java @@ -104,7 +104,7 @@ public synchronized void parse(HistoryEventHandler handler) * Only used for unit tests. */ @Private - public synchronized void parse(EventReader reader, HistoryEventHandler handler) + public synchronized void parse(EventReader reader, HistoryEventHandler handler) throws IOException { int eventCtr = 0; HistoryEvent event; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthRecordReader.java index 07264eae62e39..6969f61836fbc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthRecordReader.java @@ -40,6 +40,7 @@ import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.util.functional.FutureIO; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index 2a3650edbf91d..e5369b848830a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -630,6 +630,9 @@ private Constants() { * The default value for this FS. * Which for S3A, is adaptive. * Value: {@value} + * @deprecated use the {@link Options.OpenFileOptions} value + * in code which only needs to be compiled against newer hadoop + * releases. */ public static final String INPUT_FADV_DEFAULT = Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT; @@ -645,6 +648,9 @@ private Constants() { /** * Optimized for sequential access. * Value: {@value} + * @deprecated use the {@link Options.OpenFileOptions} value + * in code which only needs to be compiled against newer hadoop + * releases. */ public static final String INPUT_FADV_SEQUENTIAL = Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL; @@ -654,6 +660,9 @@ private Constants() { * The performance of sequential IO may be reduced in exchange for * more efficient {@code seek()} operations. * Value: {@value} + * @deprecated use the {@link Options.OpenFileOptions} value + * in code which only needs to be compiled against newer hadoop + * releases. */ public static final String INPUT_FADV_RANDOM = Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 9f20d2a49c9f3..15e240f901865 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -90,6 +90,7 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Globber; +import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.impl.OpenFileParameters; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A; @@ -516,7 +517,8 @@ public void initialize(URI name, Configuration originalConf) doBucketProbing(); inputPolicy = S3AInputPolicy.getPolicy( - conf.getTrimmed(INPUT_FADVISE, INPUT_FADV_DEFAULT), + conf.getTrimmed(INPUT_FADVISE, + Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT), S3AInputPolicy.Normal); LOG.debug("Input fadvise policy = {}", inputPolicy); changeDetectionPolicy = ChangeDetectionPolicy.getPolicy(conf); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputPolicy.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputPolicy.java index 9bf297913cc93..b90d0f2a61605 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputPolicy.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputPolicy.java @@ -40,8 +40,8 @@ public enum S3AInputPolicy { Normal(FS_OPTION_OPENFILE_READ_POLICY_DEFAULT, false, true), - Random(FS_OPTION_OPENFILE_READ_POLICY_RANDOM, false, false), - Sequential(FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL, true, false); + Random(FS_OPTION_OPENFILE_READ_POLICY_RANDOM, true, false), + Sequential(FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL, false, false); /** Policy name. */ private final String policy; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java index 7c91d6cdebaec..86cb18076cc6c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java @@ -306,11 +306,11 @@ public enum Statistic { TYPE_COUNTER), STREAM_READ_REMOTE_STREAM_ABORTED( StreamStatisticNames.STREAM_READ_REMOTE_STREAM_ABORTED, - "Count/duration of aborting a remote stream during stream IO", + "Duration of aborting a remote stream during stream IO", TYPE_DURATION), STREAM_READ_REMOTE_STREAM_CLOSED( StreamStatisticNames.STREAM_READ_REMOTE_STREAM_DRAINED, - "Count/duration of closing a remote stream during stream IO", + "Duration of closing a remote stream during stream IO", TYPE_DURATION), STREAM_READ_OPERATIONS_INCOMPLETE( diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java index f309f967e1343..1d52b0a34ea70 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java @@ -46,6 +46,7 @@ import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FilterFileSystem; +import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.Constants; import org.apache.hadoop.fs.s3a.MultipartUtils; @@ -426,7 +427,8 @@ public int run(String[] args, PrintStream out) String encryption = printOption(out, "\tEncryption", Constants.S3_ENCRYPTION_ALGORITHM, "none"); - printOption(out, "\tInput seek policy", INPUT_FADVISE, INPUT_FADV_DEFAULT); + printOption(out, "\tInput seek policy", INPUT_FADVISE, + Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT); printOption(out, "\tChange Detection Source", CHANGE_DETECT_SOURCE, CHANGE_DETECT_SOURCE_DEFAULT); printOption(out, "\tChange Detection Mode", CHANGE_DETECT_MODE, diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractSeek.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractSeek.java index 45811385d606a..dd41583de3fe4 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractSeek.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractSeek.java @@ -44,11 +44,11 @@ import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory; import org.apache.hadoop.util.NativeCodeLoader; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL; import static org.apache.hadoop.util.Preconditions.checkNotNull; -import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADV_DEFAULT; import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE; -import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADV_RANDOM; -import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADV_SEQUENTIAL; import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE; import static org.apache.hadoop.fs.s3a.Constants.SSL_CHANNEL_MODE; import static org.apache.hadoop.fs.s3a.S3ATestConstants.FS_S3A_IMPL_DISABLE_CACHE; @@ -87,9 +87,9 @@ public class ITestS3AContractSeek extends AbstractContractSeekTest { @Parameterized.Parameters public static Collection params() { return Arrays.asList(new Object[][]{ - {INPUT_FADV_SEQUENTIAL, Default_JSSE}, - {INPUT_FADV_RANDOM, OpenSSL}, - {INPUT_FADV_DEFAULT, Default_JSSE_with_GCM}, + {FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL, Default_JSSE}, + {FS_OPTION_OPENFILE_READ_POLICY_RANDOM, OpenSSL}, + {FS_OPTION_OPENFILE_READ_POLICY_DEFAULT, Default_JSSE_with_GCM}, }); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java index 2c888c09fa2f0..a80a24881fa74 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java @@ -73,7 +73,7 @@ public Configuration createConfiguration() { // this is so stream draining is always blocking, allowing // assertions to be safely made without worrying // about any race conditions - conf.setInt(ASYNC_DRAIN_THRESHOLD, 128_000); + conf.setInt(ASYNC_DRAIN_THRESHOLD, Integer.MAX_VALUE); return conf; } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java index 1a2543cb51a7a..b0ee531112b50 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java @@ -40,7 +40,7 @@ import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE; import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_OPENED; import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_SEEK_BYTES_SKIPPED; -import static org.apache.hadoop.fs.s3a.performance.OperationCost.NO_IO; +import static org.apache.hadoop.fs.s3a.performance.OperationCost.NO_HEAD_OR_LIST; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertDurationRange; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.extractStatistics; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; @@ -115,13 +115,13 @@ public void testOpenFileWithStatusOfOtherFS() throws Throwable { .withFileStatus(st2) .build() .get(), - always(NO_IO), + always(NO_HEAD_OR_LIST), with(STREAM_READ_OPENED, 0)); // the stream gets opened during read long readLen = verifyMetrics(() -> readStream(in), - always(NO_IO), + always(NO_HEAD_OR_LIST), with(STREAM_READ_OPENED, 1)); assertEquals("bytes read from file", fileLength, readLen); } @@ -144,7 +144,7 @@ public void testOpenFileShorterLength() throws Throwable { .opt(FS_OPTION_OPENFILE_LENGTH, shortLen) .build() .get(), - always(NO_IO), + always(NO_HEAD_OR_LIST), with(STREAM_READ_OPENED, 0)); // verify that the statistics are in range @@ -158,7 +158,7 @@ public void testOpenFileShorterLength() throws Throwable { // now read it long r2 = verifyMetrics(() -> readStream(in2), - always(NO_IO), + always(NO_HEAD_OR_LIST), with(STREAM_READ_OPENED, 1), with(STREAM_READ_BYTES_READ_CLOSE, 0), with(STREAM_READ_SEEK_BYTES_SKIPPED, 0)); @@ -166,16 +166,18 @@ public void testOpenFileShorterLength() throws Throwable { LOG.info("Statistics of read stream {}", statsString); assertEquals("bytes read from file", shortLen, r2); - // the read has been ranged + // no bytes were discarded. bytesDiscarded.assertDiffEquals(0); } @Test public void testOpenFileLongerLength() throws Throwable { - // do a second read with the length declared as short. - // we now expect the bytes read to be shorter. + // do a second read with the length declared as longer + // than it is. + // An EOF will be read on readFully(), -1 on a read() + S3AFileSystem fs = getFileSystem(); - // now set a length past the actual file length + // set a length past the actual file length long longLen = fileLength + 10; FSDataInputStream in3 = verifyMetrics(() -> fs.openFile(testFile) @@ -184,10 +186,10 @@ public void testOpenFileLongerLength() throws Throwable { .must(FS_OPTION_OPENFILE_LENGTH, longLen) .build() .get(), - always(NO_IO)); + always(NO_HEAD_OR_LIST)); - // shows that irrespective of the declared length, you can read past it, - // and an EOFException is raised + // assert behaviors of seeking/reading past the file length. + // there is no attempt at recovery. verifyMetrics(() -> { byte[] out = new byte[(int) longLen]; intercept(EOFException.class, diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/OperationCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/OperationCost.java index 7ae60a8c7d85e..03bc10f86cd25 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/OperationCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/OperationCost.java @@ -62,6 +62,12 @@ public final class OperationCost { public static final OperationCost NO_IO = new OperationCost(0, 0); + /** + * More detailed description of the NO_IO cost. + */ + public static final OperationCost NO_HEAD_OR_LIST = + NO_IO; + /** A HEAD operation. */ public static final OperationCost HEAD_OPERATION = new OperationCost(1, 0); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/select/ITestS3Select.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/select/ITestS3Select.java index c7d1322832224..97fbaae0ae4d3 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/select/ITestS3Select.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/select/ITestS3Select.java @@ -63,8 +63,8 @@ import org.apache.hadoop.mapreduce.task.JobContextImpl; import org.apache.hadoop.util.DurationInfo; -import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE; -import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADV_DEFAULT; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT; import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE; import static org.apache.hadoop.fs.s3a.select.CsvFile.ALL_QUOTES; import static org.apache.hadoop.fs.s3a.select.SelectBinding.expandBackslashChars; @@ -767,7 +767,8 @@ public void testSelectOddRecordsIgnoreHeaderV1() JobConf conf = createJobConf(); inputOpt(conf, CSV_INPUT_HEADER, CSV_HEADER_OPT_NONE); inputMust(conf, CSV_INPUT_HEADER, CSV_HEADER_OPT_IGNORE); - inputMust(conf, INPUT_FADVISE, INPUT_FADV_DEFAULT); + inputMust(conf, FS_OPTION_OPENFILE_READ_POLICY, + FS_OPTION_OPENFILE_READ_POLICY_DEFAULT); inputMust(conf, SELECT_ERRORS_INCLUDE_SQL, "true"); verifySelectionCount(EVEN_ROWS_COUNT, SELECT_EVEN_ROWS_NO_HEADER, diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java index 3222a15cd527d..2dd6b93cc0e8c 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java @@ -37,7 +37,8 @@ import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE; import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; -import static org.apache.hadoop.fs.impl.FutureIOSupport.awaitFuture; +import static org.apache.hadoop.util.functional.FutureIO.awaitFuture; + /** * Contract test for seek operation. diff --git a/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/mapreduce/StreamInputFormat.java b/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/mapreduce/StreamInputFormat.java index a77c13762ca3b..f44488c7c0202 100644 --- a/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/mapreduce/StreamInputFormat.java +++ b/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/mapreduce/StreamInputFormat.java @@ -24,13 +24,17 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FutureDataInputStreamBuilder; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.streaming.StreamUtil; +import org.apache.hadoop.util.functional.FutureIO; /** * An input format that selects a RecordReader based on a JobConf property. This @@ -58,8 +62,14 @@ public RecordReader createRecordReader(InputSplit genericSplit, context.progress(); // Open the file and seek to the start of the split - FileSystem fs = split.getPath().getFileSystem(conf); - FSDataInputStream in = fs.open(split.getPath()); + Path path = split.getPath(); + FileSystem fs = path.getFileSystem(conf); + // open the file + final FutureDataInputStreamBuilder builder = fs.openFile(path); + FutureIO.propagateOptions(builder, conf, + MRJobConfig.INPUT_FILE_OPTION_PREFIX, + MRJobConfig.INPUT_FILE_MANDATORY_PREFIX); + FSDataInputStream in = FutureIO.awaitFuture(builder.build()); // Factory dispatch based on available params.. Class readerClass;