Skip to content

Commit 84bf4d0

Browse files
Jzjsnow姜卓君
authored andcommitted
[FLINK-36701][cdc-runtime] Add steps to get and emit schemaManager's latest evolvedSchema when SinkDataWriterOperator handles FlushEvent.
1 parent ddb5f00 commit 84bf4d0

File tree

11 files changed

+729
-51
lines changed

11 files changed

+729
-51
lines changed

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/FlushEvent.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,25 +17,46 @@
1717

1818
package org.apache.flink.cdc.common.event;
1919

20+
import java.util.Collections;
21+
import java.util.List;
2022
import java.util.Objects;
2123

2224
/**
2325
* An {@link Event} from {@code SchemaOperator} to notify {@code DataSinkWriterOperator} that it
2426
* start flushing.
2527
*/
2628
public class FlushEvent implements Event {
29+
/** The schema changes from which table. */
30+
private final List<TableId> tableIds;
2731

2832
/** Which subTask ID this FlushEvent was initiated from. */
2933
private final int sourceSubTaskId;
3034

35+
/** Flag indicating whether the FlushEvent is sent before a create table event. */
36+
private final Boolean isForCreateTableEvent;
37+
3138
public FlushEvent(int sourceSubTaskId) {
39+
this(sourceSubTaskId, Collections.emptyList(), false);
40+
}
41+
42+
public FlushEvent(int sourceSubTaskId, List<TableId> tableIds, boolean isForCreateTableEvent) {
43+
this.tableIds = tableIds;
3244
this.sourceSubTaskId = sourceSubTaskId;
45+
this.isForCreateTableEvent = isForCreateTableEvent;
46+
}
47+
48+
public List<TableId> getTableIds() {
49+
return tableIds;
3350
}
3451

3552
public int getSourceSubTaskId() {
3653
return sourceSubTaskId;
3754
}
3855

56+
public Boolean getIsForCreateTableEvent() {
57+
return isForCreateTableEvent;
58+
}
59+
3960
@Override
4061
public boolean equals(Object o) {
4162
if (this == o) {
@@ -45,7 +66,9 @@ public boolean equals(Object o) {
4566
return false;
4667
}
4768
FlushEvent that = (FlushEvent) o;
48-
return sourceSubTaskId == that.sourceSubTaskId;
69+
return sourceSubTaskId == that.sourceSubTaskId
70+
&& Objects.equals(tableIds, that.tableIds)
71+
&& Objects.equals(isForCreateTableEvent, that.isForCreateTableEvent);
4972
}
5073

5174
@Override

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.flink.cdc.common.event.Event;
2323
import org.apache.flink.cdc.common.event.FlushEvent;
2424
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
25+
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
2526
import org.apache.flink.cdc.common.event.TableId;
2627
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
2728
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
@@ -150,6 +151,7 @@ public void processElement(StreamRecord<PartitioningEvent> streamRecord) throws
150151

151152
// Then, notify this information to the coordinator
152153
requestSchemaChange(
154+
tableId,
153155
new SchemaChangeRequest(sourcePartition, subTaskId, schemaChangeEvent));
154156
schemaOperatorMetrics.increaseFinishedSchemaChangeEvents(1);
155157
} else if (event instanceof DataChangeEvent) {
@@ -188,9 +190,16 @@ public void processElement(StreamRecord<PartitioningEvent> streamRecord) throws
188190
}
189191
}
190192

191-
private void requestSchemaChange(SchemaChangeRequest schemaChangeRequest) {
193+
private void requestSchemaChange(
194+
TableId sourceTableId, SchemaChangeRequest schemaChangeRequest) {
192195
LOG.info("{}> Sent FlushEvent to downstream...", subTaskId);
193-
output.collect(new StreamRecord<>(new FlushEvent(subTaskId)));
196+
output.collect(
197+
new StreamRecord<>(
198+
new FlushEvent(
199+
subTaskId,
200+
tableIdRouter.route(sourceTableId),
201+
schemaChangeRequest.getSchemaChangeEvent().getType()
202+
== SchemaChangeEventType.CREATE_TABLE)));
194203

195204
LOG.info("{}> Sending evolve request...", subTaskId);
196205
SchemaChangeResponse response = sendRequestToCoordinator(schemaChangeRequest);

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.flink.cdc.common.event.Event;
2424
import org.apache.flink.cdc.common.event.FlushEvent;
2525
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
26+
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
2627
import org.apache.flink.cdc.common.event.TableId;
2728
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
2829
import org.apache.flink.cdc.common.route.RouteRule;
@@ -164,8 +165,14 @@ private void handleSchemaChangeEvent(SchemaChangeEvent originalEvent) throws Exc
164165
schemaOperatorMetrics.increaseSchemaChangeEvents(1);
165166

166167
// First, send FlushEvent or it might be blocked later
168+
List<TableId> sinkTables = router.route(tableId);
167169
LOG.info("{}> Sending the FlushEvent.", subTaskId);
168-
output.collect(new StreamRecord<>(new FlushEvent(subTaskId)));
170+
output.collect(
171+
new StreamRecord<>(
172+
new FlushEvent(
173+
subTaskId,
174+
sinkTables,
175+
originalEvent.getType() == SchemaChangeEventType.CREATE_TABLE)));
169176

170177
// Then, queue to request schema change to SchemaCoordinator.
171178
SchemaChangeResponse response = requestSchemaChange(tableId, originalEvent);

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkFunctionOperator.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,20 @@ public void processElement(StreamRecord<Event> element) throws Exception {
112112
// ----------------------------- Helper functions -------------------------------
113113
private void handleFlushEvent(FlushEvent event) throws Exception {
114114
userFunction.finish();
115+
if (!event.getIsForCreateTableEvent()) {
116+
event.getTableIds().stream()
117+
.filter(tableId -> !processedTableIds.contains(tableId))
118+
.forEach(
119+
tableId -> {
120+
LOG.info("Table {} has not been processed", tableId);
121+
try {
122+
emitLatestSchema(tableId);
123+
} catch (Exception e) {
124+
throw new RuntimeException(e);
125+
}
126+
processedTableIds.add(tableId);
127+
});
128+
}
115129
schemaEvolutionClient.notifyFlushSuccess(
116130
getRuntimeContext().getIndexOfThisSubtask(), event.getSourceSubTaskId());
117131
}

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,20 @@ public void endInput() throws Exception {
198198

199199
private void handleFlushEvent(FlushEvent event) throws Exception {
200200
copySinkWriter.flush(false);
201+
if (!event.getIsForCreateTableEvent()) {
202+
event.getTableIds().stream()
203+
.filter(tableId -> !processedTableIds.contains(tableId))
204+
.forEach(
205+
tableId -> {
206+
LOG.info("Table {} has not been processed", tableId);
207+
try {
208+
emitLatestSchema(tableId);
209+
} catch (Exception e) {
210+
throw new RuntimeException(e);
211+
}
212+
processedTableIds.add(tableId);
213+
});
214+
}
201215
schemaEvolutionClient.notifyFlushSuccess(
202216
getRuntimeContext().getIndexOfThisSubtask(), event.getSourceSubTaskId());
203217
}

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/EventSerializer.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,10 @@
2424
import org.apache.flink.cdc.common.event.Event;
2525
import org.apache.flink.cdc.common.event.FlushEvent;
2626
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
27+
import org.apache.flink.cdc.common.event.TableId;
28+
import org.apache.flink.cdc.runtime.serializer.BooleanSerializer;
2729
import org.apache.flink.cdc.runtime.serializer.EnumSerializer;
30+
import org.apache.flink.cdc.runtime.serializer.ListSerializer;
2831
import org.apache.flink.cdc.runtime.serializer.TableIdSerializer;
2932
import org.apache.flink.cdc.runtime.serializer.TypeSerializerSingleton;
3033
import org.apache.flink.core.memory.DataInputView;
@@ -42,11 +45,13 @@ public final class EventSerializer extends TypeSerializerSingleton<Event> {
4245

4346
private final SchemaChangeEventSerializer schemaChangeEventSerializer =
4447
SchemaChangeEventSerializer.INSTANCE;
45-
private final TableIdSerializer tableIdSerializer = TableIdSerializer.INSTANCE;
48+
private final ListSerializer<TableId> listSerializer =
49+
new ListSerializer<>(TableIdSerializer.INSTANCE);
4650
private final EnumSerializer<EventClass> enumSerializer =
4751
new EnumSerializer<>(EventClass.class);
4852
private final TypeSerializer<DataChangeEvent> dataChangeEventSerializer =
4953
DataChangeEventSerializer.INSTANCE;
54+
private final BooleanSerializer booleanSerializer = BooleanSerializer.INSTANCE;
5055

5156
@Override
5257
public boolean isImmutableType() {
@@ -62,7 +67,10 @@ public Event createInstance() {
6267
public Event copy(Event from) {
6368
if (from instanceof FlushEvent) {
6469
FlushEvent flushEvent = (FlushEvent) from;
65-
return new FlushEvent(((FlushEvent) from).getSourceSubTaskId());
70+
return new FlushEvent(
71+
flushEvent.getSourceSubTaskId(),
72+
listSerializer.copy(((FlushEvent) from).getTableIds()),
73+
booleanSerializer.copy(flushEvent.getIsForCreateTableEvent()));
6674
} else if (from instanceof SchemaChangeEvent) {
6775
return schemaChangeEventSerializer.copy((SchemaChangeEvent) from);
6876
} else if (from instanceof DataChangeEvent) {
@@ -86,6 +94,8 @@ public void serialize(Event record, DataOutputView target) throws IOException {
8694
if (record instanceof FlushEvent) {
8795
enumSerializer.serialize(EventClass.FLUSH_EVENT, target);
8896
target.writeInt(((FlushEvent) record).getSourceSubTaskId());
97+
listSerializer.serialize(((FlushEvent) record).getTableIds(), target);
98+
booleanSerializer.serialize(((FlushEvent) record).getIsForCreateTableEvent(), target);
8999
} else if (record instanceof SchemaChangeEvent) {
90100
enumSerializer.serialize(EventClass.SCHEME_CHANGE_EVENT, target);
91101
schemaChangeEventSerializer.serialize((SchemaChangeEvent) record, target);
@@ -102,7 +112,10 @@ public Event deserialize(DataInputView source) throws IOException {
102112
EventClass eventClass = enumSerializer.deserialize(source);
103113
switch (eventClass) {
104114
case FLUSH_EVENT:
105-
return new FlushEvent(source.readInt());
115+
return new FlushEvent(
116+
source.readInt(),
117+
listSerializer.deserialize(source),
118+
booleanSerializer.deserialize(source));
106119
case DATA_CHANGE_EVENT:
107120
return dataChangeEventSerializer.deserialize(source);
108121
case SCHEME_CHANGE_EVENT:

flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaEvolveTest.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -181,16 +181,16 @@ void testLenientSchemaEvolution() throws Exception {
181181
}))
182182
.map(StreamRecord::getValue)
183183
.containsExactly(
184-
new FlushEvent(0),
184+
new FlushEvent(0, Collections.singletonList(TABLE_ID), true),
185185
createTableEvent,
186186
genInsert(TABLE_ID, "ISFS", 1, "Alice", 17.1828f, "Hello"),
187-
new FlushEvent(0),
187+
new FlushEvent(0, Collections.singletonList(TABLE_ID), false),
188188
addColumnEventAtLast,
189189
genInsert(TABLE_ID, "ISFSB", 2, "Bob", 31.415926f, "Bye-bye", false),
190-
new FlushEvent(0),
190+
new FlushEvent(0, Collections.singletonList(TABLE_ID), false),
191191
appendRenamedColumnAtLast,
192192
genInsert(TABLE_ID, "ISFSBS", 3, "Cicada", 123.456f, null, true, "Ok"),
193-
new FlushEvent(0),
193+
new FlushEvent(0, Collections.singletonList(TABLE_ID), false),
194194
alterColumnTypeEventWithBackfill,
195195
genInsert(
196196
TABLE_ID,
@@ -201,11 +201,11 @@ void testLenientSchemaEvolution() throws Exception {
201201
null,
202202
false,
203203
"Nah"),
204-
new FlushEvent(0),
204+
new FlushEvent(0, Collections.singletonList(TABLE_ID), false),
205205
genInsert(TABLE_ID, "ISDSBS", 5, "Eve", 1.414, null, true, null),
206-
new FlushEvent(0),
206+
new FlushEvent(0, Collections.singletonList(TABLE_ID), false),
207207
genInsert(TABLE_ID, "ISDSBS", 6, "Ferris", 0.001, null, false, null),
208-
new FlushEvent(0));
208+
new FlushEvent(0, Collections.singletonList(TABLE_ID), false));
209209
}
210210

211211
@Test
@@ -308,7 +308,7 @@ void testIgnoreSchemaEvolution() throws Exception {
308308
}))
309309
.map(StreamRecord::getValue)
310310
.containsExactly(
311-
new FlushEvent(0),
311+
new FlushEvent(0, Collections.singletonList(TABLE_ID), true),
312312
createTableEvent,
313313
genInsert(TABLE_ID, "ISFS", 1, "Alice", 17.1828f, "Hello"),
314314
genInsert(TABLE_ID, "ISFS", 2, "Bob", 31.415926f, "Bye-bye"),

0 commit comments

Comments
 (0)