Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
import software.amazon.awssdk.services.s3.model.ObjectAttributes;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable;
Expand All @@ -62,6 +64,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchException;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.common.Nullable;
import org.opensearch.common.SetOnce;
Expand All @@ -72,6 +75,8 @@
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStoreException;
import org.opensearch.common.blobstore.ConditionalWrite.ConditionalWriteOptions;
import org.opensearch.common.blobstore.ConditionalWrite.ConditionalWriteResponse;
import org.opensearch.common.blobstore.DeleteResult;
import org.opensearch.common.blobstore.InputStreamWithMetadata;
import org.opensearch.common.blobstore.stream.read.ReadContext;
Expand All @@ -96,6 +101,7 @@
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand All @@ -117,6 +123,7 @@

private final S3BlobStore blobStore;
private final String keyPath;
public static final int HTTP_STATUS_PRECONDITION_FAILED = 412;

S3BlobContainer(BlobPath path, S3BlobStore blobStore) {
super(path);
Expand Down Expand Up @@ -521,6 +528,99 @@
return keyPath + blobName;
}

/**
* Executes a upload to S3 using conditional write options.
* The upload can proceed based on various conditional scenarios like If-Match, If-None-Match, etc.
*
* @param blobStore the S3 blob store
* @param blobName the key (name) of the blob
* @param input the input stream containing the blob data
* @param blobSize the size of the blob in bytes
* @param metadata optional metadata to be associated with the blob
* @param options conditional write options for the upload
* @param listener listener to handle the resulting response or error notifications
* @throws IOException if an error occurs during upload or if validations fail
*/
void executeSingleUploadConditionally(
final S3BlobStore blobStore,
final String blobName,
final InputStream input,
final long blobSize,
final Map<String, String> metadata,
final ConditionalWriteOptions options,
final ActionListener<ConditionalWriteResponse> listener
) throws IOException {
// Extra safety checks remain the same
if (blobSize > MAX_FILE_SIZE.getBytes()) {
throw new IllegalArgumentException("Upload request size [" + blobSize + "] can't be larger than " + MAX_FILE_SIZE);
}
if (blobSize > blobStore.bufferSizeInBytes()) {
throw new IllegalArgumentException("Upload request size [" + blobSize + "] can't be larger than buffer size");
}

PutObjectRequest.Builder putObjectRequestBuilder = PutObjectRequest.builder()
.bucket(blobStore.bucket())
.key(blobName)
.contentLength(blobSize)
.storageClass(blobStore.getStorageClass())
.acl(blobStore.getCannedACL())
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().putObjectMetricPublisher))
.expectedBucketOwner(blobStore.expectedBucketOwner());

// Apply conditional logic based on options
if (options.isIfMatch()) {
putObjectRequestBuilder.ifMatch(options.getVersionIdentifier());
} else if (options.isIfNotExists()) {
putObjectRequestBuilder.ifNoneMatch("*");

Check warning on line 574 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java#L574

Added line #L574 was not covered by tests
}

if (CollectionUtils.isNotEmpty(metadata)) {
putObjectRequestBuilder = putObjectRequestBuilder.metadata(metadata);
}

// Use extracted encryption configuration helper
configureEncryptionSettings(putObjectRequestBuilder, blobStore);

PutObjectRequest putObjectRequest = putObjectRequestBuilder.build();

try (AmazonS3Reference clientReference = blobStore.clientReference()) {
final InputStream requestInputStream = blobStore.isUploadRetryEnabled()
? new BufferedInputStream(input, (int) (blobSize + 1))

Check warning on line 588 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java#L588

Added line #L588 was not covered by tests
: input;

PutObjectResponse response = SocketAccess.doPrivileged(
() -> clientReference.get().putObject(putObjectRequest, RequestBody.fromInputStream(requestInputStream, blobSize))
);

if (response.eTag() != null) {
listener.onResponse(ConditionalWriteResponse.success(response.eTag()));
} else {
IOException exception = new IOException(
"S3 upload for [" + blobName + "] returned null ETag, violating data integrity expectations"
);
listener.onFailure(exception);
throw exception;
}

} catch (S3Exception e) {
if (e.statusCode() == HTTP_STATUS_PRECONDITION_FAILED) {
listener.onFailure(new OpenSearchException("stale_primary_shard", e, "Precondition Failed : Etag Mismatch", blobName));
throw new IOException("Unable to upload object [" + blobName + "] due to ETag mismatch", e);
} else {
IOException exception = new IOException(
String.format(Locale.ROOT, "S3 error during upload [%s]: %s", blobName, e.getMessage()),
e
);
listener.onFailure(exception);
throw exception;
}
} catch (SdkException e) {
IOException exception = new IOException(String.format(Locale.ROOT, "S3 upload failed for [%s]", blobName), e);
listener.onFailure(exception);
throw exception;
}
}

/**
* Uploads a blob using a single upload request
*/
Expand Down
Loading