Skip to content

Commit ab9069b

Browse files
committed
add test
1 parent 3091326 commit ab9069b

File tree

1 file changed

+202
-5
lines changed

1 file changed

+202
-5
lines changed

flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java

Lines changed: 202 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -332,8 +332,8 @@ void testTransform(ValuesDataSink.SinkApi sinkApi) throws Exception {
332332
"AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}",
333333
"RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}",
334334
"DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}",
335-
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, 10], after=[], op=DELETE, meta=()}",
336-
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 20], after=[2, x, 20], op=UPDATE, meta=()}");
335+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 10, 1], after=[], op=DELETE, meta=()}",
336+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, 20, ], after=[2, 20, x], op=UPDATE, meta=()}");
337337
}
338338

339339
@ParameterizedTest
@@ -399,8 +399,8 @@ void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception {
399399
"AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}",
400400
"RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}",
401401
"DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}",
402-
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, 11], after=[], op=DELETE, meta=()}",
403-
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 22], after=[2, x, 22], op=UPDATE, meta=()}");
402+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 11, 1], after=[], op=DELETE, meta=()}",
403+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, 22, ], after=[2, 22, x], op=UPDATE, meta=()}");
404404
}
405405

406406
@Test
@@ -504,7 +504,7 @@ void testMergingWithRoute() throws Exception {
504504
// Table 1: +I[1, Alice, 18]
505505
// Table 1: +I[2, Bob, 20]
506506
// Table 1: -U[2, Bob, 20] +U[2, Bob, 30]
507-
// Create table 2 [id, name, age]
507+
// Create table 2 [id, name, age, description]
508508
// Table 2: +I[3, Charlie, 15, student]
509509
// Table 2: +I[4, Donald, 25, student]
510510
// Table 2: -D[4, Donald, 25, student]
@@ -717,4 +717,201 @@ void testRouteWithReplaceSymbol(ValuesDataSink.SinkApi sinkApi) throws Exception
717717
"DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[1, 1], after=[], op=DELETE, meta=()}",
718718
"DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[2, 2], after=[2, x], op=UPDATE, meta=()}");
719719
}
720+
721+
@Test
722+
void testTransformMergingWithRoute() throws Exception {
723+
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
724+
725+
// Setup value source
726+
Configuration sourceConfig = new Configuration();
727+
sourceConfig.set(
728+
ValuesDataSourceOptions.EVENT_SET_ID,
729+
ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);
730+
731+
TableId myTable1 = TableId.tableId("default_namespace", "default_schema", "mytable1");
732+
TableId myTable2 = TableId.tableId("default_namespace", "default_schema", "mytable2");
733+
Schema table1Schema =
734+
Schema.newBuilder()
735+
.physicalColumn("id", DataTypes.INT())
736+
.physicalColumn("name", DataTypes.STRING())
737+
.physicalColumn("age", DataTypes.INT())
738+
.primaryKey("id")
739+
.build();
740+
Schema table2Schema =
741+
Schema.newBuilder()
742+
.physicalColumn("id", DataTypes.BIGINT())
743+
.physicalColumn("name", DataTypes.VARCHAR(255))
744+
.physicalColumn("age", DataTypes.TINYINT())
745+
.physicalColumn("description", DataTypes.STRING())
746+
.primaryKey("id")
747+
.build();
748+
749+
// Create test dataset:
750+
// Create table 1 [id, name, age]
751+
// Table 1: +I[1, Alice, 18]
752+
// Table 1: +I[2, Bob, 20]
753+
// Table 1: -U[2, Bob, 20] +U[2, Bob, 30]
754+
// Create table 2 [id, name, age, description]
755+
// Table 2: +I[3, Charlie, 15, student]
756+
// Table 2: +I[4, Donald, 25, student]
757+
// Table 2: -D[4, Donald, 25, student]
758+
// Add column for table 2: gender
759+
// Table 1: +I[5, Eliza, 24]
760+
// Table 2: +I[6, Frank, 30, student, male]
761+
List<Event> events = new ArrayList<>();
762+
BinaryRecordDataGenerator table1dataGenerator =
763+
new BinaryRecordDataGenerator(
764+
table1Schema.getColumnDataTypes().toArray(new DataType[0]));
765+
BinaryRecordDataGenerator table2dataGenerator =
766+
new BinaryRecordDataGenerator(
767+
table2Schema.getColumnDataTypes().toArray(new DataType[0]));
768+
events.add(new CreateTableEvent(myTable1, table1Schema));
769+
events.add(
770+
DataChangeEvent.insertEvent(
771+
myTable1,
772+
table1dataGenerator.generate(
773+
new Object[] {1, BinaryStringData.fromString("Alice"), 18})));
774+
events.add(
775+
DataChangeEvent.insertEvent(
776+
myTable1,
777+
table1dataGenerator.generate(
778+
new Object[] {2, BinaryStringData.fromString("Bob"), 20})));
779+
events.add(
780+
DataChangeEvent.updateEvent(
781+
myTable1,
782+
table1dataGenerator.generate(
783+
new Object[] {2, BinaryStringData.fromString("Bob"), 20}),
784+
table1dataGenerator.generate(
785+
new Object[] {2, BinaryStringData.fromString("Bob"), 30})));
786+
events.add(new CreateTableEvent(myTable2, table2Schema));
787+
events.add(
788+
DataChangeEvent.insertEvent(
789+
myTable2,
790+
table2dataGenerator.generate(
791+
new Object[] {
792+
3L,
793+
BinaryStringData.fromString("Charlie"),
794+
(byte) 15,
795+
BinaryStringData.fromString("student")
796+
})));
797+
events.add(
798+
DataChangeEvent.insertEvent(
799+
myTable2,
800+
table2dataGenerator.generate(
801+
new Object[] {
802+
4L,
803+
BinaryStringData.fromString("Donald"),
804+
(byte) 25,
805+
BinaryStringData.fromString("student")
806+
})));
807+
events.add(
808+
DataChangeEvent.deleteEvent(
809+
myTable2,
810+
table2dataGenerator.generate(
811+
new Object[] {
812+
4L,
813+
BinaryStringData.fromString("Donald"),
814+
(byte) 25,
815+
BinaryStringData.fromString("student")
816+
})));
817+
events.add(
818+
new AddColumnEvent(
819+
myTable2,
820+
Collections.singletonList(
821+
new AddColumnEvent.ColumnWithPosition(
822+
Column.physicalColumn("gender", DataTypes.STRING())))));
823+
events.add(
824+
DataChangeEvent.insertEvent(
825+
myTable1,
826+
table1dataGenerator.generate(
827+
new Object[] {5, BinaryStringData.fromString("Eliza"), 24})));
828+
events.add(
829+
DataChangeEvent.insertEvent(
830+
myTable2,
831+
new BinaryRecordDataGenerator(
832+
new DataType[] {
833+
DataTypes.BIGINT(),
834+
DataTypes.VARCHAR(255),
835+
DataTypes.TINYINT(),
836+
DataTypes.STRING(),
837+
DataTypes.STRING()
838+
})
839+
.generate(
840+
new Object[] {
841+
6L,
842+
BinaryStringData.fromString("Frank"),
843+
(byte) 30,
844+
BinaryStringData.fromString("student"),
845+
BinaryStringData.fromString("male")
846+
})));
847+
848+
ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));
849+
850+
SourceDef sourceDef =
851+
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);
852+
853+
// Setup value sink
854+
Configuration sinkConfig = new Configuration();
855+
sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
856+
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);
857+
858+
// Setup transform
859+
List<TransformDef> transformDef =
860+
Collections.singletonList(
861+
new TransformDef(
862+
"default_namespace.default_schema.mytable[0-9]",
863+
"*,'last_name' as last_name",
864+
null,
865+
null,
866+
null,
867+
null,
868+
""));
869+
870+
// Setup route
871+
TableId mergedTable = TableId.tableId("default_namespace", "default_schema", "merged");
872+
List<RouteDef> routeDef =
873+
Collections.singletonList(
874+
new RouteDef(
875+
"default_namespace.default_schema.mytable[0-9]",
876+
mergedTable.toString(),
877+
null,
878+
null));
879+
880+
// Setup pipeline
881+
Configuration pipelineConfig = new Configuration();
882+
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
883+
PipelineDef pipelineDef =
884+
new PipelineDef(sourceDef, sinkDef, routeDef, transformDef, pipelineConfig);
885+
886+
// Execute the pipeline
887+
PipelineExecution execution = composer.compose(pipelineDef);
888+
execution.execute();
889+
Schema mergedTableSchema = ValuesDatabase.getTableSchema(mergedTable);
890+
assertThat(mergedTableSchema)
891+
.isEqualTo(
892+
Schema.newBuilder()
893+
.physicalColumn("id", DataTypes.BIGINT())
894+
.physicalColumn("name", DataTypes.STRING())
895+
.physicalColumn("age", DataTypes.BIGINT())
896+
.physicalColumn("last_name", DataTypes.STRING())
897+
.physicalColumn("description", DataTypes.STRING())
898+
.physicalColumn("gender", DataTypes.STRING())
899+
.primaryKey("id")
900+
.build());
901+
String[] outputEvents = outCaptor.toString().trim().split("\n");
902+
assertThat(outputEvents)
903+
.containsExactly(
904+
"CreateTableEvent{tableId=default_namespace.default_schema.merged, schema=columns={`id` INT,`name` STRING,`age` INT,`last_name` STRING}, primaryKeys=id, options=()}",
905+
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[1, Alice, 18, last_name], op=INSERT, meta=()}",
906+
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[2, Bob, 20, last_name], op=INSERT, meta=()}",
907+
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[2, Bob, 20, last_name], after=[2, Bob, 30, last_name], op=UPDATE, meta=()}",
908+
"AddColumnEvent{tableId=default_namespace.default_schema.merged, addedColumns=[ColumnWithPosition{column=`description` STRING, position=LAST, existedColumnName=null}]}",
909+
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, nameMapping={age=BIGINT, id=BIGINT}}",
910+
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[3, Charlie, 15, last_name, student], op=INSERT, meta=()}",
911+
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[4, Donald, 25, last_name, student], op=INSERT, meta=()}",
912+
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[4, Donald, 25, last_name, student], after=[], op=DELETE, meta=()}",
913+
"AddColumnEvent{tableId=default_namespace.default_schema.merged, addedColumns=[ColumnWithPosition{column=`gender` STRING, position=LAST, existedColumnName=null}]}",
914+
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[5, Eliza, 24, last_name, null, null], op=INSERT, meta=()}",
915+
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[6, Frank, 30, last_name, student, male], op=INSERT, meta=()}");
916+
}
720917
}

0 commit comments

Comments
 (0)