-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AWS: add retry logic to S3InputStream #4912
Conversation
} | ||
|
||
@Test | ||
public void testRead() throws Exception { | ||
public void testReadWithNormalClient() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cannot really figure out a way to do parameterized test here due to the use of S3MockRule
, parameterization always gives me initialization failure... I will continue to see if I can remove the redundant tests, meanwhile I will also update integration test to run against both normal client and a fuzzy client.
* Number of times to retry S3 read operation. | ||
*/ | ||
public static final String S3_READ_RETRY_NUM_RETRIES = "s3.read.retry.num-retries"; | ||
public static final int S3_READ_RETRY_NUM_RETRIES_DEFAULT = 6; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just want to confirm how were these default values selected. From Hadoop S3A configurations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, would be really nice to know
I think they are diff S3A consts : Code Pointer
- retry limit - 7
- retry interval - 500ms
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the pointer, updated
positionStream(); | ||
AtomicInteger byteRef = new AtomicInteger(0); | ||
try { | ||
Tasks.foreach(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason we don't pass in the stream in the foreach? I guess it's a private field so it's a bit awkward to pass it in as an argument but seems more readable than Tasks.foreach(0) imo
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we cannot, because the input stream needs to be closed and re-opened, but if we put it here then the retry will always retry against the same stream that is already closed after the first failure.
if (exception instanceof UncheckedIOException) { | ||
if (exception.getCause() instanceof EOFException) { | ||
return false; | ||
} | ||
} | ||
|
||
if (exception instanceof AwsServiceException) { | ||
switch (((AwsServiceException) exception).statusCode()) { | ||
case HttpURLConnection.HTTP_FORBIDDEN: | ||
case HttpURLConnection.HTTP_BAD_REQUEST: | ||
return false; | ||
} | ||
} | ||
|
||
if (exception instanceof S3Exception) { | ||
switch (((S3Exception) exception).statusCode()) { | ||
case HttpURLConnection.HTTP_NOT_FOUND: | ||
case 416: // range not satisfied | ||
return false; | ||
} | ||
} | ||
|
||
return true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious is this the same retry policy that Hadoop S3A has?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not. It's closer to the ones in Presto and Trino. Basically it retires almost all IO exceptions except for EOF, because they are most likely network issues. For AWS side exceptions, this logic seems sufficient to me if they are proven sufficient in Presto and Trino. I am not sure if we need to list every single possible exception class like S3A did.
@@ -88,23 +97,68 @@ public void seek(long newPos) { | |||
|
|||
@Override | |||
public int read() throws IOException { | |||
Preconditions.checkState(!closed, "Cannot read: already closed"); | |||
positionStream(); | |||
AtomicInteger byteRef = new AtomicInteger(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[question] do we need AtomicInt here, I think the requirement for this arises, here since we are using it in lambda provided in Task below, we can also use final int byteRef[] = {0}
, does it seems reasonable ? Considering it's single threaded access.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes good point, it's only used to hold the value, not really trying to prevent any multi-thread access.
private static boolean shouldRetry(Exception exception) { | ||
if (exception instanceof UncheckedIOException) { | ||
if (exception.getCause() instanceof EOFException) { | ||
return false; | ||
} | ||
} | ||
|
||
if (exception instanceof AwsServiceException) { | ||
switch (((AwsServiceException) exception).statusCode()) { | ||
case HttpURLConnection.HTTP_FORBIDDEN: | ||
case HttpURLConnection.HTTP_BAD_REQUEST: | ||
return false; | ||
} | ||
} | ||
|
||
if (exception instanceof S3Exception) { | ||
switch (((S3Exception) exception).statusCode()) { | ||
case HttpURLConnection.HTTP_NOT_FOUND: | ||
case 416: // range not satisfied | ||
return false; | ||
} | ||
} | ||
|
||
return true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add this in a seperate util class ? Considering it can be extended to all s3 interactions
Also any pointers if we know this is the complete list considering API's we use to connect to S3, For ex : (sample list : S3A)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's discuss this in the thread above #4912 (comment)
* Number of times to retry S3 read operation. | ||
*/ | ||
public static final String S3_READ_RETRY_NUM_RETRIES = "s3.read.retry.num-retries"; | ||
public static final int S3_READ_RETRY_NUM_RETRIES_DEFAULT = 6; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, would be really nice to know
I think they are diff S3A consts : Code Pointer
- retry limit - 7
- retry interval - 500ms
closeStream(); | ||
stream = s3.getObject(requestBuilder.build(), ResponseTransformer.toInputStream()); | ||
stream = readRange(String.format("bytes=%s-", pos)); | ||
} | ||
|
||
private void closeStream() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[minor] Now since we are catching the IOException in closeServerSideStream
closeStream actually doesn't throw IOException we can remove it from signature of closeStream / openStream then
[doubt] SDK V2 also has a possibility to define retry policy / condition by override ClientConfiguration while building the s3 client itself, why are we opting do it here via |
I'd like to take a closer look at this. So please hold off on merging. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I understand what you're trying to achieve here, but I think this is best handled by the S3Client via configuration for Retry Policy and not custom handling in Iceberg.
Introducing our own retries can ultimately conflict with other retries and can have some bad side-effects (like failures resulting in retries on top of retries).
Unless there's some reason we can't handle this via the SDK, then don't think we should introduce retries here. @jackye1995 did you look into the retry policy? It seems like we could use many of these configs and just apply them in the AwsClientFactory.
Yes you are right, I actually plan to add one more PR for configuring retry policy. The issue here is a bit different. I think there are 2 issues that cannot be handled at SDK level:
|
} | ||
} | ||
|
||
public void setSkipSize(int skipSize) { | ||
this.skipSize = skipSize; | ||
private static boolean shouldRetry(Exception exception) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to be more explicit about the cases where we do want to retry rather than just defaulting to retry. I think there were specific cases that were mentined as known issues (e.g. socket timeout). However, the problem we've had in other retry scenarios is that the retry cases are overly broad and retry when they really shouldn't.
I think the default here should return false and only return true for the known Exceptions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reasonably confident its a lot more complicated than this, especially as SDK-level failures often create deep chains with the underlying cause at the bottom
positionStream(); | ||
int[] byteRef = new int[1]; | ||
try { | ||
Tasks.foreach(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a lot of duplicate code for the retry logic. Could we consolidate to a method that wraps just the logic we want to execute?
For example:
retry(() -> {
Preconditions.checkState(!closed, "Cannot read: already closed");
positionStream();
byteRef[0] = stream.read();
})
(you many have to use consumer/function for the range versions, but seems like there might be an more concise way to write this)
private void closeStream() throws IOException { | ||
if (stream != null) { | ||
stream.close(); | ||
private static void closeServerSideStream(InputStream streamToClose) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd just put this in the close stream (not have a separate method for it).
@@ -88,23 +96,69 @@ public void seek(long newPos) { | |||
|
|||
@Override | |||
public int read() throws IOException { | |||
Preconditions.checkState(!closed, "Cannot read: already closed"); | |||
positionStream(); | |||
int[] byteRef = new int[1]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is less than ideal but Tasks is a little limited in this area because it's really based on threaded execution and we're reusing it here for retry. It might be worth exploring whether we can tweak Tasks to support this usecase:
Ideally we'd have something like:
int read = Tasks.single()
.exponentialBackoff(...)
...
run(<funciton>);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could also consider bringing in Failsafe, which is more closely aligned with what we want for these cases. Though it is a new dependency (but zero deps and apache licensed).
Why was this PR closed? We are experiencing exactly the same problem and S3FileIO is literally unusable for us at the moment. no matter what we did to tune the apache http client such as TcpKeepAlive or socket timeout, it still hits this often and it breaks the whole pipeline. We created our own S3FileIO and all the classes till S3InputStream where a SocketException arises, got the inspiration from this PR to retry and our problem is solved. But it is annoying to maintain our own version of those classes. Is it not possible to revive this PR? |
@jackye1995 Hi, I also encountered the same issue as described in #10340 can we continue the PR to address the issue here? ALSo I wonder have you seen similar AWS clients having same issue? How do they end up solving the issue? Thanks. @xiaoxuandev Hi, you added this PR, and I wonder are you interested in pushing it forward #8221? @danielcweeks @rdblue @amogh-jahagirdar @nastra I wonder if you can comment/ suggest the path forward? |
@jackye1995 @xiaoxuandev @danielcweeks @rdblue @amogh-jahagirdar @nastra appreciate your feedback on this, thank you. |
@puchengy I spoke with @amogh-jahagirdar the other day and he's taking a look at this. There have been a few different attempts to address this issue and we're trying to find a solution that fits cleanly into the implementation. |
thank you @danielcweeks and @amogh-jahagirdar ! |
@amogh-jahagirdar Hello, i wonder is there any update on this? Our clients from time to time hit the issue and cause some interruptions to their workload. Thanks! |
Hey @puchengy I'm wokring on it! I am aiming to have a PR out sometime this week to address this. |
@amogh-jahagirdar Hi, I am checking in to see if there is any update? thank you! |
@amogh-jahagirdar Hi, a gentle ping, may I know if you are still working on this? thanks |
Hi, Is there any update on this PR? We have been facing the same issue while reading data from iceberg tables in glue job blocking the entire pipeline. Would really appreciate if we can get update and a probable fix for this. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
commented. It's a start
@@ -178,12 +192,16 @@ private byte[] randomData(int size) { | |||
} | |||
|
|||
private void writeS3Data(S3URI uri, byte[] data) throws IOException { | |||
s3.putObject( | |||
s3Client.putObject( | |||
PutObjectRequest.builder() | |||
.bucket(uri.bucket()) | |||
.key(uri.key()) | |||
.contentLength((long) data.length) | |||
.build(), | |||
RequestBody.fromBytes(data)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW, this triggers a full array copy of the data.
byteRef[0] = stream.read(); | ||
} catch (IOException e) { | ||
closeStream(); | ||
throw new UncheckedIOException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
on an unrecoverable network error, abortStream()
is required to avoid the failed TLS connection being recycled. This adds a new challenge: identify an unrecoverable network error from the deep nested stacks which can come from the SDK, and distinguish from other failures
// Stated in the ResponseInputStream javadoc: | ||
// If it is not desired to read remaining data from the stream, | ||
// you can explicitly abort the connection via abort(). | ||
((Abortable) streamToClose).abort(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
abort() removes it from the HTTP pool.
- this is good if you are closing with an unrecoverable network error
- this is absolutely what you do not want to do on a normal read.
} | ||
} | ||
|
||
public void setSkipSize(int skipSize) { | ||
this.skipSize = skipSize; | ||
private static boolean shouldRetry(Exception exception) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reasonably confident its a lot more complicated than this, especially as SDK-level failures often create deep chains with the underlying cause at the bottom
@danielcweeks @rajarshisarkar @amogh-jahagirdar @xiaoxuandev @singhpk234
Add retry for the
S3InputStream
so that when we encounter network failures (mostlySSLException
for server side connection reset andSocketTimoutException
for client side connection reset), we can quickly retry the read without failing the entire operation.