Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;

/**
* Provides read access to the underlying file one block at a time.
* Improve read performance by prefetching and locall caching blocks.
Expand Down Expand Up @@ -204,7 +206,7 @@ public synchronized void close() {
// Cancel any prefetches in progress.
this.cancelPrefetches();

Io.closeIgnoringIoException(this.cache);
cleanupWithLogger(LOG, this.cache);

this.ops.end(op);
LOG.info(this.ops.getSummary(false));
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -1480,7 +1480,8 @@ private FSDataInputStream executeOpen(
new S3PrefetchingInputStream(
readContext.build(),
createObjectAttributes(path, fileStatus),
createInputStreamCallbacks(auditSpan)));
createInputStreamCallbacks(auditSpan),
inputStreamStats));
} else {
return new FSDataInputStream(
new S3AInputStream(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hadoop.fs.s3a.S3AInputStream;
import org.apache.hadoop.fs.s3a.S3AReadOpContext;
import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;

/**
* Provides an {@code InputStream} that allows reading from an S3 file.
Expand All @@ -53,6 +54,7 @@ public class S3CachingInputStream extends S3InputStream {
* @param context read-specific operation context.
* @param s3Attributes attributes of the S3 object being read.
* @param client callbacks used for interacting with the underlying S3 client.
* @param streamStatistics statistics for this stream.
*
* @throws IllegalArgumentException if context is null.
* @throws IllegalArgumentException if s3Attributes is null.
Expand All @@ -61,8 +63,9 @@ public class S3CachingInputStream extends S3InputStream {
public S3CachingInputStream(
S3AReadOpContext context,
S3ObjectAttributes s3Attributes,
S3AInputStream.InputStreamCallbacks client) {
super(context, s3Attributes, client);
S3AInputStream.InputStreamCallbacks client,
S3AInputStreamStatistics streamStatistics) {
super(context, s3Attributes, client, streamStatistics);

this.numBlocksToPrefetch = this.getContext().getPrefetchBlockCount();
int bufferPoolSize = this.numBlocksToPrefetch + 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,17 @@

package org.apache.hadoop.fs.s3a.read;

import java.io.Closeable;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;

import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.fs.common.Io;
import org.apache.hadoop.fs.common.Validate;
import org.apache.hadoop.fs.s3a.Invoker;
import org.apache.hadoop.fs.s3a.S3AInputStream;
Expand All @@ -40,30 +39,56 @@
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.fs.statistics.DurationTracker;

import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;

/**
* Encapsulates low level interactions with S3 object on AWS.
*/
public class S3File implements Closeable {
public class S3File {
private static final Logger LOG = LoggerFactory.getLogger(S3File.class);

// Read-specific operation context.
/**
* Read-specific operation context.
*/
private final S3AReadOpContext context;

// S3 object attributes.
/**
* S3 object attributes.
*/
private final S3ObjectAttributes s3Attributes;

// Callbacks used for interacting with the underlying S3 client.
/**
* Callbacks used for interacting with the underlying S3 client.
*/
private final S3AInputStream.InputStreamCallbacks client;

// Used for reporting input stream access statistics.
/**
* Used for reporting input stream access statistics.
*/
private final S3AInputStreamStatistics streamStatistics;

// Enforces change tracking related policies.
/**
* Enforces change tracking related policies.
*/
private final ChangeTracker changeTracker;

// Maps a stream returned by openForRead() to the associated S3 object.
// That allows us to close the object when closing the stream.
/**
* Maps a stream returned by openForRead() to the associated S3 object.
* That allows us to close the object when closing the stream.
*/
private Map<InputStream, S3Object> s3Objects;

/**
* uri of the object being read.
*/
private final String uri;

/**
* size of a buffer to create when draining the stream.
*/
private static final int DRAIN_BUFFER_SIZE = 16384;

/**
* Initializes a new instance of the {@code S3File} class.
*
Expand Down Expand Up @@ -97,7 +122,8 @@ public S3File(
this.client = client;
this.streamStatistics = streamStatistics;
this.changeTracker = changeTracker;
this.s3Objects = new IdentityHashMap<InputStream, S3Object>();
this.s3Objects = new IdentityHashMap<>();
this.uri = this.getPath();
}

/**
Expand Down Expand Up @@ -169,7 +195,6 @@ public InputStream openForRead(long offset, int size) throws IOException {
.withRange(offset, offset + size - 1);
this.changeTracker.maybeApplyConstraint(request);

String uri = this.getPath();
String operation = String.format(
"%s %s at %d", S3AInputStream.OPERATION_OPEN, uri, offset);
DurationTracker tracker = streamStatistics.initiateGetRequest();
Expand All @@ -193,18 +218,7 @@ public InputStream openForRead(long offset, int size) throws IOException {
return stream;
}

/**
* Closes this stream and releases all acquired resources.
*/
@Override
public synchronized void close() {
List<InputStream> streams = new ArrayList<InputStream>(this.s3Objects.keySet());
for (InputStream stream : streams) {
this.close(stream);
}
}

void close(InputStream inputStream) {
void close(InputStream inputStream, int numRemainingBytes) {
S3Object obj;
synchronized (this.s3Objects) {
obj = this.s3Objects.get(inputStream);
Expand All @@ -214,7 +228,91 @@ void close(InputStream inputStream) {
this.s3Objects.remove(inputStream);
}

Io.closeIgnoringIoException(inputStream);
Io.closeIgnoringIoException(obj);
if (numRemainingBytes <= this.context.getAsyncDrainThreshold()) {
// don't bother with async io.
drain(false, "close() operation", numRemainingBytes, obj, inputStream);
} else {
LOG.debug("initiating asynchronous drain of {} bytes", numRemainingBytes);
// schedule an async drain/abort with references to the fields so they
// can be reused
client.submit(() -> drain(false, "close() operation", numRemainingBytes, obj, inputStream));
}
}

/**
* drain the stream. This method is intended to be
* used directly or asynchronously, and measures the
* duration of the operation in the stream statistics.
*
* @param shouldAbort force an abort; used if explicitly requested.
* @param reason reason for stream being closed; used in messages
* @param remaining remaining bytes
* @param requestObject http request object;
* @param inputStream stream to close.
* @return was the stream aborted?
*/
private boolean drain(
final boolean shouldAbort,
final String reason,
final long remaining,
final S3Object requestObject,
final InputStream inputStream) {

try {
return invokeTrackingDuration(streamStatistics.initiateInnerStreamClose(shouldAbort),
() -> drainOrAbortHttpStream(shouldAbort, reason, remaining, requestObject, inputStream));
} catch (IOException e) {
// this is only here because invokeTrackingDuration() has it in its
// signature
return shouldAbort;
}
}

/**
* Drain or abort the inner stream.
* Exceptions are swallowed.
* If a close() is attempted and fails, the operation escalates to
* an abort.
*
* @param shouldAbort force an abort; used if explicitly requested.
* @param reason reason for stream being closed; used in messages
* @param remaining remaining bytes
* @param requestObject http request object
* @param inputStream stream to close.
* @return was the stream aborted?
*/
private boolean drainOrAbortHttpStream(
boolean shouldAbort,
final String reason,
final long remaining,
final S3Object requestObject,
final InputStream inputStream) {

if (!shouldAbort && remaining > 0) {
try {
long drained = 0;
byte[] buffer = new byte[DRAIN_BUFFER_SIZE];
while (true) {
final int count = inputStream.read(buffer);
if (count < 0) {
// no more data is left
break;
}
drained += count;
}
LOG.debug("Drained stream of {} bytes", drained);
} catch (Exception e) {
// exception escalates to an abort
LOG.debug("When closing {} stream for {}, will abort the stream", uri, reason, e);
shouldAbort = true;
}
}
cleanupWithLogger(LOG, inputStream);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, you can pass any number of Closeables in; not worth updating the PR for, but useful to know

cleanupWithLogger(LOG, requestObject);
streamStatistics.streamClose(shouldAbort, remaining);

LOG.debug("Stream {} {}: {}; remaining={}", uri, (shouldAbort ? "aborted" : "closed"), reason,
remaining);
return shouldAbort;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.fs.s3a.S3AInputStream;
import org.apache.hadoop.fs.s3a.S3AReadOpContext;
import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;

/**
* Provides an {@code InputStream} that allows reading from an S3 file.
Expand All @@ -48,6 +49,7 @@ public class S3InMemoryInputStream extends S3InputStream {
* @param context read-specific operation context.
* @param s3Attributes attributes of the S3 object being read.
* @param client callbacks used for interacting with the underlying S3 client.
* @param streamStatistics statistics for this stream.
*
* @throws IllegalArgumentException if context is null.
* @throws IllegalArgumentException if s3Attributes is null.
Expand All @@ -56,8 +58,9 @@ public class S3InMemoryInputStream extends S3InputStream {
public S3InMemoryInputStream(
S3AReadOpContext context,
S3ObjectAttributes s3Attributes,
S3AInputStream.InputStreamCallbacks client) {
super(context, s3Attributes, client);
S3AInputStream.InputStreamCallbacks client,
S3AInputStreamStatistics streamStatistics) {
super(context, s3Attributes, client, streamStatistics);
int fileSize = (int) s3Attributes.getLen();
this.buffer = ByteBuffer.allocate(fileSize);
LOG.debug("Created in-memory input stream for {} (size = {})", this.getName(), fileSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;

import static java.util.Objects.requireNonNull;

/**
* Provides an {@link InputStream} that allows reading from an S3 file.
*/
Expand Down Expand Up @@ -96,6 +98,7 @@ public abstract class S3InputStream
* @param context read-specific operation context.
* @param s3Attributes attributes of the S3 object being read.
* @param client callbacks used for interacting with the underlying S3 client.
* @param streamStatistics statistics for this stream.
*
* @throws IllegalArgumentException if context is null.
* @throws IllegalArgumentException if s3Attributes is null.
Expand All @@ -104,16 +107,13 @@ public abstract class S3InputStream
public S3InputStream(
S3AReadOpContext context,
S3ObjectAttributes s3Attributes,
S3AInputStream.InputStreamCallbacks client) {

Validate.checkNotNull(context, "context");
Validate.checkNotNull(s3Attributes, "s3Attributes");
Validate.checkNotNull(client, "client");
S3AInputStream.InputStreamCallbacks client,
S3AInputStreamStatistics streamStatistics) {

this.context = context;
this.s3Attributes = s3Attributes;
this.client = client;
this.streamStatistics = context.getS3AStatisticsContext().newInputStreamStatistics();
this.context = requireNonNull(context);
this.s3Attributes = requireNonNull(s3Attributes);
this.client = requireNonNull(client);
this.streamStatistics = requireNonNull(streamStatistics);
this.ioStatistics = streamStatistics.getIOStatistics();
this.name = S3File.getPath(s3Attributes);
this.changeTracker = new ChangeTracker(
Expand Down
Loading