Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

retry when killing s3 based segments #14776

Merged
merged 9 commits into from
Aug 10, 2023
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,50 @@
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.retry.RetryUtils;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.MultiObjectDeleteException;
import com.google.common.collect.ImmutableSet;

import java.io.IOException;
import java.util.Set;

public class AWSClientUtil
{
/**
* This list of error code come from {@link RetryUtils}, and
* <a href="https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html">...</a>. At the moment, aws sdk
* does not expose a good way of retrying
* {@link com.amazonaws.services.s3.AmazonS3#deleteObjects(DeleteObjectsRequest)} requests. This request is used in
* org.apache.druid.storage.s3.S3DataSegmentKiller to delete a batch of segments from deep storage.
*/
private static final Set<String> RECOVERABLE_ERROR_CODES = ImmutableSet.of(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add "InternalError" to this list?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"ServiceUnavailable" too

Copy link
Contributor

@maytasm maytasm Aug 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"503 SlowDown" too

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks added

"503 SlowDown",
"AuthFailure",
"BandwidthLimitExceeded",
"EC2ThrottledException",
"IDPCommunicationError",
"InternalError",
"InvalidSignatureException",
"PriorRequestNotComplete",
"ProvisionedThroughputExceededException",
"RequestExpired",
"RequestInTheFuture",
"RequestLimitExceeded",
"RequestThrottled",
"RequestThrottledException",
"RequestTimeTooSkewed",
"RequestTimeout",
"RequestTimeoutException",
"ServiceUnavailable",
"SignatureDoesNotMatch",
"SlowDown",
"ThrottledException",
"ThrottlingException",
"TooManyRequestsException",
"TransactionInProgressException",
"Throttling"
);

/**
* Checks whether an exception can be retried or not. Implementation is copied
* from {@link com.amazonaws.retry.PredefinedRetryPolicies.SDKDefaultRetryCondition} except deprecated methods
Expand Down Expand Up @@ -54,6 +93,19 @@ public static boolean isClientExceptionRecoverable(AmazonClientException excepti
return true;
}

return RetryUtils.isClockSkewError(exception);
if (RetryUtils.isClockSkewError(exception)) {
return true;
}

if (exception instanceof MultiObjectDeleteException) {
MultiObjectDeleteException multiObjectDeleteException = (MultiObjectDeleteException) exception;
for (MultiObjectDeleteException.DeleteError error : multiObjectDeleteException.getErrors()) {
if (RECOVERABLE_ERROR_CODES.contains(error.getCode())) {
zachjsh marked this conversation as resolved.
Show resolved Hide resolved
return true;
}
}
}

return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.inject.Inject;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.MapUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.loading.SegmentLoadingException;
Expand All @@ -50,6 +51,8 @@ public class S3DataSegmentKiller implements DataSegmentKiller
// AWS has max limit of 1000 objects that can be requested to be deleted at a time.
private static final int MAX_MULTI_OBJECT_DELETE_SIZE = 1000;

private static final String MULTI_OBJECT_DELETE_EXEPTION_ERROR_FORMAT = "message: [%s], code: [%s]";

/**
* Any implementation of DataSegmentKiller is initialized when an ingestion job starts if the extension is loaded,
* even when the implementation of DataSegmentKiller is not used. As a result, if we have a s3 client instead
Expand Down Expand Up @@ -150,13 +153,23 @@ private boolean deleteKeysForBucket(
s3Bucket,
keysToDeleteStrings
);
s3Client.deleteObjects(deleteObjectsRequest);
S3Utils.retryS3Operation(
() -> {
s3Client.deleteObjects(deleteObjectsRequest);
return null;
},
3
);
}
catch (MultiObjectDeleteException e) {
hadException = true;
Map<String, List<String>> errorToKeys = new HashMap<>();
for (MultiObjectDeleteException.DeleteError error : e.getErrors()) {
errorToKeys.computeIfAbsent(error.getMessage(), k -> new ArrayList<>()).add(error.getKey());
errorToKeys.computeIfAbsent(StringUtils.format(
MULTI_OBJECT_DELETE_EXEPTION_ERROR_FORMAT,
error.getMessage(),
error.getCode()
), k -> new ArrayList<>()).add(error.getKey());
}
errorToKeys.forEach((key, value) -> log.error(
"Unable to delete from bucket [%s], the following keys [%s], because [%s]",
Expand All @@ -173,6 +186,14 @@ private boolean deleteKeysForBucket(
chunkOfKeys.stream().map(DeleteObjectsRequest.KeyVersion::getKey).collect(Collectors.joining(", "))
);
}
catch (Exception e) {
hadException = true;
log.noStackTrace().warn(e,
"Unexpected exception occurred when deleting from bucket [%s], the following keys [%s]",
s3Bucket,
chunkOfKeys.stream().map(DeleteObjectsRequest.KeyVersion::getKey).collect(Collectors.joining(", "))
);
}
}
return hadException;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,4 +405,47 @@ public void test_kill_listOfSegments_amazonServiceExceptionExceptionIsThrown()
);
Assert.assertEquals("Couldn't delete segments from S3. See the task logs for more details.", thrown.getMessage());
}

@Test
public void test_kill_listOfSegments_retryableExceptionThrown() throws SegmentLoadingException
{
DeleteObjectsRequest deleteObjectsRequest = new DeleteObjectsRequest(TEST_BUCKET);
deleteObjectsRequest.withKeys(KEY_1_PATH, KEY_1_PATH);
s3Client.deleteObjects(EasyMock.anyObject(DeleteObjectsRequest.class));
MultiObjectDeleteException.DeleteError retryableError = new MultiObjectDeleteException.DeleteError();
retryableError.setCode("RequestLimitExceeded");
MultiObjectDeleteException.DeleteError nonRetryableError = new MultiObjectDeleteException.DeleteError();
nonRetryableError.setCode("nonRetryableError");
EasyMock.expectLastCall()
.andThrow(new MultiObjectDeleteException(
ImmutableList.of(retryableError, nonRetryableError),
ImmutableList.of()
))
.once();
EasyMock.expectLastCall().andVoid().times(2);


EasyMock.replay(s3Client, segmentPusherConfig, inputDataConfig);
segmentKiller = new S3DataSegmentKiller(Suppliers.ofInstance(s3Client), segmentPusherConfig, inputDataConfig);
segmentKiller.kill(ImmutableList.of(DATA_SEGMENT_1, DATA_SEGMENT_1));
}

@Test
public void test_kill_listOfSegments_unexpectedExceptionIsThrown()
{
DeleteObjectsRequest deleteObjectsRequest = new DeleteObjectsRequest(TEST_BUCKET);
deleteObjectsRequest.withKeys(KEY_1_PATH, KEY_2_PATH);
// struggled with the idea of making it match on equaling this
s3Client.deleteObjects(EasyMock.anyObject(DeleteObjectsRequest.class));
EasyMock.expectLastCall().andThrow(new RuntimeException("")).once();

EasyMock.replay(s3Client, segmentPusherConfig, inputDataConfig);
segmentKiller = new S3DataSegmentKiller(Suppliers.ofInstance(s3Client), segmentPusherConfig, inputDataConfig);

SegmentLoadingException thrown = Assert.assertThrows(
SegmentLoadingException.class,
() -> segmentKiller.kill(ImmutableList.of(DATA_SEGMENT_1, DATA_SEGMENT_2))
);
Assert.assertEquals("Couldn't delete segments from S3. See the task logs for more details.", thrown.getMessage());
}
}
Loading