Skip to content

Commit

Permalink
addres comments
Browse files Browse the repository at this point in the history
Signed-off-by: Sandeep Kumawat <skumwt@amazon.com>
  • Loading branch information
Sandeep Kumawat committed Apr 5, 2024
1 parent c17b19d commit d2ed0a3
Show file tree
Hide file tree
Showing 7 changed files with 15 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -285,12 +285,7 @@ public void readBlobAsync(String blobName, ActionListener<ReadContext> listener)
);
}
}
listener.onResponse(
new ReadContext.Builder().blobSize(blobSize)
.asyncPartStreams(blobPartInputStreamFutures)
.blobChecksum(blobChecksum)
.build()
);
listener.onResponse(new ReadContext.Builder(blobSize, blobPartInputStreamFutures).blobChecksum(blobChecksum).build());
} catch (Exception ex) {
listener.onFailure(ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public void readBlobAsync(String blobName, ActionListener<ReadContext> listener)
InputStreamContainer blobPartStream = new InputStreamContainer(readBlob(blobName, offset, partSize), partSize, offset);
blobPartStreams.add(() -> CompletableFuture.completedFuture(blobPartStream));
}
ReadContext blobReadContext = new ReadContext.Builder().blobSize(contentLength).asyncPartStreams(blobPartStreams).build();
ReadContext blobReadContext = new ReadContext.Builder(contentLength, blobPartStreams).build();
listener.onResponse(blobReadContext);
} catch (Exception e) {
listener.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public interface BlobContainer {
*
* @param blobName
* The name of the blob to get an {@link InputStream} for.
* @return The {@code InputStream} to read the blob.
* @return The {@link BlobDownloadResponse} of the blob.
* @throws NoSuchFileException if the blob does not exist
* @throws IOException if the blob can not be read.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public List<StreamPartCreator> getPartStreams() {
@ExperimentalApi
public interface StreamPartCreator extends Supplier<CompletableFuture<InputStreamContainer>> {
/**
* Kicks off a async process to start streaming.
* Kicks off an async process to start streaming.
*
* @return When the returned future is completed, streaming has
* just begun. Clients must fully consume the resulting stream.
Expand All @@ -87,19 +87,14 @@ public interface StreamPartCreator extends Supplier<CompletableFuture<InputStrea
* @opensearch.experimental
*/
public static class Builder {
private long blobSize;
private List<StreamPartCreator> asyncPartStreams;
private final long blobSize;
private final List<StreamPartCreator> asyncPartStreams;
private String blobChecksum;
private Map<String, String> metadata;

public Builder blobSize(long blobSize) {
public Builder(long blobSize, List<StreamPartCreator> asyncPartStreams) {
this.blobSize = blobSize;
return this;
}

public Builder asyncPartStreams(List<StreamPartCreator> asyncPartStreams) {
this.asyncPartStreams = asyncPartStreams;
return this;
}

public Builder blobChecksum(String blobChecksum) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class RemoteTransferContainer implements Closeable {
private final OffsetRangeInputStreamSupplier offsetRangeInputStreamSupplier;
private final boolean isRemoteDataIntegritySupported;
private final AtomicBoolean readBlock = new AtomicBoolean();
private Map<String, String> metadata = null;
private final Map<String, String> metadata;

private static final Logger log = LogManager.getLogger(RemoteTransferContainer.class);

Expand Down Expand Up @@ -90,6 +90,7 @@ public RemoteTransferContainer(
this.offsetRangeInputStreamSupplier = offsetRangeInputStreamSupplier;
this.expectedChecksum = expectedChecksum;
this.isRemoteDataIntegritySupported = isRemoteDataIntegritySupported;
this.metadata = null;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,7 @@ public void testReadBlobAsync() throws Exception {
final ListenerTestUtils.CountingCompletionListener<ReadContext> completionListener =
new ListenerTestUtils.CountingCompletionListener<>();
final CompletableFuture<InputStreamContainer> streamContainerFuture = CompletableFuture.completedFuture(inputStreamContainer);
final ReadContext readContext = new ReadContext.Builder().blobSize(size)
.asyncPartStreams(List.of(() -> streamContainerFuture))
.build();
final ReadContext readContext = new ReadContext.Builder(size, List.of(() -> streamContainerFuture)).build();

Mockito.doAnswer(invocation -> {
ActionListener<ReadContext> readContextActionListener = invocation.getArgument(1);
Expand Down Expand Up @@ -105,9 +103,7 @@ public void testReadBlobAsyncException() throws Exception {
final ListenerTestUtils.CountingCompletionListener<ReadContext> completionListener =
new ListenerTestUtils.CountingCompletionListener<>();
final CompletableFuture<InputStreamContainer> streamContainerFuture = CompletableFuture.completedFuture(inputStreamContainer);
final ReadContext readContext = new ReadContext.Builder().blobSize(size)
.asyncPartStreams(List.of(() -> streamContainerFuture))
.build();
final ReadContext readContext = new ReadContext.Builder(size, List.of(() -> streamContainerFuture)).build();

Mockito.doAnswer(invocation -> {
ActionListener<ReadContext> readContextActionListener = invocation.getArgument(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,7 @@ public void testReadContextListener() throws InterruptedException, IOException {
UnaryOperator.identity(),
MAX_CONCURRENT_STREAMS
);
ReadContext readContext = new ReadContext.Builder().blobSize((long) PART_SIZE * NUMBER_OF_PARTS)
.asyncPartStreams(blobPartStreams)
.build();
ReadContext readContext = new ReadContext.Builder((long) PART_SIZE * NUMBER_OF_PARTS, blobPartStreams).build();
readContextListener.onResponse(readContext);

countDownLatch.await();
Expand Down Expand Up @@ -127,9 +125,7 @@ public int available() {
threadPool.generic()
)
);
ReadContext readContext = new ReadContext.Builder().blobSize((long) (PART_SIZE + 1) * NUMBER_OF_PARTS)
.asyncPartStreams(blobPartStreams)
.build();
ReadContext readContext = new ReadContext.Builder((long) (PART_SIZE + 1) * NUMBER_OF_PARTS, blobPartStreams).build();
readContextListener.onResponse(readContext);

countDownLatch.await();
Expand Down Expand Up @@ -182,9 +178,7 @@ public int read(byte[] b) throws IOException {
threadPool.generic()
)
);
ReadContext readContext = new ReadContext.Builder().blobSize((long) (PART_SIZE + 1) * NUMBER_OF_PARTS + 1)
.asyncPartStreams(blobPartStreams)
.build();
ReadContext readContext = new ReadContext.Builder((long) (PART_SIZE + 1) * NUMBER_OF_PARTS + 1, blobPartStreams).build();
readContextListener.onResponse(readContext);

countDownLatch.await();
Expand All @@ -209,9 +203,7 @@ public void testWriteToTempFile_alreadyExists_replacesFile() throws Exception {
UnaryOperator.identity(),
MAX_CONCURRENT_STREAMS
);
ReadContext readContext = new ReadContext.Builder().blobSize((long) (PART_SIZE + 1) * NUMBER_OF_PARTS)
.asyncPartStreams(blobPartStreams)
.build();
ReadContext readContext = new ReadContext.Builder((long) (PART_SIZE + 1) * NUMBER_OF_PARTS, blobPartStreams).build();
readContextListener.onResponse(readContext);

countDownLatch.await();
Expand Down

0 comments on commit d2ed0a3

Please sign in to comment.