Skip to content

Commit

Permalink
[fix][mysql] fix When Mode LATEST_OFFSET starts,If the task generates…
Browse files Browse the repository at this point in the history
… an exception , data Repetition will occur when restarting
  • Loading branch information
fuyun2024 committed Nov 3, 2022
1 parent da5b322 commit 8191be6
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.ververica.cdc.connectors.mysql.source.assigners;

import org.apache.flink.util.CollectionUtil;

import com.ververica.cdc.connectors.mysql.source.assigners.state.BinlogPendingSplitsState;
import com.ververica.cdc.connectors.mysql.source.assigners.state.PendingSplitsState;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
Expand Down Expand Up @@ -86,8 +88,10 @@ public void onFinishedSplits(Map<String, BinlogOffset> splitFinishedOffsets) {

@Override
public void addSplits(Collection<MySqlSplit> splits) {
// we don't store the split, but will re-create binlog split later
isBinlogSplitAssigned = false;
if (!CollectionUtil.isNullOrEmpty(splits)) {
// we don't store the split, but will re-create binlog split later
isBinlogSplitAssigned = false;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,15 @@
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import static java.lang.String.format;
import static org.apache.flink.api.common.JobStatus.RUNNING;
import static org.apache.flink.util.Preconditions.checkState;
import static org.junit.Assert.assertTrue;

Expand All @@ -96,6 +102,7 @@ public class MySqlSourceITCase extends MySqlSourceTestBase {

@Rule public final Timeout timeoutPerTest = Timeout.seconds(300);

private static final String DEFAULT_SCAN_STARTUP_MODE = "initial";
private final UniqueDatabase customDatabase =
new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw");

Expand Down Expand Up @@ -188,6 +195,17 @@ public void testTaskManagerFailoverInBinlogPhase() throws Exception {
FailoverType.TM, FailoverPhase.BINLOG, new String[] {"customers", "customers_1"});
}

@Test
public void testTaskManagerFailoverFromLatestOffset() throws Exception {
testMySqlParallelSource(
DEFAULT_PARALLELISM,
"latest-offset",
FailoverType.TM,
FailoverPhase.BINLOG,
new String[] {"customers", "customers_1"},
RestartStrategies.fixedDelayRestart(1, 0));
}

@Test
public void testJobManagerFailoverInSnapshotPhase() throws Exception {
testMySqlParallelSource(
Expand All @@ -200,6 +218,17 @@ public void testJobManagerFailoverInBinlogPhase() throws Exception {
FailoverType.JM, FailoverPhase.BINLOG, new String[] {"customers", "customers_1"});
}

@Test
public void testJobManagerFailoverFromLatestOffset() throws Exception {
testMySqlParallelSource(
DEFAULT_PARALLELISM,
"latest-offset",
FailoverType.JM,
FailoverPhase.BINLOG,
new String[] {"customers", "customers_1"},
RestartStrategies.fixedDelayRestart(1, 0));
}

@Test
public void testTaskManagerFailoverSingleParallelism() throws Exception {
testMySqlParallelSource(
Expand Down Expand Up @@ -330,6 +359,7 @@ public void testConsumingTableWithoutPrimaryKey() {
try {
testMySqlParallelSource(
1,
DEFAULT_SCAN_STARTUP_MODE,
FailoverType.NONE,
FailoverPhase.NEVER,
new String[] {"customers_no_pk"},
Expand Down Expand Up @@ -415,6 +445,7 @@ public void testSnapshotSplitReadingFailCrossCheckpoints() throws Exception {
assertEqualsInAnyOrder(
Arrays.asList(expectedSnapshotData),
fetchRowData(iterator, expectedSnapshotData.length));
assertTrue(!hasNextData(iterator));
jobClient.cancel().get();
}

Expand Down Expand Up @@ -550,6 +581,7 @@ private void testMySqlParallelSource(
throws Exception {
testMySqlParallelSource(
parallelism,
DEFAULT_SCAN_STARTUP_MODE,
failoverType,
failoverPhase,
captureCustomerTables,
Expand All @@ -558,6 +590,7 @@ private void testMySqlParallelSource(

private void testMySqlParallelSource(
int parallelism,
String scanStartupMode,
FailoverType failoverType,
FailoverPhase failoverPhase,
String[] captureCustomerTables,
Expand Down Expand Up @@ -587,6 +620,7 @@ private void testMySqlParallelSource(
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'scan.startup.mode' = '%s',"
+ " 'scan.incremental.snapshot.chunk.size' = '100',"
+ " 'server-time-zone' = 'UTC',"
+ " 'server-id' = '%s'"
Expand All @@ -597,8 +631,28 @@ private void testMySqlParallelSource(
customDatabase.getPassword(),
customDatabase.getDatabaseName(),
getTableNameRegex(captureCustomerTables),
scanStartupMode,
getServerId());
tEnv.executeSql(sourceDDL);
TableResult tableResult = tEnv.executeSql("select * from customers");

// first step: check the snapshot data
if (DEFAULT_SCAN_STARTUP_MODE.equals(scanStartupMode)) {
checkSnapshotData(tableResult, failoverType, failoverPhase, captureCustomerTables);
}

// second step: check the binlog data
checkBinlogData(tableResult, failoverType, failoverPhase, captureCustomerTables);

tableResult.getJobClient().get().cancel().get();
}

private void checkSnapshotData(
TableResult tableResult,
FailoverType failoverType,
FailoverPhase failoverPhase,
String[] captureCustomerTables)
throws Exception {
String[] snapshotForSingleTable =
new String[] {
"+I[101, user_1, Shanghai, 123567891234]",
Expand All @@ -623,15 +677,15 @@ private void testMySqlParallelSource(
"+I[1019, user_20, Shanghai, 123567891234]",
"+I[2000, user_21, Shanghai, 123567891234]"
};
tEnv.executeSql(sourceDDL);
TableResult tableResult = tEnv.executeSql("select * from customers");
CloseableIterator<Row> iterator = tableResult.collect();
JobID jobId = tableResult.getJobClient().get().getJobID();

List<String> expectedSnapshotData = new ArrayList<>();
for (int i = 0; i < captureCustomerTables.length; i++) {
expectedSnapshotData.addAll(Arrays.asList(snapshotForSingleTable));
}

CloseableIterator<Row> iterator = tableResult.collect();
JobID jobId = tableResult.getJobClient().get().getJobID();

// trigger failover after some snapshot splits read finished
if (failoverPhase == FailoverPhase.SNAPSHOT && iterator.hasNext()) {
triggerFailover(
Expand All @@ -640,15 +694,26 @@ private void testMySqlParallelSource(

assertEqualsInAnyOrder(
expectedSnapshotData, fetchRows(iterator, expectedSnapshotData.size()));
}

private void checkBinlogData(
TableResult tableResult,
FailoverType failoverType,
FailoverPhase failoverPhase,
String[] captureCustomerTables)
throws Exception {
waitUntilJobRunning(tableResult);
CloseableIterator<Row> iterator = tableResult.collect();
JobID jobId = tableResult.getJobClient().get().getJobID();

// second step: check the binlog data
for (String tableId : captureCustomerTables) {
makeFirstPartBinlogEvents(
getConnection(), customDatabase.getDatabaseName() + '.' + tableId);
}
if (failoverPhase == FailoverPhase.BINLOG) {
triggerFailover(
failoverType, jobId, miniClusterResource.getMiniCluster(), () -> sleepMs(200));
waitUntilJobRunning(tableResult);
}
for (String tableId : captureCustomerTables) {
makeSecondPartBinlogEvents(
Expand All @@ -660,8 +725,9 @@ private void testMySqlParallelSource(
expectedBinlogData.addAll(firstPartBinlogEvents);
expectedBinlogData.addAll(secondPartBinlogEvents);
}

assertEqualsInAnyOrder(expectedBinlogData, fetchRows(iterator, expectedBinlogData.size()));
tableResult.getJobClient().get().cancel().get();
assertTrue(!hasNextData(iterator));
}

private static List<String> convertRowDataToRowString(List<RowData> rows) {
Expand Down Expand Up @@ -1140,6 +1206,27 @@ private static int sinkSize(String sinkName) {
}
}

private void waitUntilJobRunning(TableResult tableResult)
throws InterruptedException, ExecutionException {
do {
Thread.sleep(5000L);
} while (tableResult.getJobClient().get().getJobStatus().get() != RUNNING);
}

private boolean hasNextData(final CloseableIterator<?> iterator)
throws InterruptedException, ExecutionException {
ExecutorService executor = Executors.newSingleThreadExecutor();
try {
FutureTask<Boolean> future = new FutureTask(iterator::hasNext);
executor.execute(future);
return future.get(3, TimeUnit.SECONDS);
} catch (TimeoutException e) {
return false;
} finally {
executor.shutdown();
}
}

/**
* A {@link DebeziumDeserializationSchema} implementation which sleep given milliseconds after
* deserialize per record, this class is designed for test.
Expand Down

0 comments on commit 8191be6

Please sign in to comment.