Skip to content

HADOOP-16823. Large DeleteObject requests are their own Thundering Herd #1826

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1698,17 +1698,20 @@

<property>
<name>fs.s3a.retry.throttle.limit</name>
<value>${fs.s3a.attempts.maximum}</value>
<value>20</value>
<description>
Number of times to retry any throttled request.
</description>
</property>

<property>
<name>fs.s3a.retry.throttle.interval</name>
<value>1000ms</value>
<value>100ms</value>
<description>
Interval between retry attempts on throttled requests.
Initial between retry attempts on throttled requests, +/- 50%. chosen at random.
i.e. for an intial value of 3000ms, the initial delay would be in the range 1500ms to 4500ms.
Backoffs are exponential; again randomness is used to avoid the thundering heard problem.
500ms is the default value used by the AWS S3 Retry policy.
</description>
</property>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,7 @@ public void testRenameWithNonEmptySubDir() throws Throwable {
assertPathExists("not created in src/sub dir",
new Path(srcSubDir, "subfile.txt"));

boolean rename = fs.rename(srcDir, finalDir);
assertTrue("rename(" + srcDir + ", " + finalDir + ") failed",
rename);
rename(srcDir, finalDir);

// Accept both POSIX rename behavior and CLI rename behavior
if (renameRemoveEmptyDest) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,8 +418,9 @@ public static boolean rm(FileSystem fileSystem,
public static void rename(FileSystem fileSystem, Path src, Path dst)
throws IOException {
rejectRootOperation(src, false);
assertTrue(fileSystem.rename(src, dst));
assertPathDoesNotExist(fileSystem, "renamed", src);
assertTrue("rename(" + src + ", " + dst + ") failed",
fileSystem.rename(src, dst));
assertPathDoesNotExist(fileSystem, "renamed source dir", src);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,42 @@ private Constants() {
public static final String PROXY_DOMAIN = "fs.s3a.proxy.domain";
public static final String PROXY_WORKSTATION = "fs.s3a.proxy.workstation";

// number of times we should retry errors
/**
* Number of times the AWS client library should retry errors before
* escalating to the S3A code: {@value}.
*/
public static final String MAX_ERROR_RETRIES = "fs.s3a.attempts.maximum";

/**
* Default number of times the AWS client library should retry errors before
* escalating to the S3A code: {@value}.
*/
public static final int DEFAULT_MAX_ERROR_RETRIES = 10;

/**
* Experimental/Unstable feature: should the AWS client library retry
* throttle responses before escalating to the S3A code: {@value}.
*
* When set to false, the S3A connector sees all S3 throttle events,
* And so can update it counters and the metrics, and use its own retry
* policy.
* However, this may have adverse effects on some operations where the S3A
* code cannot retry as efficiently as the AWS client library.
*
* This only applies to S3 operations, not to DynamoDB or other services.
*/
@InterfaceStability.Unstable
public static final String EXPERIMENTAL_AWS_INTERNAL_THROTTLING =
"fs.s3a.experimental.aws.s3.throttling";

/**
* Default value of {@link #EXPERIMENTAL_AWS_INTERNAL_THROTTLING},
* value: {@value}.
*/
@InterfaceStability.Unstable
public static final boolean EXPERIMENTAL_AWS_INTERNAL_THROTTLING_DEFAULT =
true;

// seconds until we give up trying to establish a connection to s3
public static final String ESTABLISH_TIMEOUT =
"fs.s3a.connection.establish.timeout";
Expand Down Expand Up @@ -225,6 +257,33 @@ private Constants() {
public static final String ENABLE_MULTI_DELETE =
"fs.s3a.multiobjectdelete.enable";

/**
* Number of objects to delete in a single multi-object delete {@value}.
* Max: 1000.
*
* A bigger value it means fewer POST requests when deleting a directory
* tree with many objects.
* However, as you are limited to only a a few thousand requests per
* second against a single partition of an S3 bucket,
* a large page size can easily overload the bucket and so trigger
* throttling.
*
* Furthermore, as the reaction to this request is being throttled
* is simply to retry it -it can take a while for the situation to go away.
* While a large value may give better numbers on tests and benchmarks
* where only a single operations being executed, once multiple
* applications start working with the same bucket these large
* deletes can be highly disruptive.
*/
public static final String BULK_DELETE_PAGE_SIZE =
"fs.s3a.bulk.delete.page.size";

/**
* Default Number of objects to delete in a single multi-object
* delete: {@value}.
*/
public static final int BULK_DELETE_PAGE_SIZE_DEFAULT = 250;

// comma separated list of directories
public static final String BUFFER_DIR = "fs.s3a.buffer.dir";

Expand Down Expand Up @@ -733,8 +792,7 @@ private Constants() {
/**
* Default throttled retry limit: {@value}.
*/
public static final int RETRY_THROTTLE_LIMIT_DEFAULT =
DEFAULT_MAX_ERROR_RETRIES;
public static final int RETRY_THROTTLE_LIMIT_DEFAULT = 20;

/**
* Interval between retry attempts on throttled requests: {@value}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;

import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING;
import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING_DEFAULT;
import static org.apache.hadoop.fs.s3a.Constants.PATH_STYLE_ACCESS;

/**
Expand All @@ -56,7 +58,17 @@ public AmazonS3 createS3Client(URI name,
final String userAgentSuffix) throws IOException {
Configuration conf = getConf();
final ClientConfiguration awsConf = S3AUtils
.createAwsConf(getConf(), bucket, Constants.AWS_SERVICE_IDENTIFIER_S3);
.createAwsConf(conf, bucket, Constants.AWS_SERVICE_IDENTIFIER_S3);

// When EXPERIMENTAL_AWS_INTERNAL_THROTTLING is false
// throttling is explicitly disabled on the S3 client so that
// all failures are collected in S3A instrumentation, and its
// retry policy is the only one used.
// This may cause problems in copy/rename.
awsConf.setUseThrottleRetries(
conf.getBoolean(EXPERIMENTAL_AWS_INTERNAL_THROTTLING,
EXPERIMENTAL_AWS_INTERNAL_THROTTLING_DEFAULT));

if (!StringUtils.isEmpty(userAgentSuffix)) {
awsConf.setUserAgentSuffix(userAgentSuffix);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import org.apache.hadoop.fs.s3a.auth.SignerManager;
import org.apache.hadoop.fs.s3a.auth.delegation.DelegationOperations;
import org.apache.hadoop.fs.s3a.auth.delegation.DelegationTokenProvider;
import org.apache.hadoop.fs.s3a.impl.BulkDeleteRetryHandler;
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
import org.apache.hadoop.fs.s3a.impl.ContextAccessors;
import org.apache.hadoop.fs.s3a.impl.CopyOutcome;
Expand Down Expand Up @@ -170,6 +171,8 @@
import static org.apache.hadoop.fs.s3a.auth.RolePolicies.allowS3Operations;
import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.TokenIssuingPolicy.NoTokensAvailable;
import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.hasDelegationTokenBinding;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletionIgnoringExceptions;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404;
import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.fixBucketRegion;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
Expand Down Expand Up @@ -273,6 +276,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,

private ITtlTimeProvider ttlTimeProvider;

/**
* Page size for deletions.
*/
private int pageSize;

/**
* Specific operations used by rename and delete operations.
*/
Expand Down Expand Up @@ -440,6 +448,9 @@ public void initialize(URI name, Configuration originalConf)
}

initMultipartUploads(conf);

pageSize = intOption(getConf(), BULK_DELETE_PAGE_SIZE,
BULK_DELETE_PAGE_SIZE_DEFAULT, 0);
} catch (AmazonClientException e) {
// amazon client exception: stop all services then throw the translation
stopAllServices();
Expand Down Expand Up @@ -1388,7 +1399,8 @@ private long innerRename(Path source, Path dest)
createStoreContext(),
src, srcKey, p.getLeft(),
dst, dstKey, p.getRight(),
operationCallbacks);
operationCallbacks,
pageSize);
return renameOperation.execute();
}

Expand Down Expand Up @@ -1648,10 +1660,11 @@ protected void incrementGauge(Statistic statistic, long count) {
* @param ex exception.
*/
public void operationRetried(Exception ex) {
Statistic stat = isThrottleException(ex)
? STORE_IO_THROTTLED
: IGNORED_ERRORS;
incrementStatistic(stat);
if (isThrottleException(ex)) {
operationThrottled(false);
} else {
incrementStatistic(IGNORED_ERRORS);
}
}

/**
Expand Down Expand Up @@ -1684,11 +1697,28 @@ public void operationRetried(
public void metastoreOperationRetried(Exception ex,
int retries,
boolean idempotent) {
operationRetried(ex);
incrementStatistic(S3GUARD_METADATASTORE_RETRY);
if (isThrottleException(ex)) {
operationThrottled(true);
} else {
incrementStatistic(IGNORED_ERRORS);
}
}

/**
* Note that an operation was throttled -this will update
* specific counters/metrics.
* @param metastore was the throttling observed in the S3Guard metastore?
*/
private void operationThrottled(boolean metastore) {
LOG.debug("Request throttled on {}", metastore ? "S3": "DynamoDB");
if (metastore) {
incrementStatistic(S3GUARD_METADATASTORE_THROTTLED);
instrumentation.addValueToQuantiles(S3GUARD_METADATASTORE_THROTTLE_RATE, 1);
instrumentation.addValueToQuantiles(S3GUARD_METADATASTORE_THROTTLE_RATE,
1);
} else {
incrementStatistic(STORE_IO_THROTTLED);
instrumentation.addValueToQuantiles(STORE_IO_THROTTLE_RATE, 1);
}
}

Expand Down Expand Up @@ -1917,6 +1947,13 @@ private void blockRootDelete(String key) throws InvalidRequestException {
* Increments the {@code OBJECT_DELETE_REQUESTS} and write
* operation statistics.
* Retry policy: retry untranslated; delete considered idempotent.
* If the request is throttled, this is logged in the throttle statistics,
* with the counter set to the number of keys, rather than the number
* of invocations of the delete operation.
* This is because S3 considers each key as one mutating operation on
* the store when updating its load counters on a specific partition
* of an S3 bucket.
* If only the request was measured, this operation would under-report.
* @param deleteRequest keys to delete on the s3-backend
* @return the AWS response
* @throws MultiObjectDeleteException one or more of the keys could not
Expand All @@ -1927,17 +1964,24 @@ private void blockRootDelete(String key) throws InvalidRequestException {
private DeleteObjectsResult deleteObjects(DeleteObjectsRequest deleteRequest)
throws MultiObjectDeleteException, AmazonClientException, IOException {
incrementWriteOperations();
BulkDeleteRetryHandler retryHandler =
new BulkDeleteRetryHandler(createStoreContext());
try(DurationInfo ignored =
new DurationInfo(LOG, false, "DELETE %d keys",
deleteRequest.getKeys().size())) {
return invoker.retryUntranslated("delete",
DELETE_CONSIDERED_IDEMPOTENT,
(text, e, r, i) -> {
// handle the failure
retryHandler.bulkDeleteRetried(deleteRequest, e);
},
() -> {
incrementStatistic(OBJECT_DELETE_REQUESTS, 1);
return s3.deleteObjects(deleteRequest);
});
} catch (MultiObjectDeleteException e) {
// one or more of the operations failed.
// one or more of the keys could not be deleted.
// log and rethrow
List<MultiObjectDeleteException.DeleteError> errors = e.getErrors();
LOG.debug("Partial failure of delete, {} errors", errors.size(), e);
for (MultiObjectDeleteException.DeleteError error : errors) {
Expand Down Expand Up @@ -2254,7 +2298,7 @@ private void noteDeleted(final int count, final boolean deleteFakeDir) {
*/
@VisibleForTesting
@Retries.RetryMixed
void removeKeys(
public void removeKeys(
final List<DeleteObjectsRequest.KeyVersion> keysToDelete,
final boolean deleteFakeDir,
final BulkOperationState operationState)
Expand Down Expand Up @@ -2349,7 +2393,7 @@ public boolean delete(Path f, boolean recursive) throws IOException {
innerGetFileStatus(f, true, StatusProbeEnum.ALL),
recursive,
operationCallbacks,
InternalConstants.MAX_ENTRIES_TO_DELETE);
pageSize);
boolean outcome = deleteOperation.execute();
if (outcome) {
try {
Expand Down Expand Up @@ -2830,7 +2874,7 @@ S3AFileStatus innerGetFileStatus(final Path f,
S3AFileStatus s3GetFileStatus(final Path path,
final String key,
final Set<StatusProbeEnum> probes,
final Set<Path> tombstones) throws IOException {
@Nullable Set<Path> tombstones) throws IOException {
if (!key.isEmpty()) {
if (probes.contains(StatusProbeEnum.Head) && !key.endsWith("/")) {
try {
Expand Down Expand Up @@ -3515,7 +3559,14 @@ void finishedWrite(String key, long length, String eTag, String versionId,
key, length, eTag, versionId);
Path p = keyToQualifiedPath(key);
Preconditions.checkArgument(length >= 0, "content length is negative");
deleteUnnecessaryFakeDirectories(p.getParent());
final boolean isDir = objectRepresentsDirectory(key, length);
// kick off an async delete
final CompletableFuture<?> deletion = submit(
unboundedThreadPool,
() -> {
deleteUnnecessaryFakeDirectories(p.getParent());
return null;
});
// this is only set if there is a metastore to update and the
// operationState parameter passed in was null.
BulkOperationState stateToClose = null;
Expand All @@ -3529,12 +3580,13 @@ void finishedWrite(String key, long length, String eTag, String versionId,
// information gleaned from addAncestors is preserved into the
// subsequent put.
stateToClose = S3Guard.initiateBulkWrite(metadataStore,
BulkOperationState.OperationType.Mkdir,
isDir
? BulkOperationState.OperationType.Mkdir
: BulkOperationState.OperationType.Put,
keyToPath(key));
activeState = stateToClose;
}
S3Guard.addAncestors(metadataStore, p, ttlTimeProvider, activeState);
final boolean isDir = objectRepresentsDirectory(key, length);
S3AFileStatus status = createUploadFileStatus(p,
isDir, length,
getDefaultBlockSize(p), username, eTag, versionId);
Expand All @@ -3557,6 +3609,8 @@ void finishedWrite(String key, long length, String eTag, String versionId,
activeState);
}
}
// and catch up with any delete operation.
waitForCompletionIgnoringExceptions(deletion);
} catch (IOException e) {
if (failOnMetadataWriteError) {
throw new MetadataPersistenceException(p.toString(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
private final MutableCounterLong ignoredErrors;
private final MutableQuantiles putLatencyQuantile;
private final MutableQuantiles throttleRateQuantile;
private final MutableQuantiles s3GuardThrottleRateQuantile;
private final MutableCounterLong numberOfFilesCreated;
private final MutableCounterLong numberOfFilesCopied;
private final MutableCounterLong bytesOfFilesCopied;
Expand Down Expand Up @@ -248,7 +249,9 @@ public S3AInstrumentation(URI name) {
int interval = 1;
putLatencyQuantile = quantiles(S3GUARD_METADATASTORE_PUT_PATH_LATENCY,
"ops", "latency", interval);
throttleRateQuantile = quantiles(S3GUARD_METADATASTORE_THROTTLE_RATE,
s3GuardThrottleRateQuantile = quantiles(S3GUARD_METADATASTORE_THROTTLE_RATE,
"events", "frequency (Hz)", interval);
throttleRateQuantile = quantiles(STORE_IO_THROTTLE_RATE,
"events", "frequency (Hz)", interval);

registerAsMetricsSource(name);
Expand Down Expand Up @@ -617,6 +620,7 @@ public void close() {
// task in a shared thread pool.
putLatencyQuantile.stop();
throttleRateQuantile.stop();
s3GuardThrottleRateQuantile.stop();
metricsSystem.unregisterSource(metricsSourceName);
int activeSources = --metricsSourceActiveCounter;
if (activeSources == 0) {
Expand Down
Loading