-
Notifications
You must be signed in to change notification settings - Fork 8.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HADOOP-16202. Enhance openFile() for better read performance against object stores #2584
HADOOP-16202. Enhance openFile() for better read performance against object stores #2584
Conversation
MR client not compiling; not seeing useful information from yetus. |
3b3a4f8
to
41aa610
Compare
style
javadocs
|
I'm thinking we should be more ambitious in read policy than just "fadvise", because we can then use it as a declaration for the input streams to tune all their params, eg. buffer sizing, whether to do async prefetch. Then we could allow stores to support not-just seek policies, but declare what you were planning to read, e.g. "parquet-bytebuffer", to mean "I'm reading parquet files through the bytebuffer positioned read API"
example |
41aa610
to
0214072
Compare
This comment has been minimized.
This comment has been minimized.
@ThomasMarquardt could you take a look @ this ?
weakness there is that with multiple libraries working with Parquet data (spark, parquet.jar, iceberg, impala) it's not enough to declare the format. You'd really need to declare your app and version |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My biggest concern, or point of confusion, is the must() vs opt() thing. It feels too complex to me,. Instead, I think we should stick with opt() and allow filesystems to ignore options they don't understand. I guess I'm thinking along the lines that the app passes options as hints to the filesystem in hope that the hints might help, but things will still work if the filesystem does not understand the app's hint.
parameters.getMandatoryKeys(), | ||
Collections.emptySet(), | ||
FS_OPTION_OPENFILE_STANDARD_OPTIONS, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With this change, ChecksumFileSystem.openFileWithOptions has the same implementation as the base class, so you can remove this override.
* @see #opt(String, String) | ||
*/ | ||
B opt(@Nonnull String key, long value); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could use properties (getter and setter) for the standard options. We'd have one for buffer size, one for file length, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, but as well as it already being out there, I want to let applications compile against any version of hadoop with the API, even if a specific FS option isn't available, alongside allowing for custom FS opts. As an example, I have a PoC of parquet lib which uses this which is designed to compile against 3.3.x. (that isn't something I've stuck up as I use it to see how this stuff could be added to a library...highlights what is broken right now, specifically; S3A openFile.withFileStatus fails if used via Hive because hive wraps the FileStatus to a different type from S3AFileStatus.
@@ -4655,7 +4656,7 @@ public FutureDataInputStreamBuilder openFile(PathHandle pathHandle) | |||
final OpenFileParameters parameters) throws IOException { | |||
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This rejectUnknownMandatoryKeys function would not be necessary if the mandatory keys were strongly typed fields.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, but we need flexibility of linking, ability of clients to work with any FS implementation, etc etc
* @param deleteSource delete the source? | ||
* @param overwrite overwrite files at destination? | ||
* @param conf configuration to use when opening files | ||
* @return true if the operation succeeded. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the benefit of returning true on success and throwing on a failure, as opposed to returning void and throwing on failure?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
none. its just always been that way. and yes, it is wrong
/** | ||
* Prefix for all standard filesystem options: {@value}. | ||
*/ | ||
public static final String FILESYSTEM_OPTION = "fs.option."; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you could remove this and define the constants below with a string literal, or at least make it private.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
made it private
try { | ||
// Always do sequential reads. | ||
try (FSDataInputStream in = item.openFile( | ||
FS_OPTION_OPENFILE_READ_POLICY_NORMAL)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment above says do sequential reads, but then the sequential option isn't passed here, but seems like it should be.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
.withFileStatus(statusFromListing) | ||
.build() | ||
.get(); | ||
``` | ||
|
||
Here the seek policy of `random` has been specified, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now the terminology is "read policy".
relevant for object stores where the probes for existence are expensive, and, | ||
even with an asynchronous open, may be considered needless. | ||
|
||
### <a name="openfile(pathhandle)"></a> `FutureDataInputStreamBuilder openFile(PathHandle)` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider combining the docs for openFile(Path) and openFile(PathHandle) to avoid repetition.
If set as an `opt()` parameter, unsupported "standard" options MUST be ignored, | ||
as MUST unrecognized standard options. | ||
|
||
If set as an `must()` parameter, unsupported "standard" options MUST be ignored. | ||
unrecognized standard options MUST be rejected. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've read this a few times and it is not clear. I think you mean to say:
An option set via opt() that is not supported by the filesystem must be ignored. On the other hand, an option set via must() that is not supported by the filesystem must result in an exception.
This means that it is not a requirement for the stores to actually read the | ||
the read policy or file length values and use them when opening files. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now I'm very confused. Sounds like options set via must() can also be ignored, so why do we have opt() and must()?
I think we should keep this simple, and only have opt(). Get rid of must(). If a file system does not support the option, it should be ignored. Nice and simple.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'm trying to say that the read and length policies must be understood, but may not be used, if the implementation chooses not to
imagine in future you want to do something more than a hint. Case in point: the SQL API in AWS S3. there's no way an app could issue SQL commands which include transforming the response without knowing whether or not they are processed. hence how about i clarify this in the docs and recommend use of opt() except in such special cases? |
OK, so must() has already shipped, in that case sure, it should stay. I still have a question. If I pass an option via must(), can it be ignored by the implementation? I read your documentation, specifically, "This means that it is not a requirement for the stores to actually read the the read policy or file length values and use them when opening files." To me, this says that options passed via must() can be ignored. If this is true, then what is the difference between must() and opt()? Is it that must() can result in an unsupported exception, but opt() never results in an unsupported exception? |
correct. note that createFile() has the same api/rules/ common builder codebase. i hope to add some options there for the stores to say "skip all probes for paths existing, being a dir, having a parent..." goal is max performance writes on paths where the writer knows it is building up a dir tree from scratch, or using unique names. in particular: flink/spark checkpoints |
Thomas, to clarify a bit more yes, an FS can choose to ignore an option, -but it must recognise the option and so make a conscious decision "this doesn't matter"
Example: vectored IO will only work if the FS supports seek, which ftp doesn't. Assuming we add an option to say "I Want vectored IO" then all filesystems will get this out the box as a well known-and-supported feature, but ftp will need some explicit code to say "fail if vectored IO is requested" |
26dacf1
to
cac2661
Compare
rebased to fix compile problems; the final patch is the one with changes since Thomas's last review
S3A added some extra tuning of the input stream/openFile logic, so the stream no longer gets access to the S3 client, simply callbacks Testing: in progress |
cac2661
to
7e105dc
Compare
7e105dc
to
f5013f9
Compare
340be22
to
700f998
Compare
700f998
to
d28fa9d
Compare
103472c
to
c26e133
Compare
c26e133
to
fe4fa5c
Compare
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java
Outdated
Show resolved
Hide resolved
@mehakmeet thanks, yes, sounds like it. file a JIRA 😁 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
disclaimer: mainly been focusing on S3A so new to common libs here
Looks good!
I have a few comments on documentation since I was referring to it a lot. I can absolutely see the benefits of this API for future enhancements like prefetching.
I haven't run any tests. If requester pays has issues with endpoint, feel free to assign the JIRA to me.
@Override | ||
public B opt(@Nonnull final String key, final long value) { | ||
mandatoryKeys.remove(key); | ||
optionalKeys.add(key); | ||
options.setLong(key, value); | ||
return getThisBuilder(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JavaDoc?
/**
* Set optional long parameter for the Builder.
*
* @see #opt(String, String)
*/
@@ -53,6 +52,7 @@ private FutureIOSupport() { | |||
/** | |||
* Given a future, evaluate it. Raised exceptions are | |||
* extracted and handled. | |||
* See {@link FutureIO#awaitFuture(Future, long, TimeUnit)}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we want to reference the awaitFuture
with only a future as arg?
* See {@link FutureIO#awaitFuture(Future, long, TimeUnit)}. | |
* See {@link FutureIO#awaitFuture(Future)}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, because this is the (deprecated, internal) FutureIOSuppport class which forwards to the public FutureIO.awaitFuture(future, timeout, unit); in o.a.h.util.functional.
looking at that method though, i can see it is incompletely javadoc'd, so i've updated it
@@ -136,56 +129,39 @@ private FutureIOSupport() { | |||
* @param <U> type of builder | |||
* @return the builder passed in. | |||
*/ | |||
@Deprecated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why deprecate this method when other methods promoted to FutureIO
are happy without a deprecated flag?
Should we encourage Hadoop developers to move to FutureIO
once promoted from FutureIOSupport
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FutureIOSupport is private fs.impl, tagged private unstable. only FS implementations should be using it. I've just made sure there are no refs in our own code, so it's only needed to ensure that third party code still links
.opt(FS_OPTION_OPENFILE_LENGTH, | ||
srcStatus.getLen()) // file length hint for object stores |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When should we use FS_OPTION_OPENFILE_LENGTH
option vs. .withFileStatus(status)
?
// this is so stream draining is always blocking, allowing | ||
// assertions to be safely made without worrying | ||
// about any race conditions | ||
conf.setInt(ASYNC_DRAIN_THRESHOLD, 128_000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hoping to better understand why the change is needed - what did the race conditions look like?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only after posting this has it clicked - we just want to make sure any assertions on the stream are completed after drain? Makes sense.
Integer.MAX_VALUE
might make it more explicit - I was wondering the significance of 128_000
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
*/ | ||
@Deprecated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this method missing an @Override
tag? FileSystem#getDefaultBlockSize()
is deprecated
if (fileStatus == null) { | ||
// we check here for the passed in status | ||
// being a directory | ||
if (fileStatus.isDirectory()) { | ||
throw new FileNotFoundException(path.toString() + " is a directory"); | ||
} | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is no longer accurate, I think? It belongs with line 4893 (or we can drop the comment).
// we check here for the passed in status
// being a directory
.withAsyncDrainThreshold( | ||
options.getLong(ASYNC_DRAIN_THRESHOLD, | ||
defaultReadAhead)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Default should be defaultAsyncDrainThreshold
, not defaultReadAhead
?
@Test | ||
public void testOpenFileLongerLength() throws Throwable { | ||
// do a second read with the length declared as short. | ||
// we now expect the bytes read to be shorter. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment needs updating for this test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reviewed and updated all comments
// the stream gets opened during read | ||
long readLen = verifyMetrics(() -> | ||
readStream(in), | ||
always(NO_IO), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't understand here - what do we mean by NO_IO
? We are reading all of the stream, right?
💔 -1 overall
This message was automatically generated. |
Feedback from dannycjones. Change-Id: I546f28411c2475e1254b259c7e0734cc868ea9f0
Change-Id: If30684e9b4d39e9d1ba9cfdf50963b655c20144f
💔 -1 overall
This message was automatically generated. |
Change-Id: I64ac45369e4a6e1e9cd651b01acd380d258782fb
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM +1
Ran aws tests as well didn't saw any new failure.
This defines standard option and values for the openFile() builder API for opening a file: fs.option.openfile.read.policy A list of the desired read policy, in preferred order. standard values are adaptive, default, random, sequential, vector, whole-file fs.option.openfile.length How long the file is. fs.option.openfile.split.start start of a task's split fs.option.openfile.split.end end of a task's split These can be used by filesystem connectors to optimize their reading of the source file, including but not limited to * skipping existence/length probes when opening a file * choosing a policy for prefetching/caching data The hadoop shell commands which read files all declare "whole-file" and "sequential", as appropriate. Contributed by Steve Loughran. Change-Id: Ia290f79ea7973ce8713d4f90f1315b24d7a23da1
These changes ensure that sequential files are opened with the right read policy, and split start/end is passed in. As well as offering opportunities for filesystem clients to choose fetch/cache/seek policies, the settings ensure that processing text files on an s3 bucket where the default policy is "random" will still be processed efficiently. This commit depends on the associated hadoop-common patch, which must be committed first. Contributed by Steve Loughran. Change-Id: Ic6713fd752441cf42ebe8739d05c2293a5db9f94
S3A input stream support for the few fs.option.openfile settings. As well as supporting the read policy option and values, if the file length is declared in fs.option.openfile.length then no HEAD request will be issued when opening a file. This can cut a few tens of milliseconds off the operation. The patch adds a new openfile parameter/FS configuration option fs.s3a.input.async.drain.threshold (default: 16000). It declares the number of bytes remaining in the http input stream above which any operation to read and discard the rest of the stream, "draining", is executed asynchronously. This asynchronous draining offers some performance benefit on seek-heavy file IO. Contributed by Steve Loughran. Change-Id: I9b0626bbe635e9fd97ac0f463f5e7167e0111e39
Stops the abfs connector warning if openFile().withFileStatus() is invoked with a FileStatus is not an abfs VersionedFileStatus. Contributed by Steve Loughran. Change-Id: I85076b365eb30aaef2ed35139fa8714efd4d048e
…1) This defines standard option and values for the openFile() builder API for opening a file: fs.option.openfile.read.policy A list of the desired read policy, in preferred order. standard values are adaptive, default, random, sequential, vector, whole-file fs.option.openfile.length How long the file is. fs.option.openfile.split.start start of a task's split fs.option.openfile.split.end end of a task's split These can be used by filesystem connectors to optimize their reading of the source file, including but not limited to * skipping existence/length probes when opening a file * choosing a policy for prefetching/caching data The hadoop shell commands which read files all declare "whole-file" and "sequential", as appropriate. Contributed by Steve Loughran. Change-Id: Ia290f79ea7973ce8713d4f90f1315b24d7a23da1
…e#2584/2) These changes ensure that sequential files are opened with the right read policy, and split start/end is passed in. As well as offering opportunities for filesystem clients to choose fetch/cache/seek policies, the settings ensure that processing text files on an s3 bucket where the default policy is "random" will still be processed efficiently. This commit depends on the associated hadoop-common patch, which must be committed first. Contributed by Steve Loughran. Change-Id: Ic6713fd752441cf42ebe8739d05c2293a5db9f94
S3A input stream support for the few fs.option.openfile settings. As well as supporting the read policy option and values, if the file length is declared in fs.option.openfile.length then no HEAD request will be issued when opening a file. This can cut a few tens of milliseconds off the operation. The patch adds a new openfile parameter/FS configuration option fs.s3a.input.async.drain.threshold (default: 16000). It declares the number of bytes remaining in the http input stream above which any operation to read and discard the rest of the stream, "draining", is executed asynchronously. This asynchronous draining offers some performance benefit on seek-heavy file IO. Contributed by Steve Loughran. Change-Id: I9b0626bbe635e9fd97ac0f463f5e7167e0111e39
Stops the abfs connector warning if openFile().withFileStatus() is invoked with a FileStatus is not an abfs VersionedFileStatus. Contributed by Steve Loughran. Change-Id: I85076b365eb30aaef2ed35139fa8714efd4d048e
This defines standard option and values for the openFile() builder API for opening a file: fs.option.openfile.read.policy A list of the desired read policy, in preferred order. standard values are adaptive, default, random, sequential, vector, whole-file fs.option.openfile.length How long the file is. fs.option.openfile.split.start start of a task's split fs.option.openfile.split.end end of a task's split These can be used by filesystem connectors to optimize their reading of the source file, including but not limited to * skipping existence/length probes when opening a file * choosing a policy for prefetching/caching data The hadoop shell commands which read files all declare "whole-file" and "sequential", as appropriate. Contributed by Steve Loughran. Change-Id: Ia290f79ea7973ce8713d4f90f1315b24d7a23da1
These changes ensure that sequential files are opened with the right read policy, and split start/end is passed in. As well as offering opportunities for filesystem clients to choose fetch/cache/seek policies, the settings ensure that processing text files on an s3 bucket where the default policy is "random" will still be processed efficiently. This commit depends on the associated hadoop-common patch, which must be committed first. Contributed by Steve Loughran. Change-Id: Ic6713fd752441cf42ebe8739d05c2293a5db9f94
S3A input stream support for the few fs.option.openfile settings. As well as supporting the read policy option and values, if the file length is declared in fs.option.openfile.length then no HEAD request will be issued when opening a file. This can cut a few tens of milliseconds off the operation. The patch adds a new openfile parameter/FS configuration option fs.s3a.input.async.drain.threshold (default: 16000). It declares the number of bytes remaining in the http input stream above which any operation to read and discard the rest of the stream, "draining", is executed asynchronously. This asynchronous draining offers some performance benefit on seek-heavy file IO. Contributed by Steve Loughran. Change-Id: I9b0626bbe635e9fd97ac0f463f5e7167e0111e39
Stops the abfs connector warning if openFile().withFileStatus() is invoked with a FileStatus is not an abfs VersionedFileStatus. Contributed by Steve Loughran. Change-Id: I85076b365eb30aaef2ed35139fa8714efd4d048e
merged |
…1) This defines standard option and values for the openFile() builder API for opening a file: fs.option.openfile.read.policy A list of the desired read policy, in preferred order. standard values are adaptive, default, random, sequential, vector, whole-file fs.option.openfile.length How long the file is. fs.option.openfile.split.start start of a task's split fs.option.openfile.split.end end of a task's split These can be used by filesystem connectors to optimize their reading of the source file, including but not limited to * skipping existence/length probes when opening a file * choosing a policy for prefetching/caching data The hadoop shell commands which read files all declare "whole-file" and "sequential", as appropriate. Contributed by Steve Loughran. Change-Id: Ia290f79ea7973ce8713d4f90f1315b24d7a23da1
…e#2584/2) These changes ensure that sequential files are opened with the right read policy, and split start/end is passed in. As well as offering opportunities for filesystem clients to choose fetch/cache/seek policies, the settings ensure that processing text files on an s3 bucket where the default policy is "random" will still be processed efficiently. This commit depends on the associated hadoop-common patch, which must be committed first. Contributed by Steve Loughran. Change-Id: Ic6713fd752441cf42ebe8739d05c2293a5db9f94
S3A input stream support for the few fs.option.openfile settings. As well as supporting the read policy option and values, if the file length is declared in fs.option.openfile.length then no HEAD request will be issued when opening a file. This can cut a few tens of milliseconds off the operation. The patch adds a new openfile parameter/FS configuration option fs.s3a.input.async.drain.threshold (default: 16000). It declares the number of bytes remaining in the http input stream above which any operation to read and discard the rest of the stream, "draining", is executed asynchronously. This asynchronous draining offers some performance benefit on seek-heavy file IO. Contributed by Steve Loughran. Change-Id: I9b0626bbe635e9fd97ac0f463f5e7167e0111e39
Stops the abfs connector warning if openFile().withFileStatus() is invoked with a FileStatus is not an abfs VersionedFileStatus. Contributed by Steve Loughran. Change-Id: I85076b365eb30aaef2ed35139fa8714efd4d048e
This PR
and split start/end.
fs.s3a.input.async.drain.threshold
to set its read policy to be whole-file.
as a result of this, on the s3a client you can open files without needing
a head request, or even a filestatus, just the length.
and if, in a cluster, you set the default read policy to be random,
shell and distcp read performance doesn't collapse.