Skip to content

Commit 4f5f4ca

Browse files
committed
[FLINK-35791][kafka] address comments.
1 parent ceb37a5 commit 4f5f4ca

File tree

2 files changed

+14
-14
lines changed

2 files changed

+14
-14
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public void testSerialize() throws Exception {
8181
}));
8282
JsonNode expected =
8383
mapper.readTree(
84-
"{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\",\"source\":{\"db\":\"default_namespace\",\"table\":\"table1\"}}");
84+
"{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"table1\"}}");
8585
JsonNode actual = mapper.readTree(serializationSchema.serialize(insertEvent1));
8686
Assertions.assertEquals(expected, actual);
8787
DataChangeEvent insertEvent2 =
@@ -94,7 +94,7 @@ public void testSerialize() throws Exception {
9494
}));
9595
expected =
9696
mapper.readTree(
97-
"{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\",\"source\":{\"db\":\"default_namespace\",\"table\":\"table1\"}}");
97+
"{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"table1\"}}");
9898
actual = mapper.readTree(serializationSchema.serialize(insertEvent2));
9999
Assertions.assertEquals(expected, actual);
100100
DataChangeEvent deleteEvent =
@@ -107,7 +107,7 @@ public void testSerialize() throws Exception {
107107
}));
108108
expected =
109109
mapper.readTree(
110-
"{\"before\":{\"col1\":\"2\",\"col2\":\"2\"},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_namespace\",\"table\":\"table1\"}}");
110+
"{\"before\":{\"col1\":\"2\",\"col2\":\"2\"},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_schema\",\"table\":\"table1\"}}");
111111
actual = mapper.readTree(serializationSchema.serialize(deleteEvent));
112112
Assertions.assertEquals(expected, actual);
113113
DataChangeEvent updateEvent =
@@ -125,7 +125,7 @@ public void testSerialize() throws Exception {
125125
}));
126126
expected =
127127
mapper.readTree(
128-
"{\"before\":{\"col1\":\"1\",\"col2\":\"1\"},\"after\":{\"col1\":\"1\",\"col2\":\"x\"},\"op\":\"u\",\"source\":{\"db\":\"default_namespace\",\"table\":\"table1\"}}");
128+
"{\"before\":{\"col1\":\"1\",\"col2\":\"1\"},\"after\":{\"col1\":\"1\",\"col2\":\"x\"},\"op\":\"u\",\"source\":{\"db\":\"default_schema\",\"table\":\"table1\"}}");
129129
actual = mapper.readTree(serializationSchema.serialize(updateEvent));
130130
Assertions.assertEquals(expected, actual);
131131
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -269,23 +269,23 @@ void testDebeziumJsonFormat() throws Exception {
269269
Arrays.asList(
270270
mapper.readTree(
271271
String.format(
272-
"{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}",
272+
"{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
273273
table1.getTableName())),
274274
mapper.readTree(
275275
String.format(
276-
"{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}",
276+
"{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
277277
table1.getTableName())),
278278
mapper.readTree(
279279
String.format(
280-
"{\"before\":null,\"after\":{\"col1\":\"3\",\"col2\":\"3\"},\"op\":\"c\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}",
280+
"{\"before\":null,\"after\":{\"col1\":\"3\",\"col2\":\"3\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
281281
table1.getTableName())),
282282
mapper.readTree(
283283
String.format(
284-
"{\"before\":{\"col1\":\"1\",\"newCol3\":\"1\"},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}",
284+
"{\"before\":{\"col1\":\"1\",\"newCol3\":\"1\"},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
285285
table1.getTableName())),
286286
mapper.readTree(
287287
String.format(
288-
"{\"before\":{\"col1\":\"2\",\"newCol3\":\"\"},\"after\":{\"col1\":\"2\",\"newCol3\":\"x\"},\"op\":\"u\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}",
288+
"{\"before\":{\"col1\":\"2\",\"newCol3\":\"\"},\"after\":{\"col1\":\"2\",\"newCol3\":\"x\"},\"op\":\"u\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
289289
table1.getTableName())));
290290
assertThat(deserializeValues(collectedRecords)).containsAll(expected);
291291
checkProducerLeak();
@@ -437,23 +437,23 @@ void testTopicAndHeaderOption() throws Exception {
437437
Arrays.asList(
438438
mapper.readTree(
439439
String.format(
440-
"{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}",
440+
"{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
441441
table1.getTableName())),
442442
mapper.readTree(
443443
String.format(
444-
"{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}",
444+
"{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
445445
table1.getTableName())),
446446
mapper.readTree(
447447
String.format(
448-
"{\"before\":null,\"after\":{\"col1\":\"3\",\"col2\":\"3\"},\"op\":\"c\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}",
448+
"{\"before\":null,\"after\":{\"col1\":\"3\",\"col2\":\"3\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
449449
table1.getTableName())),
450450
mapper.readTree(
451451
String.format(
452-
"{\"before\":{\"col1\":\"1\",\"newCol3\":\"1\"},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}",
452+
"{\"before\":{\"col1\":\"1\",\"newCol3\":\"1\"},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
453453
table1.getTableName())),
454454
mapper.readTree(
455455
String.format(
456-
"{\"before\":{\"col1\":\"2\",\"newCol3\":\"\"},\"after\":{\"col1\":\"2\",\"newCol3\":\"x\"},\"op\":\"u\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}",
456+
"{\"before\":{\"col1\":\"2\",\"newCol3\":\"\"},\"after\":{\"col1\":\"2\",\"newCol3\":\"x\"},\"op\":\"u\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
457457
table1.getTableName())));
458458
assertThat(deserializeValues(collectedRecords)).containsAll(expected);
459459
checkProducerLeak();

0 commit comments

Comments
 (0)