Skip to content

Commit 0df63e2

Browse files
authored
[hotfix][cdc-runtime] Keep upstream pending requests in order to avoid checkpoint hanging & state inconsistency in timestamp startup mode
This closes #3576.
1 parent 2e938a9 commit 0df63e2

File tree

6 files changed

+203
-73
lines changed

6 files changed

+203
-73
lines changed

flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,64 @@ public void testSchemaChangeEvents() throws Exception {
332332
"DropTableEvent{tableId=%s.products}");
333333
}
334334

335+
@Test
336+
public void testDroppingTable() throws Exception {
337+
Thread.sleep(5000);
338+
LOG.info("Sleep 5 seconds to distinguish initial DDL events with dropping table events...");
339+
long ddlTimestamp = System.currentTimeMillis();
340+
Thread.sleep(5000);
341+
LOG.info("Going to drop tables after timestamp {}", ddlTimestamp);
342+
343+
try (Connection connection = mysqlInventoryDatabase.getJdbcConnection();
344+
Statement statement = connection.createStatement()) {
345+
statement.execute("DROP TABLE products;");
346+
}
347+
348+
String pipelineJob =
349+
String.format(
350+
"source:\n"
351+
+ " type: mysql\n"
352+
+ " hostname: %s\n"
353+
+ " port: 3306\n"
354+
+ " username: %s\n"
355+
+ " password: %s\n"
356+
+ " tables: %s.\\.*\n"
357+
+ " server-id: 5400-5404\n"
358+
+ " server-time-zone: UTC\n"
359+
+ " scan.startup.mode: timestamp\n"
360+
+ " scan.startup.timestamp-millis: %d\n"
361+
+ " scan.binlog.newly-added-table.enabled: true\n"
362+
+ "\n"
363+
+ "sink:\n"
364+
+ " type: values\n"
365+
+ "\n"
366+
+ "pipeline:\n"
367+
+ " parallelism: %d\n"
368+
+ " schema.change.behavior: evolve",
369+
INTER_CONTAINER_MYSQL_ALIAS,
370+
MYSQL_TEST_USER,
371+
MYSQL_TEST_PASSWORD,
372+
mysqlInventoryDatabase.getDatabaseName(),
373+
ddlTimestamp,
374+
parallelism);
375+
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
376+
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
377+
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
378+
submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar);
379+
waitUntilJobRunning(Duration.ofSeconds(30));
380+
LOG.info("Pipeline job is running");
381+
waitUntilSpecificEvent(
382+
String.format(
383+
"Table %s.products received SchemaChangeEvent DropTableEvent{tableId=%s.products} and start to be blocked.",
384+
mysqlInventoryDatabase.getDatabaseName(),
385+
mysqlInventoryDatabase.getDatabaseName()));
386+
387+
waitUntilSpecificEvent(
388+
String.format(
389+
"Schema change event DropTableEvent{tableId=%s.products} has been handled in another subTask already.",
390+
mysqlInventoryDatabase.getDatabaseName()));
391+
}
392+
335393
private void validateResult(String... expectedEvents) throws Exception {
336394
String dbName = mysqlInventoryDatabase.getDatabaseName();
337395
for (String event : expectedEvents) {

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.flink.cdc.common.data.RecordData;
2424
import org.apache.flink.cdc.common.data.StringData;
2525
import org.apache.flink.cdc.common.event.DataChangeEvent;
26+
import org.apache.flink.cdc.common.event.DropTableEvent;
2627
import org.apache.flink.cdc.common.event.Event;
2728
import org.apache.flink.cdc.common.event.FlushEvent;
2829
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
@@ -50,6 +51,7 @@
5051
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
5152
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
5253
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
54+
import org.apache.flink.runtime.state.StateSnapshotContext;
5355
import org.apache.flink.streaming.api.graph.StreamConfig;
5456
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
5557
import org.apache.flink.streaming.api.operators.ChainingStrategy;
@@ -242,7 +244,13 @@ private void processSchemaChangeEvents(SchemaChangeEvent event)
242244
tableId,
243245
event);
244246
handleSchemaChangeEvent(tableId, event);
245-
// Update caches
247+
248+
if (event instanceof DropTableEvent) {
249+
// Update caches unless event is a Drop table event. In that case, no schema will be
250+
// available / necessary.
251+
return;
252+
}
253+
246254
originalSchema.put(tableId, getLatestOriginalSchema(tableId));
247255
schemaDivergesMap.put(tableId, checkSchemaDiverges(tableId));
248256

@@ -440,7 +448,8 @@ private SchemaChangeResponse requestSchemaChange(
440448
long schemaEvolveTimeOutMillis = System.currentTimeMillis() + rpcTimeOutInMillis;
441449
while (true) {
442450
SchemaChangeResponse response =
443-
sendRequestToCoordinator(new SchemaChangeRequest(tableId, schemaChangeEvent));
451+
sendRequestToCoordinator(
452+
new SchemaChangeRequest(tableId, schemaChangeEvent, subTaskId));
444453
if (response.isRegistryBusy()) {
445454
if (System.currentTimeMillis() < schemaEvolveTimeOutMillis) {
446455
LOG.info(
@@ -609,4 +618,10 @@ public Object getFieldOrNull(RecordData recordData) {
609618
}
610619
}
611620
}
621+
622+
@Override
623+
public void snapshotState(StateSnapshotContext context) throws Exception {
624+
// Needless to do anything, since AbstractStreamOperator#snapshotState and #processElement
625+
// is guaranteed not to be mixed together.
626+
}
612627
}

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java

Lines changed: 108 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@
5555
import java.util.concurrent.ConcurrentHashMap;
5656
import java.util.concurrent.ExecutorService;
5757
import java.util.concurrent.Executors;
58-
import java.util.concurrent.atomic.AtomicReference;
5958
import java.util.stream.Collectors;
6059
import java.util.stream.Stream;
6160

@@ -78,7 +77,10 @@ public class SchemaRegistryRequestHandler implements Closeable {
7877
/**
7978
* Atomic flag indicating if current RequestHandler could accept more schema changes for now.
8079
*/
81-
private final AtomicReference<RequestStatus> schemaChangeStatus;
80+
private volatile RequestStatus schemaChangeStatus;
81+
82+
private final List<Integer> pendingSubTaskIds;
83+
private final Object schemaChangeRequestLock;
8284

8385
private volatile Throwable currentChangeException;
8486
private volatile List<SchemaChangeEvent> currentDerivedSchemaChangeEvents;
@@ -110,7 +112,10 @@ public SchemaRegistryRequestHandler(
110112
this.currentDerivedSchemaChangeEvents = new ArrayList<>();
111113
this.currentFinishedSchemaChanges = new ArrayList<>();
112114
this.currentIgnoredSchemaChanges = new ArrayList<>();
113-
this.schemaChangeStatus = new AtomicReference<>(RequestStatus.IDLE);
115+
116+
this.schemaChangeStatus = RequestStatus.IDLE;
117+
this.pendingSubTaskIds = new ArrayList<>();
118+
this.schemaChangeRequestLock = new Object();
114119
}
115120

116121
/**
@@ -120,67 +125,100 @@ public SchemaRegistryRequestHandler(
120125
*/
121126
public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(
122127
SchemaChangeRequest request) {
123-
if (schemaChangeStatus.compareAndSet(RequestStatus.IDLE, RequestStatus.WAITING_FOR_FLUSH)) {
124-
LOG.info(
125-
"Received schema change event request {} from table {}. SchemaChangeStatus switched from IDLE to WAITING_FOR_FLUSH, other requests will be blocked.",
126-
request.getSchemaChangeEvent(),
127-
request.getTableId().toString());
128-
SchemaChangeEvent event = request.getSchemaChangeEvent();
129-
130-
// If this schema change event has been requested by another subTask, ignore it.
131-
if (schemaManager.isOriginalSchemaChangeEventRedundant(event)) {
132-
LOG.info("Event {} has been addressed before, ignoring it.", event);
133-
clearCurrentSchemaChangeRequest();
134-
Preconditions.checkState(
135-
schemaChangeStatus.compareAndSet(
136-
RequestStatus.WAITING_FOR_FLUSH, RequestStatus.IDLE),
137-
"Illegal schemaChangeStatus state: should still in WAITING_FOR_FLUSH state if event was duplicated, not "
138-
+ schemaChangeStatus.get());
128+
129+
// We use requester subTask ID as the pending ticket, because there will be at most 1 schema
130+
// change requests simultaneously from each subTask
131+
int requestSubTaskId = request.getSubTaskId();
132+
133+
synchronized (schemaChangeRequestLock) {
134+
// Make sure we handle the first request in the pending list to avoid out-of-order
135+
// waiting and blocks checkpointing mechanism.
136+
if (schemaChangeStatus == RequestStatus.IDLE) {
137+
if (pendingSubTaskIds.isEmpty()) {
138+
LOG.info(
139+
"Received schema change event request {} from table {} from subTask {}. Pending list is empty, handling this.",
140+
request.getSchemaChangeEvent(),
141+
request.getTableId().toString(),
142+
requestSubTaskId);
143+
} else if (pendingSubTaskIds.get(0) == requestSubTaskId) {
144+
LOG.info(
145+
"Received schema change event request {} from table {} from subTask {}. It is on the first of the pending list, handling this.",
146+
request.getSchemaChangeEvent(),
147+
request.getTableId().toString(),
148+
requestSubTaskId);
149+
pendingSubTaskIds.remove(0);
150+
} else {
151+
LOG.info(
152+
"Received schema change event request {} from table {} from subTask {}. It is not the first of the pending list ({}).",
153+
request.getSchemaChangeEvent(),
154+
request.getTableId().toString(),
155+
requestSubTaskId,
156+
pendingSubTaskIds);
157+
if (!pendingSubTaskIds.contains(requestSubTaskId)) {
158+
pendingSubTaskIds.add(requestSubTaskId);
159+
}
160+
return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.busy()));
161+
}
162+
163+
SchemaChangeEvent event = request.getSchemaChangeEvent();
164+
165+
// If this schema change event has been requested by another subTask, ignore it.
166+
if (schemaManager.isOriginalSchemaChangeEventRedundant(event)) {
167+
LOG.info("Event {} has been addressed before, ignoring it.", event);
168+
clearCurrentSchemaChangeRequest();
169+
LOG.info(
170+
"SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to duplicated request.",
171+
request);
172+
return CompletableFuture.completedFuture(
173+
wrap(SchemaChangeResponse.duplicate()));
174+
}
175+
schemaManager.applyOriginalSchemaChange(event);
176+
List<SchemaChangeEvent> derivedSchemaChangeEvents =
177+
calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent());
178+
179+
// If this schema change event is filtered out by LENIENT mode or merging table
180+
// route strategies, ignore it.
181+
if (derivedSchemaChangeEvents.isEmpty()) {
182+
LOG.info("Event {} is omitted from sending to downstream, ignoring it.", event);
183+
clearCurrentSchemaChangeRequest();
184+
LOG.info(
185+
"SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to ignored request.",
186+
request);
187+
return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.ignored()));
188+
}
189+
139190
LOG.info(
140-
"SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to duplicated request.",
141-
request);
142-
return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.duplicate()));
143-
}
144-
schemaManager.applyOriginalSchemaChange(event);
145-
List<SchemaChangeEvent> derivedSchemaChangeEvents =
146-
calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent());
147-
148-
// If this schema change event is filtered out by LENIENT mode or merging table route
149-
// strategies, ignore it.
150-
if (derivedSchemaChangeEvents.isEmpty()) {
151-
LOG.info("Event {} is omitted from sending to downstream, ignoring it.", event);
152-
clearCurrentSchemaChangeRequest();
153-
Preconditions.checkState(
154-
schemaChangeStatus.compareAndSet(
155-
RequestStatus.WAITING_FOR_FLUSH, RequestStatus.IDLE),
156-
"Illegal schemaChangeStatus state: should still in WAITING_FOR_FLUSH state if event was ignored, not "
157-
+ schemaChangeStatus.get());
191+
"SchemaChangeStatus switched from IDLE to WAITING_FOR_FLUSH, other requests will be blocked.");
192+
// This request has been accepted.
193+
schemaChangeStatus = RequestStatus.WAITING_FOR_FLUSH;
194+
195+
// Backfill pre-schema info for sink applying
196+
derivedSchemaChangeEvents.forEach(
197+
e -> {
198+
if (e instanceof SchemaChangeEventWithPreSchema) {
199+
SchemaChangeEventWithPreSchema pe =
200+
(SchemaChangeEventWithPreSchema) e;
201+
if (!pe.hasPreSchema()) {
202+
schemaManager
203+
.getLatestEvolvedSchema(pe.tableId())
204+
.ifPresent(pe::fillPreSchema);
205+
}
206+
}
207+
});
208+
currentDerivedSchemaChangeEvents = new ArrayList<>(derivedSchemaChangeEvents);
209+
return CompletableFuture.completedFuture(
210+
wrap(SchemaChangeResponse.accepted(derivedSchemaChangeEvents)));
211+
} else {
158212
LOG.info(
159-
"SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to ignored request.",
160-
request);
161-
return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.ignored()));
213+
"Schema Registry is busy processing a schema change request, could not handle request {} for now. Added {} to pending list ({}).",
214+
request,
215+
requestSubTaskId,
216+
pendingSubTaskIds);
217+
if (!pendingSubTaskIds.contains(requestSubTaskId)) {
218+
pendingSubTaskIds.add(requestSubTaskId);
219+
}
220+
return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.busy()));
162221
}
163-
164-
// Backfill pre-schema info for sink applying
165-
derivedSchemaChangeEvents.forEach(
166-
e -> {
167-
if (e instanceof SchemaChangeEventWithPreSchema) {
168-
SchemaChangeEventWithPreSchema pe = (SchemaChangeEventWithPreSchema) e;
169-
if (!pe.hasPreSchema()) {
170-
schemaManager
171-
.getLatestEvolvedSchema(pe.tableId())
172-
.ifPresent(pe::fillPreSchema);
173-
}
174-
}
175-
});
176-
currentDerivedSchemaChangeEvents = new ArrayList<>(derivedSchemaChangeEvents);
177-
return CompletableFuture.completedFuture(
178-
wrap(SchemaChangeResponse.accepted(derivedSchemaChangeEvents)));
179-
} else {
180-
LOG.info(
181-
"Schema Registry is busy processing a schema change request, could not handle request {} for now.",
182-
request);
183-
return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.busy()));
184222
}
185223
}
186224

@@ -227,9 +265,10 @@ private void applySchemaChange(
227265
}
228266
}
229267
Preconditions.checkState(
230-
schemaChangeStatus.compareAndSet(RequestStatus.APPLYING, RequestStatus.FINISHED),
268+
schemaChangeStatus == RequestStatus.APPLYING,
231269
"Illegal schemaChangeStatus state: should be APPLYING before applySchemaChange finishes, not "
232-
+ schemaChangeStatus.get());
270+
+ schemaChangeStatus);
271+
schemaChangeStatus = RequestStatus.FINISHED;
233272
LOG.info(
234273
"SchemaChangeStatus switched from APPLYING to FINISHED for request {}.",
235274
currentDerivedSchemaChangeEvents);
@@ -262,10 +301,11 @@ public void flushSuccess(TableId tableId, int sinkSubtask, int parallelism) {
262301
}
263302
if (flushedSinkWriters.equals(activeSinkWriters)) {
264303
Preconditions.checkState(
265-
schemaChangeStatus.compareAndSet(
266-
RequestStatus.WAITING_FOR_FLUSH, RequestStatus.APPLYING),
304+
schemaChangeStatus == RequestStatus.WAITING_FOR_FLUSH,
267305
"Illegal schemaChangeStatus state: should be WAITING_FOR_FLUSH before collecting enough FlushEvents, not "
268306
+ schemaChangeStatus);
307+
308+
schemaChangeStatus = RequestStatus.APPLYING;
269309
LOG.info(
270310
"All sink subtask have flushed for table {}. Start to apply schema change.",
271311
tableId.toString());
@@ -276,9 +316,10 @@ public void flushSuccess(TableId tableId, int sinkSubtask, int parallelism) {
276316

277317
public CompletableFuture<CoordinationResponse> getSchemaChangeResult() {
278318
Preconditions.checkState(
279-
!schemaChangeStatus.get().equals(RequestStatus.IDLE),
319+
schemaChangeStatus != RequestStatus.IDLE,
280320
"Illegal schemaChangeStatus: should not be IDLE before getting schema change request results.");
281-
if (schemaChangeStatus.compareAndSet(RequestStatus.FINISHED, RequestStatus.IDLE)) {
321+
if (schemaChangeStatus == RequestStatus.FINISHED) {
322+
schemaChangeStatus = RequestStatus.IDLE;
282323
LOG.info(
283324
"SchemaChangeStatus switched from FINISHED to IDLE for request {}",
284325
currentDerivedSchemaChangeEvents);

0 commit comments

Comments
 (0)