2121import org .apache .flink .api .common .typeutils .TypeSerializer ;
2222import org .apache .flink .cdc .common .utils .TestCaseUtils ;
2323import org .apache .flink .cdc .connectors .mysql .debezium .DebeziumUtils ;
24- import org .apache .flink .cdc .connectors .mysql .testutils .MySqlContainer ;
2524import org .apache .flink .cdc .connectors .mysql .testutils .TestTable ;
2625import org .apache .flink .cdc .connectors .mysql .testutils .TestTableSchemas ;
2726import org .apache .flink .cdc .connectors .mysql .testutils .UniqueDatabase ;
3837import org .apache .flink .streaming .api .operators .collect .CollectSinkOperator ;
3938import org .apache .flink .streaming .api .operators .collect .CollectSinkOperatorFactory ;
4039import org .apache .flink .streaming .api .operators .collect .CollectStreamSink ;
41- import org .apache .flink .test .junit5 .MiniClusterExtension ;
42- import org .apache .flink .util .FlinkRuntimeException ;
4340
4441import io .debezium .connector .mysql .MySqlConnection ;
45- import org .junit .jupiter .api .AfterAll ;
46- import org .junit .jupiter .api .AfterEach ;
47- import org .junit .jupiter .api .BeforeAll ;
48- import org .junit .jupiter .api .BeforeEach ;
49- import org .junit .jupiter .api .Test ;
50- import org .junit .jupiter .api .extension .RegisterExtension ;
51- import org .junit .rules .TemporaryFolder ;
42+ import org .junit .After ;
43+ import org .junit .Before ;
44+ import org .junit .Test ;
5245import org .slf4j .Logger ;
5346import org .slf4j .LoggerFactory ;
54- import org .testcontainers .containers .output .Slf4jLogConsumer ;
5547
56- import java .io .File ;
5748import java .lang .reflect .Field ;
58- import java .nio .charset .StandardCharsets ;
5949import java .nio .file .Files ;
6050import java .nio .file .Path ;
61- import java .nio .file .Paths ;
62- import java .nio .file .StandardOpenOption ;
63- import java .time .ZoneId ;
6451import java .util .ArrayList ;
6552import java .util .Arrays ;
66- import java .util .Collections ;
6753import java .util .HashMap ;
6854import java .util .Iterator ;
6955import java .util .List ;
7056import java .util .Map ;
71- import java .util .Objects ;
7257import java .util .Properties ;
58+ import java .util .Random ;
7359import java .util .UUID ;
7460
7561/**
7662 * Integration tests for handling schema changes regard to renaming multiple tables within a single
7763 * statement.
7864 */
79- public class MySqlMultipleTablesRenamingITCase {
65+ public class MySqlMultipleTablesRenamingITCase extends MySqlSourceTestBase {
8066 private static final Logger LOG =
8167 LoggerFactory .getLogger (MySqlMultipleTablesRenamingITCase .class );
82- @ RegisterExtension static MiniClusterExtension miniCluster = new MiniClusterExtension ();
83-
84- @ SuppressWarnings ("unchecked" )
85- private static final MySqlContainer MYSQL_CONTAINER =
86- (MySqlContainer )
87- new MySqlContainer ()
88- .withConfigurationOverride (
89- buildMySqlConfigWithTimezone (
90- getResourceFolder (), getSystemTimeZone ()))
91- .withSetupSQL ("docker/setup.sql" )
92- .withDatabaseName ("flink-test" )
93- .withUsername ("flinkuser" )
94- .withPassword ("flinkpw" )
95- .withLogConsumer (new Slf4jLogConsumer (LOG ));
9668
9769 private final UniqueDatabase customDatabase =
9870 new UniqueDatabase (MYSQL_CONTAINER , "customer" , "mysqluser" , "mysqlpw" );
@@ -101,25 +73,15 @@ public class MySqlMultipleTablesRenamingITCase {
10173
10274 private MySqlConnection connection ;
10375
104- @ BeforeAll
105- public static void before () throws Exception {
106- MYSQL_CONTAINER .start ();
107- }
108-
109- @ AfterAll
110- public static void after () throws Exception {
111- MYSQL_CONTAINER .stop ();
112- }
113-
114- @ BeforeEach
115- void prepare () throws Exception {
76+ @ Before
77+ public void prepare () throws Exception {
11678 connection = getConnection ();
11779 customDatabase .createAndInitialize ();
11880 flushLogs ();
11981 }
12082
121- @ AfterEach
122- void tearDown () throws Exception {
83+ @ After
84+ public void tearDown () throws Exception {
12385 customDatabase .dropDatabase ();
12486 connection .close ();
12587 }
@@ -146,7 +108,7 @@ void tearDown() throws Exception {
146108 * during schema updates.
147109 */
148110 @ Test
149- void testRenameTablesWithinSingleStatement () throws Exception {
111+ public void testRenameTablesWithinSingleStatement () throws Exception {
150112 // Build Flink job
151113 StreamExecutionEnvironment env = getExecutionEnvironment ();
152114 MySqlSource <String > source = getSourceBuilder ().build ();
@@ -269,6 +231,8 @@ private MySqlSourceBuilder<String> getSourceBuilder() {
269231 .password (customDatabase .getPassword ())
270232 .databaseList (customDatabase .getDatabaseName ())
271233 .tableList (customers .getTableId ())
234+ .serverId (getServerId ())
235+ .serverTimeZone ("UTC" )
272236 .deserializer (new JsonDebeziumDeserializationSchema ());
273237 }
274238
@@ -323,50 +287,6 @@ private static List<String> fetchRow(Iterator<String> iter, int size) {
323287 return rows ;
324288 }
325289
326- private static String buildMySqlConfigWithTimezone (File resourceDirectory , String timezone ) {
327- try {
328- TemporaryFolder tempFolder = new TemporaryFolder (resourceDirectory );
329- tempFolder .create ();
330- File folder = tempFolder .newFolder (String .valueOf (UUID .randomUUID ()));
331- Path cnf = Files .createFile (Paths .get (folder .getPath (), "my.cnf" ));
332- String mysqldConf =
333- "[mysqld]\n "
334- + "binlog_format = row\n "
335- + "log_bin = mysql-bin\n "
336- + "server-id = 223344\n "
337- + "binlog_row_image = FULL\n "
338- + "gtid_mode = on\n "
339- + "enforce_gtid_consistency = on\n " ;
340- String timezoneConf = "default-time_zone = '" + timezone + "'\n " ;
341- Files .write (
342- cnf ,
343- Collections .singleton (mysqldConf + timezoneConf ),
344- StandardCharsets .UTF_8 ,
345- StandardOpenOption .APPEND );
346- return Paths .get (resourceDirectory .getAbsolutePath ()).relativize (cnf ).toString ();
347- } catch (Exception e ) {
348- throw new RuntimeException ("Failed to create my.cnf file." , e );
349- }
350- }
351-
352- private static File getResourceFolder () {
353- try {
354- return Paths .get (
355- Objects .requireNonNull (
356- SpecificStartingOffsetITCase .class
357- .getClassLoader ()
358- .getResource ("." ))
359- .toURI ())
360- .toFile ();
361- } catch (Exception e ) {
362- throw new FlinkRuntimeException ("Get Resource File Directory fail" );
363- }
364- }
365-
366- private static String getSystemTimeZone () {
367- return ZoneId .systemDefault ().toString ();
368- }
369-
370290 private void setupSavepoint (StreamExecutionEnvironment env , String savepointPath )
371291 throws Exception {
372292 // restore from savepoint
@@ -392,4 +312,10 @@ private StreamExecutionEnvironment getExecutionEnvironment() {
392312 env .getCheckpointConfig ().setCheckpointingMode (CheckpointingMode .EXACTLY_ONCE );
393313 return env ;
394314 }
315+
316+ private String getServerId () {
317+ final Random random = new Random ();
318+ int serverId = random .nextInt (100 ) + 5400 ;
319+ return serverId + "-" + (serverId + DEFAULT_PARALLELISM );
320+ }
395321}
0 commit comments