Skip to content

Commit

Permalink
[mysql] Optimize pure binlog phase check logic to improve performance (
Browse files Browse the repository at this point in the history
…apache#1392)

Co-authored-by: sammieliu <sammieliu@tencent.com>
  • Loading branch information
lzshlzsh and sammieliu authored Jul 28, 2022
1 parent 2c3962b commit 74f0a65
Showing 1 changed file with 9 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,11 @@

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
Expand Down Expand Up @@ -74,6 +76,7 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecord, MySqlSpli
private Map<TableId, List<FinishedSnapshotSplitInfo>> finishedSplitsInfo;
// tableId -> the max splitHighWatermark
private Map<TableId, BinlogOffset> maxSplitHighWatermarkMap;
private final Set<TableId> pureBinlogPhaseTables;
private Tables.TableFilter capturedTableFilter;

public BinlogSplitReader(StatefulTaskContext statefulTaskContext, int subTaskId) {
Expand All @@ -82,6 +85,7 @@ public BinlogSplitReader(StatefulTaskContext statefulTaskContext, int subTaskId)
new ThreadFactoryBuilder().setNameFormat("debezium-reader-" + subTaskId).build();
this.executor = Executors.newSingleThreadExecutor(threadFactory);
this.currentTaskRunning = true;
this.pureBinlogPhaseTables = new HashSet<>();
}

public void submitSplit(MySqlSplit mySqlSplit) {
Expand Down Expand Up @@ -226,9 +230,13 @@ private boolean shouldEmit(SourceRecord sourceRecord) {
}

private boolean hasEnterPureBinlogPhase(TableId tableId, BinlogOffset position) {
if (pureBinlogPhaseTables.contains(tableId)) {
return true;
}
// the existed tables those have finished snapshot reading
if (maxSplitHighWatermarkMap.containsKey(tableId)
&& position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))) {
pureBinlogPhaseTables.add(tableId);
return true;
}
// capture dynamically new added tables
Expand Down Expand Up @@ -269,6 +277,7 @@ private void configureFilter() {
}
this.finishedSplitsInfo = splitsInfoMap;
this.maxSplitHighWatermarkMap = tableIdBinlogPositionMap;
this.pureBinlogPhaseTables.clear();
}

public void stopBinlogReadTask() {
Expand Down

0 comments on commit 74f0a65

Please sign in to comment.