Skip to content

Commit

Permalink
HADOOP-18888. S3A. createS3AsyncClient() always enables multipart upl…
Browse files Browse the repository at this point in the history
…oads (#6056)


* The multipart flag fs.s3a.multipart.uploads.enabled is passed to the async client created
* s3A connector bypasses the transfer manager entirely if disabled or for small files.

Contributed by Steve Loughran
  • Loading branch information
steveloughran authored Sep 15, 2023
1 parent 510a7dc commit 120620c
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public S3AsyncClient createS3AsyncClient(
return configureClientBuilder(S3AsyncClient.builder(), parameters, conf, bucket)
.httpClientBuilder(httpClientBuilder)
.multipartConfiguration(multipartConfiguration)
.multipartEnabled(true)
.multipartEnabled(parameters.isMultipartCopy())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
*/
private boolean isMultipartUploadEnabled = DEFAULT_MULTIPART_UPLOAD_ENABLED;

/**
* Should file copy operations use the S3 transfer manager?
* True unless multipart upload is disabled.
*/
private boolean isMultipartCopyEnabled;

/**
* A cache of files that should be deleted when the FileSystem is closed
* or the JVM is exited.
Expand Down Expand Up @@ -576,6 +582,9 @@ public void initialize(URI name, Configuration originalConf)
intOption(conf, PREFETCH_BLOCK_COUNT_KEY, PREFETCH_BLOCK_DEFAULT_COUNT, 1);
this.isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED,
DEFAULT_MULTIPART_UPLOAD_ENABLED);
// multipart copy and upload are the same; this just makes it explicit
this.isMultipartCopyEnabled = isMultipartUploadEnabled;

initThreadPools(conf);

int listVersion = conf.getInt(LIST_VERSION, DEFAULT_LIST_VERSION);
Expand Down Expand Up @@ -982,6 +991,7 @@ private void bindAWSClient(URI name, boolean dtEnabled) throws IOException {
.withRequesterPays(conf.getBoolean(ALLOW_REQUESTER_PAYS, DEFAULT_ALLOW_REQUESTER_PAYS))
.withExecutionInterceptors(auditManager.createExecutionInterceptors())
.withMinimumPartSize(partSize)
.withMultipartCopyEnabled(isMultipartCopyEnabled)
.withMultipartThreshold(multiPartThreshold)
.withTransferManagerExecutor(unboundedThreadPool)
.withRegion(region);
Expand Down Expand Up @@ -1468,6 +1478,11 @@ public AWSCredentialProviderList shareCredentials(final String purpose) {
LOG.debug("Sharing credentials for: {}", purpose);
return credentials.share();
}

@Override
public boolean isMultipartCopyEnabled() {
return S3AFileSystem.this.isMultipartUploadEnabled;
}
}

/**
Expand Down Expand Up @@ -4436,37 +4451,56 @@ private CopyObjectResponse copyFile(String srcKey, String dstKey, long size,
e);
}

return readInvoker.retry(
action, srcKey,
true,
() -> {
CopyObjectRequest.Builder copyObjectRequestBuilder =
getRequestFactory().newCopyObjectRequestBuilder(srcKey, dstKey, srcom);
changeTracker.maybeApplyConstraint(copyObjectRequestBuilder);
incrementStatistic(OBJECT_COPY_REQUESTS);

Copy copy = transferManager.copy(
CopyRequest.builder()
.copyObjectRequest(copyObjectRequestBuilder.build())
.build());
CopyObjectRequest.Builder copyObjectRequestBuilder =
getRequestFactory().newCopyObjectRequestBuilder(srcKey, dstKey, srcom);
changeTracker.maybeApplyConstraint(copyObjectRequestBuilder);
CopyObjectResponse response;

try {
CompletedCopy completedCopy = copy.completionFuture().join();
CopyObjectResponse result = completedCopy.response();
changeTracker.processResponse(result);
incrementWriteOperations();
instrumentation.filesCopied(1, size);
return result;
} catch (CompletionException e) {
Throwable cause = e.getCause();
if (cause instanceof SdkException) {
SdkException awsException = (SdkException)cause;
changeTracker.processException(awsException, "copy");
throw awsException;
// transfer manager is skipped if disabled or the file is too small to worry about
final boolean useTransferManager = isMultipartCopyEnabled && size >= multiPartThreshold;
if (useTransferManager) {
// use transfer manager
response = readInvoker.retry(
action, srcKey,
true,
() -> {
incrementStatistic(OBJECT_COPY_REQUESTS);

Copy copy = transferManager.copy(
CopyRequest.builder()
.copyObjectRequest(copyObjectRequestBuilder.build())
.build());

try {
CompletedCopy completedCopy = copy.completionFuture().join();
return completedCopy.response();
} catch (CompletionException e) {
Throwable cause = e.getCause();
if (cause instanceof SdkException) {
SdkException awsException = (SdkException)cause;
changeTracker.processException(awsException, "copy");
throw awsException;
}
throw extractException(action, srcKey, e);
}
throw extractException(action, srcKey, e);
}
});
});
} else {
// single part copy bypasses transfer manager
// note, this helps with some mock testing, e.g. HBoss. as there is less to mock.
response = readInvoker.retry(
action, srcKey,
true,
() -> {
LOG.debug("copyFile: single part copy {} -> {} of size {}", srcKey, dstKey, size);
incrementStatistic(OBJECT_COPY_REQUESTS);
return s3Client.copyObject(copyObjectRequestBuilder.build());
});
}

changeTracker.processResponse(response);
incrementWriteOperations();
instrumentation.filesCopied(1, size);
return response;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,10 @@ public interface S3AInternals {
@AuditEntryPoint
@Retries.RetryTranslated
HeadBucketResponse getBucketMetadata() throws IOException;

/**
* Is multipart copy enabled?
* @return true if the transfer manager is used to copy files.
*/
boolean isMultipartCopyEnabled();
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ final class S3ClientCreationParameters {
*/
private long multiPartThreshold;

/**
* Multipart upload enabled.
*/
private boolean multipartCopy = true;

/**
* Executor that the transfer manager will use to execute background tasks.
*/
Expand Down Expand Up @@ -399,5 +404,24 @@ public S3ClientCreationParameters withRegion(
public Region getRegion() {
return region;
}

/**
* Set the multipart flag..
*
* @param value new value
* @return the builder
*/
public S3ClientCreationParameters withMultipartCopyEnabled(final boolean value) {
this.multipartCopy = value;
return this;
}

/**
* Get the multipart flag.
* @return multipart flag
*/
public boolean isMultipartCopy() {
return multipartCopy;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -355,10 +355,10 @@ public long getFilesize() {
/**
* Is this expected to be a multipart upload?
* Assertions will change if not.
* @return true by default.
* @return what the filesystem expects.
*/
protected boolean expectMultipartUpload() {
return true;
return getFileSystem().getS3AInternals().isMultipartCopyEnabled();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,10 @@

package org.apache.hadoop.fs.s3a.scale;

import org.assertj.core.api.Assertions;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3ATestUtils;
import org.apache.hadoop.fs.s3a.api.UnsupportedRequestException;

import static org.apache.hadoop.fs.contract.ContractTestUtils.IO_CHUNK_BUFFER_SIZE;
import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD;
Expand All @@ -33,19 +30,13 @@
import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED;
import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;

/**
* Use a single PUT for the whole upload/rename/delete workflow; include verification
* that the transfer manager will fail fast unless the multipart threshold is huge.
*/
public class ITestS3AHugeFilesNoMultipart extends AbstractSTestS3AHugeFiles {

/**
* Size to ensure MPUs don't happen in transfer manager.
*/
public static final String S_1T = "1T";

public static final String SINGLE_PUT_REQUEST_TIMEOUT = "1h";

/**
Expand All @@ -56,11 +47,23 @@ protected String getBlockOutputBufferName() {
return Constants.FAST_UPLOAD_BUFFER_DISK;
}

/**
* Multipart upload is always disabled.
* @return false
*/
@Override
protected boolean expectMultipartUpload() {
return false;
}

/**
* Is multipart copy enabled?
* @return true if the transfer manager is used to copy files.
*/
private boolean isMultipartCopyEnabled() {
return getFileSystem().getS3AInternals().isMultipartCopyEnabled();
}

/**
* Create a configuration without multipart upload,
* and a long request timeout to allow for a very slow
Expand All @@ -77,35 +80,21 @@ protected Configuration createScaleConfiguration() {
MULTIPART_SIZE,
REQUEST_TIMEOUT);
conf.setInt(IO_CHUNK_BUFFER_SIZE, 655360);
conf.set(MIN_MULTIPART_THRESHOLD, S_1T);
conf.set(MULTIPART_SIZE, S_1T);
conf.setInt(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE);
conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE);
conf.setBoolean(MULTIPART_UPLOADS_ENABLED, false);
conf.set(REQUEST_TIMEOUT, SINGLE_PUT_REQUEST_TIMEOUT);
return conf;
}

/**
* After the file is created, attempt a rename with an FS
* instance with a small multipart threshold;
* this MUST be rejected.
* Verify multipart copy is disabled.
*/
@Override
public void test_030_postCreationAssertions() throws Throwable {
assumeHugeFileExists();
final Path hugefile = getHugefile();
final Path hugefileRenamed = getHugefileRenamed();
describe("renaming %s to %s", hugefile, hugefileRenamed);
S3AFileSystem fs = getFileSystem();
fs.delete(hugefileRenamed, false);
// create a new fs with a small multipart threshold; expect rename failure.
final Configuration conf = new Configuration(fs.getConf());
conf.setInt(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE);
conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE);
S3ATestUtils.disableFilesystemCaching(conf);

try (FileSystem fs2 = FileSystem.get(fs.getUri(), conf)) {
intercept(UnsupportedRequestException.class, () ->
fs2.rename(hugefile, hugefileRenamed));
}
super.test_030_postCreationAssertions();
Assertions.assertThat(isMultipartCopyEnabled())
.describedAs("Multipart copy should be disabled in %s", getFileSystem())
.isFalse();
}
}

0 comments on commit 120620c

Please sign in to comment.