Skip to content

Commit

Permalink
[Hotfix][Connector-v2][kafka] Fix the short interval of pull data set…
Browse files Browse the repository at this point in the history
…tings and revise the format
  • Loading branch information
MonsterChenzhuo committed Jun 2, 2023
1 parent 84be0f9 commit 2f7f052
Show file tree
Hide file tree
Showing 9 changed files with 36 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,13 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import static org.awaitility.Awaitility.await;
import static org.awaitility.Awaitility.given;

@DisabledOnContainer(
value = {},
type = {EngineType.SPARK})
type = {EngineType.SPARK},
disabledReason = "Spark engine will lose the row kind of record")
public class CanalToKafkaIT extends TestSuiteBase implements TestResource {

private static final Logger LOG = LoggerFactory.getLogger(CanalToKafkaIT.class);
Expand All @@ -80,8 +82,6 @@ public class CanalToKafkaIT extends TestSuiteBase implements TestResource {

private static final String CANAL_HOST = "canal_e2e";

private static final int CANAL_PORT = 11111;

// ----------------------------------------------------------------------------
// kafka
private static final String KAFKA_IMAGE_NAME = "confluentinc/cp-kafka:7.0.9";
Expand All @@ -98,7 +98,7 @@ public class CanalToKafkaIT extends TestSuiteBase implements TestResource {
// mysql
private static final String MYSQL_HOST = "mysql_e2e";

private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0);
private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer();

private final UniqueDatabase inventoryDatabase =
new UniqueDatabase(MYSQL_CONTAINER, "canal", "mysqluser", "mysqlpw");
Expand All @@ -124,18 +124,16 @@ public class CanalToKafkaIT extends TestSuiteBase implements TestResource {
Assertions.assertEquals(0, extraCommands.getExitCode());
};

private static MySqlContainer createMySqlContainer(MySqlVersion version) {
MySqlContainer mySqlContainer =
new MySqlContainer(version)
.withConfigurationOverride("docker/server-gtids/my.cnf")
.withSetupSQL("docker/setup.sql")
.withNetwork(NETWORK)
.withNetworkAliases(MYSQL_HOST)
.withDatabaseName("canal")
.withUsername("st_user")
.withPassword("seatunnel")
.withLogConsumer(new Slf4jLogConsumer(LOG));
return mySqlContainer;
private static MySqlContainer createMySqlContainer() {
return new MySqlContainer(MySqlVersion.V8_0)
.withConfigurationOverride("docker/server-gtids/my.cnf")
.withSetupSQL("docker/setup.sql")
.withNetwork(NETWORK)
.withNetworkAliases(MYSQL_HOST)
.withDatabaseName("canal")
.withUsername("st_user")
.withPassword("seatunnel")
.withLogConsumer(new Slf4jLogConsumer(LOG));
}

private void createCanalContainer() {
Expand Down Expand Up @@ -165,7 +163,7 @@ private void createKafkaContainer() {
DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME)));
}

private void createPostgreSQLContainer() throws ClassNotFoundException {
private void createPostgreSQLContainer() {
POSTGRESQL_CONTAINER =
new PostgreSQLContainer<>(DockerImageName.parse(PG_IMAGE))
.withNetwork(NETWORK)
Expand Down Expand Up @@ -219,9 +217,9 @@ public void startUp() throws ClassNotFoundException, InterruptedException {
@TestTemplate
public void testKafkaSinkCanalFormat(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult = container.executeJob("/kafkasource_canal_to_kafka.conf");
Container.ExecResult execResult =
container.executeJob("/canalFormatIT/kafka_source_canal_to_kafka.conf");
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
ArrayList<Object> result = new ArrayList<>();
List<String> expectedResult =
Arrays.asList(
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel scooter\",\"weight\":\"3.14\"},\"type\":\"INSERT\"}",
Expand All @@ -239,22 +237,27 @@ public void testKafkaSinkCanalFormat(TestContainer container)
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted rocks\",\"weight\":\"7.88\"},\"type\":\"INSERT\"}",
"{\"data\":{\"id\":109,\"name\":\"spare tire\",\"description\":\"24 inch spare tire\",\"weight\":\"22.2\"},\"type\":\"DELETE\"}");

ArrayList<String> result = new ArrayList<>();
ArrayList<String> topics = new ArrayList<>();
topics.add(KAFKA_TOPIC);
kafkaConsumer.subscribe(topics);
ConsumerRecords<String, String> consumerRecords =
kafkaConsumer.poll(Duration.ofSeconds(10000));
for (ConsumerRecord<String, String> record : consumerRecords) {
result.add(record.value());
}
Assertions.assertEquals(expectedResult, result);
await().atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
ConsumerRecords<String, String> consumerRecords =
kafkaConsumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : consumerRecords) {
result.add(record.value());
}
Assertions.assertEquals(expectedResult, result);
});
}

@TestTemplate
public void testCanalFormatKafkaCdcToPgsql(TestContainer container)
throws IOException, InterruptedException, SQLException {
Container.ExecResult execResult =
container.executeJob("/kafkasource_canal_cdc_to_pgsql.conf");
container.executeJob("/canalFormatIT/kafka_source_canal_cdc_to_pgsql.conf");
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
List<Object> actual = new ArrayList<>();
try (Connection connection =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public void tearDown() throws Exception {

@TestTemplate
public void testSinkKafka(TestContainer container) throws IOException, InterruptedException {
Container.ExecResult execResult = container.executeJob("/kafkasink_fake_to_kafka.conf");
Container.ExecResult execResult = container.executeJob("/kafka_sink_fake_to_kafka.conf");
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());

String topicName = "test_topic";
Expand All @@ -153,7 +153,8 @@ public void testSinkKafka(TestContainer container) throws IOException, Interrupt
@TestTemplate
public void testTextFormatSinkKafka(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult = container.executeJob("/kafkaTextsink_fake_to_kafka.conf");
Container.ExecResult execResult =
container.executeJob("/textFormatIT/fake_source_to_text_sink_kafka.conf");
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());

String topicName = "test_text_topic";
Expand Down Expand Up @@ -201,7 +202,8 @@ public void testSourceKafkaTextToConsole(TestContainer container)
row -> new ProducerRecord<>("test_topic_text", null, serializer.serialize(row)),
0,
100);
Container.ExecResult execResult = container.executeJob("/kafkasource_text_to_console.conf");
Container.ExecResult execResult =
container.executeJob("/textFormatIT/kafka_source_text_to_console.conf");
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
}

Expand All @@ -215,7 +217,8 @@ public void testSourceKafkaJsonToConsole(TestContainer container)
DEFAULT_FORMAT,
DEFAULT_FIELD_DELIMITER);
generateTestData(row -> serializer.serializeRow(row), 0, 100);
Container.ExecResult execResult = container.executeJob("/kafkasource_json_to_console.conf");
Container.ExecResult execResult =
container.executeJob("/jsonFormatIT/kafka_source_json_to_console.conf");
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
execution.parallelism = 1
Expand Down Expand Up @@ -48,7 +45,6 @@ source {
}
}


sink {
Jdbc {
driver = org.postgresql.Driver
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
execution.parallelism = 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
execution.parallelism = 1
Expand Down Expand Up @@ -57,12 +54,6 @@ source {
}
}
}

# If you would like to get more information about how to configure seatunnel and see full list of source plugins,
# please go to https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource
}

transform {
}

sink {
Expand Down Expand Up @@ -93,6 +84,5 @@ sink {
}
]
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,6 @@ source {
}
}

transform {
}

sink {
Kafka {
bootstrap.servers = "kafkaCluster:9092"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,6 @@ source {
}
}

transform {
}

sink {
Kafka {
bootstrap.servers = "kafkaCluster:9092"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,6 @@ source {
}
}

transform {
}

sink {
Kafka {
bootstrap.servers = "kafkaCluster:9092"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
execution.parallelism = 1
Expand Down Expand Up @@ -57,15 +54,8 @@ source {
}
}
format = text
# The default field delimiter is ","
field_delimiter = ","
}

# If you would like to get more information about how to configure seatunnel and see full list of source plugins,
# please go to https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource
}

transform {
}

sink {
Expand Down Expand Up @@ -96,6 +86,5 @@ sink {
}
]
}

}
}

0 comments on commit 2f7f052

Please sign in to comment.