Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions .github/workflows/flink_cdc_ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,3 @@ jobs:
java-version: "[8]"
flink-version: "['1.19.2', '1.20.1']"
module: source_e2e
migration_test:
name: Migration Tests
uses: ./.github/workflows/flink_cdc_migration_test_base.yml
with:
java-version: "[8]"
flink-version: "['1.19.2', '1.20.1']"
7 changes: 0 additions & 7 deletions .github/workflows/flink_cdc_ci_nightly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,3 @@ jobs:
java-version: "[11]"
flink-version: "['1.19.2', '1.20.1']"
module: source_e2e
migration_test:
if: github.repository == 'apache/flink-cdc'
name: Migration Tests
uses: ./.github/workflows/flink_cdc_migration_test_base.yml
with:
java-version: "[11]"
flink-version: "['1.19.2', '1.20.1']"
135 changes: 0 additions & 135 deletions .github/workflows/flink_cdc_migration_test_base.yml

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,7 @@ private void startTest(String testSet) throws Exception {
+ "pipeline:\n"
+ " parallelism: 4";
Path maxcomputeCdcJar = TestUtils.getResource("maxcompute-cdc-pipeline-connector.jar");
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
submitPipelineJob(pipelineJob, maxcomputeCdcJar, valuesCdcJar);
submitPipelineJob(pipelineJob, maxcomputeCdcJar);
waitUntilJobFinished(Duration.ofMinutes(10));
LOG.info("Pipeline job is running");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

import org.apache.flink.cdc.common.test.utils.TestUtils;
import org.apache.flink.cdc.connectors.doris.sink.utils.DorisContainer;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;

Expand All @@ -32,6 +30,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.lifecycle.Startables;

import java.nio.file.Path;
Expand All @@ -53,30 +52,8 @@
class MySqlToDorisE2eITCase extends PipelineTestEnvironment {
private static final Logger LOG = LoggerFactory.getLogger(MySqlToDorisE2eITCase.class);

// ------------------------------------------------------------------------------------------
// MySQL Variables (we always use MySQL as the data source for easier verifying)
// ------------------------------------------------------------------------------------------
protected static final String MYSQL_TEST_USER = "mysqluser";
protected static final String MYSQL_TEST_PASSWORD = "mysqlpw";
protected static final String MYSQL_DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
public static final Duration DEFAULT_STARTUP_TIMEOUT = Duration.ofSeconds(240);
public static final Duration DEFAULT_RESULT_VERIFY_TIMEOUT = Duration.ofSeconds(30);

@org.testcontainers.junit.jupiter.Container
public static final MySqlContainer MYSQL =
(MySqlContainer)
new MySqlContainer(
MySqlVersion.V8_0) // v8 support both ARM and AMD architectures
.withConfigurationOverride("docker/mysql/my.cnf")
.withSetupSQL("docker/mysql/setup.sql")
.withDatabaseName("flink-test")
.withUsername("flinkuser")
.withPassword("flinkpw")
.withNetwork(NETWORK)
.withNetworkAliases("mysql");

@org.testcontainers.junit.jupiter.Container
public static final DorisContainer DORIS =
@Container
protected static final DorisContainer DORIS =
new DorisContainer(NETWORK).withNetworkAliases("doris");

protected final UniqueDatabase mysqlInventoryDatabase =
Expand All @@ -96,13 +73,13 @@ public static void initializeContainers() {
new LogMessageWaitStrategy()
.withRegEx(".*get heartbeat from FE.*")
.withTimes(1)
.withStartupTimeout(DEFAULT_STARTUP_TIMEOUT)
.withStartupTimeout(STARTUP_WAITING_TIMEOUT)
.waitUntilReady(DORIS);

while (!checkBackendAvailability()) {
try {
if (System.currentTimeMillis() - startWaitingTimestamp
> DEFAULT_STARTUP_TIMEOUT.toMillis()) {
> STARTUP_WAITING_TIMEOUT.toMillis()) {
throw new RuntimeException("Doris backend startup timed out.");
}
LOG.info("Waiting for backends to be available");
Expand Down Expand Up @@ -189,10 +166,8 @@ void testSyncWholeDatabase() throws Exception {
DORIS.getUsername(),
DORIS.getPassword(),
parallelism);
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path dorisCdcConnector = TestUtils.getResource("doris-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
submitPipelineJob(pipelineJob, mysqlCdcJar, dorisCdcConnector, mysqlDriverJar);
submitPipelineJob(pipelineJob, dorisCdcConnector);
waitUntilJobRunning(Duration.ofSeconds(30));
LOG.info("Pipeline job is running");

Expand Down Expand Up @@ -1011,7 +986,7 @@ private void validateSinkResult(
"SELECT * FROM " + tableName,
columnCount,
expected,
DEFAULT_RESULT_VERIFY_TIMEOUT.toMillis(),
EVENT_WAITING_TIMEOUT.toMillis(),
true);
}

Expand All @@ -1022,7 +997,7 @@ private void validateSinkSchema(String databaseName, String tableName, List<Stri
"DESCRIBE " + tableName,
5,
expected,
DEFAULT_RESULT_VERIFY_TIMEOUT.toMillis(),
EVENT_WAITING_TIMEOUT.toMillis(),
false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
import org.apache.flink.cdc.pipeline.tests.utils.TarballFetcher;

import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogUtil;
Expand All @@ -37,6 +38,7 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.BindMode;
Expand Down Expand Up @@ -69,15 +71,10 @@

/** End-to-end tests for mysql cdc to Iceberg pipeline job. */
public class MySqlToIcebergE2eITCase extends PipelineTestEnvironment {
private static final Logger LOG = LoggerFactory.getLogger(MySqlToIcebergE2eITCase.class);

public static final Duration TESTCASE_TIMEOUT = Duration.ofMinutes(3);
private static final Logger LOG = LoggerFactory.getLogger(MySqlToIcebergE2eITCase.class);

// ------------------------------------------------------------------------------------------
// MySQL Variables (we always use MySQL as the data source for easier verifying)
// ------------------------------------------------------------------------------------------
protected static final String MYSQL_TEST_USER = "mysqluser";
protected static final String MYSQL_TEST_PASSWORD = "mysqlpw";
@TempDir public Path temporaryFolder;

@org.testcontainers.junit.jupiter.Container
public static final MySqlContainer MYSQL =
Expand Down Expand Up @@ -145,6 +142,9 @@ public void before() throws Exception {
runInContainerAsRoot(taskManager, "chmod", "0777", "-R", sharedVolume.toString());
runInContainerAsRoot(taskManager, "chmod", "0777", "-R", warehouse);
inventoryDatabase.createAndInitialize();

TarballFetcher.fetchLatest(jobManager);
LOG.info("CDC executables deployed.");
}

@AfterEach
Expand Down Expand Up @@ -351,7 +351,7 @@ private void validateSinkResult(
throws InterruptedException {
runInContainerAsRoot(jobManager, "chmod", "0777", "-R", warehouse);
LOG.info("Verifying Iceberg {}::{}::{} results...", warehouse, database, table);
long deadline = System.currentTimeMillis() + TESTCASE_TIMEOUT.toMillis();
long deadline = System.currentTimeMillis() + EVENT_WAITING_TIMEOUT.toMillis();
List<String> results = Collections.emptyList();
while (System.currentTimeMillis() < deadline) {
try {
Expand All @@ -364,7 +364,7 @@ private void validateSinkResult(
LOG.info(
"Successfully verified {} records in {} seconds.",
expected.size(),
(System.currentTimeMillis() - deadline + TESTCASE_TIMEOUT.toMillis())
(System.currentTimeMillis() - deadline + EVENT_WAITING_TIMEOUT.toMillis())
/ 1000);
return;
} catch (Exception e) {
Expand Down
Loading
Loading