Skip to content

Commit 40a8b87

Browse files
committed
HDFS-13934. Multipart uploaders to be created through FileSystem/FileContext.
This is my proposal for an API for multipart uploads through the FS, rather than via service loading. It builds on the work done in createFile() and openFile() for builders and futures. * a builder API for configuring the uploader, including specifying the permissions of uploaded files. * MultipartUploader is now interface; org.apache.hadoop.fs.impl.AbstractMultipartUploader is the base class for implementations to extend. * all the operations in the interface now return FutureCompletable<>; implementations can perform the operations synchronously or asynchronously. * there is a new common path capability for the hasPathCapabilities() probe. FileSystemMultipartUploader and its builder moved to fs.impl; I needed a class org.apache.hadoop.fs.InternalOperations to get at rename/3 from the new package. It really is time to make the good rename method public. S3AMultipartUploader moved to fs.s3a.impl, alongside its builder. The uploader does not get a reference to the base FS; it gets a StoreContext and WriteOperationHelper and has to work with them. All operations and now async and executed in the executor offered by the StoreContext. Tests are all fixed up to cope with the changed API. The builders can be created from the file system; we can also extend file context and the filter classes... Though I would not pass the operation through checksum FS as the uploads will not be checksummed. One thing I am unsure about is viewFS: even though we can do with the remapping when the builder is created, the actual uploads need the full path in the store. That is unless, this and the WiP copy operation both take a path remapper in the builder which is then used to map from View FS paths to ones in their store. Suggestions here are welcome. No changes into the specification; yet. Let's get feedback first. Change-Id: Ib526fe6db8e9282c634181be231d4124ea6c78fc
1 parent 5157118 commit 40a8b87

File tree

23 files changed

+818
-329
lines changed

23 files changed

+818
-329
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonPathCapabilities.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,4 +131,12 @@ private CommonPathCapabilities() {
131131
@InterfaceStability.Unstable
132132
public static final String FS_EXPERIMENTAL_BATCH_LISTING =
133133
"fs.capability.batch.listing";
134+
135+
/**
136+
* Does the store support multipart uploading?
137+
* Value: {@value}.
138+
*/
139+
public static final String FS_MULTIPART_UPLOADER =
140+
"fs.capability.multipart.uploader";
141+
134142
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4644,4 +4644,17 @@ public CompletableFuture<FSDataInputStream> build() throws IOException {
46444644

46454645
}
46464646

4647+
/**
4648+
* Create a multipart uploader.
4649+
* @param basePath file path under which all files are uploaded
4650+
* @return a MultipartUploaderBuilder object to build the uploader
4651+
* @throws IOException if some early checks cause IO failures.
4652+
* @throws UnsupportedOperationException if support is checked early.
4653+
*/
4654+
@InterfaceStability.Unstable
4655+
public MultipartUploaderBuilder createMultipartUploader(Path basePath)
4656+
throws IOException {
4657+
methodNotSupported();
4658+
return null;
4659+
}
46474660
}
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/**
1+
/*
22
* Licensed to the Apache Software Foundation (ASF) under one
33
* or more contributor license agreements. See the NOTICE file
44
* distributed with this work for additional information
@@ -15,26 +15,26 @@
1515
* See the License for the specific language governing permissions and
1616
* limitations under the License.
1717
*/
18-
package org.apache.hadoop.hdfs;
1918

20-
import org.apache.hadoop.conf.Configuration;
21-
import org.apache.hadoop.fs.FileSystem;
22-
import org.apache.hadoop.fs.FileSystemMultipartUploader;
23-
import org.apache.hadoop.fs.MultipartUploader;
24-
import org.apache.hadoop.fs.MultipartUploaderFactory;
25-
import org.apache.hadoop.fs.Path;
26-
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
19+
package org.apache.hadoop.fs;
20+
21+
import java.io.IOException;
22+
23+
import org.apache.hadoop.classification.InterfaceAudience;
24+
2725

2826
/**
29-
* Support for HDFS multipart uploads, built on
30-
* {@link FileSystem#concat(Path, Path[])}.
27+
* This method allows access to Package-scoped operations from classes
28+
* in org.apache.hadoop.fs.impl and other file system implementations
29+
* in the hadoop modules.
30+
* This is absolutely not for used by any other application or library.
3131
*/
32-
public class DFSMultipartUploaderFactory extends MultipartUploaderFactory {
33-
protected MultipartUploader createMultipartUploader(FileSystem fs,
34-
Configuration conf) {
35-
if (fs.getScheme().equals(HdfsConstants.HDFS_URI_SCHEME)) {
36-
return new FileSystemMultipartUploader(fs);
37-
}
38-
return null;
32+
@InterfaceAudience.Private
33+
public class InternalOperations {
34+
35+
@SuppressWarnings("deprecation") // rename w/ OVERWRITE
36+
public void rename(FileSystem fs, final Path src, final Path dst,
37+
final Options.Rename...options) throws IOException {
38+
fs.rename(src, dst, options);
3939
}
4040
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java

Lines changed: 11 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.io.IOException;
2222
import java.io.InputStream;
2323
import java.util.Map;
24+
import java.util.concurrent.CompletableFuture;
2425

2526
import org.slf4j.Logger;
2627
import org.slf4j.LoggerFactory;
@@ -40,28 +41,19 @@
4041
* FS.</li>
4142
* </ol>
4243
*/
43-
@InterfaceAudience.Private
44+
@InterfaceAudience.Public
4445
@InterfaceStability.Unstable
45-
public abstract class MultipartUploader implements Closeable {
46-
public static final Logger LOG =
47-
LoggerFactory.getLogger(MultipartUploader.class);
46+
public interface MultipartUploader extends Closeable {
4847

49-
/**
50-
* Perform any cleanup.
51-
* The upload is not required to support any operations after this.
52-
* @throws IOException problems on close.
53-
*/
54-
@Override
55-
public void close() throws IOException {
56-
}
5748

5849
/**
5950
* Initialize a multipart upload.
6051
* @param filePath Target path for upload.
6152
* @return unique identifier associating part uploads.
6253
* @throws IOException IO failure
6354
*/
64-
public abstract UploadHandle initialize(Path filePath) throws IOException;
55+
CompletableFuture<UploadHandle> initialize(Path filePath)
56+
throws IOException;
6557

6658
/**
6759
* Put part as part of a multipart upload.
@@ -75,7 +67,8 @@ public void close() throws IOException {
7567
* @return unique PartHandle identifier for the uploaded part.
7668
* @throws IOException IO failure
7769
*/
78-
public abstract PartHandle putPart(Path filePath, InputStream inputStream,
70+
CompletableFuture<PartHandle> putPart(Path filePath,
71+
InputStream inputStream,
7972
int partNumber, UploadHandle uploadId, long lengthInBytes)
8073
throws IOException;
8174

@@ -88,7 +81,7 @@ public abstract PartHandle putPart(Path filePath, InputStream inputStream,
8881
* @return unique PathHandle identifier for the uploaded file.
8982
* @throws IOException IO failure
9083
*/
91-
public abstract PathHandle complete(Path filePath,
84+
CompletableFuture<PathHandle> complete(Path filePath,
9285
Map<Integer, PartHandle> handles,
9386
UploadHandle multipartUploadId)
9487
throws IOException;
@@ -98,57 +91,10 @@ public abstract PathHandle complete(Path filePath,
9891
* @param filePath Target path for upload (same as {@link #initialize(Path)}.
9992
* @param multipartUploadId Identifier from {@link #initialize(Path)}.
10093
* @throws IOException IO failure
94+
* @return
10195
*/
102-
public abstract void abort(Path filePath, UploadHandle multipartUploadId)
96+
CompletableFuture<Void> abort(Path filePath,
97+
UploadHandle multipartUploadId)
10398
throws IOException;
10499

105-
/**
106-
* Utility method to validate uploadIDs.
107-
* @param uploadId Upload ID
108-
* @throws IllegalArgumentException invalid ID
109-
*/
110-
protected void checkUploadId(byte[] uploadId)
111-
throws IllegalArgumentException {
112-
checkArgument(uploadId != null, "null uploadId");
113-
checkArgument(uploadId.length > 0,
114-
"Empty UploadId is not valid");
115-
}
116-
117-
/**
118-
* Utility method to validate partHandles.
119-
* @param partHandles handles
120-
* @throws IllegalArgumentException if the parts are invalid
121-
*/
122-
protected void checkPartHandles(Map<Integer, PartHandle> partHandles) {
123-
checkArgument(!partHandles.isEmpty(),
124-
"Empty upload");
125-
partHandles.keySet()
126-
.stream()
127-
.forEach(key ->
128-
checkArgument(key > 0,
129-
"Invalid part handle index %s", key));
130-
}
131-
132-
/**
133-
* Check all the arguments to the
134-
* {@link #putPart(Path, InputStream, int, UploadHandle, long)} operation.
135-
* @param filePath Target path for upload (same as {@link #initialize(Path)}).
136-
* @param inputStream Data for this part. Implementations MUST close this
137-
* stream after reading in the data.
138-
* @param partNumber Index of the part relative to others.
139-
* @param uploadId Identifier from {@link #initialize(Path)}.
140-
* @param lengthInBytes Target length to read from the stream.
141-
* @throws IllegalArgumentException invalid argument
142-
*/
143-
protected void checkPutArguments(Path filePath,
144-
InputStream inputStream,
145-
int partNumber,
146-
UploadHandle uploadId,
147-
long lengthInBytes) throws IllegalArgumentException {
148-
checkArgument(filePath != null, "null filePath");
149-
checkArgument(inputStream != null, "null inputStream");
150-
checkArgument(partNumber > 0, "Invalid part number: %d", partNumber);
151-
checkArgument(uploadId != null, "null uploadId");
152-
checkArgument(lengthInBytes >= 0, "Invalid part length: %d", lengthInBytes);
153-
}
154100
}

0 commit comments

Comments
 (0)