-
Notifications
You must be signed in to change notification settings - Fork 9.2k
HADOOP-18231. Fixes failing tests & drain stream async. #4386
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
steveloughran
merged 13 commits into
apache:feature-HADOOP-18028-s3a-prefetch
from
ahmarsuhail:HADOOP-18231-prefetching-stream-test
Jul 15, 2022
Merged
Changes from all commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
3e5113f
adds in new test for prefetching input stream
ahmarsuhail 13d181f
creates streamStats before opening stream
ahmarsuhail 74813fe
updates numBlocks calculation method
ahmarsuhail d96babe
fixes ITestS3AOpenCost.testOpenFileLongerLength
ahmarsuhail fbe57d4
drains stream async
ahmarsuhail 52e6f96
fixes failing unit test
ahmarsuhail 0807cbd
updates as per review comments
ahmarsuhail ebb1b1d
use requireNonNull
ahmarsuhail a5f32e7
removes extra new line
ahmarsuhail 115c4e0
fixes checkstyle errors
ahmarsuhail eca1cba
fix typo
ahmarsuhail 31febbf
removes unused import
ahmarsuhail 448a81c
adds in new line
ahmarsuhail File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
45 changes: 0 additions & 45 deletions
45
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/Io.java
This file was deleted.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,18 +19,17 @@ | |
|
|
||
| package org.apache.hadoop.fs.s3a.read; | ||
|
|
||
| import java.io.Closeable; | ||
|
|
||
| import java.io.IOException; | ||
| import java.io.InputStream; | ||
| import java.util.ArrayList; | ||
| import java.util.IdentityHashMap; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
|
|
||
| import com.amazonaws.services.s3.model.GetObjectRequest; | ||
| import com.amazonaws.services.s3.model.S3Object; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import org.apache.hadoop.fs.common.Io; | ||
| import org.apache.hadoop.fs.common.Validate; | ||
| import org.apache.hadoop.fs.s3a.Invoker; | ||
| import org.apache.hadoop.fs.s3a.S3AInputStream; | ||
|
|
@@ -40,30 +39,56 @@ | |
| import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; | ||
| import org.apache.hadoop.fs.statistics.DurationTracker; | ||
|
|
||
| import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration; | ||
| import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; | ||
|
|
||
| /** | ||
| * Encapsulates low level interactions with S3 object on AWS. | ||
| */ | ||
| public class S3File implements Closeable { | ||
| public class S3File { | ||
| private static final Logger LOG = LoggerFactory.getLogger(S3File.class); | ||
|
|
||
| // Read-specific operation context. | ||
| /** | ||
| * Read-specific operation context. | ||
| */ | ||
| private final S3AReadOpContext context; | ||
|
|
||
| // S3 object attributes. | ||
| /** | ||
| * S3 object attributes. | ||
| */ | ||
| private final S3ObjectAttributes s3Attributes; | ||
|
|
||
| // Callbacks used for interacting with the underlying S3 client. | ||
| /** | ||
| * Callbacks used for interacting with the underlying S3 client. | ||
| */ | ||
| private final S3AInputStream.InputStreamCallbacks client; | ||
|
|
||
| // Used for reporting input stream access statistics. | ||
| /** | ||
| * Used for reporting input stream access statistics. | ||
| */ | ||
| private final S3AInputStreamStatistics streamStatistics; | ||
|
|
||
| // Enforces change tracking related policies. | ||
| /** | ||
| * Enforces change tracking related policies. | ||
| */ | ||
| private final ChangeTracker changeTracker; | ||
|
|
||
| // Maps a stream returned by openForRead() to the associated S3 object. | ||
| // That allows us to close the object when closing the stream. | ||
| /** | ||
| * Maps a stream returned by openForRead() to the associated S3 object. | ||
| * That allows us to close the object when closing the stream. | ||
| */ | ||
| private Map<InputStream, S3Object> s3Objects; | ||
|
|
||
| /** | ||
| * uri of the object being read. | ||
| */ | ||
| private final String uri; | ||
|
|
||
| /** | ||
| * size of a buffer to create when draining the stream. | ||
| */ | ||
| private static final int DRAIN_BUFFER_SIZE = 16384; | ||
|
|
||
| /** | ||
| * Initializes a new instance of the {@code S3File} class. | ||
| * | ||
|
|
@@ -97,7 +122,8 @@ public S3File( | |
| this.client = client; | ||
| this.streamStatistics = streamStatistics; | ||
| this.changeTracker = changeTracker; | ||
| this.s3Objects = new IdentityHashMap<InputStream, S3Object>(); | ||
| this.s3Objects = new IdentityHashMap<>(); | ||
| this.uri = this.getPath(); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -169,7 +195,6 @@ public InputStream openForRead(long offset, int size) throws IOException { | |
| .withRange(offset, offset + size - 1); | ||
| this.changeTracker.maybeApplyConstraint(request); | ||
|
|
||
| String uri = this.getPath(); | ||
| String operation = String.format( | ||
| "%s %s at %d", S3AInputStream.OPERATION_OPEN, uri, offset); | ||
| DurationTracker tracker = streamStatistics.initiateGetRequest(); | ||
|
|
@@ -193,18 +218,7 @@ public InputStream openForRead(long offset, int size) throws IOException { | |
| return stream; | ||
| } | ||
|
|
||
| /** | ||
| * Closes this stream and releases all acquired resources. | ||
| */ | ||
| @Override | ||
| public synchronized void close() { | ||
| List<InputStream> streams = new ArrayList<InputStream>(this.s3Objects.keySet()); | ||
| for (InputStream stream : streams) { | ||
| this.close(stream); | ||
| } | ||
| } | ||
|
|
||
| void close(InputStream inputStream) { | ||
| void close(InputStream inputStream, int numRemainingBytes) { | ||
| S3Object obj; | ||
| synchronized (this.s3Objects) { | ||
| obj = this.s3Objects.get(inputStream); | ||
|
|
@@ -214,7 +228,91 @@ void close(InputStream inputStream) { | |
| this.s3Objects.remove(inputStream); | ||
| } | ||
|
|
||
| Io.closeIgnoringIoException(inputStream); | ||
| Io.closeIgnoringIoException(obj); | ||
| if (numRemainingBytes <= this.context.getAsyncDrainThreshold()) { | ||
| // don't bother with async io. | ||
| drain(false, "close() operation", numRemainingBytes, obj, inputStream); | ||
| } else { | ||
| LOG.debug("initiating asynchronous drain of {} bytes", numRemainingBytes); | ||
| // schedule an async drain/abort with references to the fields so they | ||
| // can be reused | ||
| client.submit(() -> drain(false, "close() operation", numRemainingBytes, obj, inputStream)); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * drain the stream. This method is intended to be | ||
| * used directly or asynchronously, and measures the | ||
| * duration of the operation in the stream statistics. | ||
| * | ||
| * @param shouldAbort force an abort; used if explicitly requested. | ||
| * @param reason reason for stream being closed; used in messages | ||
| * @param remaining remaining bytes | ||
| * @param requestObject http request object; | ||
| * @param inputStream stream to close. | ||
| * @return was the stream aborted? | ||
| */ | ||
| private boolean drain( | ||
| final boolean shouldAbort, | ||
| final String reason, | ||
| final long remaining, | ||
| final S3Object requestObject, | ||
| final InputStream inputStream) { | ||
|
|
||
| try { | ||
| return invokeTrackingDuration(streamStatistics.initiateInnerStreamClose(shouldAbort), | ||
| () -> drainOrAbortHttpStream(shouldAbort, reason, remaining, requestObject, inputStream)); | ||
| } catch (IOException e) { | ||
| // this is only here because invokeTrackingDuration() has it in its | ||
| // signature | ||
| return shouldAbort; | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Drain or abort the inner stream. | ||
| * Exceptions are swallowed. | ||
| * If a close() is attempted and fails, the operation escalates to | ||
| * an abort. | ||
| * | ||
| * @param shouldAbort force an abort; used if explicitly requested. | ||
| * @param reason reason for stream being closed; used in messages | ||
| * @param remaining remaining bytes | ||
| * @param requestObject http request object | ||
| * @param inputStream stream to close. | ||
| * @return was the stream aborted? | ||
| */ | ||
| private boolean drainOrAbortHttpStream( | ||
| boolean shouldAbort, | ||
| final String reason, | ||
| final long remaining, | ||
| final S3Object requestObject, | ||
| final InputStream inputStream) { | ||
|
|
||
| if (!shouldAbort && remaining > 0) { | ||
| try { | ||
| long drained = 0; | ||
| byte[] buffer = new byte[DRAIN_BUFFER_SIZE]; | ||
| while (true) { | ||
| final int count = inputStream.read(buffer); | ||
| if (count < 0) { | ||
| // no more data is left | ||
| break; | ||
| } | ||
| drained += count; | ||
| } | ||
| LOG.debug("Drained stream of {} bytes", drained); | ||
| } catch (Exception e) { | ||
| // exception escalates to an abort | ||
| LOG.debug("When closing {} stream for {}, will abort the stream", uri, reason, e); | ||
| shouldAbort = true; | ||
| } | ||
| } | ||
| cleanupWithLogger(LOG, inputStream); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FYI, you can pass any number of Closeables in; not worth updating the PR for, but useful to know |
||
| cleanupWithLogger(LOG, requestObject); | ||
| streamStatistics.streamClose(shouldAbort, remaining); | ||
|
|
||
| LOG.debug("Stream {} {}: {}; remaining={}", uri, (shouldAbort ? "aborted" : "closed"), reason, | ||
| remaining); | ||
| return shouldAbort; | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.