Skip to content

Commit b025580

Browse files
committed
HDFS-13934 Multipart Uploader APIa
This is ready to g in pending review and the final builder spec. Specification: * update the docs to all but the builder * builder has interface/impl split; extends existing builder implementation * FileContext support * add an optional abortUploadsUnderPath(); invaluable for S3A. Big rework of S3A implementation * has new S3AMultipartUploaderStatistics interface (implementation from Instrumentation) * factor WriteOperations interface out of WriteOperationsHelper and use that * Found and fixed DDB s3guard bug which would fail with >1 file write to the same test. Doesn't show up in existing workflows, but you can upload two files to the same test, and then it surfaces. Tests * more! Change-Id: I1b4aefef51141a53d755e96628d68cebf285f06c
1 parent 89d35a1 commit b025580

File tree

23 files changed

+1369
-471
lines changed

23 files changed

+1369
-471
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,11 +79,11 @@ public abstract class AbstractFileSystem implements PathCapabilities {
7979

8080
/** Recording statistics per a file system class. */
8181
private static final Map<URI, Statistics>
82-
STATISTICS_TABLE = new HashMap<URI, Statistics>();
82+
STATISTICS_TABLE = new HashMap<>();
8383

8484
/** Cache of constructors for each file system class. */
8585
private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE =
86-
new ConcurrentHashMap<Class<?>, Constructor<?>>();
86+
new ConcurrentHashMap<>();
8787

8888
private static final Class<?>[] URI_CONFIG_ARGS =
8989
new Class[]{URI.class, Configuration.class};
@@ -1383,4 +1383,34 @@ public boolean hasPathCapability(final Path path,
13831383
return false;
13841384
}
13851385
}
1386+
1387+
/**
1388+
* Create a multipart uploader.
1389+
* @param basePath file path under which all files are uploaded
1390+
* @return a MultipartUploaderBuilder object to build the uploader
1391+
* @throws IOException if some early checks cause IO failures.
1392+
* @throws UnsupportedOperationException if support is checked early.
1393+
*/
1394+
@InterfaceStability.Unstable
1395+
public MultipartUploaderBuilder createMultipartUploader(Path basePath)
1396+
throws IOException {
1397+
methodNotSupported();
1398+
return null;
1399+
}
1400+
1401+
/**
1402+
* Helper method that throws an {@link UnsupportedOperationException} for the
1403+
* current {@link FileSystem} method being called.
1404+
*/
1405+
protected void methodNotSupported() {
1406+
// The order of the stacktrace elements is (from top to bottom):
1407+
// - java.lang.Thread.getStackTrace
1408+
// - org.apache.hadoop.fs.FileSystem.methodNotSupported
1409+
// - <the FileSystem method>
1410+
// therefore, to find out the current method name, we use the element at
1411+
// index 2.
1412+
String name = Thread.currentThread().getStackTrace()[2].getMethodName();
1413+
throw new UnsupportedOperationException(getClass().getCanonicalName() +
1414+
" does not support method " + name);
1415+
}
13861416
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2957,4 +2957,30 @@ public boolean hasPathCapability(Path path, String capability)
29572957
(fs, p) -> fs.hasPathCapability(p, capability));
29582958
}
29592959

2960+
/**
2961+
* Return a set of server default configuration values based on path.
2962+
* @param path path to fetch server defaults
2963+
* @return server default configuration values for path
2964+
* @throws IOException an I/O error occurred
2965+
*/
2966+
public FsServerDefaults getServerDefaults(final Path path) throws IOException {
2967+
return FsLinkResolution.resolve(this,
2968+
fixRelativePart(path),
2969+
(fs, p) -> fs.getServerDefaults(p));
2970+
}
2971+
2972+
/**
2973+
* Create a multipart uploader.
2974+
* @param basePath file path under which all files are uploaded
2975+
* @return a MultipartUploaderBuilder object to build the uploader
2976+
* @throws IOException if some early checks cause IO failures.
2977+
* @throws UnsupportedOperationException if support is checked early.
2978+
*/
2979+
@InterfaceStability.Unstable
2980+
public MultipartUploaderBuilder createMultipartUploader(Path basePath)
2981+
throws IOException {
2982+
return FsLinkResolution.resolve(this,
2983+
fixRelativePart(basePath),
2984+
(fs, p) -> fs.createMultipartUploader(p));
2985+
}
29602986
}
Lines changed: 34 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/**
1+
/*
22
* Licensed to the Apache Software Foundation (ASF) under one
33
* or more contributor license agreements. See the NOTICE file
44
* distributed with this work for additional information
@@ -15,6 +15,7 @@
1515
* See the License for the specific language governing permissions and
1616
* limitations under the License.
1717
*/
18+
1819
package org.apache.hadoop.fs;
1920

2021
import java.io.Closeable;
@@ -23,23 +24,12 @@
2324
import java.util.Map;
2425
import java.util.concurrent.CompletableFuture;
2526

26-
import org.slf4j.Logger;
27-
import org.slf4j.LoggerFactory;
28-
2927
import org.apache.hadoop.classification.InterfaceAudience;
3028
import org.apache.hadoop.classification.InterfaceStability;
3129

32-
import static com.google.common.base.Preconditions.checkArgument;
33-
3430
/**
3531
* MultipartUploader is an interface for copying files multipart and across
36-
* multiple nodes. Users should:
37-
* <ol>
38-
* <li>Initialize an upload.</li>
39-
* <li>Upload parts in any order.</li>
40-
* <li>Complete the upload in order to have it materialize in the destination
41-
* FS.</li>
42-
* </ol>
32+
* multiple nodes.
4333
*/
4434
@InterfaceAudience.Public
4535
@InterfaceStability.Unstable
@@ -52,49 +42,63 @@ public interface MultipartUploader extends Closeable {
5242
* @return unique identifier associating part uploads.
5343
* @throws IOException IO failure
5444
*/
55-
CompletableFuture<UploadHandle> initialize(Path filePath)
45+
CompletableFuture<UploadHandle> startUpload(Path filePath)
5646
throws IOException;
5747

5848
/**
5949
* Put part as part of a multipart upload.
6050
* It is possible to have parts uploaded in any order (or in parallel).
61-
* @param filePath Target path for upload (same as {@link #initialize(Path)}).
51+
* @param uploadId Identifier from {@link #startUpload(Path)}.
52+
* @param partNumber Index of the part relative to others.
53+
* @param filePath Target path for upload (same as {@link #startUpload(Path)}).
6254
* @param inputStream Data for this part. Implementations MUST close this
6355
* stream after reading in the data.
64-
* @param partNumber Index of the part relative to others.
65-
* @param uploadId Identifier from {@link #initialize(Path)}.
6656
* @param lengthInBytes Target length to read from the stream.
6757
* @return unique PartHandle identifier for the uploaded part.
6858
* @throws IOException IO failure
6959
*/
70-
CompletableFuture<PartHandle> putPart(Path filePath,
60+
CompletableFuture<PartHandle> putPart(
61+
UploadHandle uploadId,
62+
int partNumber,
63+
Path filePath,
7164
InputStream inputStream,
72-
int partNumber, UploadHandle uploadId, long lengthInBytes)
65+
long lengthInBytes)
7366
throws IOException;
7467

7568
/**
7669
* Complete a multipart upload.
77-
* @param filePath Target path for upload (same as {@link #initialize(Path)}.
70+
* @param uploadId Identifier from {@link #startUpload(Path)}.
71+
* @param filePath Target path for upload (same as {@link #startUpload(Path)}.
7872
* @param handles non-empty map of part number to part handle.
79-
* from {@link #putPart(Path, InputStream, int, UploadHandle, long)}.
80-
* @param multipartUploadId Identifier from {@link #initialize(Path)}.
73+
* from {@link #putPart(UploadHandle, int, Path, InputStream, long)}.
8174
* @return unique PathHandle identifier for the uploaded file.
8275
* @throws IOException IO failure
8376
*/
84-
CompletableFuture<PathHandle> complete(Path filePath,
85-
Map<Integer, PartHandle> handles,
86-
UploadHandle multipartUploadId)
77+
CompletableFuture<PathHandle> complete(
78+
UploadHandle uploadId,
79+
Path filePath,
80+
Map<Integer, PartHandle> handles)
8781
throws IOException;
8882

8983
/**
9084
* Aborts a multipart upload.
91-
* @param filePath Target path for upload (same as {@link #initialize(Path)}.
92-
* @param multipartUploadId Identifier from {@link #initialize(Path)}.
85+
* @param uploadId Identifier from {@link #startUpload(Path)}.
86+
* @param filePath Target path for upload (same as {@link #startUpload(Path)}.
9387
* @throws IOException IO failure
94-
* @return
88+
* @return a future; the operation will have completed
9589
*/
96-
CompletableFuture<Void> abort(Path filePath,
97-
UploadHandle multipartUploadId)
90+
CompletableFuture<Void> abort(UploadHandle uploadId, Path filePath)
9891
throws IOException;
9992

93+
/**
94+
* Best effort attempt to aborts multipart uploads under a path.
95+
* Not all implementations support this, and those which do may
96+
* be vulnerable to eventually consistent listings of current uploads
97+
* -some may be missed.
98+
* @param path path to abort uploads under.
99+
* @return a future of the number of entries found; 1 if aborting is unsupported.
100+
* @throws IOException IO failure
101+
*/
102+
CompletableFuture<Integer> abortUploadsUnderPath(Path path) throws IOException;
103+
100104
}

0 commit comments

Comments
 (0)