Skip to content

Commit ebf3550

Browse files
authored
Add SharedBytes.copyToCacheFileAligned without length method (#106193)
This change adds a variant of the existing SharedBytes.copyToCacheFileAligned that is not limited to copy only a given length of bytes but copies all bytes that can be read from the InputStream. It also adds a variant of the existing BlobCacheUtils.computeRange() method that computes range of bytes to expand over a complete regions, instead of being limited to the length of the blob. Finally, it removes an assertion that a CacheFile cannot write a range larger than the blob length.
1 parent 1af4428 commit ebf3550

File tree

3 files changed

+38
-1
lines changed

3 files changed

+38
-1
lines changed

x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/BlobCacheUtils.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@ public static ByteRange computeRange(long rangeSize, long position, long size, l
6363
);
6464
}
6565

66+
public static ByteRange computeRange(long rangeSize, long position, long size) {
67+
return ByteRange.of((position / rangeSize) * rangeSize, (((position + size - 1) / rangeSize) + 1) * rangeSize);
68+
}
69+
6670
public static void ensureSlice(String sliceName, long sliceOffset, long sliceLength, IndexInput input) {
6771
if (sliceOffset < 0 || sliceLength < 0 || sliceOffset + sliceLength > input.length()) {
6872
throw new IllegalArgumentException(

x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1006,7 +1006,8 @@ public int populateAndRead(
10061006
final RangeAvailableHandler reader,
10071007
final RangeMissingHandler writer
10081008
) throws Exception {
1009-
assert assertOffsetsWithinFileLength(rangeToWrite.start(), rangeToWrite.length(), length);
1009+
// some cache files can grow after being created, so rangeToWrite can be larger than the initial {@code length}
1010+
assert rangeToWrite.start() >= 0 : rangeToWrite;
10101011
assert assertOffsetsWithinFileLength(rangeToRead.start(), rangeToRead.length(), length);
10111012
// We are interested in the total time that the system spends when fetching a result (including time spent queuing), so we start
10121013
// our measurement here.

x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBytes.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.common.unit.ByteSizeValue;
1515
import org.elasticsearch.core.AbstractRefCounted;
1616
import org.elasticsearch.core.IOUtils;
17+
import org.elasticsearch.core.Streams;
1718
import org.elasticsearch.core.SuppressForbidden;
1819
import org.elasticsearch.env.Environment;
1920
import org.elasticsearch.env.NodeEnvironment;
@@ -184,6 +185,37 @@ public static void copyToCacheFileAligned(
184185
}
185186
}
186187

188+
/**
189+
* Copy all bytes from {@code input} to {@code fc}, only doing writes aligned along {@link #PAGE_SIZE}.
190+
*
191+
* @param fc output cache file reference
192+
* @param input stream to read from
193+
* @param fileChannelPos position in {@code fc} to write to
194+
* @param progressUpdater callback to invoke with the number of copied bytes as they are copied
195+
* @param buffer bytebuffer to use for writing
196+
* @return the number of bytes copied
197+
* @throws IOException on failure
198+
*/
199+
public static int copyToCacheFileAligned(IO fc, InputStream input, int fileChannelPos, IntConsumer progressUpdater, ByteBuffer buffer)
200+
throws IOException {
201+
int bytesCopied = 0;
202+
while (true) {
203+
final int bytesRead = Streams.read(input, buffer, buffer.remaining());
204+
if (bytesRead <= 0) {
205+
break;
206+
}
207+
if (buffer.hasRemaining()) {
208+
// ensure that last write is aligned on 4k boundaries (= page size)
209+
final int remainder = buffer.position() % PAGE_SIZE;
210+
final int adjustment = remainder == 0 ? 0 : PAGE_SIZE - remainder;
211+
buffer.position(buffer.position() + adjustment);
212+
}
213+
bytesCopied += positionalWrite(fc, fileChannelPos + bytesCopied, buffer);
214+
progressUpdater.accept(bytesCopied);
215+
}
216+
return bytesCopied;
217+
}
218+
187219
private static int positionalWrite(IO fc, int start, ByteBuffer byteBuffer) throws IOException {
188220
byteBuffer.flip();
189221
int written = fc.write(byteBuffer, start);

0 commit comments

Comments
 (0)