Skip to content

Commit 5314014

Browse files
committed
debug
1 parent dfb4d41 commit 5314014

File tree

2 files changed

+32
-2
lines changed

2 files changed

+32
-2
lines changed

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -305,15 +305,32 @@ private void startAsynchronouslySplit() {
305305
private void splitTable(TableId nextTable) {
306306
LOG.info("Start splitting table {} into chunks...", nextTable);
307307
long start = System.currentTimeMillis();
308+
long lastProgressTime = System.currentTimeMillis();
308309
int chunkNum = 0;
309310
boolean hasRecordSchema = false;
310311
// split the given table into chunks (snapshot splits)
311312
do {
312313
synchronized (lock) {
313314
List<MySqlSnapshotSplit> splits;
314315
try {
316+
// Add progress logging
317+
long now = System.currentTimeMillis();
318+
if (now - lastProgressTime > 60000L) { // Log every minute
319+
LOG.info(
320+
"Still splitting table {}, chunks generated so far: {}, elapsed time: {}ms",
321+
nextTable,
322+
chunkNum,
323+
now - start);
324+
lastProgressTime = now;
325+
}
326+
315327
splits = chunkSplitter.splitChunks(partition, nextTable);
316328
} catch (Exception e) {
329+
LOG.error(
330+
"Error when splitting chunks for table {}: {}",
331+
nextTable,
332+
e.getMessage(),
333+
e);
317334
throw new IllegalStateException(
318335
"Error when splitting chunks for " + nextTable, e);
319336
}
@@ -367,8 +384,20 @@ public Optional<MySqlSplit> getNext() {
367384
split.toMySqlSnapshotSplit(tableSchemas.get(split.getTableId())));
368385
} else if (!remainingTables.isEmpty()) {
369386
try {
370-
// wait for the asynchronous split to complete
371-
lock.wait();
387+
// wait for the asynchronous split to complete with timeout
388+
final long timeout = 30000L; // 30 seconds timeout
389+
lock.wait(timeout);
390+
391+
// Check if we're still waiting after timeout
392+
if (remainingSplits.isEmpty() && !remainingTables.isEmpty()) {
393+
LOG.warn(
394+
"Timeout waiting for asynchronous split generation, remaining tables: {}",
395+
remainingTables);
396+
// Try to restart the splitting process
397+
startAsynchronouslySplit();
398+
// Wait again with shorter timeout
399+
lock.wait(5000L);
400+
}
372401
} catch (InterruptedException e) {
373402
throw new FlinkRuntimeException(
374403
"InterruptedException while waiting for asynchronously snapshot split");

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ private void assignSplits() {
236236
break;
237237
}
238238
}
239+
LOG.info("The enumerator finishes this round of assignment.");
239240
}
240241

241242
private boolean shouldCloseIdleReader(int nextAwaiting) {

0 commit comments

Comments
 (0)