Skip to content

Commit

Permalink
Add async read support for S3 plugin
Browse files Browse the repository at this point in the history
Signed-off-by: Kunal Kotwani <kkotwani@amazon.com>
(cherry picked from commit 03ddc8a)
Signed-off-by: Kunal Kotwani <kkotwani@amazon.com>
  • Loading branch information
kotwanikunal committed Sep 1, 2023
1 parent f5d3fd2 commit 7963644
Show file tree
Hide file tree
Showing 7 changed files with 476 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Expose DelimitedTermFrequencyTokenFilter to allow providing term frequencies along with terms ([#9479](https://github.com/opensearch-project/OpenSearch/pull/9479))
- APIs for performing async blob reads and async downloads from the repository using multiple streams ([#9592](https://github.com/opensearch-project/OpenSearch/issues/9592))
- Introduce cluster default remote translog buffer interval setting ([#9584](https://github.com/opensearch-project/OpenSearch/pull/9584))
- Async blob read support for S3 plugin ([#9694](https://github.com/opensearch-project/OpenSearch/pull/9694))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import software.amazon.awssdk.services.s3.model.Delete;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
import software.amazon.awssdk.services.s3.model.GetObjectAttributesResponse;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
Expand All @@ -63,6 +64,7 @@
import org.opensearch.common.Nullable;
import org.opensearch.common.SetOnce;
import org.opensearch.common.StreamContext;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.blobstore.BlobPath;
Expand All @@ -75,10 +77,12 @@
import org.opensearch.common.blobstore.support.AbstractBlobContainer;
import org.opensearch.common.blobstore.support.PlainBlobMetadata;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.io.InputStreamContainer;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.repositories.s3.async.AsyncTransferManager;
import org.opensearch.repositories.s3.async.UploadRequest;

import java.io.ByteArrayInputStream;
Expand Down Expand Up @@ -212,9 +216,50 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
}
}

@ExperimentalApi
@Override
public void readBlobAsync(String blobName, ActionListener<ReadContext> listener) {
throw new UnsupportedOperationException();
try (AmazonAsyncS3Reference amazonS3Reference = SocketAccess.doPrivileged(blobStore::asyncClientReference)) {
final S3AsyncClient s3AsyncClient = amazonS3Reference.get().client();
final String bucketName = blobStore.bucket();
final AsyncTransferManager transferManager = blobStore.getAsyncTransferManager();

final GetObjectAttributesResponse blobMetadata = transferManager.getBlobMetadata(s3AsyncClient, bucketName, blobName).get();

final long blobSize = blobMetadata.objectSize();
final int numberOfParts = blobMetadata.objectParts().totalPartsCount();
final String blobChecksum = blobMetadata.checksum().checksumCRC32();

final List<InputStreamContainer> blobPartStreams = new ArrayList<>();
final List<CompletableFuture<InputStreamContainer>> blobPartInputStreamFutures = new ArrayList<>();
// S3 multipart files use 1 to n indexing
for (int partNumber = 1; partNumber <= numberOfParts; partNumber++) {
int finalPartNumber = partNumber - 1;
CompletableFuture<InputStreamContainer> partInputStreamFuture = transferManager.getBlobPartInputStreamContainer(
s3AsyncClient,
bucketName,
blobName,
partNumber
).whenComplete((inputStreamContainer, error) -> {
if (error == null) {
blobPartStreams.add(finalPartNumber, inputStreamContainer);
}
});

blobPartInputStreamFutures.add(partInputStreamFuture);
}

CompletableFuture.allOf(blobPartInputStreamFutures.toArray(CompletableFuture[]::new)).whenComplete((unused, throwable) -> {
if (throwable == null) {
listener.onResponse(new ReadContext(blobSize, blobPartStreams, blobChecksum));
} else {
Exception ex = throwable instanceof Error ? new Exception(throwable) : (Exception) throwable;
listener.onFailure(ex);
}
});
} catch (Exception ex) {
listener.onFailure(SdkException.create("Error occurred while fetching blob parts from the repository", ex));
}
}

// package private for testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@

package org.opensearch.repositories.s3.async;

import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.http.HttpStatusCode;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm;
Expand All @@ -20,6 +23,11 @@
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectAttributesRequest;
import software.amazon.awssdk.services.s3.model.GetObjectAttributesResponse;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.ObjectAttributes;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.utils.CompletableFutureUtils;
Expand All @@ -29,13 +37,16 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.ExceptionsHelper;
import org.opensearch.common.StreamContext;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.blobstore.exception.CorruptFileException;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.io.InputStreamContainer;
import org.opensearch.common.util.ByteUtils;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.repositories.s3.SocketAccess;
import org.opensearch.repositories.s3.io.CheckedContainer;
import org.opensearch.repositories.s3.utils.HttpRangeUtils;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -353,4 +364,67 @@ private void deleteUploadedObject(S3AsyncClient s3AsyncClient, UploadRequest upl
return null;
});
}

/**
* Fetches a part of the blob from the S3 bucket and transforms it to an {@link InputStreamContainer}, which holds
* the stream and its related metadata.
* @param s3AsyncClient Async client to be utilized to fetch the object part
* @param bucketName Name of the S3 bucket
* @param blobName Identifier of the blob for which the parts will be fetched
* @param partNumber Part number for the blob to be retrieved
* @return A future of {@link InputStreamContainer} containing the stream and stream metadata.
*/
@ExperimentalApi
public CompletableFuture<InputStreamContainer> getBlobPartInputStreamContainer(
S3AsyncClient s3AsyncClient,
String bucketName,
String blobName,
int partNumber
) {
final GetObjectRequest.Builder getObjectRequestBuilder = GetObjectRequest.builder()
.bucket(bucketName)
.key(blobName)
.partNumber(partNumber);

return SocketAccess.doPrivileged(
() -> s3AsyncClient.getObject(getObjectRequestBuilder.build(), AsyncResponseTransformer.toBlockingInputStream())
.thenApply(this::transformResponseToInputStreamContainer)
);
}

/**
* Transforms the stream response object from S3 into an {@link InputStreamContainer}
* @param streamResponse Response stream object from S3
* @return {@link InputStreamContainer} containing the stream and stream metadata
*/
// Package-Private for testing.
InputStreamContainer transformResponseToInputStreamContainer(ResponseInputStream<GetObjectResponse> streamResponse) {
final GetObjectResponse getObjectResponse = streamResponse.response();
final String contentRange = getObjectResponse.contentRange();
final Long contentLength = getObjectResponse.contentLength();
if (contentRange == null || contentLength == null) {
throw SdkException.builder().message("Failed to fetch required metadata for blob part").build();
}
final Tuple<Long, Long> s3ResponseRange = HttpRangeUtils.fromHttpRangeHeader(getObjectResponse.contentRange());
return new InputStreamContainer(streamResponse, getObjectResponse.contentLength(), s3ResponseRange.v1());
}

/**
* Retrieves the metadata like checksum, object size and parts for the provided blob within the S3 bucket.
* @param s3AsyncClient Async client to be utilized to fetch the metadata
* @param bucketName Name of the S3 bucket
* @param blobName Identifier of the blob for which the metadata will be fetched
* @return A future containing the metadata within {@link GetObjectAttributesResponse}
*/
@ExperimentalApi
public CompletableFuture<GetObjectAttributesResponse> getBlobMetadata(S3AsyncClient s3AsyncClient, String bucketName, String blobName) {
// Fetch blob metadata - part info, size, checksum
final GetObjectAttributesRequest getObjectAttributesRequest = GetObjectAttributesRequest.builder()
.bucket(bucketName)
.key(blobName)
.objectAttributes(ObjectAttributes.CHECKSUM, ObjectAttributes.OBJECT_SIZE, ObjectAttributes.OBJECT_PARTS)
.build();

return SocketAccess.doPrivileged(() -> s3AsyncClient.getObjectAttributes(getObjectAttributesRequest));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,31 @@

package org.opensearch.repositories.s3.utils;

import software.amazon.awssdk.core.exception.SdkException;

import org.opensearch.common.collect.Tuple;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

public final class HttpRangeUtils {
private static final Pattern RANGE_PATTERN = Pattern.compile("^bytes\\s+(\\d+)-(\\d+)/(\\d+|.*)$");

/**
* Parses the content range header string value to calculate the start and end of the HTTP response.
* Tests against the RFC9110 specification of content range string.
* Sample values: "bytes 0-10/200", "bytes 0-10/*"
* <a href="https://www.rfc-editor.org/rfc/rfc9110.html#name-content-range">Details here</a>
* @param headerValue Header content range string value from the HTTP response
* @return Pair of values where v1 represents the lower and v2 represents the upper bound of the stream
*/
public static Tuple<Long, Long> fromHttpRangeHeader(String headerValue) {
Matcher matcher = RANGE_PATTERN.matcher(headerValue);
if (!matcher.find()) {
throw SdkException.create("Regex match for Content-Range header {" + headerValue + "} failed", new RuntimeException());
}
return new Tuple<>(Long.parseLong(matcher.group(1)), Long.parseLong(matcher.group(2)));
}

/**
* Provides a byte range string per <a href="https://www.rfc-editor.org/rfc/rfc9110.html#name-byte-ranges">RFC 9110</a>
Expand Down
Loading

0 comments on commit 7963644

Please sign in to comment.