Skip to content

Commit 4fc6319

Browse files
committed
debug
1 parent dfb4d41 commit 4fc6319

File tree

2 files changed

+87
-5
lines changed

2 files changed

+87
-5
lines changed

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java

Lines changed: 70 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -305,15 +305,32 @@ private void startAsynchronouslySplit() {
305305
private void splitTable(TableId nextTable) {
306306
LOG.info("Start splitting table {} into chunks...", nextTable);
307307
long start = System.currentTimeMillis();
308+
long lastProgressTime = System.currentTimeMillis();
308309
int chunkNum = 0;
309310
boolean hasRecordSchema = false;
310311
// split the given table into chunks (snapshot splits)
311312
do {
312313
synchronized (lock) {
313314
List<MySqlSnapshotSplit> splits;
314315
try {
316+
// Add progress logging
317+
long now = System.currentTimeMillis();
318+
if (now - lastProgressTime > 60000L) { // Log every minute
319+
LOG.info(
320+
"Still splitting table {}, chunks generated so far: {}, elapsed time: {}ms",
321+
nextTable,
322+
chunkNum,
323+
now - start);
324+
lastProgressTime = now;
325+
}
326+
315327
splits = chunkSplitter.splitChunks(partition, nextTable);
316328
} catch (Exception e) {
329+
LOG.error(
330+
"Error when splitting chunks for table {}: {}",
331+
nextTable,
332+
e.getMessage(),
333+
e);
317334
throw new IllegalStateException(
318335
"Error when splitting chunks for " + nextTable, e);
319336
}
@@ -367,13 +384,63 @@ public Optional<MySqlSplit> getNext() {
367384
split.toMySqlSnapshotSplit(tableSchemas.get(split.getTableId())));
368385
} else if (!remainingTables.isEmpty()) {
369386
try {
370-
// wait for the asynchronous split to complete
371-
lock.wait();
387+
// wait for the asynchronous split to complete with timeout
388+
final long timeout = 30000L; // 30 seconds timeout
389+
lock.wait(timeout);
390+
391+
// Check if we timed out and still have no splits
392+
if (remainingSplits.isEmpty() && !remainingTables.isEmpty()) {
393+
LOG.warn(
394+
"Timeout waiting for asynchronous split generation, remaining tables: {}",
395+
remainingTables);
396+
// Try to restart the splitting process
397+
startAsynchronouslySplit();
398+
// Wait again with shorter timeout
399+
lock.wait(5000L);
400+
401+
// After restart and wait, check again for splits
402+
if (!remainingSplits.isEmpty()) {
403+
// Splits are now available after restart
404+
Iterator<MySqlSchemalessSnapshotSplit> iterator =
405+
remainingSplits.iterator();
406+
MySqlSchemalessSnapshotSplit split = iterator.next();
407+
remainingSplits.remove(split);
408+
assignedSplits.put(split.splitId(), split);
409+
addAlreadyProcessedTablesIfNotExists(split.getTableId());
410+
return Optional.of(
411+
split.toMySqlSnapshotSplit(
412+
tableSchemas.get(split.getTableId())));
413+
} else if (!remainingTables.isEmpty()) {
414+
// Still no splits after restart, but we have tables - recurse normally
415+
return getNext();
416+
} else {
417+
// No tables left after restart
418+
closeExecutorService();
419+
return Optional.empty();
420+
}
421+
}
422+
// If we didn't timeout (normal case), check if splits are available now
423+
if (!remainingSplits.isEmpty()) {
424+
Iterator<MySqlSchemalessSnapshotSplit> iterator =
425+
remainingSplits.iterator();
426+
MySqlSchemalessSnapshotSplit split = iterator.next();
427+
remainingSplits.remove(split);
428+
assignedSplits.put(split.splitId(), split);
429+
addAlreadyProcessedTablesIfNotExists(split.getTableId());
430+
return Optional.of(
431+
split.toMySqlSnapshotSplit(tableSchemas.get(split.getTableId())));
432+
} else if (!remainingTables.isEmpty()) {
433+
// No splits yet but still have tables to process - recurse
434+
return getNext();
435+
} else {
436+
// No tables left
437+
closeExecutorService();
438+
return Optional.empty();
439+
}
372440
} catch (InterruptedException e) {
373441
throw new FlinkRuntimeException(
374442
"InterruptedException while waiting for asynchronously snapshot split");
375443
}
376-
return getNext();
377444
} else {
378445
closeExecutorService();
379446
return Optional.empty();

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,7 @@ public void close() throws IOException {
202202

203203
private void assignSplits() {
204204
final Iterator<Integer> awaitingReader = readersAwaitingSplit.iterator();
205+
boolean hasAssignmentFailure = false;
205206

206207
while (awaitingReader.hasNext()) {
207208
int nextAwaiting = awaitingReader.next();
@@ -230,12 +231,26 @@ private void assignSplits() {
230231
}
231232
awaitingReader.remove();
232233
LOG.info("The enumerator assigns split {} to subtask {}", mySqlSplit, nextAwaiting);
234+
hasAssignmentFailure = false; // Reset on successful assignment
233235
} else {
234-
// there is no available splits by now, skip assigning
236+
LOG.warn(
237+
"The enumerator has no more splits to assign to subtask {} by now. This may indicate a split generation timeout or all splits are exhausted.",
238+
nextAwaiting);
239+
// Don't break immediately - try other readers first
240+
hasAssignmentFailure = true;
235241
requestBinlogSplitUpdateIfNeed();
236-
break;
237242
}
238243
}
244+
245+
// If we had assignment failures and there are still readers waiting,
246+
// schedule another assignment attempt to handle potential async split generation
247+
if (hasAssignmentFailure && !readersAwaitingSplit.isEmpty()) {
248+
LOG.info(
249+
"Some readers couldn't get splits due to temporary unavailability. Scheduling retry...");
250+
// The async check interval will trigger another assignSplits() call
251+
}
252+
253+
LOG.info("The enumerator finishes this round of assignment.");
239254
}
240255

241256
private boolean shouldCloseIdleReader(int nextAwaiting) {

0 commit comments

Comments
 (0)