Skip to content

Commit 09c7b25

Browse files
committed
[FLINK-36771][cdc-base][mysql] Fix UT trigger error: Invalid assigner status {} [NEWLY_ADDED_ASSIGNING_FINISHED]
1 parent 25f3a74 commit 09c7b25

File tree

2 files changed

+3
-2
lines changed

2 files changed

+3
-2
lines changed

flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,17 +275,18 @@ protected void syncWithReaders(int[] subtaskIds, Throwable t) {
275275
private void requestStreamSplitUpdateIfNeed() {
276276
if (!isStreamSplitUpdateRequestAlreadySent
277277
&& isNewlyAddedAssigningSnapshotFinished(splitAssigner.getAssignerStatus())) {
278-
isStreamSplitUpdateRequestAlreadySent = true;
279278
// If enumerator knows which reader is assigned stream split, just send to this reader,
280279
// nor sends to all registered readers.
281280
if (streamSplitTaskId != null) {
281+
isStreamSplitUpdateRequestAlreadySent = true;
282282
LOG.info(
283283
"The enumerator requests subtask {} to update the stream split after newly added table.",
284284
streamSplitTaskId);
285285
context.sendEventToSourceReader(
286286
streamSplitTaskId, new StreamSplitUpdateRequestEvent());
287287
} else {
288288
for (int reader : getRegisteredReader()) {
289+
isStreamSplitUpdateRequestAlreadySent = true;
289290
LOG.info(
290291
"The enumerator requests subtask {} to update the stream split after newly added table.",
291292
reader);

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,8 +277,8 @@ private void syncWithReaders(int[] subtaskIds, Throwable t) {
277277
private void requestBinlogSplitUpdateIfNeed() {
278278
if (!isBinlogSplitUpdateRequestAlreadySent
279279
&& isNewlyAddedAssigningSnapshotFinished(splitAssigner.getAssignerStatus())) {
280-
isBinlogSplitUpdateRequestAlreadySent = true;
281280
for (int subtaskId : getRegisteredReader()) {
281+
isBinlogSplitUpdateRequestAlreadySent = true;
282282
LOG.info(
283283
"The enumerator requests subtask {} to update the binlog split after newly added table.",
284284
subtaskId);

0 commit comments

Comments
 (0)