From 623b9b7238fed229853049b132fe06c0a6665c20 Mon Sep 17 00:00:00 2001 From: Steven Zhen Wu Date: Fri, 7 Jul 2023 21:44:54 -0700 Subject: [PATCH] Flink: switch to FileScanTaskParser for JSON serialization of IcebergSourceSplit (#7978) --- .../iceberg/flink/source/IcebergSource.java | 4 +- .../IcebergEnumeratorStateSerializer.java | 10 ++-- .../source/split/IcebergSourceSplit.java | 54 +++++++++++++++++++ .../split/IcebergSourceSplitSerializer.java | 17 +++--- .../TestIcebergEnumeratorStateSerializer.java | 2 +- .../TestIcebergSourceSplitSerializer.java | 50 +++++++++++++++-- 6 files changed, 117 insertions(+), 20 deletions(-) diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java index d3859452a284..f85f6277263b 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java @@ -169,12 +169,12 @@ public SplitEnumerator restoreEnumer @Override public SimpleVersionedSerializer getSplitSerializer() { - return IcebergSourceSplitSerializer.INSTANCE; + return new IcebergSourceSplitSerializer(scanContext.caseSensitive()); } @Override public SimpleVersionedSerializer getEnumeratorCheckpointSerializer() { - return IcebergEnumeratorStateSerializer.INSTANCE; + return new IcebergEnumeratorStateSerializer(scanContext.caseSensitive()); } private SplitEnumerator createEnumerator( diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorStateSerializer.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorStateSerializer.java index 9998bee99fef..95d6db2cfbc4 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorStateSerializer.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorStateSerializer.java @@ -35,9 +35,6 @@ public class IcebergEnumeratorStateSerializer implements SimpleVersionedSerializer { - public static final IcebergEnumeratorStateSerializer INSTANCE = - new IcebergEnumeratorStateSerializer(); - private static final int VERSION = 2; private static final ThreadLocal SERIALIZER_CACHE = @@ -45,8 +42,11 @@ public class IcebergEnumeratorStateSerializer private final IcebergEnumeratorPositionSerializer positionSerializer = IcebergEnumeratorPositionSerializer.INSTANCE; - private final IcebergSourceSplitSerializer splitSerializer = - IcebergSourceSplitSerializer.INSTANCE; + private final IcebergSourceSplitSerializer splitSerializer; + + public IcebergEnumeratorStateSerializer(boolean caseSensitive) { + this.splitSerializer = new IcebergSourceSplitSerializer(caseSensitive); + } @Override public int getVersion() { diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java index 35f8ade9843d..e4bfbf1452e2 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java @@ -21,19 +21,28 @@ import java.io.IOException; import java.io.Serializable; import java.util.Collection; +import java.util.List; import java.util.stream.Collectors; import javax.annotation.Nullable; import org.apache.flink.annotation.Internal; import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.util.InstantiationUtil; +import org.apache.iceberg.BaseCombinedScanTask; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.FileScanTaskParser; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; @Internal public class IcebergSourceSplit implements SourceSplit, Serializable { private static final long serialVersionUID = 1L; + private static final ThreadLocal SERIALIZER_CACHE = + ThreadLocal.withInitial(() -> new DataOutputSerializer(1024)); private final CombinedScanTask task; @@ -109,6 +118,7 @@ byte[] serializeV1() throws IOException { if (serializedBytesCache == null) { serializedBytesCache = InstantiationUtil.serializeObject(this); } + return serializedBytesCache; } @@ -120,4 +130,48 @@ static IcebergSourceSplit deserializeV1(byte[] serialized) throws IOException { throw new RuntimeException("Failed to deserialize the split.", e); } } + + byte[] serializeV2() throws IOException { + if (serializedBytesCache == null) { + DataOutputSerializer out = SERIALIZER_CACHE.get(); + Collection fileScanTasks = task.tasks(); + Preconditions.checkArgument( + fileOffset >= 0 && fileOffset < fileScanTasks.size(), + "Invalid file offset: %s. Should be within the range of [0, %s)", + fileOffset, + fileScanTasks.size()); + + out.writeInt(fileOffset); + out.writeLong(recordOffset); + out.writeInt(fileScanTasks.size()); + + for (FileScanTask fileScanTask : fileScanTasks) { + String taskJson = FileScanTaskParser.toJson(fileScanTask); + out.writeUTF(taskJson); + } + + serializedBytesCache = out.getCopyOfBuffer(); + out.clear(); + } + + return serializedBytesCache; + } + + static IcebergSourceSplit deserializeV2(byte[] serialized, boolean caseSensitive) + throws IOException { + DataInputDeserializer in = new DataInputDeserializer(serialized); + int fileOffset = in.readInt(); + long recordOffset = in.readLong(); + int taskCount = in.readInt(); + + List tasks = Lists.newArrayListWithCapacity(taskCount); + for (int i = 0; i < taskCount; ++i) { + String taskJson = in.readUTF(); + FileScanTask task = FileScanTaskParser.fromJson(taskJson, caseSensitive); + tasks.add(task); + } + + CombinedScanTask combinedScanTask = new BaseCombinedScanTask(tasks); + return IcebergSourceSplit.fromCombinedScanTask(combinedScanTask, fileOffset, recordOffset); + } } diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplitSerializer.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplitSerializer.java index ee0f364e17d6..8c089819e731 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplitSerializer.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplitSerializer.java @@ -22,14 +22,15 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.core.io.SimpleVersionedSerializer; -/** - * TODO: use Java serialization for now. Will switch to more stable serializer from issue-1698. - */ @Internal public class IcebergSourceSplitSerializer implements SimpleVersionedSerializer { - public static final IcebergSourceSplitSerializer INSTANCE = new IcebergSourceSplitSerializer(); - private static final int VERSION = 1; + private static final int VERSION = 2; + + private final boolean caseSensitive; + + public IcebergSourceSplitSerializer(boolean caseSensitive) { + this.caseSensitive = caseSensitive; + } @Override public int getVersion() { @@ -38,7 +39,7 @@ public int getVersion() { @Override public byte[] serialize(IcebergSourceSplit split) throws IOException { - return split.serializeV1(); + return split.serializeV2(); } @Override @@ -46,6 +47,8 @@ public IcebergSourceSplit deserialize(int version, byte[] serialized) throws IOE switch (version) { case 1: return IcebergSourceSplit.deserializeV1(serialized); + case 2: + return IcebergSourceSplit.deserializeV2(serialized, caseSensitive); default: throw new IOException( String.format( diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestIcebergEnumeratorStateSerializer.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestIcebergEnumeratorStateSerializer.java index 0082e25add70..1d12d9f66a8a 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestIcebergEnumeratorStateSerializer.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestIcebergEnumeratorStateSerializer.java @@ -40,7 +40,7 @@ public class TestIcebergEnumeratorStateSerializer { @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); private final IcebergEnumeratorStateSerializer serializer = - IcebergEnumeratorStateSerializer.INSTANCE; + new IcebergEnumeratorStateSerializer(true); protected final int version; diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/split/TestIcebergSourceSplitSerializer.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/split/TestIcebergSourceSplitSerializer.java index 046b0c31ce2e..cd778309f90d 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/split/TestIcebergSourceSplitSerializer.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/split/TestIcebergSourceSplitSerializer.java @@ -21,7 +21,9 @@ import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import org.apache.iceberg.FileScanTask; import org.apache.iceberg.flink.source.SplitHelpers; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.junit.Assert; import org.junit.ClassRule; import org.junit.Test; @@ -31,7 +33,7 @@ public class TestIcebergSourceSplitSerializer { @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); - private final IcebergSourceSplitSerializer serializer = IcebergSourceSplitSerializer.INSTANCE; + private final IcebergSourceSplitSerializer serializer = new IcebergSourceSplitSerializer(true); @Test public void testLatestVersion() throws Exception { @@ -81,6 +83,34 @@ private void serializeAndDeserializeV1(int splitCount, int filesPerSplit) throws } } + @Test + public void testV2() throws Exception { + serializeAndDeserializeV2(1, 1); + serializeAndDeserializeV2(10, 2); + } + + private void serializeAndDeserializeV2(int splitCount, int filesPerSplit) throws Exception { + final List splits = + SplitHelpers.createSplitsFromTransientHadoopTable( + TEMPORARY_FOLDER, splitCount, filesPerSplit); + for (IcebergSourceSplit split : splits) { + byte[] result = split.serializeV2(); + IcebergSourceSplit deserialized = IcebergSourceSplit.deserializeV2(result, true); + assertSplitEquals(split, deserialized); + } + } + + @Test + public void testDeserializeV1() throws Exception { + final List splits = + SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1); + for (IcebergSourceSplit split : splits) { + byte[] result = split.serializeV1(); + IcebergSourceSplit deserialized = serializer.deserialize(1, result); + assertSplitEquals(split, deserialized); + } + } + @Test public void testCheckpointedPosition() throws Exception { final AtomicInteger index = new AtomicInteger(); @@ -90,9 +120,7 @@ public void testCheckpointedPosition() throws Exception { split -> { IcebergSourceSplit result; if (index.get() % 2 == 0) { - result = - IcebergSourceSplit.fromCombinedScanTask( - split.task(), index.get(), index.get()); + result = IcebergSourceSplit.fromCombinedScanTask(split.task(), 1, 1); } else { result = split; } @@ -115,7 +143,19 @@ public void testCheckpointedPosition() throws Exception { } private void assertSplitEquals(IcebergSourceSplit expected, IcebergSourceSplit actual) { - Assert.assertEquals(expected.splitId(), actual.splitId()); + List expectedTasks = Lists.newArrayList(expected.task().tasks().iterator()); + List actualTasks = Lists.newArrayList(actual.task().tasks().iterator()); + Assert.assertEquals(expectedTasks.size(), actualTasks.size()); + for (int i = 0; i < expectedTasks.size(); ++i) { + FileScanTask expectedTask = expectedTasks.get(i); + FileScanTask actualTask = actualTasks.get(i); + Assert.assertEquals(expectedTask.file().path(), actualTask.file().path()); + Assert.assertEquals(expectedTask.sizeBytes(), actualTask.sizeBytes()); + Assert.assertEquals(expectedTask.filesCount(), actualTask.filesCount()); + Assert.assertEquals(expectedTask.start(), actualTask.start()); + Assert.assertEquals(expectedTask.length(), actualTask.length()); + } + Assert.assertEquals(expected.fileOffset(), actual.fileOffset()); Assert.assertEquals(expected.recordOffset(), actual.recordOffset()); }