Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-16499. S3A retry policy to be exponential #1246

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1660,7 +1660,7 @@

<property>
<name>fs.s3a.retry.limit</name>
<value>${fs.s3a.attempts.maximum}</value>
<value>7</value>
<description>
Number of times to retry any repeatable S3 client request on failure,
excluding throttling requests.
Expand All @@ -1671,7 +1671,7 @@
<name>fs.s3a.retry.interval</name>
<value>500ms</value>
<description>
Interval between attempts to retry operations for any reason other
Initial retry interval when retrying operations for any reason other
than S3 throttle errors.
</description>
</property>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@ private Constants() {
/**
* Default retry limit: {@value}.
*/
public static final int RETRY_LIMIT_DEFAULT = DEFAULT_MAX_ERROR_RETRIES;
public static final int RETRY_LIMIT_DEFAULT = 7;

/**
* Interval between retry attempts.: {@value}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public S3ARetryPolicy(Configuration conf) {
Preconditions.checkArgument(conf != null, "Null configuration");

// base policy from configuration
fixedRetries = retryUpToMaximumCountWithFixedSleep(
fixedRetries = exponentialBackoffRetry(
conf.getInt(RETRY_LIMIT, RETRY_LIMIT_DEFAULT),
conf.getTimeDuration(RETRY_INTERVAL,
RETRY_INTERVAL_DEFAULT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1018,7 +1018,7 @@ is unrecoverable; it's the generic "No" response. Very rarely it
does recover, which is why it is in this category, rather than that
of unrecoverable failures.

These failures will be retried with a fixed sleep interval set in
These failures will be retried with an exponential sleep interval set in
`fs.s3a.retry.interval`, up to the limit set in `fs.s3a.retry.limit`.


Expand All @@ -1033,7 +1033,7 @@ after the request was processed by S3.
* "No response from Server" (443, 444) HTTP responses.
* Any other AWS client, service or S3 exception.

These failures will be retried with a fixed sleep interval set in
These failures will be retried with an exponential sleep interval set in
`fs.s3a.retry.interval`, up to the limit set in `fs.s3a.retry.limit`.

*Important*: DELETE is considered idempotent, hence: `FileSystem.delete()`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1233,17 +1233,20 @@ The number of retries and interval between each retry can be configured:

```xml
<property>
<name>fs.s3a.attempts.maximum</name>
<value>20</value>
<description>How many times we should retry commands on transient errors,
excluding throttling errors.</description>
<name>fs.s3a.retry.limit</name>
<value>7</value>
<description>
Number of times to retry any repeatable S3 client request on failure,
excluding throttling requests.
</description>
</property>

<property>
<name>fs.s3a.retry.interval</name>
<value>500ms</value>
<description>
Interval between retry attempts.
Initial retry interval when retrying operations for any reason other
than S3 throttle errors.
</description>
</property>
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,7 @@ public void testEndpoint() throws Exception {

@Test
public void testProxyConnection() throws Exception {
conf = new Configuration();
conf.setInt(Constants.MAX_ERROR_RETRIES, 2);
useFailFastConfiguration();
conf.set(Constants.PROXY_HOST, "127.0.0.1");
conf.setInt(Constants.PROXY_PORT, 1);
String proxy =
Expand All @@ -133,6 +132,16 @@ public void testProxyConnection() throws Exception {
conf, "when using proxy " + proxy);
}

/**
* Create a configuration designed to fail fast on network problems.
*/
protected void useFailFastConfiguration() {
conf = new Configuration();
conf.setInt(Constants.MAX_ERROR_RETRIES, 2);
conf.setInt(Constants.RETRY_LIMIT, 2);
conf.set(RETRY_INTERVAL, "100ms");
}

/**
* Expect a filesystem to not be created from a configuration
* @return the exception intercepted
Expand All @@ -153,9 +162,8 @@ private <E extends Throwable> E expectFSCreateFailure(

@Test
public void testProxyPortWithoutHost() throws Exception {
conf = new Configuration();
useFailFastConfiguration();
conf.unset(Constants.PROXY_HOST);
conf.setInt(Constants.MAX_ERROR_RETRIES, 2);
conf.setInt(Constants.PROXY_PORT, 1);
IllegalArgumentException e = expectFSCreateFailure(
IllegalArgumentException.class,
Expand All @@ -169,9 +177,8 @@ public void testProxyPortWithoutHost() throws Exception {

@Test
public void testAutomaticProxyPortSelection() throws Exception {
conf = new Configuration();
useFailFastConfiguration();
conf.unset(Constants.PROXY_PORT);
conf.setInt(Constants.MAX_ERROR_RETRIES, 2);
conf.set(Constants.PROXY_HOST, "127.0.0.1");
conf.set(Constants.SECURE_CONNECTIONS, "true");
expectFSCreateFailure(AWSClientIOException.class,
Expand All @@ -183,8 +190,7 @@ public void testAutomaticProxyPortSelection() throws Exception {

@Test
public void testUsernameInconsistentWithPassword() throws Exception {
conf = new Configuration();
conf.setInt(Constants.MAX_ERROR_RETRIES, 2);
useFailFastConfiguration();
conf.set(Constants.PROXY_HOST, "127.0.0.1");
conf.setInt(Constants.PROXY_PORT, 1);
conf.set(Constants.PROXY_USERNAME, "user");
Expand All @@ -204,8 +210,7 @@ private void assertIsProxyUsernameError(final IllegalArgumentException e) {

@Test
public void testUsernameInconsistentWithPassword2() throws Exception {
conf = new Configuration();
conf.setInt(Constants.MAX_ERROR_RETRIES, 2);
useFailFastConfiguration();
conf.set(Constants.PROXY_HOST, "127.0.0.1");
conf.setInt(Constants.PROXY_PORT, 1);
conf.set(Constants.PROXY_PASSWORD, "password");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

package org.apache.hadoop.fs.s3a;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
Expand All @@ -30,14 +30,35 @@
import org.junit.Test;

import java.io.FileNotFoundException;
import java.util.concurrent.Callable;

import static org.apache.hadoop.fs.s3a.Constants.CHANGE_DETECT_MODE;
import static org.apache.hadoop.fs.s3a.Constants.CHANGE_DETECT_SOURCE;
import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_AUTHORITATIVE;
import static org.apache.hadoop.fs.s3a.Constants.RETRY_INTERVAL;
import static org.apache.hadoop.fs.s3a.Constants.RETRY_LIMIT;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;

/**
* Tests behavior of a FileNotFound error that happens after open(), i.e. on
* the first read.
*/
public class ITestS3ADelayedFNF extends AbstractS3ATestBase {

@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
// reduce retry limit so FileNotFoundException cases timeout faster,
// speeding up the tests
removeBaseAndBucketOverrides(conf,
CHANGE_DETECT_SOURCE,
CHANGE_DETECT_MODE,
RETRY_LIMIT,
RETRY_INTERVAL,
METADATASTORE_AUTHORITATIVE);
conf.setInt(RETRY_LIMIT, 2);
conf.set(RETRY_INTERVAL, "1ms");
return conf;
}

/**
* See debugging documentation
Expand All @@ -46,9 +67,9 @@ public class ITestS3ADelayedFNF extends AbstractS3ATestBase {
*/
@Test
public void testNotFoundFirstRead() throws Exception {
FileSystem fs = getFileSystem();
S3AFileSystem fs = getFileSystem();
ChangeDetectionPolicy changeDetectionPolicy =
((S3AFileSystem) fs).getChangeDetectionPolicy();
fs.getChangeDetectionPolicy();
Assume.assumeFalse("FNF not expected when using a bucket with"
+ " object versioning",
changeDetectionPolicy.getSource() == Source.VersionId);
Expand All @@ -61,12 +82,7 @@ public void testNotFoundFirstRead() throws Exception {

// This should fail since we deleted after the open.
LambdaTestUtils.intercept(FileNotFoundException.class,
new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return in.read();
}
});
() -> in.read());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.contract.s3a.S3AContract;
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy.Source;
import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
Expand All @@ -40,6 +38,7 @@
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.FailureInjectionPolicy.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.test.LambdaTestUtils.eventually;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;

Expand All @@ -53,16 +52,40 @@
*/
public class ITestS3AInconsistency extends AbstractS3ATestBase {

private static final int OPEN_READ_ITERATIONS = 20;
private static final int OPEN_READ_ITERATIONS = 10;

public static final int INCONSISTENCY_MSEC = 800;

private static final int INITIAL_RETRY = 128;

private static final int RETRIES = 4;

/** By using a power of 2 for the initial time, the total is a shift left. */
private static final int TOTAL_RETRY_DELAY = INITIAL_RETRY << RETRIES;

@Override
protected AbstractFSContract createContract(Configuration conf) {
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
// reduce retry limit so FileNotFoundException cases timeout faster,
// speeding up the tests
removeBaseAndBucketOverrides(conf,
CHANGE_DETECT_SOURCE,
CHANGE_DETECT_MODE,
RETRY_LIMIT,
RETRY_INTERVAL,
METADATASTORE_AUTHORITATIVE,
S3_CLIENT_FACTORY_IMPL);
conf.setClass(S3_CLIENT_FACTORY_IMPL, InconsistentS3ClientFactory.class,
S3ClientFactory.class);
conf.set(FAIL_INJECT_INCONSISTENCY_KEY, DEFAULT_DELAY_KEY_SUBSTRING);
// the reads are always inconsistent
conf.setFloat(FAIL_INJECT_INCONSISTENCY_PROBABILITY, 1.0f);
conf.setLong(FAIL_INJECT_INCONSISTENCY_MSEC, DEFAULT_DELAY_KEY_MSEC);
return new S3AContract(conf);
// but the inconsistent time is less than exponential growth of the
// retry interval (128 -> 256 -> 512 -> 1024
conf.setLong(FAIL_INJECT_INCONSISTENCY_MSEC, INCONSISTENCY_MSEC);
conf.setInt(RETRY_LIMIT, RETRIES);
conf.set(RETRY_INTERVAL, String.format("%dms", INITIAL_RETRY));
return conf;
}

@Test
Expand Down Expand Up @@ -111,7 +134,7 @@ public void testGetFileStatus() throws Exception {
public void testOpenDeleteRead() throws Exception {
S3AFileSystem fs = getFileSystem();
ChangeDetectionPolicy changeDetectionPolicy =
((S3AFileSystem) fs).getChangeDetectionPolicy();
fs.getChangeDetectionPolicy();
Assume.assumeFalse("FNF not expected when using a bucket with"
+ " object versioning",
changeDetectionPolicy.getSource() == Source.VersionId);
Expand All @@ -124,7 +147,7 @@ public void testOpenDeleteRead() throws Exception {
fs.setMetadataStore(new NullMetadataStore());
fs.delete(p, false);
fs.setMetadataStore(metadataStore);
eventually(1000, 200, () -> {
eventually(TOTAL_RETRY_DELAY * 2, INITIAL_RETRY * 2, () -> {
intercept(FileNotFoundException.class, () -> s.read());
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@
import static org.apache.hadoop.fs.contract.ContractTestUtils.readUTF8;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBucketOverrides;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy.CHANGE_DETECTED;
import static org.apache.hadoop.fs.s3a.select.SelectConstants.S3_SELECT_CAPABILITY;
import static org.apache.hadoop.fs.s3a.select.SelectConstants.SELECT_SQL;
Expand Down Expand Up @@ -123,8 +122,8 @@ public class ITestS3ARemoteFileChanged extends AbstractS3ATestBase {

private static final byte[] TEST_DATA_BYTES = TEST_DATA.getBytes(
Charsets.UTF_8);
private static final int TEST_MAX_RETRIES = 5;
private static final String TEST_RETRY_INTERVAL = "10ms";
private static final int TEST_MAX_RETRIES = 4;
private static final String TEST_RETRY_INTERVAL = "1ms";
private static final String QUOTED_TEST_DATA =
"\"" + TEST_DATA + "\"";

Expand Down Expand Up @@ -276,8 +275,7 @@ public void teardown() throws Exception {
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
String bucketName = getTestBucketName(conf);
removeBucketOverrides(bucketName, conf,
removeBaseAndBucketOverrides(conf,
CHANGE_DETECT_SOURCE,
CHANGE_DETECT_MODE,
RETRY_LIMIT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,13 @@
import static org.apache.hadoop.fs.contract.ContractTestUtils.readBytesToString;
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
import static org.apache.hadoop.fs.s3a.Constants.CHANGE_DETECT_MODE;
import static org.apache.hadoop.fs.s3a.Constants.CHANGE_DETECT_SOURCE;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_METADATASTORE_METADATA_TTL;
import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_AUTHORITATIVE;
import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_METADATA_TTL;
import static org.apache.hadoop.fs.s3a.Constants.RETRY_INTERVAL;
import static org.apache.hadoop.fs.s3a.Constants.RETRY_LIMIT;
import static org.apache.hadoop.fs.s3a.Constants.S3_METADATA_STORE_IMPL;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.checkListingContainsPath;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.checkListingDoesNotContainPath;
Expand Down Expand Up @@ -115,7 +119,7 @@ public class ITestS3GuardOutOfBandOperations extends AbstractS3ATestBase {

public static final int STABILIZATION_TIME = 20_000;

public static final int PROBE_INTERVAL_MILLIS = 500;
public static final int PROBE_INTERVAL_MILLIS = 2500;

private S3AFileSystem guardedFs;
private S3AFileSystem rawFS;
Expand Down Expand Up @@ -153,6 +157,19 @@ protected String getMethodName() {
(authoritative ? "-auth" : "-nonauth");
}

@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
// reduce retry limit so FileNotFoundException cases timeout faster,
// speeding up the tests
removeBaseAndBucketOverrides(conf,
RETRY_LIMIT,
RETRY_INTERVAL);
conf.setInt(RETRY_LIMIT, 3);
conf.set(RETRY_INTERVAL, "10ms");
return conf;
}

@Before
public void setup() throws Exception {
super.setup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,20 @@ public static void removeBaseAndBucketOverrides(final String bucket,
removeBucketOverrides(bucket, conf, options);
}

/**
* Remove any values from the test bucket and the base values too.
* @param conf config
* @param options list of fs.s3a options to remove
*/
public static void removeBaseAndBucketOverrides(
final Configuration conf,
final String... options) {
for (String option : options) {
conf.unset(option);
}
removeBaseAndBucketOverrides(getTestBucketName(conf), conf, options);
}

/**
* Call a function; any exception raised is logged at info.
* This is for test teardowns.
Expand Down
Loading