Skip to content

HADOOP-16202. Enhance/Stabilize openFile() #2168

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conversation

steveloughran
Copy link
Contributor

Roll-up of the previous PR #2046

Change-Id: Ib0aec173afcd8aae33f52da3f99ac813bd38c32f

@steveloughran steveloughran force-pushed the s3/HADOOP-16202-enhance-openfile branch 2 times, most recently from 6121f74 to 67fb88b Compare August 6, 2020 10:35
@steveloughran steveloughran force-pushed the s3/HADOOP-16202-enhance-openfile branch from 67fb88b to 7617109 Compare September 3, 2020 18:30
@apache apache deleted a comment from hadoop-yetus Sep 7, 2020
@apache apache deleted a comment from hadoop-yetus Sep 7, 2020
@apache apache deleted a comment from hadoop-yetus Sep 7, 2020
@apache apache deleted a comment from hadoop-yetus Sep 7, 2020
@apache apache deleted a comment from hadoop-yetus Sep 7, 2020
@steveloughran
Copy link
Contributor Author

./hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/OpenFileOptions.java:31:@InterfaceAudience.Public:1: Utility classes should not have a public or default constructor. [HideUtilityClassConstructor]
./hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/OpenFileParameters.java:22:import java.util.stream.Collectors;:8: Unused import - java.util.stream.Collectors. [UnusedImports]
./hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/OpenFileParameters.java:23:import java.util.stream.Stream;:8: Unused import - java.util.stream.Stream. [UnusedImports]
./hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AOpenFileOperation.java:52:  public class S3AOpenFileOperation extends AbstractStoreOperation {: 'class def modifier' has incorrect indentation level 2, expected level should be 0. [Indentation]
./hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AOpenFileOperation.java:106:    /** SQL string if this is a SQL select file*/: First sentence should end with a period. [JavadocStyle]
./hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AOpenFileOperation.java:280:  /**: First sentence should end with a period. [JavadocStyle]
./hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestOpenFileHelper.java:215:  /**: First sentence should end with a period. [JavadocStyle]
./hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java:169:          byte[] out = new byte[(int) longLen];: 'block' child has incorrect indentation level 10, expected level should be 6. [Indentation]
./hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java:170:          intercept(EOFException.class,: 'block' child has incorrect indentation level 10, expected level should be 6. [Indentation]
./hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java:172:          in3.seek(longLen - 1);: 'block' child has incorrect indentation level 10, expected level should be 6. [Indentation]
./hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java:173:          assertEquals("read past real EOF on " + in3,: 'block' child has incorrect indentation level 10, expected level should be 6. [Indentation]
./hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java:175:          in3.close();: 'block' child has incorrect indentation level 10, expected level should be 6. [Indentation]
./hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java:176:          return null;: 'block' child has incorrect indentation level 10, expected level should be 6. [Indentation]
./hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java:177:        },: 'block rcurly' has incorrect indentation level 8, expected level should be 4. [Indentation]

@steveloughran
Copy link
Contributor Author

tests

[ERROR] Errors: 
[ERROR]   TestLocalFSContractOpen>AbstractContractOpenTest.testOpenFileNullStatus:358 ? IllegalArgument
[ERROR]   TestRawlocalContractOpen>AbstractContractOpenTest.testOpenFileNullStatus:358 ? IllegalArgument
[INFO] 

@steveloughran steveloughran changed the title HADOOP-16202. Enhance openFile() HADOOP-16202. Enhance/Stabilize openFile() Sep 12, 2020
@steveloughran
Copy link
Contributor Author

  • plan to switch copyToLocal & distcp to openFile with sequential read policy. This ensures that on clusters where the default policy has been set to random will not see performance collapse here. It will be left to the ORC/Parquet projects to consider when/how to move to this API too.

@steveloughran
Copy link
Contributor Author

I'm wondering if on distcp I should leave off the file length.

why so: if someone updates a file between distcp being scheduled and it being executed, the length will be different. I know people aren't "meant" to do that, but inevitably people already do.

@steveloughran steveloughran force-pushed the s3/HADOOP-16202-enhance-openfile branch from d7a98f1 to d536a50 Compare October 21, 2020 18:13
@hadoop-yetus

This comment has been minimized.

@steveloughran
Copy link
Contributor Author

javadoc

[WARNING] /home/jenkins/jenkins-home/workspace/hadoop-multibranch_PR-2168/src/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AOpenFileOperation.java:48: warning: empty <p> tag

@steveloughran steveloughran force-pushed the s3/HADOOP-16202-enhance-openfile branch from 767a63d to 1d85a7c Compare November 17, 2020 21:33
@hadoop-yetus

This comment has been minimized.

@steveloughran
Copy link
Contributor Author

legit failure of hadoop.util.TestJsonSerialization.

As well as fixing, maybe for the serializer, map MismatchedInputException to EOFException for better handling up the stack

Error Message
`file:/tmp/Keyval3692109174763841309.json': Failed to read JSON file com.fasterxml.jackson.databind.exc.MismatchedInputException: No content to map due to end-of-input
 at [Source: (org.apache.hadoop.fs.ChecksumFileSystem$FSDataBoundedInputStream); line: 1, column: 0]: No content to map due to end-of-input
 at [Source: (org.apache.hadoop.fs.ChecksumFileSystem$FSDataBoundedInputStream); line: 1, column: 0]
Stacktrace
org.apache.hadoop.fs.PathIOException: 
`file:/tmp/Keyval3692109174763841309.json': Failed to read JSON file com.fasterxml.jackson.databind.exc.MismatchedInputException: No content to map due to end-of-input
 at [Source: (org.apache.hadoop.fs.ChecksumFileSystem$FSDataBoundedInputStream); line: 1, column: 0]: No content to map due to end-of-input
 at [Source: (org.apache.hadoop.fs.ChecksumFileSystem$FSDataBoundedInputStream); line: 1, column: 0]
	at org.apache.hadoop.util.JsonSerialization.load(JsonSerialization.java:275)
	at org.apache.hadoop.util.JsonSerialization.load(JsonSerialization.java:246)
	at org.apache.hadoop.util.TestJsonSerialization.lambda$testFileSystemEmptyPath$2(TestJsonSerialization.java:180)
	at org.apache.hadoop.test.LambdaTestUtils.intercept(LambdaTestUtils.java:498)
	at org.apache.hadoop.test.LambdaTestUtils.intercept(LambdaTestUtils.java:384)
	at org.apache.hadoop.util.TestJsonSerialization.testFileSystemEmptyPath(TestJsonSerialization.java:179)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
	at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
	at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:748)
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: No content to map due to end-of-input
 at [Source: (org.apache.hadoop.fs.ChecksumFileSystem$FSDataBoundedInputStream); line: 1, column: 0]
	at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
	at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4360)
	at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4205)
	at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3251)
	at org.apache.hadoop.util.JsonSerialization.fromJsonStream(JsonSerialization.java:163)
	at org.apache.hadoop.util.JsonSerialization.load(JsonSerialization.java:273)
	... 18 more

@steveloughran
Copy link
Contributor Author

looking @ jackson code, MismatchedInputException covers a lot of cases -and I don't want to do anything "clever" about choosing specific IOEs based on the exception text.

Modifying test to look for a PathIOException rather than EOFException

@steveloughran steveloughran force-pushed the s3/HADOOP-16202-enhance-openfile branch from 1d85a7c to 7b56861 Compare November 24, 2020 13:59
@steveloughran
Copy link
Contributor Author

latest patch follows on from HADOOP-17318 and passes in status when opening .pending and .pendingset files.

for the magic committer this saves 1 HEAD per file in task commit; 1 HEAD per task in job commit

For max speedup/scale we'd need to also adopt incremental listing. Later.

@hadoop-yetus

This comment has been minimized.

@hadoop-yetus

This comment has been minimized.

Roll-up of the previous PR

Change-Id: Ib0aec173afcd8aae33f52da3f99ac813bd38c32f
Change-Id: I91de02318af2bb042c7d4dfa65d418c363edee50
Name and document standard options; move to visible class definition.

Document better in filesystem spec, including
examples of API and MapReduce use

Change-Id: Ia593a67babcad63c13dda65b62287683a678a266
- move all options into a class nested under o.a.h.fs.Options;
- use in FsShell commands which download files (copy, head, tail)
- distcp download
- FileUtil.copy

Includes YARN-10444 changes in yarn localization.

This ensures that if the default/cluster settings
For a file system are optimized for random IO, artifact downloads
are still read at the maximum speed possible (one big GET to the EOF).

In the changed IOUtils and YARN Code, also tweaked slightly
* use incremental listing for dir listing, better for paged loads
* pass around a FileStatus more

Change-Id: Iedbb7b53cdaef2b82a0876e1db379c9a428f9339
Change-Id: I08045aaf13919f3923a7411ef64096f5e3171e69
* fs.option.buffer.size sets buffer size
* this is picked up in the default FS/FC file openings
* fadvise declared to take a CSV list of seek options, read in order
  until one is recognized by the store. This is to allow for custom
  policies and evolution over time.
* review of FileContext.open and switch to openFile where sequential access
  likely to make a significant different (i.e. large files)
* JsonSerialization to ignore available(), as that is just buffer capacity.
* JsonSerialization exposes reader taking an FSDataInputStream. That's to
  let me switch the committers to openFile() with length -the committers
  should set the example here. It should significantly benefit Magic committer
  by eliminating 1 HEAD/.pending in task commit, one HEAD per .pendingset during
  job commit...overall 50% of the HEAD/GET I/O to S3 in these operations.

Docs improved.

Other than moving the magic committer to that new read (to be followup),
this is done.

Change-Id: I013d338f2b043f7940d7c17fcb2a7777cc7c1c18
JsonSerialization takes an optional filestatus now.

Exposed protected constructor to PathIOException to help there. Avoids
playing initCause games.

Test failures in ITestS3AInputStreamPerformance switched me to passing
down the (changeable) seek policy in the FS; the fil aree opening there
all switches to the openFile() API with the standard names, so verifies
in an ITest that they are being picked up correctly.

Change-Id: Ief8c5d40c3e1ca829afcabb27e9e04aecad8bb48
Passing in the FileStatus to openfile will have tangible benefits in
task and job commit for the magic committer, while being harmless
for staging committers (unless they stage to S3)

In task commit: eliminate one HEAD request per file created in that task attempt
In job commit: eliminate one HEAD request per task

Change-Id: I20d705067d3d3749c05a854b306888dbf72c83a5
Change-Id: I1e454d3244ffab8c248584edfb1f539ebac596c7
@steveloughran steveloughran force-pushed the s3/HADOOP-16202-enhance-openfile branch from fc74d55 to 0966a28 Compare December 2, 2020 10:27
@hadoop-yetus

This comment has been minimized.

I had the great idea of adding explict options to declare the split.start and split.end, where split end can be
used to define the file length. So I added them and wired up the relevant bits of the MR code.

But: we can't actually rely on them because some readers may read past the end of the split to get to
the end of their record. It has to be viewed as a hint "we will read this far" not a rule
"we will read this far and no further". This is reflected in the docs and tests for s3a.

This actually complicates the story for opening files efficiently in MR jobs/hive as the
split size is passed down but not the final file length.
If we want to eliminate those head requests we really need the length.

Change-Id: I44e05d45881155ebcd9407470b17f1c0d89c8952
@steveloughran
Copy link
Contributor Author

The latest patch rounds things off. This thing is ready to go in.

  • We now have the option to specify the start and end of splits; the input formats in the MR client do this.
  • everywhere in the code where we explicitly download sequential datasets request sequential IO. (actually, I've just realised hadoop fs -head <path> should request random IO as well as declare split lengths...we don't want a full GET).

its important that FS implementations don't rely on split length to set max file len, because splits are allowed to overrun to ensure a whole record/block is read. Apps which pass split info down to worker processes (hive &c) need to pass in file size too if they want to save the HEAD request. It could still be used by the input streams if they can think of a way

  1. For sequential IO: end of content length = min(split-end, file-length) for that initial request,
    2 For random IO, assume it's the initial EOF.

because openFile() declares FNFEs can be delayed until reads, we could also see if we could do an async HEAD request while processing that first GET/HEAD, so have the final file length without blocking. That would make streams more complex —at least now we have the option.

@ThomasMarquardt
Copy link
Contributor

Hindsight is 20-20, but it might still be worthwhile to break this up into multiple PRs: 1) the hadoop-common changes with the default implementation, 2) the consumers (avro, distcp, mapreduce, yarn, etc), and 3) updates to the filesystem implementations (AWS, etc). That should help the reviewers to focus.

Copy link
Contributor

@ThomasMarquardt ThomasMarquardt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should really help improve read performance! Thanks for doing it! After we have implementations for the various filesystems, it would be very cool to run benchmarks and measure the gain for random read scenarios, and in some cases sequential read--there won't be any probing to determine if it is random or sequential.

I think we should have another iteration which addresses some of the feedback, and preferably split this into multiple PRs (at minimum postpone the AWS filesystem implementation for a subsequent PR so we can focus on the hadoop-common changes).

Comment on lines +17 to +18
Create a builder to open a file, supporting options
both standard and filesystem specific. The return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"supporting both standard and filesystem specific options."

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also explain how standard and filesystem specific options are passed into the builder, or indicate that this is covered below?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The API lets you declare options that MUST be understood by the FS, vs those which can be ignored. All the new standard options fit into the "must" category as recognised options -they just don't have any sig

Future<FSDataInputStream> f = openFile(path)
  .must("fs.opt.seek-policy-option-name", "random, adaptive") 
  .opt("fs.azure.readahead", 312L)   // ignored on other stores. If you made it a must() opt then reading other stores MUST fail
  .build();

FSDataInputStream is = f.get();


When `build()` is invoked on the returned `FSDataInputStreamBuilder` instance,
the builder parameters are verified and
`openFileWithOptions(Path, OpenFileParameters)` invoked.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be helpful to point out that openFileWithOptions is a method on FileSystem.

This is needed to support passthrough/mounted filesystems.


### <a name="openfile(path)"></a> `FSDataInputStreamBuilder openFile(Path path)`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This returns a FutureDataInputStreamBuilder. Also below, you should refer to the FutureDataInputStreamBuilder interface and not the private implementation (FsDataInputStreamBuilder ).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops. Will do. I won't rename the fsdatainputstreambuilder.md file though -it's already shipped and I don't want to break links across versions. Will change all text references

stream of the contents of opened file, or raises an exception.

The base implementation of the `openFileWithOptions(PathHandle, OpenFileParameters)`
ultimately invokes `open(Path, int)`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar with the API and have to cross-reference with the source code. I think this will be easier to read and understand if you include class names with the method names.

### Implementors notes

The base implementation of `openFileWithOptions()` actually executes
the `open(path)` operation synchronously, yet still returns the result
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it calls open(path, int).

*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public static final class OpenFileOptions {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be time to break this file out into multiple files, each with one class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did have that initially, then pulled back in. No real preference

Comment on lines +602 to +603
* Set of standard options which openFile implementations
* MUST recognize, even if they ignore the actual values.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment in the builder about possibly making these properties (with getter and setter).

@@ -46,7 +46,7 @@
* <code>
* .opt("foofs:option.a", true)
* .opt("foofs:option.b", "value")
* .opt("barfs:cache", true)
* .opt("fs.s3a.open.option.etag", "9fe4c37c25b")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why isn't etag also a standard option? The less fs specific stuff the better, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's just an example of a non-standard one. No actual s3a support for this as it complicates life too much

FSDataInputStream in = item.fs.open(item.path);
try {
try (FSDataInputStream in = item.openFile(
FS_OPTION_OPENFILE_FADVISE_NORMAL)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this the same as calling open(Path) as the code was doing previously? Shouldn't it be sequential?


FSDataInputStream in = item.fs.open(item.path);
try {
// Always do sequential reads.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment says do sequential reads, but then the sequential option isn't passed below.

@steveloughran
Copy link
Contributor Author

@ThomasMarquardt -thanks for the feedback; will respond/update the PR

I've been doing it as one PR as it lets me

  • do interface and impl together, plus contract tests
  • rebase on a regular basis on a patch that's had little attention from reviewers
  • test it downstream in a PoC fork of parquet just to see what works/doesn't.

FWIW passing in the file status/length makes a big difference in code which goes from list dir -> opening every file in it. Other than that, changing the seek policy is the big one.

Feel free to not review those s3a bits, except to look and see if there are things you'd want/want to avoid. open() has got a lot more complicated than it used to be.

@steveloughran
Copy link
Contributor Author

going to close this PR and open a new one as I've rebased to trunk; will merge everything back into a single patch

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
DistCp enhancement fs/s3 changes related to hadoop-aws; submitter must declare test endpoint fs
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants