Skip to content

HADOOP-18175. fix test failures with prefetching s3a input stream #4212

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ public InputStream openForRead(long offset, int size) throws IOException {
Validate.checkLessOrEqual(offset, "offset", size(), "size()");
Validate.checkLessOrEqual(size, "size", size() - offset, "size() - offset");

streamStatistics.streamOpened();
final GetObjectRequest request = client.newGetRequest(this.s3Attributes.getKey())
.withRange(offset, offset + size - 1);
this.changeTracker.maybeApplyConstraint(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,10 @@ public void seek(long pos) throws IOException {
public int read() throws IOException {
this.throwIfClosed();

if (this.s3File.size() == 0 || this.seekTargetPos >= this.s3File.size()) {
return -1;
}

if (!ensureCurrentBuffer()) {
return -1;
}
Expand Down Expand Up @@ -296,6 +300,10 @@ public int read(byte[] buffer, int offset, int len) throws IOException {
return 0;
}

if (this.s3File.size() == 0 || this.seekTargetPos >= this.s3File.size()) {
return -1;
}

if (!ensureCurrentBuffer()) {
return -1;
}
Expand Down Expand Up @@ -427,18 +435,8 @@ protected void throwIfClosed() throws IOException {
}

protected void throwIfInvalidSeek(long pos) throws EOFException {
long fileSize = this.s3File.size();
if (pos < 0) {
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK + " " + pos);
} else {
if (fileSize == 0 && pos == 0) {
// Do nothing. Valid combination.
return;
}

if (pos >= fileSize) {
throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF + " " + pos);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,7 @@ public synchronized int available() throws IOException {
*/
@Override
public synchronized long getPos() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method no longer throws an IOException

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.inputStream.getPos() below could throw it, so I have to pass it up here

this.throwIfClosed();
return this.inputStream.getPos();
return this.isClosed() ? 0 : this.inputStream.getPos();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.fs.statistics.StreamStatisticNames;

import static org.apache.hadoop.fs.s3a.Constants.ALLOW_REQUESTER_PAYS;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
import static org.apache.hadoop.fs.s3a.Constants.S3A_BUCKET_PROBE;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;

Expand Down Expand Up @@ -71,11 +72,16 @@ public void testRequesterPaysOptionSuccess() throws Throwable {
inputStream.seek(0);
inputStream.readByte();

// Verify > 1 call was made, so we're sure it is correctly configured for each request
IOStatisticAssertions
.assertThatStatisticCounter(inputStream.getIOStatistics(),
StreamStatisticNames.STREAM_READ_OPENED)
.isGreaterThan(1);
if (conf.getBoolean(PREFETCH_ENABLED_KEY, true)) {
// For S3PrefetchingInputStream, verify a call was made
IOStatisticAssertions.assertThatStatisticCounter(inputStream.getIOStatistics(),
StreamStatisticNames.STREAM_READ_OPENED).isEqualTo(1);
} else {
// For S3InputStream, verify > 1 call was made,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be For S3AInputStream ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're right, updated the comment

// so we're sure it is correctly configured for each request
IOStatisticAssertions.assertThatStatisticCounter(inputStream.getIOStatistics(),
StreamStatisticNames.STREAM_READ_OPENED).isGreaterThan(1);
}

// Check list calls work without error
fs.listFiles(requesterPaysPath.getParent(), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.fs.s3a;

import org.apache.hadoop.fs.CanUnbuffer;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
Expand All @@ -32,7 +33,9 @@
import org.junit.Test;

import java.io.IOException;
import java.io.InputStream;

import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES;
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE;
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_TOTAL_BYTES;
Expand Down Expand Up @@ -72,6 +75,7 @@ public void testUnbuffer() throws IOException {
IOStatisticsSnapshot iostats = new IOStatisticsSnapshot();
// Open file, read half the data, and then call unbuffer
try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
skipIfCannotUnbuffer(inputStream.getWrappedStream());
assertTrue(inputStream.getWrappedStream() instanceof S3AInputStream);
int bytesToRead = 8;
readAndAssertBytesRead(inputStream, bytesToRead);
Expand Down Expand Up @@ -138,6 +142,7 @@ public void testUnbufferStreamStatistics() throws IOException {
Object streamStatsStr;
try {
inputStream = fs.open(dest);
skipIfCannotUnbuffer(inputStream.getWrappedStream());
streamStatsStr = demandStringifyIOStatisticsSource(inputStream);

LOG.info("initial stream statistics {}", streamStatsStr);
Expand Down Expand Up @@ -192,6 +197,12 @@ private boolean isObjectStreamOpen(FSDataInputStream inputStream) {
return ((S3AInputStream) inputStream.getWrappedStream()).isObjectStreamOpen();
}

private void skipIfCannotUnbuffer(InputStream inputStream) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should just be able to use inputStream.hasCapability(StreamCapabilities.UNBUFFER) instead of this method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point we still don't know what kind of inputStream it is, the method hasCapability only visible after we cast it to S3AInputStream or S3PrefetchingInputStream

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just noticed we can use FSDataInputStream.hasCapability(...), I will change to it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can also use StoreImplementationUtils when needed -which it isn't here

if (!(inputStream instanceof CanUnbuffer)) {
skip("input stream does not support unbuffer");
}
}

/**
* Read the specified number of bytes from the given
* {@link FSDataInputStream} and assert that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,6 @@ private void testSeekHelper(S3InputStream inputStream, int bufferSize, int fileS
EOFException.class,
FSExceptionMessages.NEGATIVE_SEEK,
() -> inputStream.seek(-1));

ExceptionAsserts.assertThrows(
EOFException.class,
FSExceptionMessages.CANNOT_SEEK_PAST_EOF,
() -> inputStream.seek(fileSize + 1));
}

@Test
Expand Down