Skip to content

Commit 1d37e6a

Browse files
committed
HDFS-13934 final tuning of multipart API
especially patch qualification. I had to add a new operation to the S3A StoreContext class; took the opportunity to move to a builder API. Why so -even though I'd argue before we didn't need it? Well, because S3A DelegationTokens plugin point now takes it, so must their tests. also tagged as @limitedPrivate for the same reason. Thinking maybe I should separate that change -and avoid having a guava class in the fields ...won't work with shading, will it? Change-Id: I19e8f5219d6fe51002c97694af8001453d8eb6b0
1 parent b025580 commit 1d37e6a

File tree

10 files changed

+262
-58
lines changed

10 files changed

+262
-58
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractMultipartUploader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ protected void checkPutArguments(Path filePath,
131131
* be vulnerable to eventually consistent listings of current uploads
132132
* -some may be missed.
133133
* @param path path to abort uploads under.
134-
* @return a future of the number of entries found; 1 if aborting is unsupported.
134+
* @return a future of the number of entries found; -1 if aborting is unsupported.
135135
* @throws IOException IO failure
136136
*/
137137
public CompletableFuture<Integer> abortUploadsUnderPath(Path path)

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileSystemMultipartUploader.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -179,13 +179,20 @@ public CompletableFuture<PathHandle> complete(
179179

180180
checkPath(filePath);
181181
return FutureIOSupport.eval(() ->
182-
innerComplete(filePath, handleMap, uploadId));
182+
innerComplete(uploadId, filePath, handleMap));
183183
}
184184

185-
public PathHandle innerComplete(
186-
Path filePath,
187-
Map<Integer, PartHandle> handleMap,
188-
UploadHandle multipartUploadId) throws IOException {
185+
/**
186+
* The upload complete operation.
187+
* @param multipartUploadId the ID of the upload
188+
* @param filePath path
189+
* @param handleMap map of handles
190+
* @return the path handle
191+
* @throws IOException failure
192+
*/
193+
private PathHandle innerComplete(
194+
UploadHandle multipartUploadId, Path filePath,
195+
Map<Integer, PartHandle> handleMap) throws IOException {
189196

190197
checkPath(filePath);
191198

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/MultipartUploaderBuilderImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ protected MultipartUploaderBuilderImpl(@Nonnull FileContext fc,
9191
*/
9292
protected MultipartUploaderBuilderImpl(@Nonnull FileSystem fileSystem,
9393
@Nonnull Path p) {
94-
super(checkNotNull(p));
94+
super(fileSystem.makeQualified(checkNotNull(p)));
9595
checkNotNull(fileSystem);
9696
fs = fileSystem;
9797
bufferSize = fs.getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@
112112
import org.apache.hadoop.fs.s3a.impl.S3AMultipartUploaderBuilder;
113113
import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
114114
import org.apache.hadoop.fs.s3a.impl.StoreContext;
115+
import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder;
115116
import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
116117
import org.apache.hadoop.fs.s3a.select.InternalSelectConstants;
117118
import org.apache.hadoop.io.IOUtils;
@@ -4726,24 +4727,24 @@ public MultipartUploaderBuilder createMultipartUploader(final Path basePath)
47264727
*/
47274728
@InterfaceAudience.Private
47284729
public StoreContext createStoreContext() {
4729-
return new StoreContext(
4730-
getUri(),
4731-
getBucket(),
4732-
getConf(),
4733-
getUsername(),
4734-
owner,
4735-
boundedThreadPool,
4736-
executorCapacity,
4737-
invoker,
4738-
getInstrumentation(),
4739-
getStorageStatistics(),
4740-
getInputPolicy(),
4741-
changeDetectionPolicy,
4742-
enableMultiObjectsDelete,
4743-
metadataStore,
4744-
useListV1,
4745-
new ContextAccessorsImpl(),
4746-
getTtlTimeProvider());
4730+
return new StoreContextBuilder().setFsURI(getUri())
4731+
.setBucket(getBucket())
4732+
.setConfiguration(getConf())
4733+
.setUsername(getUsername())
4734+
.setOwner(owner)
4735+
.setExecutor(boundedThreadPool)
4736+
.setExecutorCapacity(executorCapacity)
4737+
.setInvoker(invoker)
4738+
.setInstrumentation(getInstrumentation())
4739+
.setStorageStatistics(getStorageStatistics())
4740+
.setInputPolicy(getInputPolicy())
4741+
.setChangeDetectionPolicy(changeDetectionPolicy)
4742+
.setMultiObjectDeleteEnabled(enableMultiObjectsDelete)
4743+
.setMetadataStore(metadataStore)
4744+
.setUseListV1(useListV1)
4745+
.setContextAccessors(new ContextAccessorsImpl())
4746+
.setTimeProvider(getTtlTimeProvider())
4747+
.build();
47474748
}
47484749

47494750
/**
@@ -4771,5 +4772,10 @@ public File createTempFile(final String prefix, final long size)
47714772
public String getBucketLocation() throws IOException {
47724773
return S3AFileSystem.this.getBucketLocation();
47734774
}
4775+
4776+
@Override
4777+
public Path makeQualified(final Path path) {
4778+
return S3AFileSystem.this.makeQualified(path);
4779+
}
47744780
}
47754781
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ContextAccessors.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,4 +73,12 @@ public interface ContextAccessors {
7373
*/
7474
@Retries.RetryTranslated
7575
String getBucketLocation() throws IOException;
76+
77+
/**
78+
* Qualify a path.
79+
*
80+
* @param path path to qualify/normalize
81+
* @return possibly new path.
82+
*/
83+
Path makeQualified(Path path);
7684
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AMultipartUploader.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ class S3AMultipartUploader extends AbstractMultipartUploader {
9898
final WriteOperations writeOperations,
9999
final StoreContext context,
100100
final S3AMultipartUploaderStatistics statistics) {
101-
super(builder.getPath());
101+
super(context.makeQualified(builder.getPath()));
102102
this.builder = builder;
103103
this.writeOperations = writeOperations;
104104
this.context = context;
@@ -132,8 +132,9 @@ private synchronized BulkOperationState retrieveOperationState()
132132
@Override
133133
public CompletableFuture<UploadHandle> startUpload(Path filePath)
134134
throws IOException {
135-
checkPath(filePath);
136-
String key = context.pathToKey(filePath);
135+
Path dest = context.makeQualified(filePath);
136+
checkPath(dest);
137+
String key = context.pathToKey(dest);
137138
return context.submit(new CompletableFuture<>(),
138139
() -> {
139140
String uploadId = writeOperations.initiateMultiPartUpload(key);
@@ -150,11 +151,12 @@ public CompletableFuture<PartHandle> putPart(UploadHandle uploadId,
150151
InputStream inputStream,
151152
long lengthInBytes)
152153
throws IOException {
153-
checkPutArguments(filePath, inputStream, partNumber, uploadId,
154+
Path dest = context.makeQualified(filePath);
155+
checkPutArguments(dest, inputStream, partNumber, uploadId,
154156
lengthInBytes);
155157
byte[] uploadIdBytes = uploadId.toByteArray();
156158
checkUploadId(uploadIdBytes);
157-
String key = context.pathToKey(filePath);
159+
String key = context.pathToKey(dest);
158160
String uploadIdString = new String(uploadIdBytes, 0, uploadIdBytes.length,
159161
Charsets.UTF_8);
160162
return context.submit(new CompletableFuture<>(),
@@ -180,15 +182,16 @@ public CompletableFuture<PathHandle> complete(
180182
Path filePath,
181183
Map<Integer, PartHandle> handleMap)
182184
throws IOException {
183-
checkPath(filePath);
185+
Path dest = context.makeQualified(filePath);
186+
checkPath(dest);
184187
byte[] uploadIdBytes = uploadHandle.toByteArray();
185188
checkUploadId(uploadIdBytes);
186189
checkPartHandles(handleMap);
187190
List<Map.Entry<Integer, PartHandle>> handles =
188191
new ArrayList<>(handleMap.entrySet());
189192
handles.sort(Comparator.comparingInt(Map.Entry::getKey));
190193
int count = handles.size();
191-
String key = context.pathToKey(filePath);
194+
String key = context.pathToKey(dest);
192195

193196
String uploadIdStr = new String(uploadIdBytes, 0, uploadIdBytes.length,
194197
Charsets.UTF_8);
@@ -233,15 +236,16 @@ public CompletableFuture<Void> abort(
233236
UploadHandle uploadId,
234237
Path filePath)
235238
throws IOException {
236-
checkPath(filePath);
239+
Path dest = context.makeQualified(filePath);
240+
checkPath(dest);
237241
final byte[] uploadIdBytes = uploadId.toByteArray();
238242
checkUploadId(uploadIdBytes);
239243
String uploadIdString = new String(uploadIdBytes, 0, uploadIdBytes.length,
240244
Charsets.UTF_8);
241245
return context.submit(new CompletableFuture<>(),
242246
() -> {
243247
writeOperations.abortMultipartCommit(
244-
context.pathToKey(filePath),
248+
context.pathToKey(dest),
245249
uploadIdString);
246250
statistics.uploadAborted();
247251
return null;

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AMultipartUploaderBuilder.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,4 +62,5 @@ public S3AMultipartUploader build()
6262
return new S3AMultipartUploader(this, writeOperations, context, statistics);
6363
}
6464

65+
6566
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContext.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,10 @@
5252
* their own.
5353
*
5454
* <i>Warning:</i> this really is private and unstable. Do not use
55-
* outside the org.apache.hadoop.fs.s3a package.
55+
* outside the org.apache.hadoop.fs.s3a package, or in extension points
56+
* such as DelegationTokens.
5657
*/
57-
@InterfaceAudience.Private
58+
@InterfaceAudience.LimitedPrivate("S3A Filesystem and extensions")
5859
@InterfaceStability.Unstable
5960
public class StoreContext {
6061

@@ -117,8 +118,7 @@ public class StoreContext {
117118

118119
/**
119120
* Instantiate.
120-
* No attempt to use a builder here as outside tests
121-
* this should only be created in the S3AFileSystem.
121+
* @deprecated as public method: use {@link StoreContextBuilder}.
122122
*/
123123
public StoreContext(
124124
final URI fsURI,
@@ -229,6 +229,16 @@ public String pathToKey(Path path) {
229229
return contextAccessors.pathToKey(path);
230230
}
231231

232+
/**
233+
* Qualify a path.
234+
*
235+
* @param path path to qualify/normalize
236+
* @return possibly new path.
237+
*/
238+
public Path makeQualified(Path path) {
239+
return contextAccessors.makeQualified(path);
240+
}
241+
232242
/**
233243
* Get the storage statistics of this filesystem.
234244
* @return the storage statistics

0 commit comments

Comments
 (0)