Skip to content

Commit ab0bf9b

Browse files
committed
Resolve conflicts
1 parent e244cdf commit ab0bf9b

File tree

5 files changed

+54
-297
lines changed

5 files changed

+54
-297
lines changed

flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToIcebergE2eITCase.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.junit.jupiter.api.BeforeAll;
3838
import org.junit.jupiter.api.BeforeEach;
3939
import org.junit.jupiter.api.Test;
40+
import org.junit.jupiter.api.io.TempDir;
4041
import org.slf4j.Logger;
4142
import org.slf4j.LoggerFactory;
4243
import org.testcontainers.containers.BindMode;
@@ -69,15 +70,10 @@
6970

7071
/** End-to-end tests for mysql cdc to Iceberg pipeline job. */
7172
public class MySqlToIcebergE2eITCase extends PipelineTestEnvironment {
72-
private static final Logger LOG = LoggerFactory.getLogger(MySqlToIcebergE2eITCase.class);
7373

74-
public static final Duration TESTCASE_TIMEOUT = Duration.ofMinutes(3);
74+
private static final Logger LOG = LoggerFactory.getLogger(MySqlToIcebergE2eITCase.class);
7575

76-
// ------------------------------------------------------------------------------------------
77-
// MySQL Variables (we always use MySQL as the data source for easier verifying)
78-
// ------------------------------------------------------------------------------------------
79-
protected static final String MYSQL_TEST_USER = "mysqluser";
80-
protected static final String MYSQL_TEST_PASSWORD = "mysqlpw";
76+
@TempDir public Path temporaryFolder;
8177

8278
@org.testcontainers.junit.jupiter.Container
8379
public static final MySqlContainer MYSQL =
@@ -351,7 +347,7 @@ private void validateSinkResult(
351347
throws InterruptedException {
352348
runInContainerAsRoot(jobManager, "chmod", "0777", "-R", warehouse);
353349
LOG.info("Verifying Iceberg {}::{}::{} results...", warehouse, database, table);
354-
long deadline = System.currentTimeMillis() + TESTCASE_TIMEOUT.toMillis();
350+
long deadline = System.currentTimeMillis() + EVENT_WAITING_TIMEOUT.toMillis();
355351
List<String> results = Collections.emptyList();
356352
while (System.currentTimeMillis() < deadline) {
357353
try {
@@ -364,7 +360,7 @@ private void validateSinkResult(
364360
LOG.info(
365361
"Successfully verified {} records in {} seconds.",
366362
expected.size(),
367-
(System.currentTimeMillis() - deadline + TESTCASE_TIMEOUT.toMillis())
363+
(System.currentTimeMillis() - deadline + EVENT_WAITING_TIMEOUT.toMillis())
368364
/ 1000);
369365
return;
370366
} catch (Exception e) {

flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -178,22 +178,11 @@ void testSyncWholeDatabaseInBatchMode() throws Exception {
178178
MYSQL_TEST_PASSWORD,
179179
mysqlInventoryDatabase.getDatabaseName(),
180180
parallelism);
181-
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
182-
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
183-
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
184-
submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar);
181+
submitPipelineJob(pipelineJob);
185182
waitUntilJobRunning(Duration.ofSeconds(30));
186-
LOG.info("Pipeline job is running");
187-
waitUntilSpecificEvent(
188-
String.format(
189-
"DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4, Shanghai, 123567891234], op=INSERT, meta=()}",
190-
mysqlInventoryDatabase.getDatabaseName()));
191-
waitUntilSpecificEvent(
192-
String.format(
193-
"DataChangeEvent{tableId=%s.products, before=[], after=[109, spare tire, 24 inch spare tire, 22.2, null, null, null], op=INSERT, meta=()}",
194-
mysqlInventoryDatabase.getDatabaseName()));
195183

196184
validateResult(
185+
dbNameFormatter,
197186
"CreateTableEvent{tableId=%s.customers, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`address` VARCHAR(1024),`phone_number` VARCHAR(512)}, primaryKeys=id, options=()}",
198187
"CreateTableEvent{tableId=%s.products, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`description` VARCHAR(512),`weight` FLOAT,`enum_c` STRING 'red',`json_c` STRING,`point_c` STRING}, primaryKeys=id, options=()}",
199188
"DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4, Shanghai, 123567891234], op=INSERT, meta=()}",
@@ -238,7 +227,6 @@ void testSchemaChangeEvents() throws Exception {
238227
parallelism);
239228
submitPipelineJob(pipelineJob);
240229
waitUntilJobRunning(Duration.ofSeconds(30));
241-
LOG.info("Pipeline job is running");
242230

243231
validateResult(
244232
dbNameFormatter,
@@ -258,8 +246,6 @@ void testSchemaChangeEvents() throws Exception {
258246
"DataChangeEvent{tableId=%s.products, before=[], after=[101, scooter, Small 2-wheel scooter, 3.14, red, {\"key1\": \"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}",
259247
"DataChangeEvent{tableId=%s.products, before=[], after=[102, car battery, 12V car battery, 8.1, white, {\"key2\": \"value2\"}, {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}");
260248

261-
LOG.info("Begin incremental reading stage.");
262-
// generate binlogs
263249
String mysqlJdbcUrl =
264250
String.format(
265251
"jdbc:mysql://%s:%s/%s",

0 commit comments

Comments
 (0)