Skip to content

Commit 46086fb

Browse files
committed
[tests] Rewrite migration tests in Java
1 parent eda78ad commit 46086fb

File tree

1 file changed

+62
-0
lines changed

1 file changed

+62
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.pipeline.tests.migration;
19+
20+
import org.apache.flink.api.common.JobID;
21+
import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
22+
23+
import org.assertj.core.api.Assertions;
24+
import org.junit.Test;
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
27+
28+
/** E2e cases for stopping & restarting jobs. */
29+
public class CrossVersionMigrationITCase extends PipelineTestEnvironment {
30+
31+
private static final Logger LOG = LoggerFactory.getLogger(CrossVersionMigrationITCase.class);
32+
33+
@Test
34+
public void testGenericMigrationProcess() throws Exception {
35+
String content =
36+
"source:\n"
37+
+ " type: values\n"
38+
+ "\n"
39+
+ "sink:\n"
40+
+ " type: values\n"
41+
+ "\n"
42+
+ "pipeline:\n"
43+
+ " parallelism: 1";
44+
JobID jobID = submitPipelineJob(content);
45+
Assertions.assertThat(jobID).isNotNull();
46+
LOG.info("Submitted Job with ID: {} ", jobID);
47+
48+
validateResult(
49+
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
50+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1], op=INSERT, meta=()}",
51+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2], op=INSERT, meta=()}",
52+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3], op=INSERT, meta=()}",
53+
"AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}",
54+
"RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}",
55+
"DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}",
56+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1], after=[], op=DELETE, meta=()}",
57+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, ], after=[2, x], op=UPDATE, meta=()}");
58+
59+
String savepointPath = cancelJob(jobID);
60+
LOG.info("Stopped Job {} and saved savepoint to {}", jobID, savepointPath);
61+
}
62+
}

0 commit comments

Comments
 (0)