Skip to content

HADOOP-19256. steve's pr of conditional writes #7362

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 33 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
fd57fac
HADOOP-19256. S3A: Upgrade aws sdk v2 to 2.27.14
Aug 29, 2024
20e45fd
Integrate PutIfNotExist functionality into S3A
Aug 23, 2024
c46cd62
addressing PR feedback
Sep 27, 2024
c559e4d
addressing review comments, added flag to PutObjectOptions
Oct 10, 2024
aaeb09e
reverting to using optionheaders
Oct 15, 2024
e82ba25
reverting to using optionheaders
Oct 15, 2024
07969e3
redundant set for multipart_size
Oct 15, 2024
bb5267c
redundant set for multipart_size
Oct 15, 2024
64e7365
fixing merge conflict
Jan 16, 2025
0bc4efa
Fixing tests: testPutIfAbsentConflict, testPutIfAbsentLargeFileConflict
Jan 27, 2025
2280cbe
Changes based on review comments
Feb 6, 2025
f5335aa
HADOOP-19256. review of conditional write
steveloughran Feb 6, 2025
789f4d4
HADOOP-19256: add tests
Feb 7, 2025
acb8833
HADOOP-19256. filesystem options for overwrite, etag and small multip…
steveloughran Feb 10, 2025
fd14698
HADOOP-19256: Use forced multipart in tests, add If-Match header in r…
Feb 13, 2025
9a8b727
HADOOP-19256: Add tests for If-Match
Feb 18, 2025
b724d91
HADOOP-19256: Fix javadoc
Feb 19, 2025
7d3cd93
HADOOP-19256: Add statistics for conditional writes
Feb 25, 2025
2ddf036
HADOOP-19256: Add etag in BlockOutputStreamBuilder
Feb 25, 2025
979aa46
HADOOP-19256: Add tests for conditional_write statistics
Feb 27, 2025
3534336
HADOOP-19478. S3A: pull out new configuration load/probes under S3AStore
steveloughran Mar 3, 2025
6a59bb1
HADOOP-19256. Automatic use of conditional create.
steveloughran Mar 3, 2025
19f3d58
HADOOP-19256. conditional create.
steveloughran Feb 20, 2025
7c4db3c
HADOOP-19256: Changes as per review comments
Mar 5, 2025
4739e17
HADOOP-19256: Add tests
Mar 5, 2025
a8612e7
HADOOP-19256: Update tests
Mar 6, 2025
fe5b9f9
HADOOP-19256: Remove incorrect tests.
Mar 6, 2025
11ad3ee
HADOOP-19256: Update tests, fix spotBugs
Mar 6, 2025
c11bbd1
HADOOP-19256: Ignore condition_write statistics test
Mar 7, 2025
6fc0203
HADOOP-19256: Merge branch 'trunk' into s3-conditional-writes
saikatroy038 Apr 2, 2025
d1a82ec
HADOOP-19256. Final style checks for yetus
steveloughran Apr 3, 2025
cac4470
HADOOP-19256. Nits from code review
steveloughran Apr 3, 2025
6a511c9
HADOOP-19256. fix accidental edit; style
steveloughran Apr 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -710,4 +710,118 @@ private OpenFileOptions() {
public static final String FS_OPTION_OPENFILE_EC_POLICY =
FS_OPTION_OPENFILE + "ec.policy";
}

/**
* The standard {@code createFile()} options.
* <p>
* If an option is not supported during file creation and it is considered
* part of a commit protocol, then, when supplied in a must() option,
* it MUST be rejected.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public static final class CreateFileOptionKeys {

private CreateFileOptionKeys() {
}

/**
* {@code createFile()} option to write a file in the close() operation iff
* there is nothing at the destination.
* this is the equivalent of {@code create(path, overwrite=true)}
* <i>except that the existence check is postponed to the end of the write</i>.
* <p>
* Value {@value}.
* </p>
* <p>
* This can be set in the builder.
* </p>
* <ol>
* <li>It is for object stores stores which only upload/manifest files
* at the end of the stream write.</li>
* <li>Streams which support it SHALL not manifest any object to
* the destination path until close()</li>
* <li>It MUST be declared as a stream capability in streams for which
* this overwrite is enabled.</li>
* <li>It MUST be exported as a path capability for all stores where
* the feature is available <i>and</i> enabled</li>
* <li>If passed to a filesystem as a {@code must()} parameter where
* the option value is {@code true}, and it is supported/enabled,
* the FS SHALL omit all overwrite checks in {@code create},
* including for the existence of an object or a directory underneath.
* Instead, during {@code close()} the object will only be manifest
* at the target path if there is no object at the destination.
* </li>
* <li>The existence check and object creation SHALL be atomic.</li>
* <li>If passed to a filesystem as a {@code must()} parameter where
* the option value is {@code true}, and the FS does not recognise
* the feature, or it is recognized but disabled on this FS instance,
* the filesystem SHALL reject the request.
* </li>
* <li>If passed to a filesystem as a {@code opt()} parameter where
* the option value is {@code true}, the filesystem MAY ignore
* the request, or it MAY enable the feature.
* Any filesystem which does not support the feature, including
* from older releases, SHALL ignore it.
* </li>
* </ol>
*/
public static final String FS_OPTION_CREATE_CONDITIONAL_OVERWRITE =
"fs.option.create.conditional.overwrite";

/**
* Overwrite a file only if there is an Etag match. This option takes a string,
*
* Value {@value}.
* <p>
* This is similar to {@link #FS_OPTION_CREATE_CONDITIONAL_OVERWRITE}.
* <ol>
* <li>If supported and enabled, it SHALL be declared as a capability of the filesystem</li>
* <li>If supported and enabled, it SHALL be declared as a capability of the stream</li>
* <li>The string passed as the value SHALL be the etag value as returned by
* {@code EtagSource.getEtag()}</li>
* <li>This value MUST NOT be empty</li>
* <li>If passed to a filesystem which supports it, then when the file is created,
* the store SHALL check for the existence of a file/object at the destination
* path.
* </li>
* <li>If there is no object there, the operation SHALL be rejected by raising
* either a {@code org.apache.hadoop.fs.FileAlreadyExistsException}
* exception, or a{@code java.nio.file.FileAlreadyExistsException}
* </li>
* <li>If there is an object there, its Etag SHALL be compared to the
* value passed here.</li>
* <li>If there is no match, the operation SHALL be rejected by raising
* either a {@code org.apache.hadoop.fs.FileAlreadyExistsException}
* exception, or a{@code java.nio.file.FileAlreadyExistsException}
* </li>
* <li>If the etag does match, the file SHALL be created.</li>
* <li>The check and create SHALL be atomic</li>
* <li>The check and create MAY be at the end of the write, in {@code close()},
* or it MAY be in the {@code create()} operation. That is: some stores
* MAY perform the check early</li>
* <li>If supported and enabled, stores MAY check for the existence of subdirectories;
* this behavior is implementation-specific.</li>
* </ol>
*/
public static final String FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG =
"fs.option.create.conditional.overwrite.etag";

/**
* A flag which requires the filesystem to create files/objects in close(),
* rather than create/createFile.
* <p>
* Object stores with this behavior should also export it as a path capability.
*
* Value {@value}.
*/
public static final String FS_OPTION_CREATE_IN_CLOSE = "fs.option.create.in.close";

/**
* String to define the content filetype.
* Value {@value}.
*/
public static final String FS_OPTION_CREATE_CONTENT_TYPE = "fs.option.create.content.type";

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,13 @@ public final class StoreStatisticNames {
public static final String MULTIPART_UPLOAD_LIST
= "multipart_upload_list";

public static final String CONDITIONAL_CREATE
= "conditional_create";

public static final String CONDITIONAL_CREATE_FAILED
= "conditional_create_failed";


private StoreStatisticNames() {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ Here are the custom options which the S3A Connector supports.
|-----------------------------|-----------|----------------------------------------|
| `fs.s3a.create.performance` | `boolean` | create a file with maximum performance |
| `fs.s3a.create.header` | `string` | prefix for user supplied headers |
| `fs.s3a.create.multipart` | `boolean` | create a multipart file |

### `fs.s3a.create.performance`

Expand All @@ -200,9 +201,11 @@ Prioritize file creation performance over safety checks for filesystem consisten
This:
1. Skips the `LIST` call which makes sure a file is being created over a directory.
Risk: a file is created over a directory.
2. Ignores the overwrite flag.
2. If the overwrite flag is false, uses conditional creation to prevent the overwrtie
3. Never issues a `DELETE` call to delete parent directory markers.



It is possible to probe an S3A Filesystem instance for this capability through
the `hasPathCapability(path, "fs.s3a.create.performance")` check.

Expand Down Expand Up @@ -243,3 +246,17 @@ When an object is renamed, the metadata is propagated the copy created.

It is possible to probe an S3A Filesystem instance for this capability through
the `hasPathCapability(path, "fs.s3a.create.header")` check.

### `fs.s3a.create.multipart` Create a multipart file

Initiate a multipart upload when a file is created, rather
than only when the amount of data buffered reaches the threshold
set in `fs.s3a.multipart.size`.

This is only relevant during testing, as it allows for multipart
operation to be initiated without writing any data, so
reducing test time.

It is not recommended for production use, because as well as adding
more network IO, it is not compatible with third-party stores which
do not supprt multipart uploads.
Original file line number Diff line number Diff line change
Expand Up @@ -1522,6 +1522,29 @@ private Constants() {
*/
public static final String FS_S3A_PERFORMANCE_FLAGS =
"fs.s3a.performance.flags";

/**
* Is the create overwrite feature enabled or not?
* A configuration option and a path status probe.
* Value {@value}.
*/
public static final String FS_S3A_CONDITIONAL_CREATE_ENABLED =
"fs.s3a.conditional.create.enabled";

/**
* Default value for {@link #FS_S3A_CONDITIONAL_CREATE_ENABLED}.
* Value {@value}.
*/
public static final boolean DEFAULT_FS_S3A_CONDITIONAL_CREATE_ENABLED = true;

/**
* createFile() boolean option toreate a multipart file, always: {@value}.
* <p>
* This is inefficient and will not work on a store which doesn't support that feature,
* so is primarily for testing.
*/
public static final String FS_S3A_CREATE_MULTIPART = "fs.s3a.create.multipart";

/**
* Prefix for adding a header to the object when created.
* The actual value must have a "." suffix and then the actual header.
Expand Down Expand Up @@ -1845,4 +1868,11 @@ private Constants() {
public static final String ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX =
"fs.s3a.analytics.accelerator";

/**
* Value for the {@code If-None-Match} HTTP header in S3 requests.
* Value: {@value}.
* More information: <a href="https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html">
* AWS S3 PutObject API Documentation</a>
*/
public static final String IF_NONE_MATCH_STAR = "*";
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand All @@ -52,6 +53,7 @@
import org.apache.hadoop.fs.s3a.impl.ProgressListener;
import org.apache.hadoop.fs.s3a.impl.ProgressListenerEvent;
import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
import org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags;
import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
Expand Down Expand Up @@ -224,6 +226,11 @@ class S3ABlockOutputStream extends OutputStream implements
/** Is multipart upload enabled? */
private final boolean isMultipartUploadEnabled;

/**
* Object write option flags.
*/
private final EnumSet<WriteObjectFlags> writeObjectFlags;

/**
* An S3A output stream which uploads partitions in a separate pool of
* threads; different {@link S3ADataBlocks.BlockFactory}
Expand All @@ -249,6 +256,7 @@ class S3ABlockOutputStream extends OutputStream implements
this.iostatistics = statistics.getIOStatistics();
this.writeOperationHelper = builder.writeOperations;
this.putTracker = builder.putTracker;
this.writeObjectFlags = builder.putOptions.getWriteObjectFlags();
this.executorService = MoreExecutors.listeningDecorator(
builder.executorService);
this.multiPartUpload = null;
Expand All @@ -266,9 +274,19 @@ class S3ABlockOutputStream extends OutputStream implements
? builder.blockSize
: -1;

// if required to be multipart by the committer put tracker or
// write flags (i.e createFile() options, initiate multipart uploads.
// this will fail fast if the store doesn't support multipart uploads
if (putTracker.initialize()) {
LOG.debug("Put tracker requests multipart upload");
initMultipartUpload();
} else if (writeObjectFlags.contains(WriteObjectFlags.CreateMultipart)) {
// this not merged simply to avoid confusion
// to what to do it both are set, so as to guarantee
// the put tracker initialization always takes priority
// over any file flag.
LOG.debug("Multipart initiated from createFile() options");
initMultipartUpload();
}
this.isCSEEnabled = builder.isCSEEnabled;
this.threadIOStatisticsAggregator = builder.ioStatisticsAggregator;
Expand Down Expand Up @@ -772,7 +790,8 @@ BlockOutputStreamStatistics getStatistics() {
@SuppressWarnings("deprecation")
@Override
public boolean hasCapability(String capability) {
switch (capability.toLowerCase(Locale.ENGLISH)) {
final String cap = capability.toLowerCase(Locale.ENGLISH);
switch (cap) {

// does the output stream have delayed visibility
case CommitConstants.STREAM_CAPABILITY_MAGIC_OUTPUT:
Expand All @@ -797,6 +816,12 @@ public boolean hasCapability(String capability) {
return true;

default:
// scan flags for the capability
for (WriteObjectFlags flag : writeObjectFlags) {
if (flag.hasKey(cap)) {
return true;
}
}
return false;
}
}
Expand Down
Loading