Skip to content

Commit

Permalink
[mysql] Fix assigning duplicate snapshot splits when enable scan newl…
Browse files Browse the repository at this point in the history
…y added tables (#2326)
  • Loading branch information
ruanhang1993 authored Jul 20, 2023
1 parent e8b62ba commit 67c445f
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,13 @@
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -206,15 +208,24 @@ private void captureNewlyAddedTables() {
if (sourceConfig.isScanNewlyAddedTableEnabled()) {
// check whether we got newly added tables
try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
final List<TableId> newlyAddedTables = discoverCapturedTables(jdbc, sourceConfig);
final List<TableId> currentCapturedTables =
discoverCapturedTables(jdbc, sourceConfig);
final Set<TableId> previousCapturedTables = new HashSet<>();
List<TableId> tablesInRemainingSplits =
remainingSplits.stream()
.map(MySqlSnapshotSplit::getTableId)
.collect(Collectors.toList());
previousCapturedTables.addAll(tablesInRemainingSplits);
previousCapturedTables.addAll(alreadyProcessedTables);
previousCapturedTables.addAll(remainingTables);

// Get the removed tables with the new table filter
List<TableId> tablesToRemove = new LinkedList<>(alreadyProcessedTables);
tablesToRemove.addAll(remainingTables);
tablesToRemove.removeAll(newlyAddedTables);
Set<TableId> tablesToRemove = new HashSet<>(previousCapturedTables);
tablesToRemove.removeAll(currentCapturedTables);

newlyAddedTables.removeAll(alreadyProcessedTables);
newlyAddedTables.removeAll(remainingTables);
// Get the newly added tables
currentCapturedTables.removeAll(previousCapturedTables);
List<TableId> newlyAddedTables = currentCapturedTables;

// case 1: there are old tables to remove from state
if (!tablesToRemove.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,21 @@
package com.ververica.cdc.connectors.mysql.source.assigners;

import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.ExceptionUtils;

import com.ververica.cdc.connectors.mysql.source.MySqlSourceTestBase;
import com.ververica.cdc.connectors.mysql.source.assigners.state.ChunkSplitterState;
import com.ververica.cdc.connectors.mysql.source.assigners.state.SnapshotPendingSplitsState;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSchemalessSnapshotSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
import com.ververica.cdc.connectors.mysql.source.utils.ChunkUtils;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase;
import io.debezium.relational.Column;
import io.debezium.relational.TableId;
import org.junit.BeforeClass;
import org.junit.Test;
Expand All @@ -41,6 +48,7 @@

import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
import static com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset.ofEarliest;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -416,7 +424,8 @@ public void testEnumerateTablesLazily() {
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
new String[] {"customers_even_dist"},
"id");
"id",
false);

final MySqlSnapshotSplitAssigner assigner =
new MySqlSnapshotSplitAssigner(
Expand All @@ -428,6 +437,18 @@ public void testEnumerateTablesLazily() {
assertFalse(assigner.needToDiscoveryTables());
}

@Test
public void testScanNewlyAddedTableStartFromCheckpoint() {
List<String> expected =
Arrays.asList(
"customers_sparse_dist [109] null",
"customers_even_dist null [10]",
"customers_even_dist [10] [18]",
"customers_even_dist [18] null",
"customer_card_single_line null null");
assertEquals(expected, getTestAssignSnapshotSplitsFromCheckpoint());
}

private List<String> getTestAssignSnapshotSplits(
int splitSize,
double distributionFactorUpper,
Expand Down Expand Up @@ -456,7 +477,8 @@ private List<String> getTestAssignSnapshotSplits(
distributionFactorUpper,
distributionFactorLower,
captureTables,
chunkKeyColumn);
chunkKeyColumn,
false);
List<TableId> remainingTables =
Arrays.stream(captureTables)
.map(t -> database.getDatabaseName() + "." + t)
Expand All @@ -465,7 +487,103 @@ private List<String> getTestAssignSnapshotSplits(
final MySqlSnapshotSplitAssigner assigner =
new MySqlSnapshotSplitAssigner(
configuration, DEFAULT_PARALLELISM, remainingTables, false);
return getSplitsFromAssigner(assigner);
}

private List<String> getTestAssignSnapshotSplitsFromCheckpoint() {
TableId newTable =
TableId.parse(customerDatabase.getDatabaseName() + ".customer_card_single_line");
TableId processedTable =
TableId.parse(customerDatabase.getDatabaseName() + ".customers_sparse_dist");
TableId splitTable =
TableId.parse(customerDatabase.getDatabaseName() + ".customers_even_dist");
String[] captureTables = {newTable.table(), processedTable.table(), splitTable.table()};
MySqlSourceConfig configuration =
getConfig(
customerDatabase,
4,
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
captureTables,
null,
true);
List<TableId> remainingTables = new ArrayList<>();
List<TableId> alreadyProcessedTables = new ArrayList<>();
alreadyProcessedTables.add(processedTable);

RowType splitKeyType =
ChunkUtils.getChunkKeyColumnType(
Column.editor().name("id").type("INT").jdbcType(4).create());
List<MySqlSchemalessSnapshotSplit> remainingSplits =
Arrays.asList(
new MySqlSchemalessSnapshotSplit(
processedTable,
processedTable + ":2",
splitKeyType,
new Object[] {109},
null,
null),
new MySqlSchemalessSnapshotSplit(
splitTable,
splitTable + ":0",
splitKeyType,
null,
new Object[] {10},
null),
new MySqlSchemalessSnapshotSplit(
splitTable,
splitTable + ":1",
splitKeyType,
new Object[] {10},
new Object[] {18},
null),
new MySqlSchemalessSnapshotSplit(
splitTable,
splitTable + ":2",
splitKeyType,
new Object[] {18},
null,
null));

Map<String, MySqlSchemalessSnapshotSplit> assignedSplits = new HashMap<>();
assignedSplits.put(
processedTable + ":0",
new MySqlSchemalessSnapshotSplit(
processedTable,
processedTable + ":0",
splitKeyType,
null,
new Object[] {105},
null));
assignedSplits.put(
processedTable + ":1",
new MySqlSchemalessSnapshotSplit(
processedTable,
processedTable + ":1",
splitKeyType,
new Object[] {105},
new Object[] {109},
null));
Map<String, BinlogOffset> splitFinishedOffsets = new HashMap<>();
splitFinishedOffsets.put(processedTable + ":0", ofEarliest());
SnapshotPendingSplitsState checkpoint =
new SnapshotPendingSplitsState(
alreadyProcessedTables,
remainingSplits,
assignedSplits,
new HashMap<>(),
splitFinishedOffsets,
AssignerStatus.INITIAL_ASSIGNING,
remainingTables,
false,
true,
ChunkSplitterState.NO_SPLITTING_TABLE_STATE);
final MySqlSnapshotSplitAssigner assigner =
new MySqlSnapshotSplitAssigner(configuration, DEFAULT_PARALLELISM, checkpoint);
return getSplitsFromAssigner(assigner);
}

private List<String> getSplitsFromAssigner(final MySqlSnapshotSplitAssigner assigner) {
assigner.open();
List<MySqlSplit> sqlSplits = new ArrayList<>();
while (true) {
Expand Down Expand Up @@ -500,7 +618,8 @@ private MySqlSourceConfig getConfig(
double distributionFactorUpper,
double distributionLower,
String[] captureTables,
String chunkKeyColumn) {
String chunkKeyColumn,
boolean scanNewlyAddedTableEnabled) {
Map<ObjectPath, String> chunkKeys = new HashMap<>();
for (String table : captureTables) {
chunkKeys.put(new ObjectPath(database.getDatabaseName(), table), chunkKeyColumn);
Expand All @@ -523,6 +642,7 @@ private MySqlSourceConfig getConfig(
.password(database.getPassword())
.serverTimeZone(ZoneId.of("UTC").toString())
.chunkKeyColumn(chunkKeys)
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
.createConfig(0);
}
}

0 comments on commit 67c445f

Please sign in to comment.