2121import org .apache .flink .api .common .typeutils .TypeSerializer ;
2222import org .apache .flink .cdc .common .utils .TestCaseUtils ;
2323import org .apache .flink .cdc .connectors .mysql .debezium .DebeziumUtils ;
24- import org .apache .flink .cdc .connectors .mysql .testutils .MySqlContainer ;
2524import org .apache .flink .cdc .connectors .mysql .testutils .TestTable ;
2625import org .apache .flink .cdc .connectors .mysql .testutils .TestTableSchemas ;
2726import org .apache .flink .cdc .connectors .mysql .testutils .UniqueDatabase ;
3837import org .apache .flink .streaming .api .operators .collect .CollectSinkOperator ;
3938import org .apache .flink .streaming .api .operators .collect .CollectSinkOperatorFactory ;
4039import org .apache .flink .streaming .api .operators .collect .CollectStreamSink ;
41- import org .apache .flink .test .junit5 .MiniClusterExtension ;
4240import org .apache .flink .util .FlinkRuntimeException ;
4341
4442import io .debezium .connector .mysql .MySqlConnection ;
45- import org .junit .jupiter .api .AfterAll ;
46- import org .junit .jupiter .api .AfterEach ;
47- import org .junit .jupiter .api .BeforeAll ;
48- import org .junit .jupiter .api .BeforeEach ;
49- import org .junit .jupiter .api .Test ;
50- import org .junit .jupiter .api .extension .RegisterExtension ;
43+ import org .junit .After ;
44+ import org .junit .Before ;
45+ import org .junit .Test ;
5146import org .junit .rules .TemporaryFolder ;
5247import org .slf4j .Logger ;
5348import org .slf4j .LoggerFactory ;
54- import org .testcontainers .containers .output .Slf4jLogConsumer ;
5549
5650import java .io .File ;
5751import java .lang .reflect .Field ;
7064import java .util .Map ;
7165import java .util .Objects ;
7266import java .util .Properties ;
67+ import java .util .Random ;
7368import java .util .UUID ;
7469
70+ import static org .apache .flink .cdc .connectors .mysql .source .MySqlSourceTestBase .DEFAULT_PARALLELISM ;
71+
7572/**
7673 * Integration tests for handling schema changes regard to renaming multiple tables within a single
7774 * statement.
7875 */
79- public class MySqlMultipleTablesRenamingITCase {
76+ public class MySqlMultipleTablesRenamingITCase extends MySqlSourceTestBase {
8077 private static final Logger LOG =
8178 LoggerFactory .getLogger (MySqlMultipleTablesRenamingITCase .class );
82- @ RegisterExtension static MiniClusterExtension miniCluster = new MiniClusterExtension ();
83-
84- @ SuppressWarnings ("unchecked" )
85- private static final MySqlContainer MYSQL_CONTAINER =
86- (MySqlContainer )
87- new MySqlContainer ()
88- .withConfigurationOverride (
89- buildMySqlConfigWithTimezone (
90- getResourceFolder (), getSystemTimeZone ()))
91- .withSetupSQL ("docker/setup.sql" )
92- .withDatabaseName ("flink-test" )
93- .withUsername ("flinkuser" )
94- .withPassword ("flinkpw" )
95- .withLogConsumer (new Slf4jLogConsumer (LOG ));
9679
9780 private final UniqueDatabase customDatabase =
9881 new UniqueDatabase (MYSQL_CONTAINER , "customer" , "mysqluser" , "mysqlpw" );
@@ -101,25 +84,15 @@ public class MySqlMultipleTablesRenamingITCase {
10184
10285 private MySqlConnection connection ;
10386
104- @ BeforeAll
105- public static void before () throws Exception {
106- MYSQL_CONTAINER .start ();
107- }
108-
109- @ AfterAll
110- public static void after () throws Exception {
111- MYSQL_CONTAINER .stop ();
112- }
113-
114- @ BeforeEach
115- void prepare () throws Exception {
87+ @ Before
88+ public void prepare () throws Exception {
11689 connection = getConnection ();
11790 customDatabase .createAndInitialize ();
11891 flushLogs ();
11992 }
12093
121- @ AfterEach
122- void tearDown () throws Exception {
94+ @ After
95+ public void tearDown () throws Exception {
12396 customDatabase .dropDatabase ();
12497 connection .close ();
12598 }
@@ -146,7 +119,7 @@ void tearDown() throws Exception {
146119 * during schema updates.
147120 */
148121 @ Test
149- void testRenameTablesWithinSingleStatement () throws Exception {
122+ public void testRenameTablesWithinSingleStatement () throws Exception {
150123 // Build Flink job
151124 StreamExecutionEnvironment env = getExecutionEnvironment ();
152125 MySqlSource <String > source = getSourceBuilder ().build ();
@@ -269,6 +242,8 @@ private MySqlSourceBuilder<String> getSourceBuilder() {
269242 .password (customDatabase .getPassword ())
270243 .databaseList (customDatabase .getDatabaseName ())
271244 .tableList (customers .getTableId ())
245+ .serverId (getServerId ())
246+ .serverTimeZone ("UTC" )
272247 .deserializer (new JsonDebeziumDeserializationSchema ());
273248 }
274249
@@ -392,4 +367,10 @@ private StreamExecutionEnvironment getExecutionEnvironment() {
392367 env .getCheckpointConfig ().setCheckpointingMode (CheckpointingMode .EXACTLY_ONCE );
393368 return env ;
394369 }
370+
371+ private String getServerId () {
372+ final Random random = new Random ();
373+ int serverId = random .nextInt (100 ) + 5400 ;
374+ return serverId + "-" + (serverId + DEFAULT_PARALLELISM );
375+ }
395376}
0 commit comments