-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Design Proposal] Enhancement of Repository Plugin #6632
Comments
Tagging @elfisher @muralikpbhat @reta @mch2 @dreamer-89 @andrross @Bukhtawar @sachinpkale @itiyamas @dblock @shwetathareja @saratvemulapalli @ashking94 for feedback. Pls do tag others who can review this. |
@vikasvb90 I think the suggested API is not complete and even in this shape raises a number of questions:
I believe the API get the inspiration from S3 multipart upload, which (afaik) requires prior knowledge of the size of the data being uploaded. |
@reta The idea is not to make it purely stream oriented but to provide support of parallel uploads using streams where each stream can be responsible for reading a specific part of the file. Sample code below : CheckedInputStream[] checkedInputStreams = new CheckedInputStream[numberOfParts];
StreamContainer[] streamContainers = new StreamContainer[numberOfParts];
for (int partNo=0; partNo<numberOfParts; partNo++) {
long position = rawPartSize * (partNo);
long size = partNo == numberOfParts-1 ? rawLastPartSize : rawPartSize;
checkedInputStreams[partNo] = getMultiPartStreamSupplier().apply(size, position);
if (checkedInputStreams[partNo] == null) {
throw new IOException("Error creating multipart stream during opening streams for read");
}
}
We would not want to omit any detail from the abstraction which could compromise on the functioning or performance of the underlying plugin. |
Thanks @vikasvb90
Fair enough (if that is by design)
I believe it is not and highly depends on the algorithm (the proposal you mentioned talks about rough estimations). If you look into original comment, it mentions client side compression as well - the size of the compressed content is unknown ahead of time. In any case, it would be useful to see how the content length is being used to determine possible alternatives or limitations.
You just showed that in your answer. The description mentioned some isolated pieces without large picture. F.e., your own code snippet uses
|
@reta Can you please re-post encryption related question on the encryption design proposal? I can answer all your encryption related questions there. Also, by
I have updated the description with the structure of
public StreamContext openMultipartStreams(long partSize) throws IOException {
if (inputStreams.get() != null) {
throw new IOException("Multi-part streams are already created.");
}
this.partSize = partSize;
this.lastPartSize = (contentLength % partSize) != 0 ? contentLength % partSize : partSize;
this.numberOfParts = (int) ((contentLength % partSize) == 0 ? contentLength / partSize
: (contentLength / partSize) + 1);
InputStream[] streams = new InputStream[numberOfParts];
List<Supplier<Stream>> streamSuppliers = new ArrayList<>();
for (int partNo = 0; partNo < numberOfParts; partNo++) {
long position = partSize * partNo;
long size = partNo == numberOfParts - 1 ? lastPartSize : partSize;
streams[partNo] = localFile != null
? getMultiPartStreamSupplierForFile().apply(size, position)
: getMultiPartStreamSupplierForIndexInput().apply(size, position);
if (streams[partNo] == null) {
throw new IOException("Error creating multipart stream during opening streams for read");
}
final int finalPartNo = partNo;
streamSuppliers.add(() -> new Stream(streams[finalPartNo], size, position));
}
inputStreams.set(streams);
return new StreamContext(
streamSuppliers,
contentLength
);
}
private BiFunction<Long, Long, OffsetRangeFileInputStream> getMultiPartStreamSupplierForFile() {
return (size, position) -> {
OffsetRangeFileInputStream offsetRangeInputStream;
try {
offsetRangeInputStream = new OffsetRangeFileInputStream(localFile.toFile(), size, position);
} catch (IOException e) {
log.error("Failed to create input stream", e);
return null;
}
return offsetRangeInputStream;
};
}
private BiFunction<Long, Long, OffsetRangeIndexInputStream> getMultiPartStreamSupplierForIndexInput() {
return (size, position) -> {
OffsetRangeIndexInputStream offsetRangeInputStream;
try {
offsetRangeInputStream = new OffsetRangeIndexInputStream(indexInput, fileName, size, position);
} catch (IOException e) {
log.error("Failed to create input stream", e);
return null;
}
return offsetRangeInputStream;
};
} |
Thanks @vikasvb90
No, by
My question was not really related to encryption or compression but stream composition. We can leave this part aside since the design you are suggestion is not purely stream oriented (as you mentioned). |
For stream context, do we always need to divide all data into a number of streams ahead of time, or would it make sense to make stream suppliers a stream-like interface, so that we can keep supplying another stream until the caller runs out of data to write? In the compression example the stream compressor could be processing incoming stream data, and every time it has N bytes it would queue those for multi-part writing. In the non-compressed example I could start writing while still constructing further streams. |
Personally, I think the stream(s) on demand is a better option (right now the whole source has to be divided into streams before hand, taking an example, 60Gb divided into 5Mb streams, that would take noticeble amount of heap). But I do not see the clear way to accommodate that in the current design that needs content length knowledge. |
Aren't those orthogonal? Just because I know the total size of data, doesn't mean I need to create all streams ahead of time. Of course it would be better if I didn't need to know / have the entire data already loaded. |
Thank would be the best option I think |
|
Objective
This document captures low level details of enhancements to the OpenSearch repository plugin to support parallel uploads of different parts of a file.
RFC: #6354
Design Factors
This design is based on following factors:
Design details
Low Level Details
Settings
Following settings will be used to control transfers
Constructs
Encapsulation or a container to hold fields required for a write operation by multiple parts parallel upload.
StreamContextSupplier consists of stream suppliers which will be responsible for providing streams depending on the part size. This can look like following
Different uploads can then be triggered in parallel each consuming one stream.
New methods in BlobContainer to accept a WriteContext. Depending upon the support, this will be invoked in OpenSearch remote store flows.
Performance Results
On carrying out some performance runs on POC code providing capability of parallel upload of multiple parts of a file, we obtained following results :
Note: Observations below are taken from prolonged runs (>15min) of repeated transfers of a file.
Instance Type : m5.2xlarge
Observations
Following can be deduced from the tables :
The text was updated successfully, but these errors were encountered: