Skip to content

Commit 8f15cd9

Browse files
committed
[hotfix] Fix job failure due to dangling DropTableEvent (without a prior CreateTableEvent)
1 parent cc82048 commit 8f15cd9

File tree

2 files changed

+65
-1
lines changed

2 files changed

+65
-1
lines changed

flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,64 @@ public void testSchemaChangeEvents() throws Exception {
331331
"DropTableEvent{tableId=%s.products}");
332332
}
333333

334+
@Test
335+
public void testDroppingTable() throws Exception {
336+
Thread.sleep(5000);
337+
LOG.info("Sleep 5 seconds to distinguish initial DDL events with dropping table events...");
338+
long ddlTimestamp = System.currentTimeMillis();
339+
Thread.sleep(5000);
340+
LOG.info("Going to drop tables after timestamp {}", ddlTimestamp);
341+
342+
try (Connection connection = mysqlInventoryDatabase.getJdbcConnection();
343+
Statement statement = connection.createStatement()) {
344+
statement.execute("DROP TABLE products;");
345+
}
346+
347+
String pipelineJob =
348+
String.format(
349+
"source:\n"
350+
+ " type: mysql\n"
351+
+ " hostname: %s\n"
352+
+ " port: 3306\n"
353+
+ " username: %s\n"
354+
+ " password: %s\n"
355+
+ " tables: %s.\\.*\n"
356+
+ " server-id: 5400-5404\n"
357+
+ " server-time-zone: UTC\n"
358+
+ " scan.startup.mode: timestamp\n"
359+
+ " scan.startup.timestamp-millis: %d\n"
360+
+ " scan.binlog.newly-added-table.enabled: true\n"
361+
+ "\n"
362+
+ "sink:\n"
363+
+ " type: values\n"
364+
+ "\n"
365+
+ "pipeline:\n"
366+
+ " parallelism: %d\n"
367+
+ " schema.change.behavior: evolve",
368+
INTER_CONTAINER_MYSQL_ALIAS,
369+
MYSQL_TEST_USER,
370+
MYSQL_TEST_PASSWORD,
371+
mysqlInventoryDatabase.getDatabaseName(),
372+
ddlTimestamp,
373+
parallelism);
374+
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
375+
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
376+
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
377+
submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar);
378+
waitUntilJobRunning(Duration.ofSeconds(30));
379+
LOG.info("Pipeline job is running");
380+
waitUntilSpecificEvent(
381+
String.format(
382+
"Table %s.products received SchemaChangeEvent DropTableEvent{tableId=%s.products} and start to be blocked.",
383+
mysqlInventoryDatabase.getDatabaseName(),
384+
mysqlInventoryDatabase.getDatabaseName()));
385+
386+
waitUntilSpecificEvent(
387+
String.format(
388+
"Schema change event DropTableEvent{tableId=%s.products} has been handled in another subTask already.",
389+
mysqlInventoryDatabase.getDatabaseName()));
390+
}
391+
334392
private void validateResult(String... expectedEvents) throws Exception {
335393
String dbName = mysqlInventoryDatabase.getDatabaseName();
336394
for (String event : expectedEvents) {

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,13 @@ private void processSchemaChangeEvents(SchemaChangeEvent event)
244244
tableId,
245245
event);
246246
handleSchemaChangeEvent(tableId, event);
247-
// Update caches
247+
248+
if (event instanceof DropTableEvent) {
249+
// Update caches unless event is a Drop table event. In that case, no schema will be
250+
// available / necessary.
251+
return;
252+
}
253+
248254
originalSchema.put(tableId, getLatestOriginalSchema(tableId));
249255
schemaDivergesMap.put(tableId, checkSchemaDiverges(tableId));
250256

0 commit comments

Comments
 (0)