Skip to content

Commit aa31af9

Browse files
committed
HDFS-13934. S3A MPUploader uses bulk operation state to avoid
excessive DDB IO. Change-Id: Icbab3ef1e93ad3da87a3b29d11698213d1f0e26c
1 parent de84eed commit aa31af9

File tree

3 files changed

+48
-7
lines changed

3 files changed

+48
-7
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ private CompleteMultipartUploadResult finalizeMultipartUpload(
254254
"No upload parts in multipart upload to " + destKey);
255255
}
256256
CompleteMultipartUploadResult uploadResult =
257-
invoker.retry("Completing multipart commit", destKey,
257+
invoker.retry("Completing multipart upload", destKey,
258258
true,
259259
retrying,
260260
() -> {
@@ -539,8 +539,20 @@ public CompleteMultipartUploadResult commitUpload(
539539
*/
540540
public BulkOperationState initiateCommitOperation(
541541
Path path) throws IOException {
542+
return initiateOperation(path, BulkOperationState.OperationType.Commit);
543+
}
544+
545+
/**
546+
* Initiate a commit operation through any metastore.
547+
* @param path path under which the writes will all take place.
548+
* @param operationType operation to initiate
549+
* @return an possibly null operation state from the metastore.
550+
* @throws IOException failure to instantiate.
551+
*/
552+
public BulkOperationState initiateOperation(final Path path,
553+
final BulkOperationState.OperationType operationType) throws IOException {
542554
return S3Guard.initiateBulkWrite(owner.getMetadataStore(),
543-
BulkOperationState.OperationType.Commit, path);
555+
operationType, path);
544556
}
545557

546558
/**

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AMultipartUploader.java

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import java.util.List;
3131
import java.util.Map;
3232
import java.util.concurrent.CompletableFuture;
33-
import java.util.concurrent.atomic.AtomicInteger;
3433

3534
import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
3635
import com.amazonaws.services.s3.model.PartETag;
@@ -52,6 +51,7 @@
5251
import org.apache.hadoop.fs.UploadHandle;
5352
import org.apache.hadoop.fs.impl.AbstractMultipartUploader;
5453
import org.apache.hadoop.fs.s3a.WriteOperationHelper;
54+
import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
5555

5656
/**
5757
* MultipartUploader for S3AFileSystem. This uses the S3 multipart
@@ -71,15 +71,40 @@ public class S3AMultipartUploader extends AbstractMultipartUploader {
7171

7272
private final StoreContext context;
7373

74+
private final Path path;
75+
76+
/**
77+
* Bulk state -demand created during completion operations.
78+
*/
79+
private BulkOperationState operationState;
80+
7481
public S3AMultipartUploader(final S3AMultipartUploaderBuilder builder,
7582
final WriteOperationHelper writeHelper,
7683
final StoreContext context) {
7784
this.builder = builder;
7885
this.writeHelper = writeHelper;
86+
path = builder.getPath();
7987

8088
this.context = context;
8189
}
8290

91+
@Override
92+
public void close() throws IOException {
93+
if (operationState != null) {
94+
operationState.close();
95+
}
96+
super.close();
97+
}
98+
99+
private synchronized BulkOperationState retrieveOperationState()
100+
throws IOException {
101+
if (operationState == null) {
102+
operationState = writeHelper.initiateOperation(path,
103+
BulkOperationState.OperationType.Upload);
104+
}
105+
return operationState;
106+
}
107+
83108
@Override
84109
public CompletableFuture<UploadHandle> initialize(Path filePath)
85110
throws IOException {
@@ -142,13 +167,14 @@ public CompletableFuture<PathHandle> complete(Path filePath,
142167
totalLength += result.getLeft();
143168
eTags.add(new PartETag(handle.getKey(), result.getRight()));
144169
}
145-
AtomicInteger errorCount = new AtomicInteger(0);
170+
// retrieve/create operation state for scalability of completion.
171+
final BulkOperationState state = retrieveOperationState();
146172
long finalLen = totalLength;
147173
return context.submit(new CompletableFuture<>(),
148174
() -> {
149175
CompleteMultipartUploadResult result
150-
= writeHelper.completeMPUwithRetries(
151-
key, uploadIdStr, eTags, finalLen, errorCount);
176+
= writeHelper.commitUpload(
177+
key, uploadIdStr, eTags, finalLen, state);
152178

153179
byte[] eTag = result.getETag().getBytes(Charsets.UTF_8);
154180
return (PathHandle) () -> ByteBuffer.wrap(eTag);
@@ -170,7 +196,6 @@ public CompletableFuture<Void> abort(Path filePath, UploadHandle uploadId)
170196
});
171197
}
172198

173-
174199
/**
175200
* Build the payload for marshalling.
176201
* @param eTag upload etag

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/BulkOperationState.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,5 +102,9 @@ public enum OperationType {
102102
* Mkdir operation.
103103
*/
104104
Mkdir,
105+
/**
106+
* Multipart upload operation.
107+
*/
108+
Upload
105109
}
106110
}

0 commit comments

Comments
 (0)