Skip to content

Commit

Permalink
Flink: Fix duplicate data with upsert writer in case of aborted check…
Browse files Browse the repository at this point in the history
…points (#10526)
  • Loading branch information
zhongqishang authored Aug 26, 2024
1 parent 2ed61a1 commit a7398ab
Show file tree
Hide file tree
Showing 7 changed files with 299 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
import org.apache.iceberg.flink.sink.shuffle.StatisticsOrRecord;
import org.apache.iceberg.flink.sink.shuffle.StatisticsType;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand Down Expand Up @@ -420,7 +419,7 @@ private <T> DataStreamSink<T> chainIcebergOperators() {
distributeDataStream(rowDataInput, equalityFieldIds, flinkRowType, writerParallelism);

// Add parallel writers that append rows to files
SingleOutputStreamOperator<WriteResult> writerStream =
SingleOutputStreamOperator<FlinkWriteResult> writerStream =
appendWriter(distributeStream, flinkRowType, equalityFieldIds, writerParallelism);

// Add single-parallelism committer that commits files
Expand Down Expand Up @@ -487,7 +486,7 @@ private <T> DataStreamSink<T> appendDummySink(
}

private SingleOutputStreamOperator<Void> appendCommitter(
SingleOutputStreamOperator<WriteResult> writerStream) {
SingleOutputStreamOperator<FlinkWriteResult> writerStream) {
IcebergFilesCommitter filesCommitter =
new IcebergFilesCommitter(
tableLoader,
Expand All @@ -507,7 +506,7 @@ private SingleOutputStreamOperator<Void> appendCommitter(
return committerStream;
}

private SingleOutputStreamOperator<WriteResult> appendWriter(
private SingleOutputStreamOperator<FlinkWriteResult> appendWriter(
DataStream<RowData> input,
RowType flinkRowType,
List<Integer> equalityFieldIds,
Expand Down Expand Up @@ -545,11 +544,11 @@ private SingleOutputStreamOperator<WriteResult> appendWriter(
IcebergStreamWriter<RowData> streamWriter =
createStreamWriter(tableSupplier, flinkWriteConf, flinkRowType, equalityFieldIds);

SingleOutputStreamOperator<WriteResult> writerStream =
SingleOutputStreamOperator<FlinkWriteResult> writerStream =
input
.transform(
operatorName(ICEBERG_STREAM_WRITER_NAME),
TypeInformation.of(WriteResult.class),
TypeInformation.of(FlinkWriteResult.class),
streamWriter)
.setParallelism(writerParallelism);
if (uidPrefix != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.sink;

import java.io.Serializable;
import org.apache.iceberg.io.WriteResult;

public class FlinkWriteResult implements Serializable {
private final long checkpointId;
private final WriteResult writeResult;

public FlinkWriteResult(long checkpointId, WriteResult writeResult) {
this.checkpointId = checkpointId;
this.writeResult = writeResult;
}

public long checkpointId() {
return checkpointId;
}

public WriteResult writeResult() {
return writeResult;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
import org.slf4j.LoggerFactory;

class IcebergFilesCommitter extends AbstractStreamOperator<Void>
implements OneInputStreamOperator<WriteResult, Void>, BoundedOneInput {
implements OneInputStreamOperator<FlinkWriteResult, Void>, BoundedOneInput {

private static final long serialVersionUID = 1L;
private static final long INITIAL_CHECKPOINT_ID = -1L;
Expand Down Expand Up @@ -96,7 +96,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>

// The completed files cache for current checkpoint. Once the snapshot barrier received, it will
// be flushed to the 'dataFilesPerCheckpoint'.
private final List<WriteResult> writeResultsOfCurrentCkpt = Lists.newArrayList();
private final Map<Long, List<WriteResult>> writeResultsSinceLastSnapshot = Maps.newHashMap();
private final String branch;

// It will have an unique identifier for one job.
Expand Down Expand Up @@ -212,16 +212,15 @@ public void snapshotState(StateSnapshotContext context) throws Exception {

// Update the checkpoint state.
long startNano = System.nanoTime();
dataFilesPerCheckpoint.put(checkpointId, writeToManifest(checkpointId));
writeToManifestUptoLatestCheckpoint(checkpointId);

// Reset the snapshot state to the latest state.
checkpointsState.clear();
checkpointsState.add(dataFilesPerCheckpoint);

jobIdState.clear();
jobIdState.add(flinkJobId);

// Clear the local buffer for current checkpoint.
writeResultsOfCurrentCkpt.clear();
committerMetrics.checkpointDuration(
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNano));
}
Expand Down Expand Up @@ -426,30 +425,45 @@ private void commitOperation(
}

@Override
public void processElement(StreamRecord<WriteResult> element) {
this.writeResultsOfCurrentCkpt.add(element.getValue());
public void processElement(StreamRecord<FlinkWriteResult> element) {
FlinkWriteResult flinkWriteResult = element.getValue();
List<WriteResult> writeResults =
writeResultsSinceLastSnapshot.computeIfAbsent(
flinkWriteResult.checkpointId(), k -> Lists.newArrayList());
writeResults.add(flinkWriteResult.writeResult());
}

@Override
public void endInput() throws IOException {
// Flush the buffered data files into 'dataFilesPerCheckpoint' firstly.
long currentCheckpointId = Long.MAX_VALUE;
dataFilesPerCheckpoint.put(currentCheckpointId, writeToManifest(currentCheckpointId));
writeResultsOfCurrentCkpt.clear();

long currentCheckpointId = IcebergStreamWriter.END_INPUT_CHECKPOINT_ID;
writeToManifestUptoLatestCheckpoint(currentCheckpointId);
commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, operatorUniqueId, currentCheckpointId);
}

private void writeToManifestUptoLatestCheckpoint(long checkpointId) throws IOException {
if (!writeResultsSinceLastSnapshot.containsKey(checkpointId)) {
dataFilesPerCheckpoint.put(checkpointId, EMPTY_MANIFEST_DATA);
}

for (Map.Entry<Long, List<WriteResult>> writeResultsOfCheckpoint :
writeResultsSinceLastSnapshot.entrySet()) {
dataFilesPerCheckpoint.put(
writeResultsOfCheckpoint.getKey(),
writeToManifest(writeResultsOfCheckpoint.getKey(), writeResultsOfCheckpoint.getValue()));
}

// Clear the local buffer for current checkpoint.
writeResultsSinceLastSnapshot.clear();
}

/**
* Write all the complete data files to a newly created manifest file and return the manifest's
* avro serialized bytes.
*/
private byte[] writeToManifest(long checkpointId) throws IOException {
if (writeResultsOfCurrentCkpt.isEmpty()) {
return EMPTY_MANIFEST_DATA;
}

WriteResult result = WriteResult.builder().addAll(writeResultsOfCurrentCkpt).build();
private byte[] writeToManifest(long checkpointId, List<WriteResult> writeResults)
throws IOException {
WriteResult result = WriteResult.builder().addAll(writeResults).build();
DeltaManifests deltaManifests =
FlinkManifestUtil.writeCompletedFiles(
result, () -> manifestOutputFileFactory.create(checkpointId), spec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;

class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult>
implements OneInputStreamOperator<T, WriteResult>, BoundedOneInput {
class IcebergStreamWriter<T> extends AbstractStreamOperator<FlinkWriteResult>
implements OneInputStreamOperator<T, FlinkWriteResult>, BoundedOneInput {

private static final long serialVersionUID = 1L;
static final long END_INPUT_CHECKPOINT_ID = Long.MAX_VALUE;

private final String fullTableName;
private final TaskWriterFactory<T> taskWriterFactory;
Expand Down Expand Up @@ -63,7 +64,7 @@ public void open() {

@Override
public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
flush();
flush(checkpointId);
this.writer = taskWriterFactory.create();
}

Expand All @@ -89,7 +90,7 @@ public void endInput() throws IOException {
// Note that if the task is not closed after calling endInput, checkpoint may be triggered again
// causing files to be sent repeatedly, the writer is marked as null after the last file is sent
// to guard against duplicated writes.
flush();
flush(END_INPUT_CHECKPOINT_ID);
}

@Override
Expand All @@ -102,15 +103,15 @@ public String toString() {
}

/** close all open files and emit files to downstream committer operator */
private void flush() throws IOException {
private void flush(long checkpointId) throws IOException {
if (writer == null) {
return;
}

long startNano = System.nanoTime();
WriteResult result = writer.complete();
writerMetrics.updateFlushResult(result);
output.collect(new StreamRecord<>(result));
output.collect(new StreamRecord<>(new FlinkWriteResult(checkpointId, result)));
writerMetrics.flushDuration(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNano));

// Set writer to null to prevent duplicate flushes in the corner case of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.io.BaseTaskWriter;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
Expand Down Expand Up @@ -210,16 +209,18 @@ public void testCompressionOrc() throws Exception {
.containsEntry(TableProperties.ORC_COMPRESSION_STRATEGY, "speed");
}

private static OneInputStreamOperatorTestHarness<RowData, WriteResult> createIcebergStreamWriter(
Table icebergTable, TableSchema flinkSchema, Map<String, String> override) throws Exception {
private static OneInputStreamOperatorTestHarness<RowData, FlinkWriteResult>
createIcebergStreamWriter(
Table icebergTable, TableSchema flinkSchema, Map<String, String> override)
throws Exception {
RowType flinkRowType = FlinkSink.toFlinkRowType(icebergTable.schema(), flinkSchema);
FlinkWriteConf flinkWriteConfig =
new FlinkWriteConf(
icebergTable, override, new org.apache.flink.configuration.Configuration());

IcebergStreamWriter<RowData> streamWriter =
FlinkSink.createStreamWriter(() -> icebergTable, flinkWriteConfig, flinkRowType, null);
OneInputStreamOperatorTestHarness<RowData, WriteResult> harness =
OneInputStreamOperatorTestHarness<RowData, FlinkWriteResult> harness =
new OneInputStreamOperatorTestHarness<>(streamWriter, 1, 1, 0);

harness.setup();
Expand All @@ -230,7 +231,7 @@ private static OneInputStreamOperatorTestHarness<RowData, WriteResult> createIce

private static Map<String, String> appenderProperties(
Table table, TableSchema schema, Map<String, String> override) throws Exception {
try (OneInputStreamOperatorTestHarness<RowData, WriteResult> testHarness =
try (OneInputStreamOperatorTestHarness<RowData, FlinkWriteResult> testHarness =
createIcebergStreamWriter(table, schema, override)) {
testHarness.processElement(SimpleDataUtil.createRowData(1, "hello"), 1);

Expand Down
Loading

0 comments on commit a7398ab

Please sign in to comment.