diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java index 2256d1e874ce..5cd43a46de37 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java @@ -67,7 +67,6 @@ import org.apache.iceberg.flink.sink.shuffle.StatisticsOrRecord; import org.apache.iceberg.flink.sink.shuffle.StatisticsType; import org.apache.iceberg.flink.util.FlinkCompatibilityUtil; -import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -420,7 +419,7 @@ private DataStreamSink chainIcebergOperators() { distributeDataStream(rowDataInput, equalityFieldIds, flinkRowType, writerParallelism); // Add parallel writers that append rows to files - SingleOutputStreamOperator writerStream = + SingleOutputStreamOperator writerStream = appendWriter(distributeStream, flinkRowType, equalityFieldIds, writerParallelism); // Add single-parallelism committer that commits files @@ -487,7 +486,7 @@ private DataStreamSink appendDummySink( } private SingleOutputStreamOperator appendCommitter( - SingleOutputStreamOperator writerStream) { + SingleOutputStreamOperator writerStream) { IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter( tableLoader, @@ -507,7 +506,7 @@ private SingleOutputStreamOperator appendCommitter( return committerStream; } - private SingleOutputStreamOperator appendWriter( + private SingleOutputStreamOperator appendWriter( DataStream input, RowType flinkRowType, List equalityFieldIds, @@ -545,11 +544,11 @@ private SingleOutputStreamOperator appendWriter( IcebergStreamWriter streamWriter = createStreamWriter(tableSupplier, flinkWriteConf, flinkRowType, equalityFieldIds); - SingleOutputStreamOperator writerStream = + SingleOutputStreamOperator writerStream = input .transform( operatorName(ICEBERG_STREAM_WRITER_NAME), - TypeInformation.of(WriteResult.class), + TypeInformation.of(FlinkWriteResult.class), streamWriter) .setParallelism(writerParallelism); if (uidPrefix != null) { diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkWriteResult.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkWriteResult.java new file mode 100644 index 000000000000..317fb169ae1b --- /dev/null +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkWriteResult.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import java.io.Serializable; +import org.apache.iceberg.io.WriteResult; + +public class FlinkWriteResult implements Serializable { + private final long checkpointId; + private final WriteResult writeResult; + + public FlinkWriteResult(long checkpointId, WriteResult writeResult) { + this.checkpointId = checkpointId; + this.writeResult = writeResult; + } + + public long checkpointId() { + return checkpointId; + } + + public WriteResult writeResult() { + return writeResult; + } +} diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java index b9bceaa9311d..7108c2008341 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java @@ -63,7 +63,7 @@ import org.slf4j.LoggerFactory; class IcebergFilesCommitter extends AbstractStreamOperator - implements OneInputStreamOperator, BoundedOneInput { + implements OneInputStreamOperator, BoundedOneInput { private static final long serialVersionUID = 1L; private static final long INITIAL_CHECKPOINT_ID = -1L; @@ -96,7 +96,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator // The completed files cache for current checkpoint. Once the snapshot barrier received, it will // be flushed to the 'dataFilesPerCheckpoint'. - private final List writeResultsOfCurrentCkpt = Lists.newArrayList(); + private final Map> writeResultsSinceLastSnapshot = Maps.newHashMap(); private final String branch; // It will have an unique identifier for one job. @@ -212,7 +212,8 @@ public void snapshotState(StateSnapshotContext context) throws Exception { // Update the checkpoint state. long startNano = System.nanoTime(); - dataFilesPerCheckpoint.put(checkpointId, writeToManifest(checkpointId)); + writeToManifestUptoLatestCheckpoint(checkpointId); + // Reset the snapshot state to the latest state. checkpointsState.clear(); checkpointsState.add(dataFilesPerCheckpoint); @@ -220,8 +221,6 @@ public void snapshotState(StateSnapshotContext context) throws Exception { jobIdState.clear(); jobIdState.add(flinkJobId); - // Clear the local buffer for current checkpoint. - writeResultsOfCurrentCkpt.clear(); committerMetrics.checkpointDuration( TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNano)); } @@ -426,30 +425,45 @@ private void commitOperation( } @Override - public void processElement(StreamRecord element) { - this.writeResultsOfCurrentCkpt.add(element.getValue()); + public void processElement(StreamRecord element) { + FlinkWriteResult flinkWriteResult = element.getValue(); + List writeResults = + writeResultsSinceLastSnapshot.computeIfAbsent( + flinkWriteResult.checkpointId(), k -> Lists.newArrayList()); + writeResults.add(flinkWriteResult.writeResult()); } @Override public void endInput() throws IOException { // Flush the buffered data files into 'dataFilesPerCheckpoint' firstly. - long currentCheckpointId = Long.MAX_VALUE; - dataFilesPerCheckpoint.put(currentCheckpointId, writeToManifest(currentCheckpointId)); - writeResultsOfCurrentCkpt.clear(); - + long currentCheckpointId = IcebergStreamWriter.END_INPUT_CHECKPOINT_ID; + writeToManifestUptoLatestCheckpoint(currentCheckpointId); commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, operatorUniqueId, currentCheckpointId); } + private void writeToManifestUptoLatestCheckpoint(long checkpointId) throws IOException { + if (!writeResultsSinceLastSnapshot.containsKey(checkpointId)) { + dataFilesPerCheckpoint.put(checkpointId, EMPTY_MANIFEST_DATA); + } + + for (Map.Entry> writeResultsOfCheckpoint : + writeResultsSinceLastSnapshot.entrySet()) { + dataFilesPerCheckpoint.put( + writeResultsOfCheckpoint.getKey(), + writeToManifest(writeResultsOfCheckpoint.getKey(), writeResultsOfCheckpoint.getValue())); + } + + // Clear the local buffer for current checkpoint. + writeResultsSinceLastSnapshot.clear(); + } + /** * Write all the complete data files to a newly created manifest file and return the manifest's * avro serialized bytes. */ - private byte[] writeToManifest(long checkpointId) throws IOException { - if (writeResultsOfCurrentCkpt.isEmpty()) { - return EMPTY_MANIFEST_DATA; - } - - WriteResult result = WriteResult.builder().addAll(writeResultsOfCurrentCkpt).build(); + private byte[] writeToManifest(long checkpointId, List writeResults) + throws IOException { + WriteResult result = WriteResult.builder().addAll(writeResults).build(); DeltaManifests deltaManifests = FlinkManifestUtil.writeCompletedFiles( result, () -> manifestOutputFileFactory.create(checkpointId), spec); diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java index 9ea0349fb057..bb5efe982ee1 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java @@ -29,10 +29,11 @@ import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; -class IcebergStreamWriter extends AbstractStreamOperator - implements OneInputStreamOperator, BoundedOneInput { +class IcebergStreamWriter extends AbstractStreamOperator + implements OneInputStreamOperator, BoundedOneInput { private static final long serialVersionUID = 1L; + static final long END_INPUT_CHECKPOINT_ID = Long.MAX_VALUE; private final String fullTableName; private final TaskWriterFactory taskWriterFactory; @@ -63,7 +64,7 @@ public void open() { @Override public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { - flush(); + flush(checkpointId); this.writer = taskWriterFactory.create(); } @@ -89,7 +90,7 @@ public void endInput() throws IOException { // Note that if the task is not closed after calling endInput, checkpoint may be triggered again // causing files to be sent repeatedly, the writer is marked as null after the last file is sent // to guard against duplicated writes. - flush(); + flush(END_INPUT_CHECKPOINT_ID); } @Override @@ -102,7 +103,7 @@ public String toString() { } /** close all open files and emit files to downstream committer operator */ - private void flush() throws IOException { + private void flush(long checkpointId) throws IOException { if (writer == null) { return; } @@ -110,7 +111,7 @@ private void flush() throws IOException { long startNano = System.nanoTime(); WriteResult result = writer.complete(); writerMetrics.updateFlushResult(result); - output.collect(new StreamRecord<>(result)); + output.collect(new StreamRecord<>(new FlinkWriteResult(checkpointId, result))); writerMetrics.flushDuration(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNano)); // Set writer to null to prevent duplicate flushes in the corner case of diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java index 8faae1b05a4e..3299e7a97776 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java @@ -40,7 +40,6 @@ import org.apache.iceberg.flink.SimpleDataUtil; import org.apache.iceberg.io.BaseTaskWriter; import org.apache.iceberg.io.TaskWriter; -import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestTemplate; @@ -210,8 +209,10 @@ public void testCompressionOrc() throws Exception { .containsEntry(TableProperties.ORC_COMPRESSION_STRATEGY, "speed"); } - private static OneInputStreamOperatorTestHarness createIcebergStreamWriter( - Table icebergTable, TableSchema flinkSchema, Map override) throws Exception { + private static OneInputStreamOperatorTestHarness + createIcebergStreamWriter( + Table icebergTable, TableSchema flinkSchema, Map override) + throws Exception { RowType flinkRowType = FlinkSink.toFlinkRowType(icebergTable.schema(), flinkSchema); FlinkWriteConf flinkWriteConfig = new FlinkWriteConf( @@ -219,7 +220,7 @@ private static OneInputStreamOperatorTestHarness createIce IcebergStreamWriter streamWriter = FlinkSink.createStreamWriter(() -> icebergTable, flinkWriteConfig, flinkRowType, null); - OneInputStreamOperatorTestHarness harness = + OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness<>(streamWriter, 1, 1, 0); harness.setup(); @@ -230,7 +231,7 @@ private static OneInputStreamOperatorTestHarness createIce private static Map appenderProperties( Table table, TableSchema schema, Map override) throws Exception { - try (OneInputStreamOperatorTestHarness testHarness = + try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter(table, schema, override)) { testHarness.processElement(SimpleDataUtil.createRowData(1, "hello"), 1); diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java index 948c7b31430c..ac5babe11943 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java @@ -129,7 +129,8 @@ public void testCommitTxnWithoutDataFiles() throws Exception { long timestamp = 0; JobID jobId = new JobID(); OperatorID operatorId; - try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + try (OneInputStreamOperatorTestHarness harness = + createStreamSink(jobId)) { harness.setup(); harness.open(); operatorId = harness.getOperator().getOperatorID(); @@ -161,7 +162,8 @@ public void testMaxContinuousEmptyCommits() throws Exception { JobID jobId = new JobID(); long checkpointId = 0; long timestamp = 0; - try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + try (OneInputStreamOperatorTestHarness harness = + createStreamSink(jobId)) { harness.setup(); harness.open(); @@ -176,8 +178,8 @@ public void testMaxContinuousEmptyCommits() throws Exception { } } - private WriteResult of(DataFile dataFile) { - return WriteResult.builder().addDataFiles(dataFile).build(); + private FlinkWriteResult of(long checkpointId, DataFile dataFile) { + return new FlinkWriteResult(checkpointId, WriteResult.builder().addDataFiles(dataFile).build()); } @TestTemplate @@ -193,7 +195,8 @@ public void testCommitTxn() throws Exception { JobID jobID = new JobID(); OperatorID operatorId; - try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobID)) { + try (OneInputStreamOperatorTestHarness harness = + createStreamSink(jobID)) { harness.setup(); harness.open(); operatorId = harness.getOperator().getOperatorID(); @@ -204,7 +207,7 @@ public void testCommitTxn() throws Exception { for (int i = 1; i <= 3; i++) { RowData rowData = SimpleDataUtil.createRowData(i, "hello" + i); DataFile dataFile = writeDataFile("data-" + i, ImmutableList.of(rowData)); - harness.processElement(of(dataFile), ++timestamp); + harness.processElement(of(i, dataFile), ++timestamp); rows.add(rowData); harness.snapshot(i, ++timestamp); @@ -233,7 +236,8 @@ public void testOrderedEventsBetweenCheckpoints() throws Exception { JobID jobId = new JobID(); OperatorID operatorId; - try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + try (OneInputStreamOperatorTestHarness harness = + createStreamSink(jobId)) { harness.setup(); harness.open(); operatorId = harness.getOperator().getOperatorID(); @@ -243,21 +247,21 @@ public void testOrderedEventsBetweenCheckpoints() throws Exception { RowData row1 = SimpleDataUtil.createRowData(1, "hello"); DataFile dataFile1 = writeDataFile("data-1", ImmutableList.of(row1)); - harness.processElement(of(dataFile1), ++timestamp); + long firstCheckpointId = 1; + harness.processElement(of(firstCheckpointId, dataFile1), ++timestamp); assertMaxCommittedCheckpointId(jobId, operatorId, -1L); // 1. snapshotState for checkpoint#1 - long firstCheckpointId = 1; harness.snapshot(firstCheckpointId, ++timestamp); assertFlinkManifests(1); RowData row2 = SimpleDataUtil.createRowData(2, "world"); DataFile dataFile2 = writeDataFile("data-2", ImmutableList.of(row2)); - harness.processElement(of(dataFile2), ++timestamp); + long secondCheckpointId = 2; + harness.processElement(of(secondCheckpointId, dataFile2), ++timestamp); assertMaxCommittedCheckpointId(jobId, operatorId, -1L); // 2. snapshotState for checkpoint#2 - long secondCheckpointId = 2; harness.snapshot(secondCheckpointId, ++timestamp); assertFlinkManifests(2); @@ -286,7 +290,8 @@ public void testDisorderedEventsBetweenCheckpoints() throws Exception { JobID jobId = new JobID(); OperatorID operatorId; - try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + try (OneInputStreamOperatorTestHarness harness = + createStreamSink(jobId)) { harness.setup(); harness.open(); operatorId = harness.getOperator().getOperatorID(); @@ -296,21 +301,21 @@ public void testDisorderedEventsBetweenCheckpoints() throws Exception { RowData row1 = SimpleDataUtil.createRowData(1, "hello"); DataFile dataFile1 = writeDataFile("data-1", ImmutableList.of(row1)); - harness.processElement(of(dataFile1), ++timestamp); + long firstCheckpointId = 1; + harness.processElement(of(firstCheckpointId, dataFile1), ++timestamp); assertMaxCommittedCheckpointId(jobId, operatorId, -1L); // 1. snapshotState for checkpoint#1 - long firstCheckpointId = 1; harness.snapshot(firstCheckpointId, ++timestamp); assertFlinkManifests(1); RowData row2 = SimpleDataUtil.createRowData(2, "world"); DataFile dataFile2 = writeDataFile("data-2", ImmutableList.of(row2)); - harness.processElement(of(dataFile2), ++timestamp); + long secondCheckpointId = 2; + harness.processElement(of(secondCheckpointId, dataFile2), ++timestamp); assertMaxCommittedCheckpointId(jobId, operatorId, -1L); // 2. snapshotState for checkpoint#2 - long secondCheckpointId = 2; harness.snapshot(secondCheckpointId, ++timestamp); assertFlinkManifests(2); @@ -337,7 +342,8 @@ public void testRecoveryFromValidSnapshot() throws Exception { JobID jobId = new JobID(); OperatorID operatorId; - try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + try (OneInputStreamOperatorTestHarness harness = + createStreamSink(jobId)) { harness.setup(); harness.open(); operatorId = harness.getOperator().getOperatorID(); @@ -349,8 +355,8 @@ public void testRecoveryFromValidSnapshot() throws Exception { expectedRows.add(row); DataFile dataFile1 = writeDataFile("data-1", ImmutableList.of(row)); - harness.processElement(of(dataFile1), ++timestamp); - snapshot = harness.snapshot(++checkpointId, ++timestamp); + harness.processElement(of(++checkpointId, dataFile1), ++timestamp); + snapshot = harness.snapshot(checkpointId, ++timestamp); assertFlinkManifests(1); harness.notifyOfCompletedCheckpoint(checkpointId); @@ -362,7 +368,8 @@ public void testRecoveryFromValidSnapshot() throws Exception { } // Restore from the given snapshot - try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + try (OneInputStreamOperatorTestHarness harness = + createStreamSink(jobId)) { harness.getStreamConfig().setOperatorID(operatorId); harness.setup(); harness.initializeState(snapshot); @@ -375,9 +382,9 @@ public void testRecoveryFromValidSnapshot() throws Exception { RowData row = SimpleDataUtil.createRowData(2, "world"); expectedRows.add(row); DataFile dataFile = writeDataFile("data-2", ImmutableList.of(row)); - harness.processElement(of(dataFile), ++timestamp); + harness.processElement(of(++checkpointId, dataFile), ++timestamp); - harness.snapshot(++checkpointId, ++timestamp); + harness.snapshot(checkpointId, ++timestamp); assertFlinkManifests(1); harness.notifyOfCompletedCheckpoint(checkpointId); @@ -400,7 +407,8 @@ public void testRecoveryFromSnapshotWithoutCompletedNotification() throws Except List expectedRows = Lists.newArrayList(); JobID jobId = new JobID(); OperatorID operatorId; - try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + try (OneInputStreamOperatorTestHarness harness = + createStreamSink(jobId)) { harness.setup(); harness.open(); operatorId = harness.getOperator().getOperatorID(); @@ -411,15 +419,16 @@ public void testRecoveryFromSnapshotWithoutCompletedNotification() throws Except RowData row = SimpleDataUtil.createRowData(1, "hello"); expectedRows.add(row); DataFile dataFile = writeDataFile("data-1", ImmutableList.of(row)); - harness.processElement(of(dataFile), ++timestamp); + harness.processElement(of(++checkpointId, dataFile), ++timestamp); - snapshot = harness.snapshot(++checkpointId, ++timestamp); + snapshot = harness.snapshot(checkpointId, ++timestamp); SimpleDataUtil.assertTableRows(table, ImmutableList.of(), branch); assertMaxCommittedCheckpointId(jobId, operatorId, -1L); assertFlinkManifests(1); } - try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + try (OneInputStreamOperatorTestHarness harness = + createStreamSink(jobId)) { harness.getStreamConfig().setOperatorID(operatorId); harness.setup(); harness.initializeState(snapshot); @@ -446,15 +455,15 @@ public void testRecoveryFromSnapshotWithoutCompletedNotification() throws Except RowData row = SimpleDataUtil.createRowData(2, "world"); expectedRows.add(row); DataFile dataFile = writeDataFile("data-2", ImmutableList.of(row)); - harness.processElement(of(dataFile), ++timestamp); + harness.processElement(of(++checkpointId, dataFile), ++timestamp); - snapshot = harness.snapshot(++checkpointId, ++timestamp); + snapshot = harness.snapshot(checkpointId, ++timestamp); assertFlinkManifests(1); } // Redeploying flink job from external checkpoint. JobID newJobId = new JobID(); - try (OneInputStreamOperatorTestHarness harness = + try (OneInputStreamOperatorTestHarness harness = createStreamSink(newJobId)) { harness.setup(); harness.initializeState(snapshot); @@ -473,9 +482,9 @@ public void testRecoveryFromSnapshotWithoutCompletedNotification() throws Except RowData row = SimpleDataUtil.createRowData(3, "foo"); expectedRows.add(row); DataFile dataFile = writeDataFile("data-3", ImmutableList.of(row)); - harness.processElement(of(dataFile), ++timestamp); + harness.processElement(of(++checkpointId, dataFile), ++timestamp); - harness.snapshot(++checkpointId, ++timestamp); + harness.snapshot(checkpointId, ++timestamp); assertFlinkManifests(1); harness.notifyOfCompletedCheckpoint(checkpointId); @@ -496,7 +505,7 @@ public void testStartAnotherJobToWriteSameTable() throws Exception { JobID oldJobId = new JobID(); OperatorID oldOperatorId; - try (OneInputStreamOperatorTestHarness harness = + try (OneInputStreamOperatorTestHarness harness = createStreamSink(oldJobId)) { harness.setup(); harness.open(); @@ -510,8 +519,8 @@ public void testStartAnotherJobToWriteSameTable() throws Exception { tableRows.addAll(rows); DataFile dataFile = writeDataFile(String.format("data-%d", i), rows); - harness.processElement(of(dataFile), ++timestamp); - harness.snapshot(++checkpointId, ++timestamp); + harness.processElement(of(++checkpointId, dataFile), ++timestamp); + harness.snapshot(checkpointId, ++timestamp); assertFlinkManifests(1); harness.notifyOfCompletedCheckpoint(checkpointId); @@ -528,7 +537,7 @@ public void testStartAnotherJobToWriteSameTable() throws Exception { timestamp = 0; JobID newJobId = new JobID(); OperatorID newOperatorId; - try (OneInputStreamOperatorTestHarness harness = + try (OneInputStreamOperatorTestHarness harness = createStreamSink(newJobId)) { harness.setup(); harness.open(); @@ -542,8 +551,8 @@ public void testStartAnotherJobToWriteSameTable() throws Exception { tableRows.addAll(rows); DataFile dataFile = writeDataFile("data-new-1", rows); - harness.processElement(of(dataFile), ++timestamp); - harness.snapshot(++checkpointId, ++timestamp); + harness.processElement(of(++checkpointId, dataFile), ++timestamp); + harness.snapshot(checkpointId, ++timestamp); assertFlinkManifests(1); harness.notifyOfCompletedCheckpoint(checkpointId); @@ -567,7 +576,8 @@ public void testMultipleJobsWriteSameTable() throws Exception { int checkpointId = i / 3; JobID jobId = jobs[jobIndex]; OperatorID operatorId = operatorIds[jobIndex]; - try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + try (OneInputStreamOperatorTestHarness harness = + createStreamSink(jobId)) { harness.getStreamConfig().setOperatorID(operatorId); harness.setup(); harness.open(); @@ -579,7 +589,7 @@ public void testMultipleJobsWriteSameTable() throws Exception { tableRows.addAll(rows); DataFile dataFile = writeDataFile(String.format("data-%d", i), rows); - harness.processElement(of(dataFile), ++timestamp); + harness.processElement(of(checkpointId + 1, dataFile), ++timestamp); harness.snapshot(checkpointId + 1, ++timestamp); assertFlinkManifests(1); @@ -603,8 +613,10 @@ public void testMultipleSinksRecoveryFromValidSnapshot() throws Exception { JobID jobId = new JobID(); OperatorID operatorId1 = new OperatorID(); OperatorID operatorId2 = new OperatorID(); - try (OneInputStreamOperatorTestHarness harness1 = createStreamSink(jobId); - OneInputStreamOperatorTestHarness harness2 = createStreamSink(jobId)) { + try (OneInputStreamOperatorTestHarness harness1 = + createStreamSink(jobId); + OneInputStreamOperatorTestHarness harness2 = + createStreamSink(jobId)) { harness1.getStreamConfig().setOperatorID(operatorId1); harness1.setup(); harness1.open(); @@ -620,14 +632,14 @@ public void testMultipleSinksRecoveryFromValidSnapshot() throws Exception { expectedRows.add(row1); DataFile dataFile1 = writeDataFile("data-1-1", ImmutableList.of(row1)); - harness1.processElement(of(dataFile1), ++timestamp); - snapshot1 = harness1.snapshot(++checkpointId, ++timestamp); + harness1.processElement(of(++checkpointId, dataFile1), ++timestamp); + snapshot1 = harness1.snapshot(checkpointId, ++timestamp); RowData row2 = SimpleDataUtil.createRowData(1, "hello2"); expectedRows.add(row2); DataFile dataFile2 = writeDataFile("data-1-2", ImmutableList.of(row2)); - harness2.processElement(of(dataFile2), ++timestamp); + harness2.processElement(of(checkpointId, dataFile2), ++timestamp); snapshot2 = harness2.snapshot(checkpointId, ++timestamp); assertFlinkManifests(2); @@ -643,8 +655,10 @@ public void testMultipleSinksRecoveryFromValidSnapshot() throws Exception { } // Restore from the given snapshot - try (OneInputStreamOperatorTestHarness harness1 = createStreamSink(jobId); - OneInputStreamOperatorTestHarness harness2 = createStreamSink(jobId)) { + try (OneInputStreamOperatorTestHarness harness1 = + createStreamSink(jobId); + OneInputStreamOperatorTestHarness harness2 = + createStreamSink(jobId)) { harness1.getStreamConfig().setOperatorID(operatorId1); harness1.setup(); harness1.initializeState(snapshot1); @@ -668,13 +682,13 @@ public void testMultipleSinksRecoveryFromValidSnapshot() throws Exception { expectedRows.add(row1); DataFile dataFile1 = writeDataFile("data-2-1", ImmutableList.of(row1)); - harness1.processElement(of(dataFile1), ++timestamp); - harness1.snapshot(++checkpointId, ++timestamp); + harness1.processElement(of(++checkpointId, dataFile1), ++timestamp); + harness1.snapshot(checkpointId, ++timestamp); RowData row2 = SimpleDataUtil.createRowData(2, "world2"); expectedRows.add(row2); DataFile dataFile2 = writeDataFile("data-2-2", ImmutableList.of(row2)); - harness2.processElement(of(dataFile2), ++timestamp); + harness2.processElement(of(checkpointId, dataFile2), ++timestamp); harness2.snapshot(checkpointId, ++timestamp); assertFlinkManifests(2); @@ -694,7 +708,8 @@ public void testMultipleSinksRecoveryFromValidSnapshot() throws Exception { public void testBoundedStream() throws Exception { JobID jobId = new JobID(); OperatorID operatorId; - try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + try (OneInputStreamOperatorTestHarness harness = + createStreamSink(jobId)) { harness.setup(); harness.open(); operatorId = harness.getOperator().getOperatorID(); @@ -706,13 +721,14 @@ public void testBoundedStream() throws Exception { List tableRows = Lists.newArrayList(SimpleDataUtil.createRowData(1, "word-1")); DataFile dataFile = writeDataFile("data-1", tableRows); - harness.processElement(of(dataFile), 1); + harness.processElement(of(IcebergStreamWriter.END_INPUT_CHECKPOINT_ID, dataFile), 1); ((BoundedOneInput) harness.getOneInputOperator()).endInput(); assertFlinkManifests(0); SimpleDataUtil.assertTableRows(table, tableRows, branch); assertSnapshotSize(1); - assertMaxCommittedCheckpointId(jobId, operatorId, Long.MAX_VALUE); + assertMaxCommittedCheckpointId( + jobId, operatorId, IcebergStreamWriter.END_INPUT_CHECKPOINT_ID); assertThat(SimpleDataUtil.latestSnapshot(table, branch).summary()) .containsEntry("flink.test", TestIcebergFilesCommitter.class.getName()); } @@ -725,7 +741,8 @@ public void testFlinkManifests() throws Exception { JobID jobId = new JobID(); OperatorID operatorId; - try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + try (OneInputStreamOperatorTestHarness harness = + createStreamSink(jobId)) { harness.setup(); harness.open(); operatorId = harness.getOperator().getOperatorID(); @@ -735,7 +752,7 @@ public void testFlinkManifests() throws Exception { RowData row1 = SimpleDataUtil.createRowData(1, "hello"); DataFile dataFile1 = writeDataFile("data-1", ImmutableList.of(row1)); - harness.processElement(of(dataFile1), ++timestamp); + harness.processElement(of(checkpoint, dataFile1), ++timestamp); assertMaxCommittedCheckpointId(jobId, operatorId, -1L); // 1. snapshotState for checkpoint#1 @@ -775,7 +792,8 @@ public void testDeleteFiles() throws Exception { OperatorID operatorId; FileAppenderFactory appenderFactory = createDeletableAppenderFactory(); - try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + try (OneInputStreamOperatorTestHarness harness = + createStreamSink(jobId)) { harness.setup(); harness.open(); operatorId = harness.getOperator().getOperatorID(); @@ -784,7 +802,7 @@ public void testDeleteFiles() throws Exception { RowData row1 = SimpleDataUtil.createInsert(1, "aaa"); DataFile dataFile1 = writeDataFile("data-file-1", ImmutableList.of(row1)); - harness.processElement(of(dataFile1), ++timestamp); + harness.processElement(of(checkpoint, dataFile1), ++timestamp); assertMaxCommittedCheckpointId(jobId, operatorId, -1L); // 1. snapshotState for checkpoint#1 @@ -816,13 +834,15 @@ public void testDeleteFiles() throws Exception { RowData delete1 = SimpleDataUtil.createDelete(1, "aaa"); DeleteFile deleteFile1 = writeEqDeleteFile(appenderFactory, "delete-file-1", ImmutableList.of(delete1)); + assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint); harness.processElement( - WriteResult.builder().addDataFiles(dataFile2).addDeleteFiles(deleteFile1).build(), + new FlinkWriteResult( + ++checkpoint, + WriteResult.builder().addDataFiles(dataFile2).addDeleteFiles(deleteFile1).build()), ++timestamp); - assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint); // 5. snapshotState for checkpoint#2 - harness.snapshot(++checkpoint, ++timestamp); + harness.snapshot(checkpoint, ++timestamp); assertFlinkManifests(2); // 6. notifyCheckpointComplete for checkpoint#2 @@ -846,7 +866,8 @@ public void testCommitTwoCheckpointsInSingleTxn() throws Exception { OperatorID operatorId; FileAppenderFactory appenderFactory = createDeletableAppenderFactory(); - try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + try (OneInputStreamOperatorTestHarness harness = + createStreamSink(jobId)) { harness.setup(); harness.open(); operatorId = harness.getOperator().getOperatorID(); @@ -860,7 +881,9 @@ public void testCommitTwoCheckpointsInSingleTxn() throws Exception { DeleteFile deleteFile1 = writeEqDeleteFile(appenderFactory, "delete-file-1", ImmutableList.of(delete3)); harness.processElement( - WriteResult.builder().addDataFiles(dataFile1).addDeleteFiles(deleteFile1).build(), + new FlinkWriteResult( + checkpoint, + WriteResult.builder().addDataFiles(dataFile1).addDeleteFiles(deleteFile1).build()), ++timestamp); // The 1th snapshotState. @@ -872,11 +895,13 @@ public void testCommitTwoCheckpointsInSingleTxn() throws Exception { DeleteFile deleteFile2 = writeEqDeleteFile(appenderFactory, "delete-file-2", ImmutableList.of(delete2)); harness.processElement( - WriteResult.builder().addDataFiles(dataFile2).addDeleteFiles(deleteFile2).build(), + new FlinkWriteResult( + ++checkpoint, + WriteResult.builder().addDataFiles(dataFile2).addDeleteFiles(deleteFile2).build()), ++timestamp); // The 2nd snapshotState. - harness.snapshot(++checkpoint, ++timestamp); + harness.snapshot(checkpoint, ++timestamp); // Notify the 2nd snapshot to complete. harness.notifyOfCompletedCheckpoint(checkpoint); @@ -887,6 +912,79 @@ public void testCommitTwoCheckpointsInSingleTxn() throws Exception { } } + /** + * The testcase is to simulate upserting to an Iceberg V2 table, and facing the following + * scenario: + * + *
    + *
  • A specific row is updated + *
  • The prepareSnapshotPreBarrier triggered + *
  • Checkpoint failed for reasons outside of the Iceberg connector + *
  • The specific row is updated again in the second checkpoint as well + *
  • Second snapshot is triggered, and finished + *
+ * + *

Previously the files from the 2 snapshots were committed in a single Iceberg commit, as a + * results duplicate rows were created in the table. + * + * @throws Exception Exception + */ + @TestTemplate + public void testCommitMultipleCheckpointsForV2Table() throws Exception { + assumeThat(formatVersion) + .as("Only support equality-delete in format v2 or later.") + .isGreaterThan(1); + + long timestamp = 0; + long checkpoint = 10; + + JobID jobId = new JobID(); + OperatorID operatorId; + + FileAppenderFactory appenderFactory = + new FlinkAppenderFactory( + table, + table.schema(), + FlinkSchemaUtil.convert(table.schema()), + table.properties(), + table.spec(), + new int[] {table.schema().findField("id").fieldId()}, + table.schema(), + null); + + try (OneInputStreamOperatorTestHarness harness = + createStreamSink(jobId)) { + harness.setup(); + harness.open(); + operatorId = harness.getOperator().getOperatorID(); + + assertMaxCommittedCheckpointId(jobId, operatorId, -1L); + + RowData insert1 = null; + RowData insert2 = null; + for (int i = 1; i <= 3; i++) { + insert1 = SimpleDataUtil.createInsert(1, "aaa" + i); + insert2 = SimpleDataUtil.createInsert(2, "bbb" + i); + DataFile dataFile = writeDataFile("data-file-" + i, ImmutableList.of(insert1, insert2)); + DeleteFile deleteFile = + writeEqDeleteFile( + appenderFactory, "delete-file-" + i, ImmutableList.of(insert1, insert2)); + harness.processElement( + new FlinkWriteResult( + ++checkpoint, + WriteResult.builder().addDataFiles(dataFile).addDeleteFiles(deleteFile).build()), + ++timestamp); + } + + harness.snapshot(checkpoint, ++timestamp); + harness.notifyOfCompletedCheckpoint(checkpoint); + SimpleDataUtil.assertTableRows(table, ImmutableList.of(insert1, insert2), branch); + assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint); + assertFlinkManifests(0); + assertThat(table.snapshots()).hasSize(3); + } + } + @TestTemplate public void testSpecEvolution() throws Exception { long timestamp = 0; @@ -899,7 +997,8 @@ public void testSpecEvolution() throws Exception { DataFile dataFile; int specId; - try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + try (OneInputStreamOperatorTestHarness harness = + createStreamSink(jobId)) { harness.setup(); harness.open(); operatorId = harness.getOperator().getOperatorID(); @@ -910,7 +1009,7 @@ public void testSpecEvolution() throws Exception { RowData rowData = SimpleDataUtil.createRowData(checkpointId, "hello" + checkpointId); // table unpartitioned dataFile = writeDataFile("data-" + checkpointId, ImmutableList.of(rowData)); - harness.processElement(of(dataFile), ++timestamp); + harness.processElement(of(checkpointId, dataFile), ++timestamp); rows.add(rowData); harness.snapshot(checkpointId, ++timestamp); @@ -929,7 +1028,7 @@ public void testSpecEvolution() throws Exception { rowData = SimpleDataUtil.createRowData(checkpointId, "hello" + checkpointId); // write data with old partition spec dataFile = writeDataFile("data-" + checkpointId, ImmutableList.of(rowData), oldSpec, null); - harness.processElement(of(dataFile), ++timestamp); + harness.processElement(of(checkpointId, dataFile), ++timestamp); rows.add(rowData); snapshot = harness.snapshot(checkpointId, ++timestamp); @@ -947,7 +1046,8 @@ public void testSpecEvolution() throws Exception { } // Restore from the given snapshot - try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { + try (OneInputStreamOperatorTestHarness harness = + createStreamSink(jobId)) { harness.getStreamConfig().setOperatorID(operatorId); harness.setup(); harness.initializeState(snapshot); @@ -963,7 +1063,7 @@ public void testSpecEvolution() throws Exception { partition.set(0, checkpointId); dataFile = writeDataFile("data-" + checkpointId, ImmutableList.of(row), table.spec(), partition); - harness.processElement(of(dataFile), ++timestamp); + harness.processElement(of(checkpointId, dataFile), ++timestamp); rows.add(row); harness.snapshot(checkpointId, ++timestamp); assertFlinkManifests(1); @@ -1089,7 +1189,7 @@ private void assertSnapshotSize(int expectedSnapshotSize) { assertThat(table.snapshots()).hasSize(expectedSnapshotSize); } - private OneInputStreamOperatorTestHarness createStreamSink(JobID jobID) + private OneInputStreamOperatorTestHarness createStreamSink(JobID jobID) throws Exception { TestOperatorFactory factory = TestOperatorFactory.of(table.location(), branch, table.spec()); return new OneInputStreamOperatorTestHarness<>(factory, createEnvironment(jobID)); @@ -1109,7 +1209,7 @@ private static MockEnvironment createEnvironment(JobID jobID) { } private static class TestOperatorFactory extends AbstractStreamOperatorFactory - implements OneInputStreamOperatorFactory { + implements OneInputStreamOperatorFactory { private final String tablePath; private final String branch; private final PartitionSpec spec; diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java index 50283f7ad215..e13721a9f170 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java @@ -28,6 +28,7 @@ import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import org.apache.flink.streaming.api.operators.BoundedOneInput; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.table.api.DataTypes; @@ -102,7 +103,7 @@ public void before() throws IOException { @TestTemplate public void testWritingTable() throws Exception { long checkpointId = 1L; - try (OneInputStreamOperatorTestHarness testHarness = + try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter()) { // The first checkpoint testHarness.processElement(SimpleDataUtil.createRowData(1, "hello"), 1); @@ -111,7 +112,8 @@ public void testWritingTable() throws Exception { testHarness.prepareSnapshotPreBarrier(checkpointId); int expectedDataFiles = partitioned ? 2 : 1; - WriteResult result = WriteResult.builder().addAll(testHarness.extractOutputValues()).build(); + WriteResult result = + WriteResult.builder().addAll(getWriteResults(testHarness.extractOutputValues())).build(); assertThat(result.deleteFiles()).isEmpty(); assertThat(result.dataFiles()).hasSize(expectedDataFiles); @@ -123,7 +125,8 @@ public void testWritingTable() throws Exception { testHarness.prepareSnapshotPreBarrier(checkpointId); expectedDataFiles = partitioned ? 4 : 2; - result = WriteResult.builder().addAll(testHarness.extractOutputValues()).build(); + result = + WriteResult.builder().addAll(getWriteResults(testHarness.extractOutputValues())).build(); assertThat(result.deleteFiles()).isEmpty(); assertThat(result.dataFiles()).hasSize(expectedDataFiles); @@ -148,14 +151,15 @@ public void testWritingTable() throws Exception { public void testSnapshotTwice() throws Exception { long checkpointId = 1; long timestamp = 1; - try (OneInputStreamOperatorTestHarness testHarness = + try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter()) { testHarness.processElement(SimpleDataUtil.createRowData(1, "hello"), timestamp++); testHarness.processElement(SimpleDataUtil.createRowData(2, "world"), timestamp); testHarness.prepareSnapshotPreBarrier(checkpointId++); int expectedDataFiles = partitioned ? 2 : 1; - WriteResult result = WriteResult.builder().addAll(testHarness.extractOutputValues()).build(); + WriteResult result = + WriteResult.builder().addAll(getWriteResults(testHarness.extractOutputValues())).build(); assertThat(result.deleteFiles()).isEmpty(); assertThat(result.dataFiles()).hasSize(expectedDataFiles); @@ -163,7 +167,10 @@ public void testSnapshotTwice() throws Exception { for (int i = 0; i < 5; i++) { testHarness.prepareSnapshotPreBarrier(checkpointId++); - result = WriteResult.builder().addAll(testHarness.extractOutputValues()).build(); + result = + WriteResult.builder() + .addAll(getWriteResults(testHarness.extractOutputValues())) + .build(); assertThat(result.deleteFiles()).isEmpty(); assertThat(result.dataFiles()).hasSize(expectedDataFiles); } @@ -172,14 +179,14 @@ public void testSnapshotTwice() throws Exception { @TestTemplate public void testTableWithoutSnapshot() throws Exception { - try (OneInputStreamOperatorTestHarness testHarness = + try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter()) { assertThat(testHarness.extractOutputValues()).isEmpty(); } // Even if we closed the iceberg stream writer, there's no orphan data file. assertThat(scanDataFiles()).isEmpty(); - try (OneInputStreamOperatorTestHarness testHarness = + try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter()) { testHarness.processElement(SimpleDataUtil.createRowData(1, "hello"), 1); // Still not emit the data file yet, because there is no checkpoint. @@ -212,7 +219,7 @@ private Set scanDataFiles() throws IOException { @TestTemplate public void testBoundedStreamCloseWithEmittingDataFiles() throws Exception { - try (OneInputStreamOperatorTestHarness testHarness = + try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter()) { testHarness.processElement(SimpleDataUtil.createRowData(1, "hello"), 1); testHarness.processElement(SimpleDataUtil.createRowData(2, "world"), 2); @@ -221,13 +228,15 @@ public void testBoundedStreamCloseWithEmittingDataFiles() throws Exception { ((BoundedOneInput) testHarness.getOneInputOperator()).endInput(); int expectedDataFiles = partitioned ? 2 : 1; - WriteResult result = WriteResult.builder().addAll(testHarness.extractOutputValues()).build(); + WriteResult result = + WriteResult.builder().addAll(getWriteResults(testHarness.extractOutputValues())).build(); assertThat(result.deleteFiles()).isEmpty(); assertThat(result.dataFiles()).hasSize(expectedDataFiles); ((BoundedOneInput) testHarness.getOneInputOperator()).endInput(); - result = WriteResult.builder().addAll(testHarness.extractOutputValues()).build(); + result = + WriteResult.builder().addAll(getWriteResults(testHarness.extractOutputValues())).build(); assertThat(result.deleteFiles()).isEmpty(); // Datafiles should not be sent again assertThat(result.dataFiles()).hasSize(expectedDataFiles); @@ -236,7 +245,7 @@ public void testBoundedStreamCloseWithEmittingDataFiles() throws Exception { @TestTemplate public void testBoundedStreamTriggeredEndInputBeforeTriggeringCheckpoint() throws Exception { - try (OneInputStreamOperatorTestHarness testHarness = + try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter()) { testHarness.processElement(SimpleDataUtil.createRowData(1, "hello"), 1); testHarness.processElement(SimpleDataUtil.createRowData(2, "world"), 2); @@ -244,13 +253,15 @@ public void testBoundedStreamTriggeredEndInputBeforeTriggeringCheckpoint() throw testHarness.endInput(); int expectedDataFiles = partitioned ? 2 : 1; - WriteResult result = WriteResult.builder().addAll(testHarness.extractOutputValues()).build(); + WriteResult result = + WriteResult.builder().addAll(getWriteResults(testHarness.extractOutputValues())).build(); assertThat(result.deleteFiles()).isEmpty(); assertThat(result.dataFiles()).hasSize(expectedDataFiles); testHarness.prepareSnapshotPreBarrier(1L); - result = WriteResult.builder().addAll(testHarness.extractOutputValues()).build(); + result = + WriteResult.builder().addAll(getWriteResults(testHarness.extractOutputValues())).build(); assertThat(result.deleteFiles()).isEmpty(); // It should be ensured that after endInput is triggered, when prepareSnapshotPreBarrier // is triggered, write should only send WriteResult once @@ -275,7 +286,7 @@ public void testTableWithTargetFileSize() throws Exception { } } - try (OneInputStreamOperatorTestHarness testHarness = + try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter()) { for (RowData row : rows) { testHarness.processElement(row, 1); @@ -283,7 +294,8 @@ public void testTableWithTargetFileSize() throws Exception { // snapshot the operator. testHarness.prepareSnapshotPreBarrier(1); - WriteResult result = WriteResult.builder().addAll(testHarness.extractOutputValues()).build(); + WriteResult result = + WriteResult.builder().addAll(getWriteResults(testHarness.extractOutputValues())).build(); assertThat(result.deleteFiles()).isEmpty(); assertThat(result.dataFiles()).hasSize(8); @@ -346,13 +358,14 @@ public void testPromotedFlinkDataType() throws Exception { record.copy(ImmutableMap.of("tinyint", 2, "smallint", 0, "int", 102)), record.copy(ImmutableMap.of("tinyint", 3, "smallint", 32767, "int", 103))); - try (OneInputStreamOperatorTestHarness testHarness = + try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter(icebergTable, flinkSchema)) { for (RowData row : rows) { testHarness.processElement(row, 1); } testHarness.prepareSnapshotPreBarrier(1); - WriteResult result = WriteResult.builder().addAll(testHarness.extractOutputValues()).build(); + WriteResult result = + WriteResult.builder().addAll(getWriteResults(testHarness.extractOutputValues())).build(); assertThat(result.deleteFiles()).isEmpty(); assertThat(result.dataFiles()).hasSize(partitioned ? 3 : 1); @@ -365,12 +378,18 @@ public void testPromotedFlinkDataType() throws Exception { SimpleDataUtil.assertTableRecords(location, expected); } - private OneInputStreamOperatorTestHarness createIcebergStreamWriter() + private static List getWriteResults(List flinkWriteResults) { + return flinkWriteResults.stream() + .map(FlinkWriteResult::writeResult) + .collect(Collectors.toList()); + } + + private OneInputStreamOperatorTestHarness createIcebergStreamWriter() throws Exception { return createIcebergStreamWriter(table, SimpleDataUtil.FLINK_SCHEMA); } - private OneInputStreamOperatorTestHarness createIcebergStreamWriter( + private OneInputStreamOperatorTestHarness createIcebergStreamWriter( Table icebergTable, TableSchema flinkSchema) throws Exception { RowType flinkRowType = FlinkSink.toFlinkRowType(icebergTable.schema(), flinkSchema); FlinkWriteConf flinkWriteConfig = @@ -379,7 +398,7 @@ private OneInputStreamOperatorTestHarness createIcebergStr IcebergStreamWriter streamWriter = FlinkSink.createStreamWriter(() -> icebergTable, flinkWriteConfig, flinkRowType, null); - OneInputStreamOperatorTestHarness harness = + OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness<>(streamWriter, 1, 1, 0); harness.setup();