Skip to content

Commit

Permalink
Flink: switch to FileScanTaskParser for JSON serialization of Iceberg…
Browse files Browse the repository at this point in the history
…SourceSplit (#7978)
  • Loading branch information
stevenzwu authored Jul 8, 2023
1 parent 3a9516f commit 623b9b7
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,12 @@ public SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> restoreEnumer

@Override
public SimpleVersionedSerializer<IcebergSourceSplit> getSplitSerializer() {
return IcebergSourceSplitSerializer.INSTANCE;
return new IcebergSourceSplitSerializer(scanContext.caseSensitive());
}

@Override
public SimpleVersionedSerializer<IcebergEnumeratorState> getEnumeratorCheckpointSerializer() {
return IcebergEnumeratorStateSerializer.INSTANCE;
return new IcebergEnumeratorStateSerializer(scanContext.caseSensitive());
}

private SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> createEnumerator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,18 @@
public class IcebergEnumeratorStateSerializer
implements SimpleVersionedSerializer<IcebergEnumeratorState> {

public static final IcebergEnumeratorStateSerializer INSTANCE =
new IcebergEnumeratorStateSerializer();

private static final int VERSION = 2;

private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
ThreadLocal.withInitial(() -> new DataOutputSerializer(1024));

private final IcebergEnumeratorPositionSerializer positionSerializer =
IcebergEnumeratorPositionSerializer.INSTANCE;
private final IcebergSourceSplitSerializer splitSerializer =
IcebergSourceSplitSerializer.INSTANCE;
private final IcebergSourceSplitSerializer splitSerializer;

public IcebergEnumeratorStateSerializer(boolean caseSensitive) {
this.splitSerializer = new IcebergSourceSplitSerializer(caseSensitive);
}

@Override
public int getVersion() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,28 @@
import java.io.IOException;
import java.io.Serializable;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.util.InstantiationUtil;
import org.apache.iceberg.BaseCombinedScanTask;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.FileScanTaskParser;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

@Internal
public class IcebergSourceSplit implements SourceSplit, Serializable {
private static final long serialVersionUID = 1L;
private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
ThreadLocal.withInitial(() -> new DataOutputSerializer(1024));

private final CombinedScanTask task;

Expand Down Expand Up @@ -109,6 +118,7 @@ byte[] serializeV1() throws IOException {
if (serializedBytesCache == null) {
serializedBytesCache = InstantiationUtil.serializeObject(this);
}

return serializedBytesCache;
}

Expand All @@ -120,4 +130,48 @@ static IcebergSourceSplit deserializeV1(byte[] serialized) throws IOException {
throw new RuntimeException("Failed to deserialize the split.", e);
}
}

byte[] serializeV2() throws IOException {
if (serializedBytesCache == null) {
DataOutputSerializer out = SERIALIZER_CACHE.get();
Collection<FileScanTask> fileScanTasks = task.tasks();
Preconditions.checkArgument(
fileOffset >= 0 && fileOffset < fileScanTasks.size(),
"Invalid file offset: %s. Should be within the range of [0, %s)",
fileOffset,
fileScanTasks.size());

out.writeInt(fileOffset);
out.writeLong(recordOffset);
out.writeInt(fileScanTasks.size());

for (FileScanTask fileScanTask : fileScanTasks) {
String taskJson = FileScanTaskParser.toJson(fileScanTask);
out.writeUTF(taskJson);
}

serializedBytesCache = out.getCopyOfBuffer();
out.clear();
}

return serializedBytesCache;
}

static IcebergSourceSplit deserializeV2(byte[] serialized, boolean caseSensitive)
throws IOException {
DataInputDeserializer in = new DataInputDeserializer(serialized);
int fileOffset = in.readInt();
long recordOffset = in.readLong();
int taskCount = in.readInt();

List<FileScanTask> tasks = Lists.newArrayListWithCapacity(taskCount);
for (int i = 0; i < taskCount; ++i) {
String taskJson = in.readUTF();
FileScanTask task = FileScanTaskParser.fromJson(taskJson, caseSensitive);
tasks.add(task);
}

CombinedScanTask combinedScanTask = new BaseCombinedScanTask(tasks);
return IcebergSourceSplit.fromCombinedScanTask(combinedScanTask, fileOffset, recordOffset);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.io.SimpleVersionedSerializer;

/**
* TODO: use Java serialization for now. Will switch to more stable serializer from <a
* href="https://github.com/apache/iceberg/issues/1698">issue-1698</a>.
*/
@Internal
public class IcebergSourceSplitSerializer implements SimpleVersionedSerializer<IcebergSourceSplit> {
public static final IcebergSourceSplitSerializer INSTANCE = new IcebergSourceSplitSerializer();
private static final int VERSION = 1;
private static final int VERSION = 2;

private final boolean caseSensitive;

public IcebergSourceSplitSerializer(boolean caseSensitive) {
this.caseSensitive = caseSensitive;
}

@Override
public int getVersion() {
Expand All @@ -38,14 +39,16 @@ public int getVersion() {

@Override
public byte[] serialize(IcebergSourceSplit split) throws IOException {
return split.serializeV1();
return split.serializeV2();
}

@Override
public IcebergSourceSplit deserialize(int version, byte[] serialized) throws IOException {
switch (version) {
case 1:
return IcebergSourceSplit.deserializeV1(serialized);
case 2:
return IcebergSourceSplit.deserializeV2(serialized, caseSensitive);
default:
throw new IOException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class TestIcebergEnumeratorStateSerializer {
@ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

private final IcebergEnumeratorStateSerializer serializer =
IcebergEnumeratorStateSerializer.INSTANCE;
new IcebergEnumeratorStateSerializer(true);

protected final int version;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.flink.source.SplitHelpers;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
Expand All @@ -31,7 +33,7 @@ public class TestIcebergSourceSplitSerializer {

@ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

private final IcebergSourceSplitSerializer serializer = IcebergSourceSplitSerializer.INSTANCE;
private final IcebergSourceSplitSerializer serializer = new IcebergSourceSplitSerializer(true);

@Test
public void testLatestVersion() throws Exception {
Expand Down Expand Up @@ -81,6 +83,34 @@ private void serializeAndDeserializeV1(int splitCount, int filesPerSplit) throws
}
}

@Test
public void testV2() throws Exception {
serializeAndDeserializeV2(1, 1);
serializeAndDeserializeV2(10, 2);
}

private void serializeAndDeserializeV2(int splitCount, int filesPerSplit) throws Exception {
final List<IcebergSourceSplit> splits =
SplitHelpers.createSplitsFromTransientHadoopTable(
TEMPORARY_FOLDER, splitCount, filesPerSplit);
for (IcebergSourceSplit split : splits) {
byte[] result = split.serializeV2();
IcebergSourceSplit deserialized = IcebergSourceSplit.deserializeV2(result, true);
assertSplitEquals(split, deserialized);
}
}

@Test
public void testDeserializeV1() throws Exception {
final List<IcebergSourceSplit> splits =
SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
for (IcebergSourceSplit split : splits) {
byte[] result = split.serializeV1();
IcebergSourceSplit deserialized = serializer.deserialize(1, result);
assertSplitEquals(split, deserialized);
}
}

@Test
public void testCheckpointedPosition() throws Exception {
final AtomicInteger index = new AtomicInteger();
Expand All @@ -90,9 +120,7 @@ public void testCheckpointedPosition() throws Exception {
split -> {
IcebergSourceSplit result;
if (index.get() % 2 == 0) {
result =
IcebergSourceSplit.fromCombinedScanTask(
split.task(), index.get(), index.get());
result = IcebergSourceSplit.fromCombinedScanTask(split.task(), 1, 1);
} else {
result = split;
}
Expand All @@ -115,7 +143,19 @@ public void testCheckpointedPosition() throws Exception {
}

private void assertSplitEquals(IcebergSourceSplit expected, IcebergSourceSplit actual) {
Assert.assertEquals(expected.splitId(), actual.splitId());
List<FileScanTask> expectedTasks = Lists.newArrayList(expected.task().tasks().iterator());
List<FileScanTask> actualTasks = Lists.newArrayList(actual.task().tasks().iterator());
Assert.assertEquals(expectedTasks.size(), actualTasks.size());
for (int i = 0; i < expectedTasks.size(); ++i) {
FileScanTask expectedTask = expectedTasks.get(i);
FileScanTask actualTask = actualTasks.get(i);
Assert.assertEquals(expectedTask.file().path(), actualTask.file().path());
Assert.assertEquals(expectedTask.sizeBytes(), actualTask.sizeBytes());
Assert.assertEquals(expectedTask.filesCount(), actualTask.filesCount());
Assert.assertEquals(expectedTask.start(), actualTask.start());
Assert.assertEquals(expectedTask.length(), actualTask.length());
}

Assert.assertEquals(expected.fileOffset(), actual.fileOffset());
Assert.assertEquals(expected.recordOffset(), actual.recordOffset());
}
Expand Down

0 comments on commit 623b9b7

Please sign in to comment.