Skip to content

Commit 651bd64

Browse files
committed
[FLINK-35893][cdc-connect][paimon] use filterAndCommit API for retried Committables.
1 parent a39959f commit 651bd64

File tree

1 file changed

+27
-4
lines changed

1 file changed

+27
-4
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@
2424
import org.apache.paimon.flink.sink.StoreMultiCommitter;
2525
import org.apache.paimon.manifest.WrappedManifestCommittable;
2626
import org.apache.paimon.options.Options;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
2729

28-
import java.io.IOException;
2930
import java.util.Collection;
3031
import java.util.Collections;
3132
import java.util.List;
@@ -34,6 +35,8 @@
3435
/** A {@link Committer} to commit write results for multiple tables. */
3536
public class PaimonCommitter implements Committer<MultiTableCommittable> {
3637

38+
private static final Logger LOGGER = LoggerFactory.getLogger(PaimonCommitter.class);
39+
3740
private final StoreMultiCommitter storeMultiCommitter;
3841

3942
public PaimonCommitter(Options catalogOptions, String commitUser) {
@@ -46,19 +49,39 @@ public PaimonCommitter(Options catalogOptions, String commitUser) {
4649
}
4750

4851
@Override
49-
public void commit(Collection<CommitRequest<MultiTableCommittable>> commitRequests)
50-
throws IOException, InterruptedException {
52+
public void commit(Collection<CommitRequest<MultiTableCommittable>> commitRequests) {
5153
if (commitRequests.isEmpty()) {
5254
return;
5355
}
56+
5457
List<MultiTableCommittable> committables =
5558
commitRequests.stream()
5659
.map(CommitRequest::getCommittable)
5760
.collect(Collectors.toList());
61+
// All CommitRequest shared the same checkpointId.
5862
long checkpointId = committables.get(0).checkpointId();
63+
int retriedNumber = commitRequests.stream().findFirst().get().getNumberOfRetries();
5964
WrappedManifestCommittable wrappedManifestCommittable =
6065
storeMultiCommitter.combine(checkpointId, 1L, committables);
61-
storeMultiCommitter.commit(Collections.singletonList(wrappedManifestCommittable));
66+
try {
67+
if (retriedNumber > 0) {
68+
storeMultiCommitter.filterAndCommit(
69+
Collections.singletonList(wrappedManifestCommittable));
70+
} else {
71+
storeMultiCommitter.commit(Collections.singletonList(wrappedManifestCommittable));
72+
}
73+
commitRequests.forEach(CommitRequest::signalAlreadyCommitted);
74+
LOGGER.info(
75+
String.format(
76+
"Commit succeeded for %s with %s committable",
77+
checkpointId, committables.size()));
78+
} catch (Exception e) {
79+
commitRequests.forEach(CommitRequest::retryLater);
80+
LOGGER.warn(
81+
String.format(
82+
"Commit failed for %s with %s committable",
83+
checkpointId, committables.size()));
84+
}
6285
}
6386

6487
@Override

0 commit comments

Comments
 (0)