Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cdc] Added pulsar database cdc synchronization action #2345

Merged
merged 13 commits into from
Nov 30, 2023
Prev Previous commit
Next Next commit
fix
  • Loading branch information
zhuangchong committed Nov 21, 2023
commit b259cbb677b69fef51c1c45a69dca1ce1fbd512f
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public class PulsarActionITCaseBase extends CdcActionITCaseBase {

private PulsarAdmin admin;
private PulsarClient client;
protected List<String> topics = new ArrayList<>();

@RegisterExtension
public static final PulsarContainerExtension PULSAR_CONTAINER =
Expand Down Expand Up @@ -191,7 +192,6 @@ private void createPartitionedTopic(String topic, int numberOfPartitions) {
}

private void deleteTopics() throws Exception {
List<String> topics = admin.topics().getList("public/default");
for (String topic : topics) {
String topicName = topicName(topic);
PartitionedTopicMetadata metadata =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void testSchemaEvolutionMultiTopic() throws Exception {
final String topic3 = "schema_evolution_2";
boolean writeOne = false;
int fileCount = 3;
List<String> topics = Arrays.asList(topic1, topic2, topic3);
topics = Arrays.asList(topic1, topic2, topic3);
topics.forEach(topic -> createTopic(topic, 1));

// ---------- Write the Canal json into Pulsar -------------------
Expand All @@ -65,6 +65,7 @@ public void testSchemaEvolutionMultiTopic() throws Exception {
}

Map<String, String> pulsarConfig = getBasicPulsarConfig();
pulsarConfig.put(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS.key(), "-1");
pulsarConfig.put(VALUE_FORMAT.key(), "canal-json");
pulsarConfig.put(TOPIC.key(), String.join(",", topics));
PulsarSyncDatabaseAction action =
Expand All @@ -82,7 +83,7 @@ public void testSchemaEvolutionOneTopic() throws Exception {
final String topic = "schema_evolution";
boolean writeOne = true;
int fileCount = 3;
List<String> topics = Collections.singletonList(topic);
topics = Collections.singletonList(topic);
topics.forEach(t -> createTopic(t, 1));

// ---------- Write the Canal json into Pulsar -------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.junit.jupiter.api.Timeout;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;

Expand All @@ -45,6 +46,7 @@ public void testSchemaEvolution() throws Exception {

private void runSingleTableSchemaEvolution(String sourceDir) throws Exception {
final String topic = "schema_evolution";
topics = Collections.singletonList(topic);
createTopic(topic, 1);
// ---------- Write the Canal json into Pulsar -------------------
sendMessages(
Expand Down
Loading