Skip to content

Commit

Permalink
Copy directly between 2 ByteBlockPool to avoid double-copy
Browse files Browse the repository at this point in the history
  • Loading branch information
dungba88 committed Nov 8, 2023
1 parent ddb01ca commit 79edd58
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 27 deletions.
41 changes: 41 additions & 0 deletions lucene/core/src/java/org/apache/lucene/util/ByteBlockPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,46 @@ public void append(final BytesRef bytes) {
append(bytes.bytes, bytes.offset, bytes.length);
}

/**
* Append the bytes from a source {@link ByteBlockPool} at a given offset and length
*
* @param srcPool the source pool to copy from
* @param srcOffset the source pool offset
* @param length the number of bytes to copy
*/
public void append(ByteBlockPool srcPool, long srcOffset, int length) {
int bytesLeft = length;
while (bytesLeft > 0) {
int bufferLeft = BYTE_BLOCK_SIZE - byteUpto;
if (bytesLeft < bufferLeft) { // fits within current buffer
appendBytesSingleBuffer(srcPool, srcOffset, bytesLeft);
break;
} else { // fill up this buffer and move to next one
if (bufferLeft > 0) {
appendBytesSingleBuffer(srcPool, srcOffset, bufferLeft);
bytesLeft -= bufferLeft;
srcOffset += bufferLeft;
}
nextBuffer();
}
}
}

// copy from source pool until no bytes left. length must be fit within the current head buffer
private void appendBytesSingleBuffer(ByteBlockPool srcPool, long srcOffset, int length) {
assert length <= BYTE_BLOCK_SIZE - byteUpto;
// doing a loop as the bytes to copy might span across multiple byte[] in srcPool
while (length > 0) {
byte[] srcBytes = srcPool.buffers[Math.toIntExact(srcOffset >> BYTE_BLOCK_SHIFT)];
int srcPos = Math.toIntExact(srcOffset & BYTE_BLOCK_MASK);
int bytesToCopy = Math.min(length, BYTE_BLOCK_SIZE - srcPos);
System.arraycopy(srcBytes, srcPos, buffer, byteUpto, bytesToCopy);
length -= bytesToCopy;
srcOffset += bytesToCopy;
byteUpto += bytesToCopy;
}
}

/**
* Append the provided byte array at the current position.
*
Expand Down Expand Up @@ -283,6 +323,7 @@ public void readBytes(final long offset, final byte[] bytes, int bytesOffset, in
int pos = (int) (offset & BYTE_BLOCK_MASK);
while (bytesLeft > 0) {
byte[] buffer = buffers[bufferIndex++];
assert buffer != null;
int chunk = Math.min(bytesLeft, BYTE_BLOCK_SIZE - pos);
System.arraycopy(buffer, pos, bytes, bytesOffset, chunk);
bytesOffset += chunk;
Expand Down
47 changes: 23 additions & 24 deletions lucene/core/src/java/org/apache/lucene/util/fst/NodeHash.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,9 @@ public long add(FSTCompiler.UnCompiledNode<T> nodeIn) throws IOException {
assert lastFallbackHashSlot != -1 && lastFallbackNodeLength != -1;

// it was already in fallback -- promote to primary
// TODO: Copy directly between 2 ByteBlockPool to avoid double-copy
primaryTable.setNode(
hashSlot,
nodeAddress,
fallbackTable.getBytes(lastFallbackHashSlot, lastFallbackNodeLength));
primaryTable.setNodeAddress(hashSlot, nodeAddress);
primaryTable.copyFallbackNodeBytes(
hashSlot, fallbackTable, lastFallbackHashSlot, lastFallbackNodeLength);
} else {
// not in fallback either -- freeze & add the incoming node

Expand All @@ -142,7 +140,8 @@ public long add(FSTCompiler.UnCompiledNode<T> nodeIn) throws IOException {
byte[] buf = new byte[Math.toIntExact(nodeAddress - startAddress + 1)];
fstCompiler.bytes.copyBytes(startAddress, buf, 0, buf.length);

primaryTable.setNode(hashSlot, nodeAddress, buf);
primaryTable.setNodeAddress(hashSlot, nodeAddress);
primaryTable.copyNodeBytes(hashSlot, buf);

// confirm frozen hash and unfrozen hash are the same
assert primaryTable.hash(nodeAddress, hashSlot) == hash
Expand Down Expand Up @@ -263,21 +262,6 @@ public PagedGrowableHash(long lastNodeAddress, long size) {
bytesReader = new ByteBlockPoolReverseBytesReader(copiedNodes);
}

/**
* Get the copied bytes at the provided hash slot
*
* @param hashSlot the hash slot to read from
* @param length the number of bytes to read
* @return the copied byte array
*/
public byte[] getBytes(long hashSlot, int length) {
long address = copiedNodeAddress.get(hashSlot);
assert address - length + 1 >= 0;
byte[] buf = new byte[length];
copiedNodes.readBytes(address - length + 1, buf, 0, length);
return buf;
}

/**
* Get the node address from the provided hash slot
*
Expand All @@ -289,21 +273,36 @@ public long getNodeAddress(long hashSlot) {
}

/**
* Set the node address and bytes from the provided hash slot
* Set the node address from the provided hash slot
*
* @param hashSlot the hash slot to write to
* @param nodeAddress the node address
* @param bytes the node bytes to be copied
*/
public void setNode(long hashSlot, long nodeAddress, byte[] bytes) {
public void setNodeAddress(long hashSlot, long nodeAddress) {
assert fstNodeAddress.get(hashSlot) == 0;
fstNodeAddress.set(hashSlot, nodeAddress);
count++;
}

/** copy the node bytes from the FST */
private void copyNodeBytes(long hashSlot, byte[] bytes) {
assert copiedNodeAddress.get(hashSlot) == 0;
copiedNodes.append(bytes);
// write the offset, which points to the last byte of the node we copied since we later read
// this node in reverse
copiedNodeAddress.set(hashSlot, copiedNodes.getPosition() - 1);
}

/** promote the node bytes from the fallback table */
private void copyFallbackNodeBytes(
long hashSlot, PagedGrowableHash fallbackTable, long fallbackHashSlot, int nodeLength) {
assert copiedNodeAddress.get(hashSlot) == 0;
long fallbackAddress = fallbackTable.copiedNodeAddress.get(fallbackHashSlot);
long fallbackStartAddress = fallbackAddress - nodeLength + 1;
assert fallbackStartAddress >= 0;
copiedNodes.append(fallbackTable.copiedNodes, fallbackStartAddress, nodeLength);
// write the offset, which points to the last byte of the node we copied since we later read
// this node in reverse
copiedNodeAddress.set(hashSlot, copiedNodes.getPosition() - 1);
}

Expand Down
36 changes: 33 additions & 3 deletions lucene/core/src/test/org/apache/lucene/util/TestByteBlockPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,46 @@
*/
package org.apache.lucene.util;

import java.io.IOException;
import com.carrotsearch.randomizedtesting.generators.RandomBytes;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.apache.lucene.tests.util.TestUtil;

public class TestByteBlockPool extends LuceneTestCase {

public void testReadAndWrite() throws IOException {
public void testAppendFromOtherPool() {
Random random = random();

ByteBlockPool pool = new ByteBlockPool(new ByteBlockPool.DirectAllocator());
final int numBytes = atLeast(2 << 16);
byte[] bytes = RandomBytes.randomBytesOfLength(random, numBytes);
pool.append(bytes);

ByteBlockPool anotherPool = new ByteBlockPool(new ByteBlockPool.DirectAllocator());
byte[] existingBytes = new byte[atLeast(500)];
anotherPool.append(existingBytes);

// now slice and append to another pool
int offset = TestUtil.nextInt(random, 1, 2 << 15);
int length = bytes.length - offset;
if (random.nextBoolean()) {
length = TestUtil.nextInt(random, 1, length);
}
anotherPool.append(pool, offset, length);

assertEquals(existingBytes.length + length, anotherPool.getPosition());

byte[] results = new byte[length];
anotherPool.readBytes(existingBytes.length, results, 0, results.length);
for (int i = 0; i < length; i++) {
assertEquals("byte @ index=" + i, bytes[offset + i], results[i]);
}
}

public void testReadAndWrite() {
Counter bytesUsed = Counter.newCounter();
ByteBlockPool pool = new ByteBlockPool(new ByteBlockPool.DirectTrackingAllocator(bytesUsed));
pool.nextBuffer();
Expand Down Expand Up @@ -74,7 +104,7 @@ public void testReadAndWrite() throws IOException {
}
}

public void testLargeRandomBlocks() throws IOException {
public void testLargeRandomBlocks() {
Counter bytesUsed = Counter.newCounter();
ByteBlockPool pool = new ByteBlockPool(new ByteBlockPool.DirectTrackingAllocator(bytesUsed));
pool.nextBuffer();
Expand Down

0 comments on commit 79edd58

Please sign in to comment.