Skip to content

HADOOP-19256. S3A: Support Conditional Writes #7594

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -710,4 +710,114 @@ private OpenFileOptions() {
public static final String FS_OPTION_OPENFILE_EC_POLICY =
FS_OPTION_OPENFILE + "ec.policy";
}

/**
* The standard {@code createFile()} options.
* <p>
* If an option is not supported during file creation and it is considered
* part of a commit protocol, then, when supplied in a must() option,
* it MUST be rejected.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface CreateFileOptionKeys {

/**
* {@code createFile()} option to write a file in the close() operation iff
* there is nothing at the destination.
* this is the equivalent of {@code create(path, overwrite=true)}
* <i>except that the existence check is postponed to the end of the write</i>.
* <p>
* Value {@value}.
* </p>
* <p>
* This can be set in the builder.
* </p>
* <ol>
* <li>It is for object stores stores which only upload/manifest files
* at the end of the stream write.</li>
* <li>Streams which support it SHALL not manifest any object to
* the destination path until close()</li>
* <li>It MUST be declared as a stream capability in streams for which
* this overwrite is enabled.</li>
* <li>It MUST be exported as a path capability for all stores where
* the feature is available <i>and</i> enabled</li>
* <li>If passed to a filesystem as a {@code must()} parameter where
* the option value is {@code true}, and it is supported/enabled,
* the FS SHALL omit all overwrite checks in {@code create},
* including for the existence of an object or a directory underneath.
* Instead, during {@code close()} the object will only be manifest
* at the target path if there is no object at the destination.
* </li>
* <li>The existence check and object creation SHALL be atomic.</li>
* <li>If passed to a filesystem as a {@code must()} parameter where
* the option value is {@code true}, and the FS does not recognise
* the feature, or it is recognized but disabled on this FS instance,
* the filesystem SHALL reject the request.
* </li>
* <li>If passed to a filesystem as a {@code opt()} parameter where
* the option value is {@code true}, the filesystem MAY ignore
* the request, or it MAY enable the feature.
* Any filesystem which does not support the feature, including
* from older releases, SHALL ignore it.
* </li>
* </ol>
*/
String FS_OPTION_CREATE_CONDITIONAL_OVERWRITE = "fs.option.create.conditional.overwrite";

/**
* Overwrite a file only if there is an Etag match. This option takes a string,
*
* Value {@value}.
* <p>
* This is similar to {@link #FS_OPTION_CREATE_CONDITIONAL_OVERWRITE}.
* <ol>
* <li>If supported and enabled, it SHALL be declared as a capability of the filesystem</li>
* <li>If supported and enabled, it SHALL be declared as a capability of the stream</li>
* <li>The string passed as the value SHALL be the etag value as returned by
* {@code EtagSource.getEtag()}</li>
* <li>This value MUST NOT be empty</li>
* <li>If passed to a filesystem which supports it, then when the file is created,
* the store SHALL check for the existence of a file/object at the destination
* path.
* </li>
* <li>If there is no object there, the operation SHALL be rejected by raising
* either a {@code org.apache.hadoop.fs.FileAlreadyExistsException}
* exception, or a{@code java.nio.file.FileAlreadyExistsException}
* </li>
* <li>If there is an object there, its Etag SHALL be compared to the
* value passed here.</li>
* <li>If there is no match, the operation SHALL be rejected by raising
* either a {@code org.apache.hadoop.fs.FileAlreadyExistsException}
* exception, or a{@code java.nio.file.FileAlreadyExistsException}
* </li>
* <li>If the etag does match, the file SHALL be created.</li>
* <li>The check and create SHALL be atomic</li>
* <li>The check and create MAY be at the end of the write, in {@code close()},
* or it MAY be in the {@code create()} operation. That is: some stores
* MAY perform the check early</li>
* <li>If supported and enabled, stores MAY check for the existence of subdirectories;
* this behavior is implementation-specific.</li>
* </ol>
*/
String FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG =
"fs.option.create.conditional.overwrite.etag";

/**
* A flag which requires the filesystem to create files/objects in close(),
* rather than create/createFile.
* <p>
* Object stores with this behavior should also export it as a path capability.
*
* Value {@value}.
*/
String FS_OPTION_CREATE_IN_CLOSE = "fs.option.create.in.close";

/**
* String to define the content filetype.
* Value {@value}.
*/
String FS_OPTION_CREATE_CONTENT_TYPE = "fs.option.create.content.type";

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,12 @@ public final class StoreStatisticNames {
public static final String MULTIPART_UPLOAD_LIST
= "multipart_upload_list";

public static final String CONDITIONAL_CREATE
= "conditional_create";

public static final String CONDITIONAL_CREATE_FAILED
= "conditional_create_failed";

private StoreStatisticNames() {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@
|-----------------------------|-----------|----------------------------------------|
| `fs.s3a.create.performance` | `boolean` | create a file with maximum performance |
| `fs.s3a.create.header` | `string` | prefix for user supplied headers |
| `fs.s3a.create.multipart` | `boolean` | create a multipart file |

### `fs.s3a.create.performance`

Expand All @@ -200,7 +201,8 @@
This:
1. Skips the `LIST` call which makes sure a file is being created over a directory.
Risk: a file is created over a directory.
2. Ignores the overwrite flag.
2. If the overwrite flag is false and filesystem flag`fs.s3a.create.conditional.enabled` is true,

Check failure on line 204 in hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdataoutputstreambuilder.md

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdataoutputstreambuilder.md#L204

blanks: end of line
uses conditional creation to prevent the overwrite of any object at the destination.
3. Never issues a `DELETE` call to delete parent directory markers.

It is possible to probe an S3A Filesystem instance for this capability through
Expand Down Expand Up @@ -243,3 +245,17 @@

It is possible to probe an S3A Filesystem instance for this capability through
the `hasPathCapability(path, "fs.s3a.create.header")` check.

### `fs.s3a.create.multipart` Create a multipart file

Initiate a multipart upload when a file is created, rather
than only when the amount of data buffered reaches the threshold
set in `fs.s3a.multipart.size`.

This is only relevant during testing, as it allows for multipart
operation to be initiated without writing any data, so
reducing test time.

It is not recommended for production use, because as well as adding
more network IO, it is not compatible with third-party stores which
do not supprt multipart uploads.
Original file line number Diff line number Diff line change
Expand Up @@ -1505,6 +1505,29 @@ private Constants() {
*/
public static final String FS_S3A_PERFORMANCE_FLAGS =
"fs.s3a.performance.flags";

/**
* Is the create overwrite feature enabled or not?
* A configuration option and a path status probe.
* Value {@value}.
*/
public static final String FS_S3A_CONDITIONAL_CREATE_ENABLED =
"fs.s3a.create.conditional.enabled";

/**
* Default value for {@link #FS_S3A_CONDITIONAL_CREATE_ENABLED}.
* Value {@value}.
*/
public static final boolean DEFAULT_FS_S3A_CONDITIONAL_CREATE_ENABLED = true;

/**
* createFile() boolean option toreate a multipart file, always: {@value}.
* <p>
* This is inefficient and will not work on a store which doesn't support that feature,
* so is primarily for testing.
*/
public static final String FS_S3A_CREATE_MULTIPART = "fs.s3a.create.multipart";

/**
* Prefix for adding a header to the object when created.
* The actual value must have a "." suffix and then the actual header.
Expand Down Expand Up @@ -1811,4 +1834,11 @@ private Constants() {
public static final String ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX =
"fs.s3a.analytics.accelerator";

/**
* Value for the {@code If-None-Match} HTTP header in S3 requests.
* Value: {@value}.
* More information: <a href="https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html">
* AWS S3 PutObject API Documentation</a>
*/
public static final String IF_NONE_MATCH_STAR = "*";
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand All @@ -52,6 +53,7 @@
import org.apache.hadoop.fs.s3a.impl.ProgressListener;
import org.apache.hadoop.fs.s3a.impl.ProgressListenerEvent;
import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
import org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags;
import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
Expand Down Expand Up @@ -224,6 +226,11 @@ class S3ABlockOutputStream extends OutputStream implements
/** Is multipart upload enabled? */
private final boolean isMultipartUploadEnabled;

/**
* Object write option flags.
*/
private final EnumSet<WriteObjectFlags> writeObjectFlags;

/**
* An S3A output stream which uploads partitions in a separate pool of
* threads; different {@link S3ADataBlocks.BlockFactory}
Expand All @@ -249,6 +256,7 @@ class S3ABlockOutputStream extends OutputStream implements
this.iostatistics = statistics.getIOStatistics();
this.writeOperationHelper = builder.writeOperations;
this.putTracker = builder.putTracker;
this.writeObjectFlags = builder.putOptions.getWriteObjectFlags();
this.executorService = MoreExecutors.listeningDecorator(
builder.executorService);
this.multiPartUpload = null;
Expand All @@ -266,9 +274,19 @@ class S3ABlockOutputStream extends OutputStream implements
? builder.blockSize
: -1;

// if required to be multipart by the committer put tracker or
// write flags (i.e createFile() options, initiate multipart uploads.
// this will fail fast if the store doesn't support multipart uploads
if (putTracker.initialize()) {
LOG.debug("Put tracker requests multipart upload");
initMultipartUpload();
} else if (writeObjectFlags.contains(WriteObjectFlags.CreateMultipart)) {
// this not merged simply to avoid confusion
// to what to do it both are set, so as to guarantee
// the put tracker initialization always takes priority
// over any file flag.
LOG.debug("Multipart initiated from createFile() options");
initMultipartUpload();
}
this.isCSEEnabled = builder.isCSEEnabled;
this.threadIOStatisticsAggregator = builder.ioStatisticsAggregator;
Expand Down Expand Up @@ -772,7 +790,8 @@ BlockOutputStreamStatistics getStatistics() {
@SuppressWarnings("deprecation")
@Override
public boolean hasCapability(String capability) {
switch (capability.toLowerCase(Locale.ENGLISH)) {
final String cap = capability.toLowerCase(Locale.ENGLISH);
switch (cap) {

// does the output stream have delayed visibility
case CommitConstants.STREAM_CAPABILITY_MAGIC_OUTPUT:
Expand All @@ -797,6 +816,12 @@ public boolean hasCapability(String capability) {
return true;

default:
// scan flags for the capability
for (WriteObjectFlags flag : writeObjectFlags) {
if (flag.hasKey(cap)) {
return true;
}
}
return false;
}
}
Expand Down
Loading