Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Make PositionDeleteIndex serializable #11463

Merged
merged 1 commit into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,23 @@
*/
package org.apache.iceberg.deletes;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Collection;
import java.util.List;
import java.util.function.LongConsumer;
import java.util.zip.CRC32;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

class BitmapPositionDeleteIndex implements PositionDeleteIndex {
private static final int LENGTH_SIZE_BYTES = 4;
private static final int MAGIC_NUMBER_SIZE_BYTES = 4;
private static final int CRC_SIZE_BYTES = 4;
private static final int BITMAP_DATA_OFFSET = 4;
private static final int MAGIC_NUMBER = 1681511377;

private final RoaringPositionBitmap bitmap;
private final List<DeleteFile> deleteFiles;

Expand All @@ -43,6 +53,11 @@ class BitmapPositionDeleteIndex implements PositionDeleteIndex {
this.deleteFiles = deleteFile != null ? Lists.newArrayList(deleteFile) : Lists.newArrayList();
}

BitmapPositionDeleteIndex(RoaringPositionBitmap bitmap, DeleteFile deleteFile) {
this.bitmap = bitmap;
this.deleteFiles = deleteFile != null ? Lists.newArrayList(deleteFile) : Lists.newArrayList();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a case where we can have a deserialized bitmap, but no delete file? Just wondering if we should have a separate constructor or if it makes sense to allow passing null for those cases.

Copy link
Contributor Author

@aokolnychyi aokolnychyi Nov 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. I have this check here to match the constructor above. There are some legacy code paths that create an index and pass null as DeleteFile. Given that it is a new constructor, we can probably require the file to be non-null, but it won't be consistent with the constructor above.

}

void merge(BitmapPositionDeleteIndex that) {
bitmap.setAll(that.bitmap);
deleteFiles.addAll(that.deleteFiles);
Expand Down Expand Up @@ -92,4 +107,113 @@ public Collection<DeleteFile> deleteFiles() {
public long cardinality() {
return bitmap.cardinality();
}

/**
* Serializes the index using the following format:
*
* <ul>
* <li>The length of the magic bytes and bitmap stored as 4 bytes (big-endian).
* <li>A 4-byte {@link #MAGIC_NUMBER} (little-endian).
* <li>The bitmap serialized using the portable Roaring spec (little-endian).
* <li>A CRC-32 checksum of the magic bytes and bitmap as 4-bytes (big-endian).
* </ul>
*
* Note that the length and the checksum are computed for the bitmap data, which includes the
* magic bytes and bitmap for compatibility with Delta.
*/
@Override
public ByteBuffer serialize() {
Copy link
Contributor Author

@aokolnychyi aokolnychyi Nov 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Puffin writers currently assume we will create and pass ByteBuffer. We also compute a checksum on serialized magic bytes and bitmap, so I am allocating a byte array upfront.

bitmap.runLengthEncode(); // run-length encode the bitmap before serializing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we more eagerly run length encode the bitmap in the constructor instead of at serialization time?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NVM, I don't think it'll make any difference and probably more readable the way it's written at the moment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually want to accumulate all values prior to applying run-length encoding, so doing this right before the serialization seems like a good place. We don't anticipate the bitmap to change after serialization.

int bitmapDataLength = computeBitmapDataLength(bitmap); // magic bytes + bitmap
byte[] bytes = new byte[LENGTH_SIZE_BYTES + bitmapDataLength + CRC_SIZE_BYTES];
ByteBuffer buffer = ByteBuffer.wrap(bytes);
buffer.putInt(bitmapDataLength);
serializeBitmapData(bytes, bitmapDataLength, bitmap);
int crcOffset = LENGTH_SIZE_BYTES + bitmapDataLength;
int crc = computeChecksum(bytes, bitmapDataLength);
buffer.putInt(crcOffset, crc);
buffer.rewind();
return buffer;
}

/**
* Deserializes the index from bytes, assuming the format described in {@link #serialize()}.
*
* @param bytes an array containing the serialized index
* @param deleteFile the DV file
* @return the deserialized index
*/
public static PositionDeleteIndex deserialize(byte[] bytes, DeleteFile deleteFile) {
ByteBuffer buffer = ByteBuffer.wrap(bytes);
int bitmapDataLength = readBitmapDataLength(buffer, deleteFile);
RoaringPositionBitmap bitmap = deserializeBitmap(bytes, bitmapDataLength, deleteFile);
int crc = computeChecksum(bytes, bitmapDataLength);
int crcOffset = LENGTH_SIZE_BYTES + bitmapDataLength;
int expectedCrc = buffer.getInt(crcOffset);
Preconditions.checkArgument(crc == expectedCrc, "Invalid CRC");
return new BitmapPositionDeleteIndex(bitmap, deleteFile);
}

// computes and validates the length of the bitmap data (magic bytes + bitmap)
private static int computeBitmapDataLength(RoaringPositionBitmap bitmap) {
long length = MAGIC_NUMBER_SIZE_BYTES + bitmap.serializedSizeInBytes();
long bufferSize = LENGTH_SIZE_BYTES + length + CRC_SIZE_BYTES;
Preconditions.checkState(bufferSize <= Integer.MAX_VALUE, "Can't serialize index > 2GB");
return (int) length;
}

// serializes the bitmap data (magic bytes + bitmap) using the little-endian byte order
private static void serializeBitmapData(
byte[] bytes, int bitmapDataLength, RoaringPositionBitmap bitmap) {
ByteBuffer bitmapData = pointToBitmapData(bytes, bitmapDataLength);
bitmapData.putInt(MAGIC_NUMBER);
bitmap.serialize(bitmapData);
}

// points to the bitmap data in the blob
private static ByteBuffer pointToBitmapData(byte[] bytes, int bitmapDataLength) {
ByteBuffer bitmapData = ByteBuffer.wrap(bytes, BITMAP_DATA_OFFSET, bitmapDataLength);
bitmapData.order(ByteOrder.LITTLE_ENDIAN);
return bitmapData;
}

// checks the blob size is equal to the bitmap data length + extra bytes for length and CRC
private static int readBitmapDataLength(ByteBuffer buffer, DeleteFile deleteFile) {
int length = buffer.getInt();
long expectedLength = deleteFile.contentSizeInBytes() - LENGTH_SIZE_BYTES - CRC_SIZE_BYTES;
Preconditions.checkArgument(
length == expectedLength,
"Invalid bitmap data length: %s, expected %s",
length,
expectedLength);
return length;
}

// validates magic bytes and deserializes the bitmap
private static RoaringPositionBitmap deserializeBitmap(
byte[] bytes, int bitmapDataLength, DeleteFile deleteFile) {
ByteBuffer bitmapData = pointToBitmapData(bytes, bitmapDataLength);
int magicNumber = bitmapData.getInt();
Preconditions.checkArgument(
magicNumber == MAGIC_NUMBER,
"Invalid magic number: %s, expected %s",
magicNumber,
MAGIC_NUMBER);
RoaringPositionBitmap bitmap = RoaringPositionBitmap.deserialize(bitmapData);
long cardinality = bitmap.cardinality();
long expectedCardinality = deleteFile.recordCount();
Comment on lines +203 to +204
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIt: I feel like we could inline bitMap.cardinality() == deleteFile.recordCount() in the check below and the intent of the check would still be clear

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did this for consistency with the method above. It shouldn't be expensive to call recordCount() twice, so I am OK changing it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me actually keep it to avoid one more round of CI.

Preconditions.checkArgument(
cardinality == expectedCardinality,
"Invalid cardinality: %s, expected %s",
cardinality,
expectedCardinality);
return bitmap;
}

// generates a 32-bit unsigned checksum for the magic bytes and serialized bitmap
private static int computeChecksum(byte[] bytes, int bitmapDataLength) {
CRC32 crc = new CRC32();
crc.update(bytes, BITMAP_DATA_OFFSET, bitmapDataLength);
return (int) crc.getValue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.deletes;

import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.function.LongConsumer;
import org.apache.iceberg.DeleteFile;
Expand Down Expand Up @@ -92,6 +93,26 @@ default long cardinality() {
throw new UnsupportedOperationException(getClass().getName() + " does not support cardinality");
}

/**
* Serializes this index.
*
* @return a buffer containing the serialized index
*/
default ByteBuffer serialize() {
throw new UnsupportedOperationException(getClass().getName() + " does not support serialize");
}

/**
* Deserializes a position delete index.
*
* @param bytes an array containing the serialized index
* @param deleteFile the delete file that the index is created for
* @return the deserialized index
*/
static PositionDeleteIndex deserialize(byte[] bytes, DeleteFile deleteFile) {
return BitmapPositionDeleteIndex.deserialize(bytes, deleteFile);
}

/** Returns an empty immutable position delete index. */
static PositionDeleteIndex empty() {
return EmptyPositionDeleteIndex.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,21 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.io.IOException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.io.Resources;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

public class TestBitmapPositionDeleteIndex {

private static final long BITMAP_OFFSET = 0xFFFFFFFFL + 1L;
private static final long CONTAINER_OFFSET = Character.MAX_VALUE + 1L;

@Test
public void testForEach() {
long pos1 = 10L; // Container 0 (high bits = 0)
Expand Down Expand Up @@ -105,6 +114,102 @@ public void testMergeBitmapIndexWithEmpty() {
assertThat(positions).containsExactly(pos1, pos2, pos3, pos4);
}

@Test
public void testEmptyIndexSerialization() throws Exception {
PositionDeleteIndex index = new BitmapPositionDeleteIndex();
validate(index, "empty-position-index.bin");
}

@Test
public void testSmallAlternatingValuesIndexSerialization() throws Exception {
PositionDeleteIndex index = new BitmapPositionDeleteIndex();
index.delete(1L);
index.delete(3L);
index.delete(5L);
index.delete(7L);
index.delete(9L);
validate(index, "small-alternating-values-position-index.bin");
}

@Test
public void testSmallAndLargeValuesIndexSerialization() throws Exception {
PositionDeleteIndex index = new BitmapPositionDeleteIndex();
index.delete(100L);
index.delete(101L);
index.delete(Integer.MAX_VALUE + 100L);
index.delete(Integer.MAX_VALUE + 101L);
validate(index, "small-and-large-values-position-index.bin");
}

@Test
public void testAllContainerTypesIndexSerialization() throws Exception {
PositionDeleteIndex index = new BitmapPositionDeleteIndex();

// bitmap 0, container 0 (array)
index.delete(position(0 /* bitmap */, 0 /* container */, 5L));
index.delete(position(0 /* bitmap */, 0 /* container */, 7L));

// bitmap 0, container 1 (array that can be compressed)
index.delete(
position(0 /* bitmap */, 1 /* container */, 1L),
position(0 /* bitmap */, 1 /* container */, 1000L));

// bitmap 1, container 2 (bitset)
index.delete(
position(0 /* bitmap */, 2 /* container */, 1L),
position(0 /* bitmap */, 2 /* container */, CONTAINER_OFFSET - 1L));

// bitmap 1, container 0 (array)
index.delete(position(1 /* bitmap */, 0 /* container */, 10L));
index.delete(position(1 /* bitmap */, 0 /* container */, 20L));

// bitmap 1, container 1 (array that can be compressed)
index.delete(
position(1 /* bitmap */, 1 /* container */, 10L),
position(1 /* bitmap */, 1 /* container */, 500L));

// bitmap 1, container 2 (bitset)
index.delete(
position(1 /* bitmap */, 2 /* container */, 1L),
position(1 /* bitmap */, 2 /* container */, CONTAINER_OFFSET - 1));

validate(index, "all-container-types-position-index.bin");
}

private static void validate(PositionDeleteIndex index, String goldenFile) throws Exception {
ByteBuffer buffer = index.serialize();
byte[] bytes = buffer.array();
DeleteFile dv = mockDV(bytes.length, index.cardinality());
PositionDeleteIndex indexCopy = PositionDeleteIndex.deserialize(bytes, dv);
assertEqual(index, indexCopy);
byte[] goldenBytes = readTestResource(goldenFile);
assertThat(bytes).isEqualTo(goldenBytes);
PositionDeleteIndex goldenIndex = PositionDeleteIndex.deserialize(goldenBytes, dv);
assertEqual(index, goldenIndex);
}

private static DeleteFile mockDV(long contentSize, long cardinality) {
DeleteFile mock = Mockito.mock(DeleteFile.class);
Mockito.when(mock.contentSizeInBytes()).thenReturn(contentSize);
Mockito.when(mock.recordCount()).thenReturn(cardinality);
return mock;
}

private static void assertEqual(PositionDeleteIndex index, PositionDeleteIndex thatIndex) {
assertThat(index.cardinality()).isEqualTo(thatIndex.cardinality());
index.forEach(position -> assertThat(thatIndex.isDeleted(position)).isTrue());
thatIndex.forEach(position -> assertThat(index.isDeleted(position)).isTrue());
}

private static long position(int bitmapIndex, int containerIndex, long value) {
return bitmapIndex * BITMAP_OFFSET + containerIndex * CONTAINER_OFFSET + value;
}

private static byte[] readTestResource(String resourceName) throws IOException {
URL resource = Resources.getResource(TestRoaringPositionBitmap.class, resourceName);
return Resources.toByteArray(resource);
}

private List<Long> collect(PositionDeleteIndex index) {
List<Long> positions = Lists.newArrayList();
index.forEach(positions::add);
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.