Skip to content

HADOOP-17812. NPE in S3AInputStream read() after failure to reconnect to store #3222

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -418,15 +418,21 @@ public synchronized int read() throws IOException {
int byteRead = invoker.retry("read", pathStr, true,
() -> {
int b;
// When exception happens before re-setting wrappedStream in "reopen" called
// by onReadFailure, then wrappedStream will be null. But the **retry** may
// re-execute this block and cause NPE if we don't check wrappedStream
if (wrappedStream == null) {
reopen("failure recovery", getPos(), 1, false);
}
try {
b = wrappedStream.read();
} catch (EOFException e) {
return -1;
} catch (SocketTimeoutException e) {
onReadFailure(e, 1, true);
onReadFailure(e, true);
throw e;
} catch (IOException e) {
onReadFailure(e, 1, false);
onReadFailure(e, false);
throw e;
}
return b;
Expand All @@ -444,15 +450,12 @@ public synchronized int read() throws IOException {
}

/**
* Handle an IOE on a read by attempting to re-open the stream.
* Close the stream on read failure.
* The filesystem's readException count will be incremented.
* @param ioe exception caught.
* @param length length of data being attempted to read
* @throws IOException any exception thrown on the re-open attempt.
*/
@Retries.OnceTranslated
private void onReadFailure(IOException ioe, int length, boolean forceAbort)
throws IOException {
private void onReadFailure(IOException ioe, boolean forceAbort) {
if (LOG.isDebugEnabled()) {
LOG.debug("Got exception while trying to read from stream {}, " +
"client: {} object: {}, trying to recover: ",
Expand All @@ -463,7 +466,7 @@ private void onReadFailure(IOException ioe, int length, boolean forceAbort)
uri, client, object);
}
streamStatistics.readException();
reopen("failure recovery", pos, length, forceAbort);
closeStream("failure recovery", contentRangeFinish, forceAbort);
}

/**
Expand Down Expand Up @@ -506,16 +509,22 @@ public synchronized int read(byte[] buf, int off, int len)
int bytesRead = invoker.retry("read", pathStr, true,
() -> {
int bytes;
// When exception happens before re-setting wrappedStream in "reopen" called
// by onReadFailure, then wrappedStream will be null. But the **retry** may
// re-execute this block and cause NPE if we don't check wrappedStream
if (wrappedStream == null) {
reopen("failure recovery", getPos(), 1, false);
}
try {
bytes = wrappedStream.read(buf, off, len);
} catch (EOFException e) {
// the base implementation swallows EOFs.
return -1;
} catch (SocketTimeoutException e) {
onReadFailure(e, len, true);
onReadFailure(e, true);
throw e;
} catch (IOException e) {
onReadFailure(e, len, false);
onReadFailure(e, false);
throw e;
}
return bytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.net.SocketException;
import java.nio.charset.Charset;

import com.amazonaws.SdkClientException;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3Object;
Expand Down Expand Up @@ -120,10 +121,28 @@ private S3AInputStream.InputStreamCallbacks getMockedInputStreamCallback() {
return new S3AInputStream.InputStreamCallbacks() {

private final S3Object mockedS3Object = getMockedS3Object();
private Integer mockedS3ObjectIndex = 0;

@Override
public S3Object getObject(GetObjectRequest request) {
// Set s3 client to return mocked s3object with defined read behavior.
mockedS3ObjectIndex++;
// open() -> lazySeek() -> reopen()
// -> getObject (mockedS3ObjectIndex=1) -> getObjectContent(objectInputStreamBad1)
// read() -> objectInputStreamBad1 throws exception
// -> onReadFailure -> close wrappedStream
// -> retry(1) -> wrappedStream==null -> reopen -> getObject (mockedS3ObjectIndex=2)
// -> getObjectContent(objectInputStreamBad2)-> objectInputStreamBad2
// -> wrappedStream.read -> objectInputStreamBad2 throws exception
// -> onReadFailure -> close wrappedStream
// -> retry(2) -> wrappedStream==null -> reopen
// -> getObject (mockedS3ObjectIndex=3) throws exception
// -> retry(3) -> wrappedStream==null -> reopen -> getObject (mockedS3ObjectIndex=4)
// -> getObjectContent(objectInputStreamGood)-> objectInputStreamGood
// -> wrappedStream.read
if (mockedS3ObjectIndex == 3) {
throw new SdkClientException("Failed to get S3Object");
}
return mockedS3Object;
}

Expand Down