Skip to content

Commit

Permalink
HADOOP-19295. review feedback
Browse files Browse the repository at this point in the history
* default timeout set in builder
* tune logging of content provider on recovery
* new tests to verify timeout propagation
* discovered a new wrapping of failures in read(),
  so relaxed intercept exception class more

Change-Id: I43e2822e4dbd684d2c0469650b07369b731a2e7c
  • Loading branch information
steveloughran committed Oct 4, 2024
1 parent b5833bb commit 04a18d3
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;

import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT;
import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.UNKNOWN_ALGORITHM;
import static org.apache.hadoop.fs.s3a.impl.AWSClientConfig.setRequestTimeout;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT;
Expand Down Expand Up @@ -724,7 +725,7 @@ public static final class RequestFactoryBuilder {
* This will be set on data put/post operations only.
* A zero value means "no custom timeout"
*/
private Duration partUploadTimeout = Duration.ZERO;
private Duration partUploadTimeout = DEFAULT_PART_UPLOAD_TIMEOUT;

private RequestFactoryBuilder() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,11 +282,11 @@ public final InputStream newStream() {
close();
checkOpen();
streamCreationCount++;
if (streamCreationCount > 1) {
if (streamCreationCount == 2) {
// the stream has been recreated for the first time.
// notify only once for this stream, so as not to flood
// the logs.
LOG.info("Stream recreated: {}", this);
if (LOG.isDebugEnabled()) {
LOG.debug("Stream creation stack", new Exception("here"));
}
}
return setCurrentStream(createNewStream());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertLacksPathCapabilities;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT;
import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM;
import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
Expand Down Expand Up @@ -100,7 +101,10 @@ public void testCreateNonRecursiveSuccess() throws IOException {
public void testPutObjectDirect() throws Throwable {
final S3AFileSystem fs = getFileSystem();
try (AuditSpan span = span()) {
RequestFactory factory = RequestFactoryImpl.builder().withBucket(fs.getBucket()).build();
RequestFactory factory = RequestFactoryImpl.builder()
.withBucket(fs.getBucket())
.withPartUploadTimeout(DEFAULT_PART_UPLOAD_TIMEOUT)
.build();
Path path = path("putDirect");
PutObjectRequest.Builder putObjectRequestBuilder =
factory.newPutObjectRequestBuilder(path.toUri().getPath(), null, -1, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.hadoop.util.Progressable;


import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT;
import static org.apache.hadoop.fs.s3a.audit.AuditTestSupport.noopAuditor;
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.stubDurationTrackerFactory;
import static org.apache.hadoop.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -99,6 +100,7 @@ public class MockS3AFileSystem extends S3AFileSystem {
.withRequestPreparer(MockS3AFileSystem::prepareRequest)
.withBucket(BUCKET)
.withEncryptionSecrets(new EncryptionSecrets())
.withPartUploadTimeout(DEFAULT_PART_UPLOAD_TIMEOUT)
.build();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.hadoop.fs.s3a.impl;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -73,7 +72,7 @@
* The likely cause is actually -Dprefetch test runs as these return connections to
* the pool.
* However, it is also important to have a non-brittle FS for creating the test file
* and teardow, again, this makes for a flaky test..
* and teardown, again, this makes for a flaky test.
*/
public class ITestConnectionTimeouts extends AbstractS3ATestBase {

Expand Down Expand Up @@ -248,7 +247,8 @@ public void testObjectUploadTimeouts() throws Throwable {
// the exact IOE depends on what failed; if it is in the http read it will be a
// software.amazon.awssdk.thirdparty.org.apache.http.ConnectionClosedException
// which is too low level to safely assert about.
intercept(IOException.class, () ->
// it can also surface as an UncheckedIOException wrapping the inner cause.
intercept(Exception.class, () ->
ContractTestUtils.readUTF8(brittleFS, file, DATASET.length));
Assertions.assertThat(totalSleepTime.get())
.describedAs("total sleep time of read")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,21 @@
package org.apache.hadoop.fs.s3a.impl;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicLong;

import software.amazon.awssdk.awscore.AwsRequest;
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
import software.amazon.awssdk.core.SdkRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Request;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;


import org.apache.hadoop.fs.PathIOException;
Expand All @@ -38,6 +43,7 @@
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
import org.apache.hadoop.test.AbstractHadoopTestBase;

import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -109,8 +115,6 @@ public void testRequestFactoryWithCannedACL() throws Throwable {
.isEqualTo(acl);
}



/**
* Now add a processor and verify that it was invoked for
* exactly as many requests as were analyzed.
Expand Down Expand Up @@ -207,4 +211,64 @@ public void testMultipartUploadRequest() throws Throwable {
.isEqualTo(requestsAnalyzed);
}

/**
* Assertion for Request timeouts.
* @param duration expected duration.
* @param request request.
*/
private void assertApiTimeouts(Duration duration, S3Request request) {
Assertions.assertThat(request.overrideConfiguration())
.describedAs("request %s", request)
.isNotEmpty();
final AwsRequestOverrideConfiguration override =
request.overrideConfiguration().get();
Assertions.assertThat(override.apiCallAttemptTimeout())
.describedAs("apiCallAttemptTimeout")
.hasValue(duration);
Assertions.assertThat(override.apiCallTimeout())
.describedAs("apiCallTimeout")
.hasValue(duration);
}

/**
* If not overridden timeouts are set to the default part upload timeout.
*/
@Test
public void testDefaultUploadTimeouts() throws Throwable {

RequestFactory factory = RequestFactoryImpl.builder()
.withBucket("bucket")
.withMultipartPartCountLimit(2)
.build();
final UploadPartRequest upload =
factory.newUploadPartRequestBuilder("path", "id", 2, 128_000_000).build();
assertApiTimeouts(DEFAULT_PART_UPLOAD_TIMEOUT, upload);
}

/**
* Verify that when upload request timeouts are set,
* they are passed down.
*/
@Test
public void testUploadTimeouts() throws Throwable {
Duration partDuration = Duration.ofDays(1);
RequestFactory factory = RequestFactoryImpl.builder()
.withBucket("bucket")
.withPartUploadTimeout(partDuration)
.build();

String path = "path";

// A simple PUT
final PutObjectRequest put = factory.newPutObjectRequestBuilder(path,
PutObjectOptions.deletingDirs(), 1024, false).build();
assertApiTimeouts(partDuration, put);

// multipart part
final UploadPartRequest upload = factory.newUploadPartRequestBuilder(path,
"1", 3, 128_000_000)
.build();
assertApiTimeouts(partDuration, upload);

}
}

0 comments on commit 04a18d3

Please sign in to comment.