Skip to content

HADOOP-16379: S3AInputStream#unbuffer should merge input stream stats into fs-wide stats #983

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -823,9 +823,17 @@ public static long validateReadahead(@Nullable Long readahead) {
}
}

/**
* Closes the underlying S3 stream, and merges the {@link #streamStatistics}
* instance associated with the stream.
*/
@Override
public synchronized void unbuffer() {
closeStream("unbuffer()", contentRangeFinish, false);
try {
closeStream("unbuffer()", contentRangeFinish, false);
} finally {
streamStatistics.merge(false);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,7 @@ public final class InputStreamStatistics implements AutoCloseable {
public long inputPolicy;
/** This is atomic so that it can be passed as a reference. */
private final AtomicLong versionMismatches = new AtomicLong(0);
private InputStreamStatistics mergedStats;

private InputStreamStatistics() {
}
Expand Down Expand Up @@ -759,7 +760,7 @@ public void readOperationCompleted(int requested, int actual) {
*/
@Override
public void close() {
mergeInputStreamStatistics(this);
merge(true);
}

/**
Expand Down Expand Up @@ -816,6 +817,88 @@ public String toString() {
sb.append('}');
return sb.toString();
}

/**
* Merge the statistics into the filesystem's instrumentation instance.
* Takes a diff between the current version of the stats and the
* version of the stats when merge was last called, and merges the diff
* into the instrumentation instance. Used to periodically merge the
* stats into the fs-wide stats. <b>Behavior is undefined if called on a
* closed instance.</b>
*/
void merge(boolean isClosed) {
if (mergedStats != null) {
mergeInputStreamStatistics(diff(mergedStats));
} else {
mergeInputStreamStatistics(this);
}
// If stats are closed, no need to create another copy
if (!isClosed) {
mergedStats = copy();
}
}

/**
* Returns a diff between this {@link InputStreamStatistics} instance and
* the given {@link InputStreamStatistics} instance.
*/
private InputStreamStatistics diff(InputStreamStatistics inputStats) {
InputStreamStatistics diff = new InputStreamStatistics();
diff.openOperations = openOperations - inputStats.openOperations;
diff.closeOperations = closeOperations - inputStats.closeOperations;
diff.closed = closed - inputStats.closed;
diff.aborted = aborted - inputStats.aborted;
diff.seekOperations = seekOperations - inputStats.seekOperations;
diff.readExceptions = readExceptions - inputStats.readExceptions;
diff.forwardSeekOperations =
forwardSeekOperations - inputStats.forwardSeekOperations;
diff.backwardSeekOperations =
backwardSeekOperations - inputStats.backwardSeekOperations;
diff.bytesRead = bytesRead - inputStats.bytesRead;
diff.bytesSkippedOnSeek =
bytesSkippedOnSeek - inputStats.bytesSkippedOnSeek;
diff.bytesBackwardsOnSeek =
bytesBackwardsOnSeek - inputStats.bytesBackwardsOnSeek;
diff.readOperations = readOperations - inputStats.readOperations;
diff.readFullyOperations =
readFullyOperations - inputStats.readFullyOperations;
diff.readsIncomplete = readsIncomplete - inputStats.readsIncomplete;
diff.bytesReadInClose = bytesReadInClose - inputStats.bytesReadInClose;
diff.bytesDiscardedInAbort =
bytesDiscardedInAbort - inputStats.bytesDiscardedInAbort;
diff.policySetCount = policySetCount - inputStats.policySetCount;
diff.inputPolicy = inputPolicy - inputStats.inputPolicy;
diff.versionMismatches.set(versionMismatches.longValue() -
inputStats.versionMismatches.longValue());
return diff;
}

/**
* Returns a new {@link InputStreamStatistics} instance with all the same
* values as this {@link InputStreamStatistics}.
*/
private InputStreamStatistics copy() {
InputStreamStatistics copy = new InputStreamStatistics();
copy.openOperations = openOperations;
copy.closeOperations = closeOperations;
copy.closed = closed;
copy.aborted = aborted;
copy.seekOperations = seekOperations;
copy.readExceptions = readExceptions;
copy.forwardSeekOperations = forwardSeekOperations;
copy.backwardSeekOperations = backwardSeekOperations;
copy.bytesRead = bytesRead;
copy.bytesSkippedOnSeek = bytesSkippedOnSeek;
copy.bytesBackwardsOnSeek = bytesBackwardsOnSeek;
copy.readOperations = readOperations;
copy.readFullyOperations = readFullyOperations;
copy.readsIncomplete = readsIncomplete;
copy.bytesReadInClose = bytesReadInClose;
copy.bytesDiscardedInAbort = bytesDiscardedInAbort;
copy.policySetCount = policySetCount;
copy.inputPolicy = inputPolicy;
return copy;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@
package org.apache.hadoop.fs.s3a;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.io.IOUtils;

import org.junit.Test;

import java.io.IOException;

import static org.apache.hadoop.fs.s3a.Statistic.STREAM_SEEK_BYTES_READ;

/**
* Integration test for calling
* {@link org.apache.hadoop.fs.CanUnbuffer#unbuffer} on {@link S3AInputStream}.
Expand All @@ -38,20 +40,27 @@
*/
public class ITestS3AUnbuffer extends AbstractS3ATestBase {

private Path dest;

@Override
public void setup() throws Exception {
super.setup();
dest = path("ITestS3AUnbuffer");
describe("ITestS3AUnbuffer");

byte[] data = ContractTestUtils.dataset(16, 'a', 26);
ContractTestUtils.writeDataset(getFileSystem(), dest, data, data.length,
16, true);
}

@Test
public void testUnbuffer() throws IOException {
// Setup test file
Path dest = path("testUnbuffer");
describe("testUnbuffer");
try (FSDataOutputStream outputStream = getFileSystem().create(dest, true)) {
byte[] data = ContractTestUtils.dataset(16, 'a', 26);
outputStream.write(data);
}

// Open file, read half the data, and then call unbuffer
try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
assertTrue(inputStream.getWrappedStream() instanceof S3AInputStream);
assertEquals(8, inputStream.read(new byte[8]));
readAndAssertBytesRead(inputStream, 8);
assertTrue(isObjectStreamOpen(inputStream));
inputStream.unbuffer();

Expand All @@ -60,7 +69,61 @@ public void testUnbuffer() throws IOException {
}
}

/**
* Test that calling {@link S3AInputStream#unbuffer()} merges a stream's
* {@link org.apache.hadoop.fs.s3a.S3AInstrumentation.InputStreamStatistics}
* into the {@link S3AFileSystem}'s {@link S3AInstrumentation} instance.
*/
@Test
public void testUnbufferStreamStatistics() throws IOException {
describe("testUnbufferStreamStatistics");

// Validate bytesRead is updated correctly
S3ATestUtils.MetricDiff bytesRead = new S3ATestUtils.MetricDiff(
getFileSystem(), STREAM_SEEK_BYTES_READ);

// Open file, read half the data, and then call unbuffer
FSDataInputStream inputStream = null;
try {
inputStream = getFileSystem().open(dest);

readAndAssertBytesRead(inputStream, 8);
inputStream.unbuffer();

// Validate that calling unbuffer updates the input stream statistics
bytesRead.assertDiffEquals(8);

// Validate that calling unbuffer twice in a row updates the statistics
// correctly
readAndAssertBytesRead(inputStream, 4);
inputStream.unbuffer();
bytesRead.assertDiffEquals(12);
} finally {
IOUtils.closeStream(inputStream);
}

// Validate that closing the file does not further change the statistics
bytesRead.assertDiffEquals(12);

// Validate that the input stream stats are correct when the file is closed
assertEquals("S3AInputStream statistics were not updated properly", 12,
((S3AInputStream) inputStream.getWrappedStream())
.getS3AStreamStatistics().bytesRead);
}

private boolean isObjectStreamOpen(FSDataInputStream inputStream) {
return ((S3AInputStream) inputStream.getWrappedStream()).isObjectStreamOpen();
}

/**
* Read the specified number of bytes from the given
* {@link FSDataInputStream} and assert that
* {@link FSDataInputStream#read(byte[])} read the specified number of bytes.
*/
private static void readAndAssertBytesRead(FSDataInputStream inputStream,
int bytesToRead) throws IOException {
assertEquals("S3AInputStream#read did not read the correct number of " +
"bytes", bytesToRead,
inputStream.read(new byte[bytesToRead]));
}
}