-
Notifications
You must be signed in to change notification settings - Fork 9.1k
HADOOP-16202. Enhance/Stabilize openFile() #2168
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HADOOP-16202. Enhance/Stabilize openFile() #2168
Conversation
6121f74
to
67fb88b
Compare
67fb88b
to
7617109
Compare
|
tests
|
|
e7bf3e7
to
ba15ef4
Compare
ba15ef4
to
d7a98f1
Compare
I'm wondering if on distcp I should leave off the file length. why so: if someone updates a file between distcp being scheduled and it being executed, the length will be different. I know people aren't "meant" to do that, but inevitably people already do. |
d7a98f1
to
d536a50
Compare
This comment has been minimized.
This comment has been minimized.
javadoc
|
767a63d
to
1d85a7c
Compare
This comment has been minimized.
This comment has been minimized.
legit failure of As well as fixing, maybe for the serializer, map MismatchedInputException to EOFException for better handling up the stack
|
looking @ jackson code, MismatchedInputException covers a lot of cases -and I don't want to do anything "clever" about choosing specific IOEs based on the exception text. Modifying test to look for a PathIOException rather than EOFException |
1d85a7c
to
7b56861
Compare
latest patch follows on from HADOOP-17318 and passes in status when opening .pending and .pendingset files. for the magic committer this saves 1 HEAD per file in task commit; 1 HEAD per task in job commit For max speedup/scale we'd need to also adopt incremental listing. Later. |
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
Roll-up of the previous PR Change-Id: Ib0aec173afcd8aae33f52da3f99ac813bd38c32f
Change-Id: I91de02318af2bb042c7d4dfa65d418c363edee50
Name and document standard options; move to visible class definition. Document better in filesystem spec, including examples of API and MapReduce use Change-Id: Ia593a67babcad63c13dda65b62287683a678a266
- move all options into a class nested under o.a.h.fs.Options; - use in FsShell commands which download files (copy, head, tail) - distcp download - FileUtil.copy Includes YARN-10444 changes in yarn localization. This ensures that if the default/cluster settings For a file system are optimized for random IO, artifact downloads are still read at the maximum speed possible (one big GET to the EOF). In the changed IOUtils and YARN Code, also tweaked slightly * use incremental listing for dir listing, better for paged loads * pass around a FileStatus more Change-Id: Iedbb7b53cdaef2b82a0876e1db379c9a428f9339
Change-Id: I08045aaf13919f3923a7411ef64096f5e3171e69
* fs.option.buffer.size sets buffer size * this is picked up in the default FS/FC file openings * fadvise declared to take a CSV list of seek options, read in order until one is recognized by the store. This is to allow for custom policies and evolution over time. * review of FileContext.open and switch to openFile where sequential access likely to make a significant different (i.e. large files) * JsonSerialization to ignore available(), as that is just buffer capacity. * JsonSerialization exposes reader taking an FSDataInputStream. That's to let me switch the committers to openFile() with length -the committers should set the example here. It should significantly benefit Magic committer by eliminating 1 HEAD/.pending in task commit, one HEAD per .pendingset during job commit...overall 50% of the HEAD/GET I/O to S3 in these operations. Docs improved. Other than moving the magic committer to that new read (to be followup), this is done. Change-Id: I013d338f2b043f7940d7c17fcb2a7777cc7c1c18
JsonSerialization takes an optional filestatus now. Exposed protected constructor to PathIOException to help there. Avoids playing initCause games. Test failures in ITestS3AInputStreamPerformance switched me to passing down the (changeable) seek policy in the FS; the fil aree opening there all switches to the openFile() API with the standard names, so verifies in an ITest that they are being picked up correctly. Change-Id: Ief8c5d40c3e1ca829afcabb27e9e04aecad8bb48
Passing in the FileStatus to openfile will have tangible benefits in task and job commit for the magic committer, while being harmless for staging committers (unless they stage to S3) In task commit: eliminate one HEAD request per file created in that task attempt In job commit: eliminate one HEAD request per task Change-Id: I20d705067d3d3749c05a854b306888dbf72c83a5
Change-Id: I1e454d3244ffab8c248584edfb1f539ebac596c7
fc74d55
to
0966a28
Compare
This comment has been minimized.
This comment has been minimized.
I had the great idea of adding explict options to declare the split.start and split.end, where split end can be used to define the file length. So I added them and wired up the relevant bits of the MR code. But: we can't actually rely on them because some readers may read past the end of the split to get to the end of their record. It has to be viewed as a hint "we will read this far" not a rule "we will read this far and no further". This is reflected in the docs and tests for s3a. This actually complicates the story for opening files efficiently in MR jobs/hive as the split size is passed down but not the final file length. If we want to eliminate those head requests we really need the length. Change-Id: I44e05d45881155ebcd9407470b17f1c0d89c8952
The latest patch rounds things off. This thing is ready to go in.
its important that FS implementations don't rely on split length to set max file len, because splits are allowed to overrun to ensure a whole record/block is read. Apps which pass split info down to worker processes (hive &c) need to pass in file size too if they want to save the HEAD request. It could still be used by the input streams if they can think of a way
because openFile() declares FNFEs can be delayed until reads, we could also see if we could do an async HEAD request while processing that first GET/HEAD, so have the final file length without blocking. That would make streams more complex —at least now we have the option. |
Hindsight is 20-20, but it might still be worthwhile to break this up into multiple PRs: 1) the hadoop-common changes with the default implementation, 2) the consumers (avro, distcp, mapreduce, yarn, etc), and 3) updates to the filesystem implementations (AWS, etc). That should help the reviewers to focus. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should really help improve read performance! Thanks for doing it! After we have implementations for the various filesystems, it would be very cool to run benchmarks and measure the gain for random read scenarios, and in some cases sequential read--there won't be any probing to determine if it is random or sequential.
I think we should have another iteration which addresses some of the feedback, and preferably split this into multiple PRs (at minimum postpone the AWS filesystem implementation for a subsequent PR so we can focus on the hadoop-common changes).
Create a builder to open a file, supporting options | ||
both standard and filesystem specific. The return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"supporting both standard and filesystem specific options."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also explain how standard and filesystem specific options are passed into the builder, or indicate that this is covered below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The API lets you declare options that MUST be understood by the FS, vs those which can be ignored. All the new standard options fit into the "must" category as recognised options -they just don't have any sig
Future<FSDataInputStream> f = openFile(path)
.must("fs.opt.seek-policy-option-name", "random, adaptive")
.opt("fs.azure.readahead", 312L) // ignored on other stores. If you made it a must() opt then reading other stores MUST fail
.build();
FSDataInputStream is = f.get();
|
||
When `build()` is invoked on the returned `FSDataInputStreamBuilder` instance, | ||
the builder parameters are verified and | ||
`openFileWithOptions(Path, OpenFileParameters)` invoked. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be helpful to point out that openFileWithOptions is a method on FileSystem.
This is needed to support passthrough/mounted filesystems. | ||
|
||
|
||
### <a name="openfile(path)"></a> `FSDataInputStreamBuilder openFile(Path path)` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This returns a FutureDataInputStreamBuilder. Also below, you should refer to the FutureDataInputStreamBuilder interface and not the private implementation (FsDataInputStreamBuilder ).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops. Will do. I won't rename the fsdatainputstreambuilder.md file though -it's already shipped and I don't want to break links across versions. Will change all text references
stream of the contents of opened file, or raises an exception. | ||
|
||
The base implementation of the `openFileWithOptions(PathHandle, OpenFileParameters)` | ||
ultimately invokes `open(Path, int)`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not familiar with the API and have to cross-reference with the source code. I think this will be easier to read and understand if you include class names with the method names.
### Implementors notes | ||
|
||
The base implementation of `openFileWithOptions()` actually executes | ||
the `open(path)` operation synchronously, yet still returns the result |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it calls open(path, int).
*/ | ||
@InterfaceAudience.Public | ||
@InterfaceStability.Evolving | ||
public static final class OpenFileOptions { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may be time to break this file out into multiple files, each with one class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did have that initially, then pulled back in. No real preference
* Set of standard options which openFile implementations | ||
* MUST recognize, even if they ignore the actual values. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my comment in the builder about possibly making these properties (with getter and setter).
@@ -46,7 +46,7 @@ | |||
* <code> | |||
* .opt("foofs:option.a", true) | |||
* .opt("foofs:option.b", "value") | |||
* .opt("barfs:cache", true) | |||
* .opt("fs.s3a.open.option.etag", "9fe4c37c25b") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why isn't etag also a standard option? The less fs specific stuff the better, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's just an example of a non-standard one. No actual s3a support for this as it complicates life too much
FSDataInputStream in = item.fs.open(item.path); | ||
try { | ||
try (FSDataInputStream in = item.openFile( | ||
FS_OPTION_OPENFILE_FADVISE_NORMAL)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this the same as calling open(Path) as the code was doing previously? Shouldn't it be sequential?
|
||
FSDataInputStream in = item.fs.open(item.path); | ||
try { | ||
// Always do sequential reads. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment says do sequential reads, but then the sequential option isn't passed below.
@ThomasMarquardt -thanks for the feedback; will respond/update the PR I've been doing it as one PR as it lets me
FWIW passing in the file status/length makes a big difference in code which goes from list dir -> opening every file in it. Other than that, changing the seek policy is the big one. Feel free to not review those s3a bits, except to look and see if there are things you'd want/want to avoid. open() has got a lot more complicated than it used to be. |
going to close this PR and open a new one as I've rebased to trunk; will merge everything back into a single patch |
Roll-up of the previous PR #2046
Change-Id: Ib0aec173afcd8aae33f52da3f99ac813bd38c32f