Skip to content

Commit

Permalink
[mysql-cdc] Add handler for catching async exceptions in snapshot rea…
Browse files Browse the repository at this point in the history
…ding executor

This closes apache#2016.
  • Loading branch information
leonardBang committed Jun 7, 2023
1 parent 70db0d2 commit 29713db
Showing 1 changed file with 22 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,11 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecords, MySqlS
public SnapshotSplitReader(StatefulTaskContext statefulTaskContext, int subtaskId) {
this.statefulTaskContext = statefulTaskContext;
ThreadFactory threadFactory =
new ThreadFactoryBuilder().setNameFormat("snapshot-reader-" + subtaskId).build();
new ThreadFactoryBuilder()
.setNameFormat("debezium-reader-" + subtaskId)
.setUncaughtExceptionHandler(
(thread, throwable) -> setReadException(throwable))
.build();
this.executorService = Executors.newSingleThreadExecutor(threadFactory);
this.currentTaskRunning = false;
this.hasNextElement = new AtomicBoolean(false);
Expand All @@ -120,7 +124,7 @@ public void submitSplit(MySqlSplit mySqlSplit) {
statefulTaskContext.getSnapshotReceiver(),
StatefulTaskContext.getClock(),
currentSnapshotSplit);
executorService.submit(
executorService.execute(
() -> {
try {
currentTaskRunning = true;
Expand Down Expand Up @@ -160,20 +164,14 @@ public void submitSplit(MySqlSplit mySqlSplit) {
new SnapshotBinlogSplitChangeEventSourceContextImpl(),
mySqlOffsetContext);
} else {
readException =
setReadException(
new IllegalStateException(
String.format(
"Read snapshot for mysql split %s fail",
currentSnapshotSplit));
currentSnapshotSplit)));
}
} catch (Exception e) {
currentTaskRunning = false;
LOG.error(
String.format(
"Execute snapshot read task for mysql split %s fail",
currentSnapshotSplit),
e);
readException = e;
setReadException(e);
}
});
}
Expand Down Expand Up @@ -335,6 +333,19 @@ private void assertLowWatermark(SourceRecord lowWatermark) {
lowWatermark));
}

private void setReadException(Throwable throwable) {
currentTaskRunning = false;
LOG.error(
String.format(
"Execute snapshot read task for mysql split %s fail", currentSnapshotSplit),
throwable);
if (readException == null) {
readException = throwable;
} else {
readException.addSuppressed(throwable);
}
}

@Override
public void close() {
try {
Expand Down

0 comments on commit 29713db

Please sign in to comment.