Skip to content

Commit b1b490b

Browse files
committed
GH-3078: Use Hadoop FileSystem.openFile() to open files
* Open files with FileSystem.openFile(), passing in file status * And read policy of "parquet, vector, random, adaptive"
1 parent 78a3b76 commit b1b490b

File tree

2 files changed

+76
-1
lines changed

2 files changed

+76
-1
lines changed

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,13 @@
1919

2020
package org.apache.parquet.hadoop.util;
2121

22+
import static org.apache.parquet.hadoop.util.wrapped.io.FutureIO.awaitFuture;
23+
2224
import java.io.IOException;
25+
import java.io.InterruptedIOException;
26+
import java.util.concurrent.CompletableFuture;
2327
import org.apache.hadoop.conf.Configuration;
28+
import org.apache.hadoop.fs.FSDataInputStream;
2429
import org.apache.hadoop.fs.FileStatus;
2530
import org.apache.hadoop.fs.FileSystem;
2631
import org.apache.hadoop.fs.Path;
@@ -29,6 +34,24 @@
2934

3035
public class HadoopInputFile implements InputFile {
3136

37+
/**
38+
* openFile() option name for setting the read policy: {@value}.
39+
*/
40+
private static final String OPENFILE_READ_POLICY_KEY = "fs.option.openfile.read.policy";
41+
42+
/**
43+
* Read policy when opening parquet files: {@value}.
44+
* <p>Policy-aware stores pick the first policy they recognize in the list.
45+
* everything recognizes "random";
46+
* "vector" came in with 3.4.0, while "parquet" came with Hadoop 3.4.1
47+
* parquet means "this is a Parquet file, so be clever about footers, prefetch,
48+
* and expect vector and/or random IO".
49+
* <p>In Hadoop 3.4.1, "parquet" and "vector" are both mapped to "random" for the
50+
* S3A connector, but as the ABFS and GCS connectors do footer caching, they
51+
* may use it as a hint to say "fetch the footer and keep it in memory"
52+
*/
53+
private static final String PARQUET_READ_POLICY = "parquet, vector, random, adaptive";
54+
3255
private final FileSystem fs;
3356
private final FileStatus stat;
3457
private final Configuration conf;
@@ -70,9 +93,38 @@ public long getLength() {
7093
return stat.getLen();
7194
}
7295

96+
/**
97+
* Open the file.
98+
* <p>Uses {@code FileSystem.openFile()} so that
99+
* the existing FileStatus can be passed down: saves a HEAD request on cloud storage.
100+
* and ignored everywhere else.
101+
*
102+
* @return the input stream.
103+
*
104+
* @throws InterruptedIOException future was interrupted
105+
* @throws IOException if something went wrong
106+
* @throws RuntimeException any nested RTE thrown
107+
*/
73108
@Override
74109
public SeekableInputStream newStream() throws IOException {
75-
return HadoopStreams.wrap(fs.open(stat.getPath()));
110+
FSDataInputStream stream;
111+
try {
112+
// this method is async so that implementations may do async HEAD head
113+
// requests. Not done in S3A/ABFS when a file status passed down (as is done here)
114+
final CompletableFuture<FSDataInputStream> future = fs.openFile(stat.getPath())
115+
.withFileStatus(stat)
116+
.opt(OPENFILE_READ_POLICY_KEY, PARQUET_READ_POLICY)
117+
.build();
118+
stream = awaitFuture(future);
119+
} catch (RuntimeException e) {
120+
// S3A < 3.3.5 would raise illegal path exception if the openFile path didn't
121+
// equal the path in the FileStatus; Hive virtual FS could create this condition.
122+
// As the path to open is derived from stat.getPath(), this condition seems
123+
// near-impossible to create -but is handled here for due diligence.
124+
stream = fs.open(stat.getPath());
125+
}
126+
127+
return HadoopStreams.wrap(stream);
76128
}
77129

78130
@Override

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/wrapped/io/FutureIO.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,29 @@ public static <T> T awaitFuture(final Future<T> future, final long timeout, fina
7070
}
7171
}
7272

73+
/**
74+
* Given a future, evaluate it.
75+
* <p>
76+
* Any exception generated in the future is
77+
* extracted and rethrown.
78+
* </p>
79+
* @param future future to evaluate
80+
* @param <T> type of the result.
81+
* @return the result, if all went well.
82+
* @throws InterruptedIOException future was interrupted
83+
* @throws IOException if something went wrong
84+
* @throws RuntimeException any nested RTE thrown
85+
*/
86+
public static <T> T awaitFuture(final Future<T> future)
87+
throws InterruptedIOException, IOException, RuntimeException {
88+
try {
89+
return future.get();
90+
} catch (InterruptedException e) {
91+
throw (InterruptedIOException) new InterruptedIOException(e.toString()).initCause(e);
92+
} catch (ExecutionException e) {
93+
throw unwrapInnerException(e);
94+
}
95+
}
7396
/**
7497
* From the inner cause of an execution exception, extract the inner cause
7598
* to an IOException, raising Errors immediately.

0 commit comments

Comments
 (0)