-
Notifications
You must be signed in to change notification settings - Fork 9.1k
HADOOP-18175. fix test failures with prefetching s3a input stream #4212
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
2097657
c0c6bd8
59ea8ac
0fb1192
9516d27
4d3b4cc
3314e5e
ce15ebc
f0d25f7
e1eb23e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,6 +30,7 @@ | |
import org.apache.hadoop.fs.statistics.StreamStatisticNames; | ||
|
||
import static org.apache.hadoop.fs.s3a.Constants.ALLOW_REQUESTER_PAYS; | ||
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY; | ||
import static org.apache.hadoop.fs.s3a.Constants.S3A_BUCKET_PROBE; | ||
import static org.apache.hadoop.test.LambdaTestUtils.intercept; | ||
|
||
|
@@ -71,11 +72,16 @@ public void testRequesterPaysOptionSuccess() throws Throwable { | |
inputStream.seek(0); | ||
inputStream.readByte(); | ||
|
||
// Verify > 1 call was made, so we're sure it is correctly configured for each request | ||
IOStatisticAssertions | ||
.assertThatStatisticCounter(inputStream.getIOStatistics(), | ||
StreamStatisticNames.STREAM_READ_OPENED) | ||
.isGreaterThan(1); | ||
if (conf.getBoolean(PREFETCH_ENABLED_KEY, true)) { | ||
// For S3PrefetchingInputStream, verify a call was made | ||
IOStatisticAssertions.assertThatStatisticCounter(inputStream.getIOStatistics(), | ||
StreamStatisticNames.STREAM_READ_OPENED).isEqualTo(1); | ||
} else { | ||
// For S3InputStream, verify > 1 call was made, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should this be For S3AInputStream ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you're right, updated the comment |
||
// so we're sure it is correctly configured for each request | ||
IOStatisticAssertions.assertThatStatisticCounter(inputStream.getIOStatistics(), | ||
StreamStatisticNames.STREAM_READ_OPENED).isGreaterThan(1); | ||
} | ||
|
||
// Check list calls work without error | ||
fs.listFiles(requesterPaysPath.getParent(), false); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ | |
|
||
package org.apache.hadoop.fs.s3a; | ||
|
||
import org.apache.hadoop.fs.CanUnbuffer; | ||
import org.apache.hadoop.fs.FSDataInputStream; | ||
import org.apache.hadoop.fs.Path; | ||
import org.apache.hadoop.fs.contract.ContractTestUtils; | ||
|
@@ -32,7 +33,9 @@ | |
import org.junit.Test; | ||
|
||
import java.io.IOException; | ||
import java.io.InputStream; | ||
|
||
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip; | ||
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES; | ||
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE; | ||
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_TOTAL_BYTES; | ||
|
@@ -72,6 +75,7 @@ public void testUnbuffer() throws IOException { | |
IOStatisticsSnapshot iostats = new IOStatisticsSnapshot(); | ||
// Open file, read half the data, and then call unbuffer | ||
try (FSDataInputStream inputStream = getFileSystem().open(dest)) { | ||
skipIfCannotUnbuffer(inputStream.getWrappedStream()); | ||
assertTrue(inputStream.getWrappedStream() instanceof S3AInputStream); | ||
int bytesToRead = 8; | ||
readAndAssertBytesRead(inputStream, bytesToRead); | ||
|
@@ -138,6 +142,7 @@ public void testUnbufferStreamStatistics() throws IOException { | |
Object streamStatsStr; | ||
try { | ||
inputStream = fs.open(dest); | ||
skipIfCannotUnbuffer(inputStream.getWrappedStream()); | ||
streamStatsStr = demandStringifyIOStatisticsSource(inputStream); | ||
|
||
LOG.info("initial stream statistics {}", streamStatsStr); | ||
|
@@ -192,6 +197,12 @@ private boolean isObjectStreamOpen(FSDataInputStream inputStream) { | |
return ((S3AInputStream) inputStream.getWrappedStream()).isObjectStreamOpen(); | ||
} | ||
|
||
private void skipIfCannotUnbuffer(InputStream inputStream) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You should just be able to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. At this point we still don't know what kind of inputStream it is, the method There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just noticed we can use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you can also use StoreImplementationUtils when needed -which it isn't here |
||
if (!(inputStream instanceof CanUnbuffer)) { | ||
skip("input stream does not support unbuffer"); | ||
} | ||
} | ||
|
||
/** | ||
* Read the specified number of bytes from the given | ||
* {@link FSDataInputStream} and assert that | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method no longer throws an IOException
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this.inputStream.getPos()
below could throw it, so I have to pass it up here