Skip to content

Commit cb1b232

Browse files
authored
[minor][tests] Fix test testDanglingDroppingTableDuringBinlogMode due to imprecise timestamp startup
This closes #3580
1 parent 0e9a176 commit cb1b232

File tree

2 files changed

+108
-12
lines changed

2 files changed

+108
-12
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.flink.cdc.common.types.DataType;
3939
import org.apache.flink.cdc.common.types.DataTypes;
4040
import org.apache.flink.cdc.common.types.RowType;
41+
import org.apache.flink.cdc.common.utils.Preconditions;
4142
import org.apache.flink.cdc.connectors.mysql.factory.MySqlDataSourceFactory;
4243
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
4344
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
@@ -57,6 +58,7 @@
5758
import org.testcontainers.lifecycle.Startables;
5859

5960
import java.sql.Connection;
61+
import java.sql.ResultSet;
6062
import java.sql.SQLException;
6163
import java.sql.Statement;
6264
import java.util.ArrayList;
@@ -689,6 +691,78 @@ public void testSchemaChangeEvents() throws Exception {
689691
actual.stream().map(Object::toString).collect(Collectors.toList()));
690692
}
691693

694+
@Test
695+
public void testDanglingDropTableEventInBinlog() throws Exception {
696+
env.setParallelism(1);
697+
inventoryDatabase.createAndInitialize();
698+
699+
// Create a new table for later deletion
700+
try (Connection connection = inventoryDatabase.getJdbcConnection();
701+
Statement statement = connection.createStatement()) {
702+
statement.execute("CREATE TABLE live_fast(ID INT PRIMARY KEY);");
703+
}
704+
705+
String logFileName = null;
706+
Long logPosition = null;
707+
708+
try (Connection connection = inventoryDatabase.getJdbcConnection();
709+
Statement statement = connection.createStatement()) {
710+
ResultSet rs = statement.executeQuery("SHOW BINARY LOGS;");
711+
while (rs.next()) {
712+
logFileName = rs.getString("Log_name");
713+
logPosition = rs.getLong("File_size");
714+
}
715+
}
716+
717+
// We start reading binlog from the tail of current position and file to avoid reading
718+
// previous events. The next DDL event (DROP TABLE) will push binlog position forward.
719+
Preconditions.checkNotNull(logFileName, "Log file name must not be null");
720+
Preconditions.checkNotNull(logPosition, "Log position name must not be null");
721+
LOG.info("Trying to restore from {} @ {}...", logFileName, logPosition);
722+
723+
try (Connection connection = inventoryDatabase.getJdbcConnection();
724+
Statement statement = connection.createStatement()) {
725+
statement.execute("DROP TABLE live_fast;");
726+
}
727+
728+
MySqlSourceConfigFactory configFactory =
729+
new MySqlSourceConfigFactory()
730+
.hostname(MYSQL8_CONTAINER.getHost())
731+
.port(MYSQL8_CONTAINER.getDatabasePort())
732+
.username(TEST_USER)
733+
.password(TEST_PASSWORD)
734+
.databaseList(inventoryDatabase.getDatabaseName())
735+
.tableList(inventoryDatabase.getDatabaseName() + ".*")
736+
.startupOptions(StartupOptions.specificOffset(logFileName, logPosition))
737+
.serverId(getServerId(env.getParallelism()))
738+
.serverTimeZone("UTC")
739+
.includeSchemaChanges(SCHEMA_CHANGE_ENABLED.defaultValue());
740+
741+
FlinkSourceProvider sourceProvider =
742+
(FlinkSourceProvider) new MySqlDataSource(configFactory).getEventSourceProvider();
743+
CloseableIterator<Event> events =
744+
env.fromSource(
745+
sourceProvider.getSource(),
746+
WatermarkStrategy.noWatermarks(),
747+
MySqlDataSourceFactory.IDENTIFIER,
748+
new EventTypeInfo())
749+
.executeAndCollect();
750+
Thread.sleep(5_000);
751+
752+
List<Event> expectedEvents =
753+
new ArrayList<>(
754+
getInventoryCreateAllTableEvents(inventoryDatabase.getDatabaseName()));
755+
756+
expectedEvents.add(
757+
new DropTableEvent(
758+
TableId.tableId(inventoryDatabase.getDatabaseName(), "live_fast")));
759+
760+
List<Event> actual = fetchResults(events, expectedEvents.size());
761+
assertEqualsInAnyOrder(
762+
expectedEvents.stream().map(Object::toString).collect(Collectors.toList()),
763+
actual.stream().map(Object::toString).collect(Collectors.toList()));
764+
}
765+
692766
private CreateTableEvent getProductsCreateTableEvent(TableId tableId) {
693767
return new CreateTableEvent(
694768
tableId,

flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.flink.cdc.pipeline.tests;
1919

2020
import org.apache.flink.cdc.common.test.utils.TestUtils;
21+
import org.apache.flink.cdc.common.utils.Preconditions;
2122
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
2223
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
2324
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
@@ -36,6 +37,7 @@
3637
import java.nio.file.Path;
3738
import java.sql.Connection;
3839
import java.sql.DriverManager;
40+
import java.sql.ResultSet;
3941
import java.sql.SQLException;
4042
import java.sql.Statement;
4143
import java.time.Duration;
@@ -333,16 +335,34 @@ public void testSchemaChangeEvents() throws Exception {
333335
}
334336

335337
@Test
336-
public void testDroppingTable() throws Exception {
337-
Thread.sleep(5000);
338-
LOG.info("Sleep 5 seconds to distinguish initial DDL events with dropping table events...");
339-
long ddlTimestamp = System.currentTimeMillis();
340-
Thread.sleep(5000);
341-
LOG.info("Going to drop tables after timestamp {}", ddlTimestamp);
338+
public void testDanglingDropTableEventInBinlog() throws Exception {
339+
// Create a new table for later deletion
340+
try (Connection connection = mysqlInventoryDatabase.getJdbcConnection();
341+
Statement statement = connection.createStatement()) {
342+
statement.execute("CREATE TABLE live_fast(ID INT PRIMARY KEY);");
343+
}
344+
345+
String logFileName = null;
346+
Long logPosition = null;
347+
348+
try (Connection connection = mysqlInventoryDatabase.getJdbcConnection();
349+
Statement statement = connection.createStatement()) {
350+
ResultSet rs = statement.executeQuery("SHOW BINARY LOGS;");
351+
while (rs.next()) {
352+
logFileName = rs.getString("Log_name");
353+
logPosition = rs.getLong("File_size");
354+
}
355+
}
356+
357+
// We start reading binlog from the tail of current position and file to avoid reading
358+
// previous events. The next DDL event (DROP TABLE) will push binlog position forward.
359+
Preconditions.checkNotNull(logFileName, "Log file name must not be null");
360+
Preconditions.checkNotNull(logPosition, "Log position name must not be null");
361+
LOG.info("Trying to restore from {} @ {}...", logFileName, logPosition);
342362

343363
try (Connection connection = mysqlInventoryDatabase.getJdbcConnection();
344364
Statement statement = connection.createStatement()) {
345-
statement.execute("DROP TABLE products;");
365+
statement.execute("DROP TABLE live_fast;");
346366
}
347367

348368
String pipelineJob =
@@ -356,8 +376,9 @@ public void testDroppingTable() throws Exception {
356376
+ " tables: %s.\\.*\n"
357377
+ " server-id: 5400-5404\n"
358378
+ " server-time-zone: UTC\n"
359-
+ " scan.startup.mode: timestamp\n"
360-
+ " scan.startup.timestamp-millis: %d\n"
379+
+ " scan.startup.mode: specific-offset\n"
380+
+ " scan.startup.specific-offset.file: %s\n"
381+
+ " scan.startup.specific-offset.pos: %d\n"
361382
+ " scan.binlog.newly-added-table.enabled: true\n"
362383
+ "\n"
363384
+ "sink:\n"
@@ -370,7 +391,8 @@ public void testDroppingTable() throws Exception {
370391
MYSQL_TEST_USER,
371392
MYSQL_TEST_PASSWORD,
372393
mysqlInventoryDatabase.getDatabaseName(),
373-
ddlTimestamp,
394+
logFileName,
395+
logPosition,
374396
parallelism);
375397
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
376398
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
@@ -380,13 +402,13 @@ public void testDroppingTable() throws Exception {
380402
LOG.info("Pipeline job is running");
381403
waitUntilSpecificEvent(
382404
String.format(
383-
"Table %s.products received SchemaChangeEvent DropTableEvent{tableId=%s.products} and start to be blocked.",
405+
"Table %s.live_fast received SchemaChangeEvent DropTableEvent{tableId=%s.live_fast} and start to be blocked.",
384406
mysqlInventoryDatabase.getDatabaseName(),
385407
mysqlInventoryDatabase.getDatabaseName()));
386408

387409
waitUntilSpecificEvent(
388410
String.format(
389-
"Schema change event DropTableEvent{tableId=%s.products} has been handled in another subTask already.",
411+
"Schema change event DropTableEvent{tableId=%s.live_fast} has been handled in another subTask already.",
390412
mysqlInventoryDatabase.getDatabaseName()));
391413
}
392414

0 commit comments

Comments
 (0)