Skip to content

Commit bae9fcf

Browse files
debug
1 parent 8b1d7a2 commit bae9fcf

File tree

3 files changed

+16
-102
lines changed

3 files changed

+16
-102
lines changed

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java

Lines changed: 12 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -271,10 +271,18 @@ private void captureNewlyAddedTables() {
271271

272272
// case 2: there are new tables to add
273273
if (!newlyAddedTables.isEmpty()) {
274+
// if job is still in snapshot reading phase, directly add all newly added
275+
// tables
274276
LOG.info("Found newly added tables, start capture newly added tables process");
275277

278+
// add new tables
276279
remainingTables.addAll(newlyAddedTables);
277-
this.startAssignNewlyAddedTables();
280+
if (AssignerStatus.isAssigningFinished(assignerStatus)) {
281+
// start the newly added tables process under binlog reading phase
282+
LOG.info(
283+
"Found newly added tables, start capture newly added tables process under binlog reading phase");
284+
this.startAssignNewlyAddedTables();
285+
}
278286
}
279287
} catch (Exception e) {
280288
throw new FlinkRuntimeException(
@@ -297,32 +305,15 @@ private void startAsynchronouslySplit() {
297305
private void splitTable(TableId nextTable) {
298306
LOG.info("Start splitting table {} into chunks...", nextTable);
299307
long start = System.currentTimeMillis();
300-
long lastProgressTime = System.currentTimeMillis();
301308
int chunkNum = 0;
302309
boolean hasRecordSchema = false;
303310
// split the given table into chunks (snapshot splits)
304311
do {
305312
synchronized (lock) {
306313
List<MySqlSnapshotSplit> splits;
307314
try {
308-
// Add progress logging
309-
long now = System.currentTimeMillis();
310-
if (now - lastProgressTime > 60000L) { // Log every minute
311-
LOG.info(
312-
"Still splitting table {}, chunks generated so far: {}, elapsed time: {}ms",
313-
nextTable,
314-
chunkNum,
315-
now - start);
316-
lastProgressTime = now;
317-
}
318-
319315
splits = chunkSplitter.splitChunks(partition, nextTable);
320316
} catch (Exception e) {
321-
LOG.error(
322-
"Error when splitting chunks for table {}: {}",
323-
nextTable,
324-
e.getMessage(),
325-
e);
326317
throw new IllegalStateException(
327318
"Error when splitting chunks for " + nextTable, e);
328319
}
@@ -376,63 +367,13 @@ public Optional<MySqlSplit> getNext() {
376367
split.toMySqlSnapshotSplit(tableSchemas.get(split.getTableId())));
377368
} else if (!remainingTables.isEmpty()) {
378369
try {
379-
// wait for the asynchronous split to complete with timeout
380-
final long timeout = 30000L; // 30 seconds timeout
381-
lock.wait(timeout);
382-
383-
// Check if we timed out and still have no splits
384-
if (remainingSplits.isEmpty() && !remainingTables.isEmpty()) {
385-
LOG.warn(
386-
"Timeout waiting for asynchronous split generation, remaining tables: {}",
387-
remainingTables);
388-
// Try to restart the splitting process
389-
startAsynchronouslySplit();
390-
// Wait again with shorter timeout
391-
lock.wait(5000L);
392-
393-
// After restart and wait, check again for splits
394-
if (!remainingSplits.isEmpty()) {
395-
// Splits are now available after restart
396-
Iterator<MySqlSchemalessSnapshotSplit> iterator =
397-
remainingSplits.iterator();
398-
MySqlSchemalessSnapshotSplit split = iterator.next();
399-
remainingSplits.remove(split);
400-
assignedSplits.put(split.splitId(), split);
401-
addAlreadyProcessedTablesIfNotExists(split.getTableId());
402-
return Optional.of(
403-
split.toMySqlSnapshotSplit(
404-
tableSchemas.get(split.getTableId())));
405-
} else if (!remainingTables.isEmpty()) {
406-
// Still no splits after restart, but we have tables - recurse normally
407-
return getNext();
408-
} else {
409-
// No tables left after restart
410-
closeExecutorService();
411-
return Optional.empty();
412-
}
413-
}
414-
// If we didn't timeout (normal case), check if splits are available now
415-
if (!remainingSplits.isEmpty()) {
416-
Iterator<MySqlSchemalessSnapshotSplit> iterator =
417-
remainingSplits.iterator();
418-
MySqlSchemalessSnapshotSplit split = iterator.next();
419-
remainingSplits.remove(split);
420-
assignedSplits.put(split.splitId(), split);
421-
addAlreadyProcessedTablesIfNotExists(split.getTableId());
422-
return Optional.of(
423-
split.toMySqlSnapshotSplit(tableSchemas.get(split.getTableId())));
424-
} else if (!remainingTables.isEmpty()) {
425-
// No splits yet but still have tables to process - recurse
426-
return getNext();
427-
} else {
428-
// No tables left
429-
closeExecutorService();
430-
return Optional.empty();
431-
}
370+
// wait for the asynchronous split to complete
371+
lock.wait();
432372
} catch (InterruptedException e) {
433373
throw new FlinkRuntimeException(
434374
"InterruptedException while waiting for asynchronously snapshot split");
435375
}
376+
return getNext();
436377
} else {
437378
closeExecutorService();
438379
return Optional.empty();

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,6 @@ public void close() throws IOException {
202202

203203
private void assignSplits() {
204204
final Iterator<Integer> awaitingReader = readersAwaitingSplit.iterator();
205-
boolean hasAssignmentFailure = false;
206205

207206
while (awaitingReader.hasNext()) {
208207
int nextAwaiting = awaitingReader.next();
@@ -230,26 +229,12 @@ private void assignSplits() {
230229
}
231230
awaitingReader.remove();
232231
LOG.info("The enumerator assigns split {} to subtask {}", mySqlSplit, nextAwaiting);
233-
hasAssignmentFailure = false; // Reset on successful assignment
234232
} else {
235-
LOG.warn(
236-
"The enumerator has no more splits to assign to subtask {} by now. This may indicate a split generation timeout or all splits are exhausted.",
237-
nextAwaiting);
238-
// Don't break immediately - try other readers first
239-
hasAssignmentFailure = true;
233+
// there is no available splits by now, skip assigning
240234
requestBinlogSplitUpdateIfNeed();
235+
break;
241236
}
242237
}
243-
244-
// If we had assignment failures and there are still readers waiting,
245-
// schedule another assignment attempt to handle potential async split generation
246-
if (hasAssignmentFailure && !readersAwaitingSplit.isEmpty()) {
247-
LOG.info(
248-
"Some readers couldn't get splits due to temporary unavailability. Scheduling retry...");
249-
// The async check interval will trigger another assignSplits() call
250-
}
251-
252-
LOG.info("The enumerator finishes this round of assignment.");
253238
}
254239

255240
private boolean shouldCloseIdleReader(int nextAwaiting) {

pom.xml

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -59,15 +59,15 @@ limitations under the License.
5959
</scm>
6060

6161
<properties>
62-
<revision>3.6-SNAPSHOT</revision>
62+
<revision>3.5-SNAPSHOT</revision>
6363
<scala.binary.version>2.12</scala.binary.version>
6464
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
6565
<!-- Enforce single fork execution due to heavy mini cluster use in the tests -->
6666
<flink.forkCount>1</flink.forkCount>
6767
<flink.reuseForks>true</flink.reuseForks>
6868

6969
<!-- dependencies versions -->
70-
<flink.version>1.20.2</flink.version>
70+
<flink.version>1.20.1</flink.version>
7171
<flink.major.version>1.20</flink.major.version>
7272
<flink.shaded.version>17.0</flink.shaded.version>
7373
<debezium.version>1.9.8.Final</debezium.version>
@@ -421,8 +421,6 @@ limitations under the License.
421421
</licenseFamilies>
422422
<excludes>
423423
<!-- Additional files like .gitignore etc.-->
424-
<!-- Release files -->
425-
<exclude>**/release/**</exclude>
426424
<exclude>**/README.md</exclude>
427425
<exclude>**/.*/**</exclude>
428426
<!-- Generated content -->
@@ -678,16 +676,6 @@ limitations under the License.
678676
<target>${maven.compiler.target}</target>
679677
</configuration>
680678
</plugin>
681-
682-
<!-- Allow modification of pom.xml properties using Maven commands. -->
683-
<plugin>
684-
<groupId>org.codehaus.mojo</groupId>
685-
<artifactId>versions-maven-plugin</artifactId>
686-
<version>2.5</version>
687-
<configuration>
688-
<generateBackupPoms>false</generateBackupPoms>
689-
</configuration>
690-
</plugin>
691679
</plugins>
692680
</build>
693681

0 commit comments

Comments
 (0)