Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand All @@ -49,7 +50,8 @@ public PaimonCommitter(Options catalogOptions, String commitUser) {
}

@Override
public void commit(Collection<CommitRequest<MultiTableCommittable>> commitRequests) {
public void commit(Collection<CommitRequest<MultiTableCommittable>> commitRequests)
throws IOException {
if (commitRequests.isEmpty()) {
return;
}
Expand All @@ -60,27 +62,23 @@ public void commit(Collection<CommitRequest<MultiTableCommittable>> commitReques
.collect(Collectors.toList());
// All CommitRequest shared the same checkpointId.
long checkpointId = committables.get(0).checkpointId();
int retriedNumber = commitRequests.stream().findFirst().get().getNumberOfRetries();
WrappedManifestCommittable wrappedManifestCommittable =
storeMultiCommitter.combine(checkpointId, 1L, committables);
try {
if (retriedNumber > 0) {
storeMultiCommitter.filterAndCommit(
Collections.singletonList(wrappedManifestCommittable));
} else {
storeMultiCommitter.commit(Collections.singletonList(wrappedManifestCommittable));
}
storeMultiCommitter.filterAndCommit(
Collections.singletonList(wrappedManifestCommittable));
commitRequests.forEach(CommitRequest::signalAlreadyCommitted);
LOGGER.info(
String.format(
"Commit succeeded for %s with %s committable",
checkpointId, committables.size()));
"Commit succeeded for {} with {} committable",
checkpointId,
committables.size());
} catch (Exception e) {
commitRequests.forEach(CommitRequest::retryLater);
Copy link
Contributor Author

@beryllw beryllw Oct 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a specific purpose for retrying later in this context? @lvyanquan

LOGGER.warn(
String.format(
"Commit failed for %s with %s committable",
checkpointId, committables.size()));
"Commit failed for {} with {} committable",
checkpointId,
committables.size(),
e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ public class PaimonSinkITCase {

private BinaryRecordDataGenerator generator;

private static int checkpointId = 1;

public static final String TEST_DATABASE = "test";
private static final String HADOOP_CONF_DIR =
Objects.requireNonNull(
Expand Down Expand Up @@ -188,6 +190,7 @@ public void testSinkWithDataChange(String metastore)
writer.flush(false);
Collection<Committer.CommitRequest<MultiTableCommittable>> commitRequests =
writer.prepareCommit().stream()
.map(this::correctCheckpointId)
.map(MockCommitRequestImpl::new)
.collect(Collectors.toList());
committer.commit(commitRequests);
Expand All @@ -214,6 +217,7 @@ public void testSinkWithDataChange(String metastore)
writer.flush(false);
commitRequests =
writer.prepareCommit().stream()
.map(this::correctCheckpointId)
.map(MockCommitRequestImpl::new)
.collect(Collectors.toList());
committer.commit(commitRequests);
Expand Down Expand Up @@ -243,6 +247,7 @@ public void testSinkWithDataChange(String metastore)
writer.flush(false);
commitRequests =
writer.prepareCommit().stream()
.map(this::correctCheckpointId)
.map(MockCommitRequestImpl::new)
.collect(Collectors.toList());
committer.commit(commitRequests);
Expand Down Expand Up @@ -274,6 +279,7 @@ public void testSinkWithSchemaChange(String metastore)
writer.flush(false);
Collection<Committer.CommitRequest<MultiTableCommittable>> commitRequests =
writer.prepareCommit().stream()
.map(this::correctCheckpointId)
.map(MockCommitRequestImpl::new)
.collect(Collectors.toList());
committer.commit(commitRequests);
Expand Down Expand Up @@ -324,6 +330,7 @@ public void testSinkWithSchemaChange(String metastore)
writer.flush(false);
commitRequests =
writer.prepareCommit().stream()
.map(this::correctCheckpointId)
.map(MockCommitRequestImpl::new)
.collect(Collectors.toList());
committer.commit(commitRequests);
Expand Down Expand Up @@ -371,6 +378,7 @@ public void testSinkWithSchemaChange(String metastore)
writer.flush(false);
commitRequests =
writer.prepareCommit().stream()
.map(this::correctCheckpointId)
.map(MockCommitRequestImpl::new)
.collect(Collectors.toList());
committer.commit(commitRequests);
Expand Down Expand Up @@ -433,6 +441,7 @@ public void testSinkWithMultiTables(String metastore)
writer.flush(false);
Collection<Committer.CommitRequest<MultiTableCommittable>> commitRequests =
writer.prepareCommit().stream()
.map(this::correctCheckpointId)
.map(MockCommitRequestImpl::new)
.collect(Collectors.toList());
committer.commit(commitRequests);
Expand All @@ -454,6 +463,99 @@ public void testSinkWithMultiTables(String metastore)
Collections.singletonList(Row.ofKind(RowKind.INSERT, "1", "1")), result);
}

@ParameterizedTest
@ValueSource(strings = {"filesystem", "hive"})
public void testDuplicateCommitAfterRestore(String metastore)
throws IOException, InterruptedException, Catalog.DatabaseNotEmptyException,
Catalog.DatabaseNotExistException, SchemaEvolveException {
initialize(metastore);
PaimonSink<Event> paimonSink =
new PaimonSink<>(
catalogOptions, new PaimonRecordEventSerializer(ZoneId.systemDefault()));
PaimonWriter<Event> writer = paimonSink.createWriter(new MockInitContext());
Committer<MultiTableCommittable> committer = paimonSink.createCommitter();

// insert
for (Event event : createTestEvents()) {
writer.write(event, null);
}
writer.flush(false);
Collection<Committer.CommitRequest<MultiTableCommittable>> commitRequests =
writer.prepareCommit().stream()
.map(this::correctCheckpointId)
.map(MockCommitRequestImpl::new)
.collect(Collectors.toList());
committer.commit(commitRequests);

// We add a loop for restore 7 times
for (int i = 2; i < 9; i++) {
// We've two steps in checkpoint: 1. snapshotState(ckp); 2.
// notifyCheckpointComplete(ckp).
// It's possible that flink job will restore from a checkpoint with only step#1 finished
// and
// step#2 not.
// CommitterOperator will try to re-commit recovered transactions.
committer.commit(commitRequests);
List<DataChangeEvent> events =
Arrays.asList(
DataChangeEvent.insertEvent(
table1,
generator.generate(
new Object[] {
BinaryStringData.fromString(Integer.toString(i)),
BinaryStringData.fromString(Integer.toString(i))
})));
Assertions.assertDoesNotThrow(
() -> {
for (Event event : events) {
writer.write(event, null);
}
});
writer.flush(false);
// Checkpoint id start from 1
committer.commit(
writer.prepareCommit().stream()
.map(this::correctCheckpointId)
.map(MockCommitRequestImpl::new)
.collect(Collectors.toList()));
}

List<Row> result = new ArrayList<>();
tEnv.sqlQuery("select * from paimon_catalog.test.`table1$snapshots`")
.execute()
.collect()
.forEachRemaining(result::add);
// 8 APPEND and 1 COMPACT
Assertions.assertEquals(result.size(), 9);
result.clear();

tEnv.sqlQuery("select * from paimon_catalog.test.`table1`")
.execute()
.collect()
.forEachRemaining(result::add);
Assertions.assertEquals(
Arrays.asList(
Row.ofKind(RowKind.INSERT, "1", "1"),
Row.ofKind(RowKind.INSERT, "2", "2"),
Row.ofKind(RowKind.INSERT, "3", "3"),
Row.ofKind(RowKind.INSERT, "4", "4"),
Row.ofKind(RowKind.INSERT, "5", "5"),
Row.ofKind(RowKind.INSERT, "6", "6"),
Row.ofKind(RowKind.INSERT, "7", "7"),
Row.ofKind(RowKind.INSERT, "8", "8")),
result);
}

private MultiTableCommittable correctCheckpointId(MultiTableCommittable committable) {
// update the right checkpointId for MultiTableCommittable
return new MultiTableCommittable(
committable.getDatabase(),
committable.getTable(),
checkpointId++,
committable.kind(),
committable.wrappedCommittable());
}

private static class MockCommitRequestImpl<CommT> extends CommitRequestImpl<CommT> {

protected MockCommitRequestImpl(CommT committable) {
Expand Down
Loading