Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce interface changes to support read/write blob with object metadata #13023

Merged
merged 21 commits into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ public void readBlobAsync(String blobName, ActionListener<ReadContext> listener)
);
}
}
listener.onResponse(new ReadContext(blobSize, blobPartInputStreamFutures, blobChecksum));
listener.onResponse(new ReadContext(blobSize, blobPartInputStreamFutures, blobChecksum, null));
skumawat2025 marked this conversation as resolved.
Show resolved Hide resolved
} catch (Exception ex) {
listener.onFailure(ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ public InputStreamContainer apply(Integer partNo, Long size, Long position) thro
if (throwExceptionOnFinalizeUpload) {
throw new RuntimeException();
}
}, false, null), completionListener);
}, false, null, null), completionListener);

assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS));
// wait for completableFuture to finish
Expand Down Expand Up @@ -533,7 +533,7 @@ public InputStreamContainer apply(Integer partNo, Long size, Long position) thro
if (throwExceptionOnFinalizeUpload) {
throw new RuntimeException();
}
}, false, null), completionListener);
}, false, null, null), completionListener);

assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS));
if (expectException || throwExceptionOnFinalizeUpload) {
Expand Down Expand Up @@ -644,7 +644,7 @@ public InputStreamContainer apply(Integer partNo, Long size, Long position) thro
}
}, partSize, calculateLastPartSize(blobSize, partSize), calculateNumberOfParts(blobSize, partSize));
}
}, blobSize, false, WritePriority.HIGH, uploadSuccess -> { assertTrue(uploadSuccess); }, false, null), completionListener);
}, blobSize, false, WritePriority.HIGH, uploadSuccess -> { assertTrue(uploadSuccess); }, false, null, null), completionListener);

assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS));
if (expectException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ public InputStreamContainer apply(Integer partNo, Long size, Long position) thro
}
}, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize));
}
}, bytes.length, false, WritePriority.NORMAL, Assert::assertTrue, false, null), completionListener);
}, bytes.length, false, WritePriority.NORMAL, Assert::assertTrue, false, null, null), completionListener);

assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public void readBlobAsync(String blobName, ActionListener<ReadContext> listener)
InputStreamContainer blobPartStream = new InputStreamContainer(readBlob(blobName, offset, partSize), partSize, offset);
blobPartStreams.add(() -> CompletableFuture.completedFuture(blobPartStream));
}
ReadContext blobReadContext = new ReadContext(contentLength, blobPartStreams, null);
ReadContext blobReadContext = new ReadContext(contentLength, blobPartStreams, null, null);
listener.onResponse(blobReadContext);
} catch (Exception e) {
listener.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,19 @@ public interface BlobContainer {
*/
InputStream readBlob(String blobName) throws IOException;

/**
* Creates a new {@link BlobDownloadResponse} for the given blob name.
*
* @param blobName
* The name of the blob to get an {@link InputStream} for.
* @return The {@code InputStream} to read the blob.
skumawat2025 marked this conversation as resolved.
Show resolved Hide resolved
* @throws NoSuchFileException if the blob does not exist
* @throws IOException if the blob can not be read.
*/
default BlobDownloadResponse readBlobWithMetadata(String blobName) throws IOException {
skumawat2025 marked this conversation as resolved.
Show resolved Hide resolved
return null;
skumawat2025 marked this conversation as resolved.
Show resolved Hide resolved
};

/**
* Creates a new {@link InputStream} that can be used to read the given blob starting from
* a specific {@code position} in the blob. The {@code length} is an indication of the
Expand Down Expand Up @@ -128,6 +141,33 @@ default long readBlobPreferredLength() {
*/
void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException;

/**
* Reads blob content from the input stream and writes it to the container in a new blob with the given name, and metadata.
* This method assumes the container does not already contain a blob of the same blobName. If a blob by the
* same name already exists, the operation will fail and an {@link IOException} will be thrown.
*
* @param blobName
* The name of the blob to write the contents of the input stream to.
* @param inputStream
* The input stream from which to retrieve the bytes to write to the blob.
* @param metadata
* The metadata to be associate with the blob upload.
* @param blobSize
* The size of the blob to be written, in bytes. It is implementation dependent whether
* this value is used in writing the blob to the repository.
* @param failIfAlreadyExists
* whether to throw a FileAlreadyExistsException if the given blob already exists
* @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists
skumawat2025 marked this conversation as resolved.
Show resolved Hide resolved
* @throws IOException if the input stream could not be read, or the target blob could not be written to.
*/
default void writeBlobWithMetadata(
String blobName,
InputStream inputStream,
Map<String, String> metadata,
long blobSize,
boolean failIfAlreadyExists
) throws IOException {};

/**
* Reads blob content from the input stream and writes it to the container in a new blob with the given name,
* using an atomic write operation if the implementation supports it.
Expand All @@ -149,6 +189,35 @@ default long readBlobPreferredLength() {
*/
void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException;

/**
* Reads blob content from the input stream and writes it to the container in a new blob with the given name,and metadata
skumawat2025 marked this conversation as resolved.
Show resolved Hide resolved
* using an atomic write operation if the implementation supports it.
* <p>
* This method assumes the container does not already contain a blob of the same blobName. If a blob by the
skumawat2025 marked this conversation as resolved.
Show resolved Hide resolved
* same name already exists, the operation will fail and an {@link IOException} will be thrown.
*
* @param blobName
* The name of the blob to write the contents of the input stream to.
* @param inputStream
* The input stream from which to retrieve the bytes to write to the blob.
* @param metadata
* The metadata to be associate with the blob upload.
* @param blobSize
* The size of the blob to be written, in bytes. It is implementation dependent whether
* this value is used in writing the blob to the repository.
* @param failIfAlreadyExists
* whether to throw a FileAlreadyExistsException if the given blob already exists
* @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists
* @throws IOException if the input stream could not be read, or the target blob could not be written to.
*/
default void writeBlobAtomicWithMetadata(
String blobName,
InputStream inputStream,
Map<String, String> metadata,
long blobSize,
boolean failIfAlreadyExists
) throws IOException {};

/**
* Deletes this container and all its contents from the repository.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.blobstore;

import java.io.InputStream;
import java.util.Map;

/**
* A class for blob download response
*
* @opensearch.internal
*/
skumawat2025 marked this conversation as resolved.
Show resolved Hide resolved
public class BlobDownloadResponse {
skumawat2025 marked this conversation as resolved.
Show resolved Hide resolved

/**
* Downloaded blob InputStream
*/
private InputStream inputStream;

/**
* Metadata of the downloaded blob
*/
private Map<String, String> metadata;
skumawat2025 marked this conversation as resolved.
Show resolved Hide resolved

public InputStream getInputStream() {
return inputStream;
}

public Map<String, String> getMetadata() {
return metadata;
}

public BlobDownloadResponse(InputStream inputStream, Map<String, String> metadata) {
this.inputStream = inputStream;
this.metadata = metadata;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.common.io.InputStreamContainer;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

Expand All @@ -25,23 +26,30 @@ public class ReadContext {
private final long blobSize;
private final List<StreamPartCreator> asyncPartStreams;
private final String blobChecksum;
private final Map<String, String> metadata;

public ReadContext(long blobSize, List<StreamPartCreator> asyncPartStreams, String blobChecksum) {
public ReadContext(long blobSize, List<StreamPartCreator> asyncPartStreams, String blobChecksum, Map<String, String> metadata) {
skumawat2025 marked this conversation as resolved.
Show resolved Hide resolved
this.blobSize = blobSize;
this.asyncPartStreams = asyncPartStreams;
this.blobChecksum = blobChecksum;
this.metadata = metadata;
}

public ReadContext(ReadContext readContext) {
this.blobSize = readContext.blobSize;
this.asyncPartStreams = readContext.asyncPartStreams;
this.blobChecksum = readContext.blobChecksum;
this.metadata = readContext.metadata;
}

public String getBlobChecksum() {
return blobChecksum;
}

public Map<String, String> getMetadata() {
return metadata;
}

public int getNumberOfParts() {
return asyncPartStreams.size();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.common.StreamContext;

import java.io.IOException;
import java.util.Map;

/**
* WriteContext is used to encapsulate all data needed by <code>BlobContainer#writeStreams</code>
Expand All @@ -29,6 +30,7 @@ public class WriteContext {
private final CheckedConsumer<Boolean, IOException> uploadFinalizer;
private final boolean doRemoteDataIntegrityCheck;
private final Long expectedChecksum;
private final Map<String, String> metadata;

/**
* Construct a new WriteContext object
Expand All @@ -49,7 +51,8 @@ public WriteContext(
WritePriority writePriority,
CheckedConsumer<Boolean, IOException> uploadFinalizer,
boolean doRemoteDataIntegrityCheck,
@Nullable Long expectedChecksum
@Nullable Long expectedChecksum,
Map<String, String> metadata
skumawat2025 marked this conversation as resolved.
Show resolved Hide resolved
) {
this.fileName = fileName;
this.streamContextSupplier = streamContextSupplier;
Expand All @@ -59,6 +62,7 @@ public WriteContext(
this.uploadFinalizer = uploadFinalizer;
this.doRemoteDataIntegrityCheck = doRemoteDataIntegrityCheck;
this.expectedChecksum = expectedChecksum;
this.metadata = metadata;
}

/**
Expand All @@ -73,6 +77,7 @@ protected WriteContext(WriteContext writeContext) {
this.uploadFinalizer = writeContext.uploadFinalizer;
this.doRemoteDataIntegrityCheck = writeContext.doRemoteDataIntegrityCheck;
this.expectedChecksum = writeContext.expectedChecksum;
this.metadata = writeContext.getMetadata();
}

/**
Expand Down Expand Up @@ -131,4 +136,11 @@ public boolean doRemoteDataIntegrityCheck() {
public Long getExpectedChecksum() {
return expectedChecksum;
}

/**
* @return the upload metadata.
*/
public Map<String, String> getMetadata() {
return metadata;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
Expand Down Expand Up @@ -55,6 +56,7 @@ public class RemoteTransferContainer implements Closeable {
private final OffsetRangeInputStreamSupplier offsetRangeInputStreamSupplier;
private final boolean isRemoteDataIntegritySupported;
private final AtomicBoolean readBlock = new AtomicBoolean();
private Map<String, String> metadata = null;
sachinpkale marked this conversation as resolved.
Show resolved Hide resolved

private static final Logger log = LogManager.getLogger(RemoteTransferContainer.class);

Expand Down Expand Up @@ -90,6 +92,41 @@ public RemoteTransferContainer(
this.isRemoteDataIntegritySupported = isRemoteDataIntegritySupported;
}

/**
* Construct a new RemoteTransferContainer object with metadata.
*
* @param fileName Name of the local file
* @param remoteFileName Name of the remote file
* @param contentLength Total content length of the file to be uploaded
* @param failTransferIfFileExists A boolean to determine if upload has to be failed if file exists
* @param writePriority The {@link WritePriority} of current upload
* @param offsetRangeInputStreamSupplier A supplier to create OffsetRangeInputStreams
* @param expectedChecksum The expected checksum value for the file being uploaded. This checksum will be used for local or remote data integrity checks
* @param isRemoteDataIntegritySupported A boolean to signify whether the remote repository supports server side data integrity verification
* @param metadata Object metadata to be store with the file.
*/
public RemoteTransferContainer(
sachinpkale marked this conversation as resolved.
Show resolved Hide resolved
String fileName,
String remoteFileName,
long contentLength,
boolean failTransferIfFileExists,
WritePriority writePriority,
OffsetRangeInputStreamSupplier offsetRangeInputStreamSupplier,
long expectedChecksum,
boolean isRemoteDataIntegritySupported,
Map<String, String> metadata
) {
this.fileName = fileName;
this.remoteFileName = remoteFileName;
this.contentLength = contentLength;
this.failTransferIfFileExists = failTransferIfFileExists;
this.writePriority = writePriority;
this.offsetRangeInputStreamSupplier = offsetRangeInputStreamSupplier;
this.expectedChecksum = expectedChecksum;
this.isRemoteDataIntegritySupported = isRemoteDataIntegritySupported;
this.metadata = metadata;
}

/**
* @return The {@link WriteContext} for the current upload
*/
Expand All @@ -102,7 +139,8 @@ public WriteContext createWriteContext() {
writePriority,
this::finalizeUpload,
isRemoteDataIntegrityCheckPossible(),
isRemoteDataIntegrityCheckPossible() ? expectedChecksum : null
isRemoteDataIntegrityCheckPossible() ? expectedChecksum : null,
metadata
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.action.ActionRunnable;
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobDownloadResponse;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStore;
Expand Down Expand Up @@ -164,6 +165,11 @@ public InputStream downloadBlob(Iterable<String> path, String fileName) throws I
return blobStore.blobContainer((BlobPath) path).readBlob(fileName);
}

@Override
public BlobDownloadResponse downloadBlobWithMetadata(Iterable<String> path, String fileName) throws IOException {
skumawat2025 marked this conversation as resolved.
Show resolved Hide resolved
return blobStore.blobContainer((BlobPath) path).readBlobWithMetadata(fileName);
}

@Override
public void deleteBlobs(Iterable<String> path, List<String> fileNames) throws IOException {
blobStore.blobContainer((BlobPath) path).deleteBlobsIgnoringIfNotExists(fileNames);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.index.translog.transfer;

import org.opensearch.common.blobstore.BlobDownloadResponse;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.stream.write.WritePriority;
Expand Down Expand Up @@ -125,6 +126,15 @@ void uploadBlobs(
*/
InputStream downloadBlob(Iterable<String> path, String fileName) throws IOException;

/**
*
* @param path the remote path from where download should be made
* @param fileName the name of the file
* @return {@link BlobDownloadResponse} of the remote file
* @throws IOException the exception while reading the data
*/
BlobDownloadResponse downloadBlobWithMetadata(Iterable<String> path, String fileName) throws IOException;
sachinpkale marked this conversation as resolved.
Show resolved Hide resolved

void listAllInSortedOrder(Iterable<String> path, String filenamePrefix, int limit, ActionListener<List<BlobMetadata>> listener);

void listAllInSortedOrderAsync(
Expand Down
Loading
Loading