diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java index 7b038cecef53f..e9f7d707286a8 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java @@ -60,6 +60,7 @@ import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets; import static org.apache.commons.lang3.StringUtils.isNotEmpty; +import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT; import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.UNKNOWN_ALGORITHM; import static org.apache.hadoop.fs.s3a.impl.AWSClientConfig.setRequestTimeout; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT; @@ -724,7 +725,7 @@ public static final class RequestFactoryBuilder { * This will be set on data put/post operations only. * A zero value means "no custom timeout" */ - private Duration partUploadTimeout = Duration.ZERO; + private Duration partUploadTimeout = DEFAULT_PART_UPLOAD_TIMEOUT; private RequestFactoryBuilder() { } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/UploadContentProviders.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/UploadContentProviders.java index e4ce053176702..d1fb28257f2ab 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/UploadContentProviders.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/UploadContentProviders.java @@ -282,11 +282,11 @@ public final InputStream newStream() { close(); checkOpen(); streamCreationCount++; - if (streamCreationCount > 1) { + if (streamCreationCount == 2) { + // the stream has been recreated for the first time. + // notify only once for this stream, so as not to flood + // the logs. LOG.info("Stream recreated: {}", this); - if (LOG.isDebugEnabled()) { - LOG.debug("Stream creation stack", new Exception("here")); - } } return setCurrentStream(createNewStream()); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java index 5e127050fe65b..a4eba8a96478e 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java @@ -41,6 +41,7 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.assertLacksPathCapabilities; import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; import static org.apache.hadoop.fs.contract.ContractTestUtils.touch; +import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT; import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM; import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY; import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM; @@ -100,7 +101,10 @@ public void testCreateNonRecursiveSuccess() throws IOException { public void testPutObjectDirect() throws Throwable { final S3AFileSystem fs = getFileSystem(); try (AuditSpan span = span()) { - RequestFactory factory = RequestFactoryImpl.builder().withBucket(fs.getBucket()).build(); + RequestFactory factory = RequestFactoryImpl.builder() + .withBucket(fs.getBucket()) + .withPartUploadTimeout(DEFAULT_PART_UPLOAD_TIMEOUT) + .build(); Path path = path("putDirect"); PutObjectRequest.Builder putObjectRequestBuilder = factory.newPutObjectRequestBuilder(path.toUri().getPath(), null, -1, false); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java index 3c53fd6081663..44864cd67089a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java @@ -54,6 +54,7 @@ import org.apache.hadoop.util.Progressable; +import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT; import static org.apache.hadoop.fs.s3a.audit.AuditTestSupport.noopAuditor; import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.stubDurationTrackerFactory; import static org.apache.hadoop.util.Preconditions.checkNotNull; @@ -99,6 +100,7 @@ public class MockS3AFileSystem extends S3AFileSystem { .withRequestPreparer(MockS3AFileSystem::prepareRequest) .withBucket(BUCKET) .withEncryptionSecrets(new EncryptionSecrets()) + .withPartUploadTimeout(DEFAULT_PART_UPLOAD_TIMEOUT) .build(); /** diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestConnectionTimeouts.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestConnectionTimeouts.java index 3dc5ddf29a083..9ca12e4f31a60 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestConnectionTimeouts.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestConnectionTimeouts.java @@ -18,7 +18,6 @@ package org.apache.hadoop.fs.s3a.impl; -import java.io.IOException; import java.time.Duration; import java.util.ArrayList; import java.util.List; @@ -73,7 +72,7 @@ * The likely cause is actually -Dprefetch test runs as these return connections to * the pool. * However, it is also important to have a non-brittle FS for creating the test file - * and teardow, again, this makes for a flaky test.. + * and teardown, again, this makes for a flaky test. */ public class ITestConnectionTimeouts extends AbstractS3ATestBase { @@ -248,7 +247,8 @@ public void testObjectUploadTimeouts() throws Throwable { // the exact IOE depends on what failed; if it is in the http read it will be a // software.amazon.awssdk.thirdparty.org.apache.http.ConnectionClosedException // which is too low level to safely assert about. - intercept(IOException.class, () -> + // it can also surface as an UncheckedIOException wrapping the inner cause. + intercept(Exception.class, () -> ContractTestUtils.readUTF8(brittleFS, file, DATASET.length)); Assertions.assertThat(totalSleepTime.get()) .describedAs("total sleep time of read") diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java index 9fee2fd63a0ef..b864cd3b63982 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java @@ -19,16 +19,21 @@ package org.apache.hadoop.fs.s3a.impl; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.concurrent.atomic.AtomicLong; import software.amazon.awssdk.awscore.AwsRequest; +import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration; import software.amazon.awssdk.core.SdkRequest; import software.amazon.awssdk.services.s3.model.HeadObjectResponse; import org.assertj.core.api.Assertions; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.S3Request; +import software.amazon.awssdk.services.s3.model.UploadPartRequest; import org.apache.hadoop.fs.PathIOException; @@ -38,6 +43,7 @@ import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets; import org.apache.hadoop.test.AbstractHadoopTestBase; +import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT; import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.assertj.core.api.Assertions.assertThat; @@ -109,8 +115,6 @@ public void testRequestFactoryWithCannedACL() throws Throwable { .isEqualTo(acl); } - - /** * Now add a processor and verify that it was invoked for * exactly as many requests as were analyzed. @@ -207,4 +211,64 @@ public void testMultipartUploadRequest() throws Throwable { .isEqualTo(requestsAnalyzed); } + /** + * Assertion for Request timeouts. + * @param duration expected duration. + * @param request request. + */ + private void assertApiTimeouts(Duration duration, S3Request request) { + Assertions.assertThat(request.overrideConfiguration()) + .describedAs("request %s", request) + .isNotEmpty(); + final AwsRequestOverrideConfiguration override = + request.overrideConfiguration().get(); + Assertions.assertThat(override.apiCallAttemptTimeout()) + .describedAs("apiCallAttemptTimeout") + .hasValue(duration); + Assertions.assertThat(override.apiCallTimeout()) + .describedAs("apiCallTimeout") + .hasValue(duration); + } + + /** + * If not overridden timeouts are set to the default part upload timeout. + */ + @Test + public void testDefaultUploadTimeouts() throws Throwable { + + RequestFactory factory = RequestFactoryImpl.builder() + .withBucket("bucket") + .withMultipartPartCountLimit(2) + .build(); + final UploadPartRequest upload = + factory.newUploadPartRequestBuilder("path", "id", 2, 128_000_000).build(); + assertApiTimeouts(DEFAULT_PART_UPLOAD_TIMEOUT, upload); + } + + /** + * Verify that when upload request timeouts are set, + * they are passed down. + */ + @Test + public void testUploadTimeouts() throws Throwable { + Duration partDuration = Duration.ofDays(1); + RequestFactory factory = RequestFactoryImpl.builder() + .withBucket("bucket") + .withPartUploadTimeout(partDuration) + .build(); + + String path = "path"; + + // A simple PUT + final PutObjectRequest put = factory.newPutObjectRequestBuilder(path, + PutObjectOptions.deletingDirs(), 1024, false).build(); + assertApiTimeouts(partDuration, put); + + // multipart part + final UploadPartRequest upload = factory.newUploadPartRequestBuilder(path, + "1", 3, 128_000_000) + .build(); + assertApiTimeouts(partDuration, upload); + + } }