Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AWS: add retry logic to S3InputStream #4912

Closed
wants to merge 1 commit into from

Conversation

jackye1995
Copy link
Contributor

@danielcweeks @rajarshisarkar @amogh-jahagirdar @xiaoxuandev @singhpk234

Add retry for the S3InputStream so that when we encounter network failures (mostly SSLException for server side connection reset and SocketTimoutException for client side connection reset), we can quickly retry the read without failing the entire operation.

@jackye1995 jackye1995 requested a review from danielcweeks May 31, 2022 06:43
@github-actions github-actions bot added the AWS label May 31, 2022
}

@Test
public void testRead() throws Exception {
public void testReadWithNormalClient() throws Exception {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cannot really figure out a way to do parameterized test here due to the use of S3MockRule, parameterization always gives me initialization failure... I will continue to see if I can remove the redundant tests, meanwhile I will also update integration test to run against both normal client and a fuzzy client.

* Number of times to retry S3 read operation.
*/
public static final String S3_READ_RETRY_NUM_RETRIES = "s3.read.retry.num-retries";
public static final int S3_READ_RETRY_NUM_RETRIES_DEFAULT = 6;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just want to confirm how were these default values selected. From Hadoop S3A configurations?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, would be really nice to know

I think they are diff S3A consts : Code Pointer

  1. retry limit - 7
  2. retry interval - 500ms

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the pointer, updated

positionStream();
AtomicInteger byteRef = new AtomicInteger(0);
try {
Tasks.foreach(0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason we don't pass in the stream in the foreach? I guess it's a private field so it's a bit awkward to pass it in as an argument but seems more readable than Tasks.foreach(0) imo

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we cannot, because the input stream needs to be closed and re-opened, but if we put it here then the retry will always retry against the same stream that is already closed after the first failure.

Comment on lines +311 to +332
if (exception instanceof UncheckedIOException) {
if (exception.getCause() instanceof EOFException) {
return false;
}
}

if (exception instanceof AwsServiceException) {
switch (((AwsServiceException) exception).statusCode()) {
case HttpURLConnection.HTTP_FORBIDDEN:
case HttpURLConnection.HTTP_BAD_REQUEST:
return false;
}
}

if (exception instanceof S3Exception) {
switch (((S3Exception) exception).statusCode()) {
case HttpURLConnection.HTTP_NOT_FOUND:
case 416: // range not satisfied
return false;
}
}

return true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious is this the same retry policy that Hadoop S3A has?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not. It's closer to the ones in Presto and Trino. Basically it retires almost all IO exceptions except for EOF, because they are most likely network issues. For AWS side exceptions, this logic seems sufficient to me if they are proven sufficient in Presto and Trino. I am not sure if we need to list every single possible exception class like S3A did.

@@ -88,23 +97,68 @@ public void seek(long newPos) {

@Override
public int read() throws IOException {
Preconditions.checkState(!closed, "Cannot read: already closed");
positionStream();
AtomicInteger byteRef = new AtomicInteger(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[question] do we need AtomicInt here, I think the requirement for this arises, here since we are using it in lambda provided in Task below, we can also use final int byteRef[] = {0} , does it seems reasonable ? Considering it's single threaded access.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes good point, it's only used to hold the value, not really trying to prevent any multi-thread access.

Comment on lines +310 to +332
private static boolean shouldRetry(Exception exception) {
if (exception instanceof UncheckedIOException) {
if (exception.getCause() instanceof EOFException) {
return false;
}
}

if (exception instanceof AwsServiceException) {
switch (((AwsServiceException) exception).statusCode()) {
case HttpURLConnection.HTTP_FORBIDDEN:
case HttpURLConnection.HTTP_BAD_REQUEST:
return false;
}
}

if (exception instanceof S3Exception) {
switch (((S3Exception) exception).statusCode()) {
case HttpURLConnection.HTTP_NOT_FOUND:
case 416: // range not satisfied
return false;
}
}

return true;
Copy link
Contributor

@singhpk234 singhpk234 May 31, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add this in a seperate util class ? Considering it can be extended to all s3 interactions

Also any pointers if we know this is the complete list considering API's we use to connect to S3, For ex : (sample list : S3A)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's discuss this in the thread above #4912 (comment)

* Number of times to retry S3 read operation.
*/
public static final String S3_READ_RETRY_NUM_RETRIES = "s3.read.retry.num-retries";
public static final int S3_READ_RETRY_NUM_RETRIES_DEFAULT = 6;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, would be really nice to know

I think they are diff S3A consts : Code Pointer

  1. retry limit - 7
  2. retry interval - 500ms

closeStream();
stream = s3.getObject(requestBuilder.build(), ResponseTransformer.toInputStream());
stream = readRange(String.format("bytes=%s-", pos));
}

private void closeStream() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] Now since we are catching the IOException in closeServerSideStream closeStream actually doesn't throw IOException we can remove it from signature of closeStream / openStream then

@singhpk234
Copy link
Contributor

[doubt] SDK V2 also has a possibility to define retry policy / condition by override ClientConfiguration while building the s3 client itself, why are we opting do it here via Tasks here ?

@danielcweeks
Copy link
Contributor

I'd like to take a closer look at this. So please hold off on merging.

Copy link
Contributor

@danielcweeks danielcweeks left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I understand what you're trying to achieve here, but I think this is best handled by the S3Client via configuration for Retry Policy and not custom handling in Iceberg.

Introducing our own retries can ultimately conflict with other retries and can have some bad side-effects (like failures resulting in retries on top of retries).

Unless there's some reason we can't handle this via the SDK, then don't think we should introduce retries here. @jackye1995 did you look into the retry policy? It seems like we could use many of these configs and just apply them in the AwsClientFactory.

@jackye1995
Copy link
Contributor Author

jackye1995 commented Jun 5, 2022

Introducing our own retries can ultimately conflict with other retries and can have some bad side-effects (like failures resulting in retries on top of retries).

Yes you are right, I actually plan to add one more PR for configuring retry policy. The issue here is a bit different. I think there are 2 issues that cannot be handled at SDK level:

  1. AWS SDK only handles exceptions up to the AWS client calls, in this case s3.getObject. Once you get the input stream, further exceptions are not handled during operations like read and seek, and that's typically when network issues like connection timeout or socket timeout might happen. I haven't found logic in the SDK that could close a stream, re-seek to the right position and retry a read operation, but maybe I missed some code places.
  2. When closing the input stream, the closing should call abort instead of close to terminate the HTTP connection. This should be done both at retry and stream close time. See this javadoc for more details.

}
}

public void setSkipSize(int skipSize) {
this.skipSize = skipSize;
private static boolean shouldRetry(Exception exception) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to be more explicit about the cases where we do want to retry rather than just defaulting to retry. I think there were specific cases that were mentined as known issues (e.g. socket timeout). However, the problem we've had in other retry scenarios is that the retry cases are overly broad and retry when they really shouldn't.

I think the default here should return false and only return true for the known Exceptions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reasonably confident its a lot more complicated than this, especially as SDK-level failures often create deep chains with the underlying cause at the bottom

positionStream();
int[] byteRef = new int[1];
try {
Tasks.foreach(0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a lot of duplicate code for the retry logic. Could we consolidate to a method that wraps just the logic we want to execute?

For example:

retry(() -> {
    Preconditions.checkState(!closed, "Cannot read: already closed");
    positionStream();

    byteRef[0] =  stream.read();
})

(you many have to use consumer/function for the range versions, but seems like there might be an more concise way to write this)

private void closeStream() throws IOException {
if (stream != null) {
stream.close();
private static void closeServerSideStream(InputStream streamToClose) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd just put this in the close stream (not have a separate method for it).

@@ -88,23 +96,69 @@ public void seek(long newPos) {

@Override
public int read() throws IOException {
Preconditions.checkState(!closed, "Cannot read: already closed");
positionStream();
int[] byteRef = new int[1];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is less than ideal but Tasks is a little limited in this area because it's really based on threaded execution and we're reusing it here for retry. It might be worth exploring whether we can tweak Tasks to support this usecase:

Ideally we'd have something like:

int read = Tasks.single()
    .exponentialBackoff(...)
    ...
    run(<funciton>);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also consider bringing in Failsafe, which is more closely aligned with what we want for these cases. Though it is a new dependency (but zero deps and apache licensed).

@javrasya
Copy link
Contributor

javrasya commented Jan 31, 2024

Why was this PR closed? We are experiencing exactly the same problem and S3FileIO is literally unusable for us at the moment. no matter what we did to tune the apache http client such as TcpKeepAlive or socket timeout, it still hits this often and it breaks the whole pipeline.

We created our own S3FileIO and all the classes till S3InputStream where a SocketException arises, got the inspiration from this PR to retry and our problem is solved. But it is annoying to maintain our own version of those classes.

Is it not possible to revive this PR?

@puchengy
Copy link
Contributor

puchengy commented May 16, 2024

@jackye1995 Hi, I also encountered the same issue as described in #10340 can we continue the PR to address the issue here? ALSo I wonder have you seen similar AWS clients having same issue? How do they end up solving the issue? Thanks.

@xiaoxuandev Hi, you added this PR, and I wonder are you interested in pushing it forward #8221?

@danielcweeks @rdblue @amogh-jahagirdar @nastra I wonder if you can comment/ suggest the path forward?

@puchengy
Copy link
Contributor

@jackye1995 @xiaoxuandev @danielcweeks @rdblue @amogh-jahagirdar @nastra appreciate your feedback on this, thank you.

@danielcweeks
Copy link
Contributor

@puchengy I spoke with @amogh-jahagirdar the other day and he's taking a look at this. There have been a few different attempts to address this issue and we're trying to find a solution that fits cleanly into the implementation.

@puchengy
Copy link
Contributor

thank you @danielcweeks and @amogh-jahagirdar !

@puchengy
Copy link
Contributor

@amogh-jahagirdar Hello, i wonder is there any update on this? Our clients from time to time hit the issue and cause some interruptions to their workload. Thanks!

@amogh-jahagirdar
Copy link
Contributor

Hey @puchengy I'm wokring on it! I am aiming to have a PR out sometime this week to address this.

@puchengy
Copy link
Contributor

@amogh-jahagirdar Hi, I am checking in to see if there is any update? thank you!

@puchengy
Copy link
Contributor

puchengy commented Jul 1, 2024

@amogh-jahagirdar Hi, a gentle ping, may I know if you are still working on this? thanks

@SandeepSinghGahir
Copy link

Hi,

Is there any update on this PR? We have been facing the same issue while reading data from iceberg tables in glue job blocking the entire pipeline. Would really appreciate if we can get update and a probable fix for this.

Copy link
Contributor

@steveloughran steveloughran left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commented. It's a start

@@ -178,12 +192,16 @@ private byte[] randomData(int size) {
}

private void writeS3Data(S3URI uri, byte[] data) throws IOException {
s3.putObject(
s3Client.putObject(
PutObjectRequest.builder()
.bucket(uri.bucket())
.key(uri.key())
.contentLength((long) data.length)
.build(),
RequestBody.fromBytes(data));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, this triggers a full array copy of the data.

byteRef[0] = stream.read();
} catch (IOException e) {
closeStream();
throw new UncheckedIOException(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on an unrecoverable network error, abortStream() is required to avoid the failed TLS connection being recycled. This adds a new challenge: identify an unrecoverable network error from the deep nested stacks which can come from the SDK, and distinguish from other failures

// Stated in the ResponseInputStream javadoc:
// If it is not desired to read remaining data from the stream,
// you can explicitly abort the connection via abort().
((Abortable) streamToClose).abort();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

abort() removes it from the HTTP pool.

  • this is good if you are closing with an unrecoverable network error
  • this is absolutely what you do not want to do on a normal read.

}
}

public void setSkipSize(int skipSize) {
this.skipSize = skipSize;
private static boolean shouldRetry(Exception exception) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reasonably confident its a lot more complicated than this, especially as SDK-level failures often create deep chains with the underlying cause at the bottom

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants