Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement interface changes for s3 plugin to read/write blob with obj… #13113

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
rename BlobDownloadResponse to FetchBlobResults
Signed-off-by: Sandeep Kumawat <2025sandeepkumawat@gmail.com>
  • Loading branch information
skumawat2025 committed Apr 11, 2024
commit 349c1c0a0a771b2ceead30314cfcc874737bd4d9
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,11 @@
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobDownloadResponse;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStoreException;
import org.opensearch.common.blobstore.DeleteResult;
import org.opensearch.common.blobstore.FetchBlobResult;
import org.opensearch.common.blobstore.stream.read.ReadContext;
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.common.blobstore.stream.write.WritePriority;
Expand Down Expand Up @@ -141,9 +141,9 @@

@ExperimentalApi
@Override
public BlobDownloadResponse readBlobWithMetadata(String blobName) throws IOException {
public FetchBlobResult readBlobWithMetadata(String blobName) throws IOException {
S3RetryingInputStream s3RetryingInputStream = new S3RetryingInputStream(blobStore, buildKey(blobName));
return new BlobDownloadResponse(s3RetryingInputStream, s3RetryingInputStream.getMetadata());
return new FetchBlobResult(s3RetryingInputStream, s3RetryingInputStream.getMetadata());

Check warning on line 146 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java#L145-L146

Added lines #L145 - L146 were not covered by tests
}

@Override
Expand Down Expand Up @@ -343,8 +343,8 @@
long blobSize,
boolean failIfAlreadyExists
) throws IOException {
writeBlobWithMetadata(blobName, inputStream, metadata, blobSize, failIfAlreadyExists);
}

Check warning on line 347 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java#L346-L347

Added lines #L346 - L347 were not covered by tests

@Override
public DeleteResult delete() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -78,11 +80,14 @@ public void testOneChunkUpload() {
);

AtomicReference<InputStream> streamRef = new AtomicReference<>();
Map<String, String> metadata = new HashMap<>();
metadata.put("key1", "value1");
metadata.put("key2", "value2");
CompletableFuture<Void> resultFuture = asyncTransferManager.uploadObject(
s3AsyncClient,
new UploadRequest("bucket", "key", ByteSizeUnit.MB.toBytes(1), WritePriority.HIGH, uploadSuccess -> {
// do nothing
}, false, null, true, null),
}, false, null, true, metadata),
new StreamContext((partIdx, partSize, position) -> {
streamRef.set(new ZeroInputStream(partSize));
return new InputStreamContainer(streamRef.get(), partSize, position);
Expand Down Expand Up @@ -123,11 +128,15 @@ public void testOneChunkUploadCorruption() {
deleteObjectResponseCompletableFuture.complete(DeleteObjectResponse.builder().build());
when(s3AsyncClient.deleteObject(any(DeleteObjectRequest.class))).thenReturn(deleteObjectResponseCompletableFuture);

Map<String, String> metadata = new HashMap<>();
metadata.put("key1", "value1");
metadata.put("key2", "value2");

CompletableFuture<Void> resultFuture = asyncTransferManager.uploadObject(
s3AsyncClient,
new UploadRequest("bucket", "key", ByteSizeUnit.MB.toBytes(1), WritePriority.HIGH, uploadSuccess -> {
// do nothing
}, false, null, true, null),
}, false, null, true, metadata),
new StreamContext(
(partIdx, partSize, position) -> new InputStreamContainer(new ZeroInputStream(partSize), partSize, position),
ByteSizeUnit.MB.toBytes(1),
Expand Down Expand Up @@ -180,7 +189,7 @@ public void testMultipartUpload() {
s3AsyncClient,
new UploadRequest("bucket", "key", ByteSizeUnit.MB.toBytes(5), WritePriority.HIGH, uploadSuccess -> {
// do nothing
}, true, 3376132981L, true, null),
}, true, 3376132981L, true, new HashMap<>()),
new StreamContext((partIdx, partSize, position) -> {
InputStream stream = new ZeroInputStream(partSize);
streams.add(stream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,16 @@ public interface BlobContainer {
InputStream readBlob(String blobName) throws IOException;

/**
* Creates a new {@link BlobDownloadResponse} for the given blob name.
* Creates a new {@link FetchBlobResult} for the given blob name.
*
* @param blobName
* The name of the blob to get an {@link InputStream} for.
* @return The {@link BlobDownloadResponse} of the blob.
* @return The {@link FetchBlobResult} of the blob.
* @throws NoSuchFileException if the blob does not exist
* @throws IOException if the blob can not be read.
*/
@ExperimentalApi
default BlobDownloadResponse readBlobWithMetadata(String blobName) throws IOException {
default FetchBlobResult readBlobWithMetadata(String blobName) throws IOException {
throw new UnsupportedOperationException("readBlobWithMetadata is not implemented yet");
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*
* @opensearch.experimental
*/
public class BlobDownloadResponse {
public class FetchBlobResult {

/**
* Downloaded blob InputStream
Expand All @@ -37,7 +37,7 @@
return metadata;
}

public BlobDownloadResponse(InputStream inputStream, Map<String, String> metadata) {
public FetchBlobResult(InputStream inputStream, Map<String, String> metadata) {

Check warning on line 40 in server/src/main/java/org/opensearch/common/blobstore/FetchBlobResult.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/blobstore/FetchBlobResult.java#L40

Added line #L40 was not covered by tests
this.inputStream = inputStream;
this.metadata = metadata;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobDownloadResponse;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.common.blobstore.FetchBlobResult;
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.blobstore.transfer.RemoteTransferContainer;
Expand Down Expand Up @@ -168,7 +168,7 @@ public InputStream downloadBlob(Iterable<String> path, String fileName) throws I

@Override
@ExperimentalApi
public BlobDownloadResponse downloadBlobWithMetadata(Iterable<String> path, String fileName) throws IOException {
public FetchBlobResult downloadBlobWithMetadata(Iterable<String> path, String fileName) throws IOException {
return blobStore.blobContainer((BlobPath) path).readBlobWithMetadata(fileName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
package org.opensearch.index.translog.transfer;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.blobstore.BlobDownloadResponse;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.FetchBlobResult;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.core.action.ActionListener;
import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot;
Expand Down Expand Up @@ -131,11 +131,11 @@ void uploadBlobs(
*
* @param path the remote path from where download should be made
* @param fileName the name of the file
* @return {@link BlobDownloadResponse} of the remote file
* @return {@link FetchBlobResult} of the remote file
* @throws IOException the exception while reading the data
*/
@ExperimentalApi
BlobDownloadResponse downloadBlobWithMetadata(Iterable<String> path, String fileName) throws IOException;
FetchBlobResult downloadBlobWithMetadata(Iterable<String> path, String fileName) throws IOException;

void listAllInSortedOrder(Iterable<String> path, String filenamePrefix, int limit, ActionListener<List<BlobMetadata>> listener);

Expand Down
Loading