Skip to content

Commit 63c30eb

Browse files
committed
HADOOP-19256. filesystem options for overwrite, etag and small multipart files
createFile() options are passed down all the way to object creation * fs.option.create.conditional.overwrite: sets the header. MUST be in close(). After all, create(overwrite=false) is eager. * fs.option.create.conditional.overwrite.etag requests etag writes. MAY be in create(); may be in close(). * fs.option.create.content.type for mime type. * fs.option.create.in.close to ask for create in close, as fs capability. (maybe make this an fs path capability only?) * fs.s3a.create.multipart : allows tests to create small multipart files Javadocs of the fs.option.create try to define semantics; will need strict fs specification soon. Bool params come down in an enumset of flags; I'm going to do that in more code as it is more flexible over time than many booleans. - etag values are passed down but not wired up/tested - content type flag exists but is ignored New WriteObjectFlags enum is in new package o.a.h.fs.s3a.write; to match the streams package in another ongoing PR. Goal, as we maintain things, all code related to writing can go in here. also: options set by WriteObjectFlags return true in hasCapability() of the stream Change-Id: I301abd7397accbd278d05f42f858223ba1349fc8
1 parent 26811e6 commit 63c30eb

File tree

15 files changed

+317
-99
lines changed

15 files changed

+317
-99
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java

Lines changed: 74 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -723,34 +723,87 @@ private OpenFileOptions() {
723723
public interface CreateFileOptionKeys {
724724

725725
/**
726-
* {code createFile()} option to write a file iff there is nothing at the destination.
727-
* This may happen during create() or in the close.
726+
* {code createFile()} option to write a file in the close() operation iff
727+
* there is nothing at the destination.
728+
* this is the equivalent of {@code create(path, overwrite=true}
729+
* <i>except that the existence check is postponed to the end of the write</i>.
728730
* <p>
729-
* Explicitly set {@link #FS_OPTION_CREATE_IN_CLOSE} if you want to force the end of file
730-
* creation.
731-
*
732731
* Value {@value}.
733732
* <p>
734733
* This can be set in the builder.
735734
* <p>
736-
* It should be exported as a path capability for all stores where
737-
* the feature is available *and* enabled.
735+
* <ol>
736+
* <li>It is for object stores stores which only upload/manifest files
737+
* at the end of the stream write.</li>
738+
* <li>Streams which support it SHALL not manifest any object to
739+
* the destination path until close()</li>
740+
* <li>It MUST be declared as a stream capability in streams for which
741+
* this overwrite is enabled.</li>
742+
* <li>It MUST be exported as a path capability for all stores where
743+
* the feature is available <i>and</i> enabled</li>
744+
* <li>If passed to a filesystem as a {@code must()} parameter where
745+
* the option value is {@code true}, and it is supported/enabled,
746+
* the FS SHALL omit all overwrite checks in {@code create},
747+
* including for the existence of an object or a directory underneath.
748+
* Instead, during {close()} the object will only be manifest
749+
* at the target path if there is no object at the destination.
750+
* </li>
751+
* <li>The existence check and object creation SHALL be atomic.</li>
752+
* <li>If passed to a filesystem as a {@code must()} parameter where
753+
* the option value is {@code true}, and the FS does not recognise
754+
* the feature, or it is recognized but disabled on this FS instance,
755+
* the filesystem SHALL reject the request.
756+
* </li>
757+
* <li>If passed to a filesystem as a {@code opt()} parameter where
758+
* the option value is {@code true}, the filesystem MAY ignore
759+
* the request, or it MAY enable the feature.
760+
* Any filesystem which does not support the feature, including
761+
* from older releases, SHALL ignore it.
762+
* </ol>
763+
738764
*/
739765
String FS_OPTION_CREATE_CONDITIONAL_OVERWRITE = "fs.option.create.conditional.overwrite";
740766

741767
/**
742-
* Overwrite a file only if there is an Etag match. This option takes a string.
768+
* Overwrite a file only if there is an Etag match. This option takes a string,
769+
*
743770
* Value {@value}.
771+
* <p>
772+
* This is similar to {@link #FS_OPTION_CREATE_CONDITIONAL_OVERWRITE}.
773+
* <ol>
774+
* <li>If supported and enabled, it SHALL be declared as a capability of the filesystem</li>
775+
* <li>If supported and enabled, it SHALL be declared as a capability of the stream</li>
776+
* <li>The string passed as the value SHALL be the etag value as returned by
777+
* {@code EtagSource.getEtag()}</li>
778+
* <li>This value MUST NOT be empty</li>
779+
* <li>If passed to a filesystem which supports it, then when the file is created,
780+
* the store SHALL check for the existence of a file/object at the destination
781+
* path.
782+
* </li>
783+
* <li>If there is no object there, the operation SHALL be rejected by raising
784+
* either a {@code org.apache.hadoop.fs.FileAlreadyExistsException}
785+
* exception, or a{@code java.nio.file.FileAlreadyExistsException}
786+
* </li>
787+
* <li>If there is an object there, its Etag SHALL be compared to the
788+
* value passed here.<li>
789+
* <li>If there is no match, the operation SHALL be rejected by raising
790+
* either a {@code org.apache.hadoop.fs.FileAlreadyExistsException}
791+
* exception, or a{@code java.nio.file.FileAlreadyExistsException}
792+
* </li> *
793+
* <li>If the etag does match, the file SHALL be created.</li>
794+
* <li>The check and create SHALL be atomic</li>
795+
* <li>The check and create MAY be at the end of the write, in {@code close()},
796+
* or it MAY be in the {@code create()} operation. That is: some stores
797+
* MAY perform the check early</li>
798+
* <li>If supported and enabled, stores MAY check for the existence of subdirectories;
799+
* this behavior is implementation-specific.</li>
800+
* <li></li>
801+
* <li></li>
802+
* </ol>
744803
*/
745804
String FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG =
746805
"fs.option.create.conditional.overwrite.etag";
747806

748-
/**
749-
* String to define the content filetype.
750-
* Value {@value}.
751-
*/
752-
String FS_OPTION_CREATE_CONTENT_TYPE = "fs.option.create.content.type";
753-
754807
/**
755808
* A flag which requires the filesystem to create files/objects in close(),
756809
* rather than create/createFile.
@@ -760,5 +813,12 @@ public interface CreateFileOptionKeys {
760813
* Value {@value}.
761814
*/
762815
String FS_OPTION_CREATE_IN_CLOSE = "fs.option.create.in.close";
816+
817+
/**
818+
* String to define the content filetype.
819+
* Value {@value}.
820+
*/
821+
String FS_OPTION_CREATE_CONTENT_TYPE = "fs.option.create.content.type";
822+
763823
}
764824
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.time.Instant;
2727
import java.util.ArrayList;
2828
import java.util.Collections;
29+
import java.util.EnumSet;
2930
import java.util.List;
3031
import java.util.Locale;
3132
import java.util.Map;
@@ -52,6 +53,7 @@
5253
import org.apache.hadoop.fs.s3a.impl.ProgressListener;
5354
import org.apache.hadoop.fs.s3a.impl.ProgressListenerEvent;
5455
import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
56+
import org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags;
5557
import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
5658
import org.apache.hadoop.util.Preconditions;
5759
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
@@ -226,6 +228,11 @@ class S3ABlockOutputStream extends OutputStream implements
226228
/** Is multipart upload enabled? */
227229
private final boolean isMultipartUploadEnabled;
228230

231+
/**
232+
* Object write option flags.
233+
*/
234+
private final EnumSet<WriteObjectFlags> writeObjectFlags;
235+
229236
/**
230237
* An S3A output stream which uploads partitions in a separate pool of
231238
* threads; different {@link S3ADataBlocks.BlockFactory}
@@ -251,6 +258,7 @@ class S3ABlockOutputStream extends OutputStream implements
251258
this.iostatistics = statistics.getIOStatistics();
252259
this.writeOperationHelper = builder.writeOperations;
253260
this.putTracker = builder.putTracker;
261+
this.writeObjectFlags = builder.putOptions.getWriteObjectFlags();
254262
this.executorService = MoreExecutors.listeningDecorator(
255263
builder.executorService);
256264
this.multiPartUpload = null;
@@ -268,9 +276,19 @@ class S3ABlockOutputStream extends OutputStream implements
268276
? builder.blockSize
269277
: -1;
270278

279+
// if required to be multipart by the committer put tracker or
280+
// write flags (i.e createFile() options, initiate multipart uploads.
281+
// this will fail fast if the store doesn't support multipart uploads
271282
if (putTracker.initialize()) {
272283
LOG.debug("Put tracker requests multipart upload");
273284
initMultipartUpload();
285+
} else if (writeObjectFlags.contains(WriteObjectFlags.CreateMultipart)) {
286+
// this not merged simply to avoid confusion
287+
// to what to do it both are set, so as to guarantee
288+
// the put tracker initialization always takes priority
289+
// over any file flag.
290+
LOG.debug("Multipart initiated from createFile() options");
291+
initMultipartUpload();
274292
}
275293
this.isCSEEnabled = builder.isCSEEnabled;
276294
this.threadIOStatisticsAggregator = builder.ioStatisticsAggregator;
@@ -782,7 +800,8 @@ BlockOutputStreamStatistics getStatistics() {
782800
@SuppressWarnings("deprecation")
783801
@Override
784802
public boolean hasCapability(String capability) {
785-
switch (capability.toLowerCase(Locale.ENGLISH)) {
803+
final String cap = capability.toLowerCase(Locale.ENGLISH);
804+
switch (cap) {
786805

787806
// does the output stream have delayed visibility
788807
case CommitConstants.STREAM_CAPABILITY_MAGIC_OUTPUT:
@@ -807,6 +826,12 @@ public boolean hasCapability(String capability) {
807826
return true;
808827

809828
default:
829+
// scan flags for the capability
830+
for (WriteObjectFlags flag : writeObjectFlags) {
831+
if (flag.hasKey(cap)) {
832+
return true;
833+
}
834+
}
810835
return false;
811836
}
812837
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2177,7 +2177,10 @@ private FSDataOutputStream innerCreateFile(
21772177
// put options are derived from the option builder.
21782178
boolean conditionalCreate = options.isConditionalOverwrite();
21792179
final PutObjectOptions putOptions =
2180-
new PutObjectOptions(null, options.getHeaders(), conditionalCreate, null);
2180+
new PutObjectOptions(null,
2181+
options.getHeaders(),
2182+
options.writeObjectFlags(),
2183+
options.etag());
21812184

21822185

21832186
validateOutputStreamConfiguration(path, getConf());
@@ -3252,8 +3255,7 @@ private DeleteObjectsResponse deleteObjects(DeleteObjectsRequest deleteRequest)
32523255
public PutObjectRequest.Builder newPutObjectRequestBuilder(String key,
32533256
long length,
32543257
boolean isDirectoryMarker) {
3255-
return requestFactory.newPutObjectRequestBuilder(key, null, length, isDirectoryMarker,
3256-
PutObjectOptions.defaultOptions());
3258+
return requestFactory.newPutObjectRequestBuilder(key, PutObjectOptions.defaultOptions(), length, isDirectoryMarker);
32573259
}
32583260

32593261
/**

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ public PutObjectRequest createPutObjectRequest(String destKey,
245245
activateAuditSpan();
246246

247247
return getRequestFactory()
248-
.newPutObjectRequestBuilder(destKey, options, length, false, options)
248+
.newPutObjectRequestBuilder(destKey, options, length, false)
249249
.build();
250250
}
251251

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,14 +118,12 @@ CopyObjectRequest.Builder newCopyObjectRequestBuilder(String srcKey,
118118
* @param options options for the request
119119
* @param length length of object to be uploaded
120120
* @param isDirectoryMarker true if object to be uploaded is a directory marker
121-
* @param putOptions
122121
* @return the request builder
123122
*/
124123
PutObjectRequest.Builder newPutObjectRequestBuilder(String key,
125124
PutObjectOptions options,
126125
long length,
127-
boolean isDirectoryMarker,
128-
PutObjectOptions putOptions);
126+
boolean isDirectoryMarker);
129127

130128
/**
131129
* Create a {@link PutObjectRequest} request for creating

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.hadoop.fs.s3a.commit.magic;
2020

2121
import java.io.IOException;
22+
import java.util.EnumSet;
2223
import java.util.HashMap;
2324
import java.util.List;
2425
import java.util.Map;
@@ -33,13 +34,15 @@
3334
import org.apache.hadoop.fs.s3a.WriteOperationHelper;
3435
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
3536
import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
37+
import org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags;
3638
import org.apache.hadoop.fs.s3a.statistics.PutTrackerStatistics;
3739
import org.apache.hadoop.fs.statistics.IOStatistics;
3840
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
3941
import org.apache.hadoop.util.Preconditions;
4042

4143
import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_MARKER_PUT;
4244
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.X_HEADER_MAGIC_MARKER;
45+
import static org.apache.hadoop.fs.s3a.impl.PutObjectOptions.defaultOptions;
4346
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation;
4447

4548
/**
@@ -79,7 +82,10 @@ public boolean aboutToComplete(String uploadId,
7982
PutObjectRequest originalDestPut = getWriter().createPutObjectRequest(
8083
getOriginalDestKey(),
8184
0,
82-
new PutObjectOptions(null, headers, false, null));
85+
new PutObjectOptions(null,
86+
headers,
87+
EnumSet.noneOf(WriteObjectFlags.class),
88+
""));
8389
upload(originalDestPut, EMPTY);
8490

8591
// build the commit summary
@@ -103,7 +109,8 @@ public boolean aboutToComplete(String uploadId,
103109
getPath(), getPendingPartKey(), commitData);
104110
PutObjectRequest put = getWriter().createPutObjectRequest(
105111
getPendingPartKey(),
106-
bytes.length, null);
112+
bytes.length,
113+
defaultOptions());
107114
upload(put, bytes);
108115
return false;
109116
}
@@ -117,7 +124,7 @@ public boolean aboutToComplete(String uploadId,
117124
@Retries.RetryTranslated
118125
private void upload(PutObjectRequest request, byte[] bytes) throws IOException {
119126
trackDurationOfInvocation(getTrackerStatistics(), COMMITTER_MAGIC_MARKER_PUT.getSymbol(),
120-
() -> getWriter().putObject(request, PutObjectOptions.defaultOptions(),
127+
() -> getWriter().putObject(request, defaultOptions(),
121128
new S3ADataBlocks.BlockUploadData(bytes, null), null));
122129
}
123130
}

0 commit comments

Comments
 (0)