From 864df2d50b0d5baf0ace24fba972444569d0e21e Mon Sep 17 00:00:00 2001 From: Sergei Egorov Date: Sat, 6 Feb 2021 15:24:18 +0100 Subject: [PATCH 1/9] Force Jackson version to 2.8.8 (forward compatibility) (#3602) Co-authored-by: Richard North --- core/build.gradle | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/core/build.gradle b/core/build.gradle index ac174e8bc93..7613ed58ccc 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -157,6 +157,13 @@ if (!org.gradle.internal.os.OperatingSystem.current().isWindows()) { project.tasks.check.dependsOn(japicmp) } +configurations.all { + resolutionStrategy { + // use lower Jackson version + force 'com.fasterxml.jackson.core:jackson-databind:2.8.8' + } +} + dependencies { baseline 'org.testcontainers:testcontainers:1.15.1', { exclude group: "*", module: "*" @@ -175,10 +182,6 @@ dependencies { compile "com.github.docker-java:docker-java-api:3.2.7" - // use lower Jackson version - shaded ('com.fasterxml.jackson.core:jackson-databind:2.8.8') { - exclude(group: 'com.fasterxml.jackson.core', module: 'jackson-annotations') - } shaded ('com.github.docker-java:docker-java-core:3.2.7') { exclude(group: 'com.github.docker-java', module: 'docker-java-api') exclude(group: 'com.github.docker-java', module: 'docker-java-transport') From 7688d1bd808d4ce965cfd86f2053dd61a479f2ed Mon Sep 17 00:00:00 2001 From: Alexander Girke Date: Sat, 6 Feb 2021 15:49:21 +0100 Subject: [PATCH 2/9] Clarify usage of host port exporting (#3421) --- docs/features/networking.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/features/networking.md b/docs/features/networking.md index 2fe039b74ef..e7d8493483e 100644 --- a/docs/features/networking.md +++ b/docs/features/networking.md @@ -69,7 +69,7 @@ We need to tell Testcontainers to prepare to expose this port to containers: !!! warning - Note that the above command should be invoked _before_ containers are started. + Note that the above command should be invoked _before_ containers are started, but _after_ the server on the host was started. Having done so, we can now access this port from any containers that are launched. From a container's perspective, the hostname will be `host.testcontainers.internal` and the port will be the same value as `localServerPort`. From ed8386c4cc2e38cc766eea30de68f3d559639a8a Mon Sep 17 00:00:00 2001 From: Michael Nitschinger Date: Sat, 6 Feb 2021 16:23:10 +0100 Subject: [PATCH 3/9] couchbase: wait until all services are part of the config (#3003) This changeset adds a predicate to the wait strategy to make sure that it not only returns a 200, but also that every enabled service is actually already exposed in the config. Since we are polling the server during bootstrap here, not all of them might show up at the same time. Also, while not contributing to the fix we poll the terse bucket http config "b" instead of the verbose one "buckets" since it is a little more efficient on the server side and actually the config the client internally works with. fixes #2993 Co-authored-by: Sergei Egorov --- .../couchbase/CouchbaseContainer.java | 54 +++++++++++++++---- .../couchbase/CouchbaseService.java | 17 ++++-- 2 files changed, 56 insertions(+), 15 deletions(-) diff --git a/modules/couchbase/src/main/java/org/testcontainers/couchbase/CouchbaseContainer.java b/modules/couchbase/src/main/java/org/testcontainers/couchbase/CouchbaseContainer.java index 11960304ee6..3671608d324 100644 --- a/modules/couchbase/src/main/java/org/testcontainers/couchbase/CouchbaseContainer.java +++ b/modules/couchbase/src/main/java/org/testcontainers/couchbase/CouchbaseContainer.java @@ -36,10 +36,12 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.EnumSet; +import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; import java.util.stream.Collectors; /** @@ -181,7 +183,7 @@ protected void configure() { .map("healthy"::equals) .orElse(false); } catch (IOException e) { - logger().error("Unable to parse response {}", response, e); + logger().error("Unable to parse response: {}", response, e); return false; } }) @@ -254,15 +256,10 @@ private void renameNode() { private void initializeServices() { logger().debug("Initializing couchbase services on host: {}", enabledServices); - final String services = enabledServices.stream().map(s -> { - switch (s) { - case KV: return "kv"; - case QUERY: return "n1ql"; - case INDEX: return "index"; - case SEARCH: return "fts"; - default: throw new IllegalStateException("Unknown service!"); - } - }).collect(Collectors.joining(",")); + final String services = enabledServices + .stream() + .map(CouchbaseService::getIdentifier) + .collect(Collectors.joining(",")); @Cleanup Response response = doHttpRequest(MGMT_PORT, "/node/controller/setupServices", "POST", new FormBody.Builder() .add("services", services) @@ -363,10 +360,11 @@ private void createBuckets() { checkSuccessfulResponse(response, "Could not create bucket " + bucket.getName()); new HttpWaitStrategy() - .forPath("/pools/default/buckets/" + bucket.getName()) + .forPath("/pools/default/b/" + bucket.getName()) .forPort(MGMT_PORT) .withBasicCredentials(username, password) .forStatusCode(200) + .forResponsePredicate(new AllServicesEnabledPredicate()) .waitUntilReady(this); if (enabledServices.contains(CouchbaseService.QUERY)) { @@ -472,4 +470,38 @@ private Response doHttpRequest(final int port, final String path, final String m throw new RuntimeException("Could not perform request against couchbase HTTP endpoint ", ex); } } + + /** + * In addition to getting a 200, we need to make sure that all services we need are enabled and available on + * the bucket. + *

+ * Fixes the issue observed in https://github.com/testcontainers/testcontainers-java/issues/2993 + */ + private class AllServicesEnabledPredicate implements Predicate { + + @Override + public boolean test(final String rawConfig) { + try { + for (JsonNode node : MAPPER.readTree(rawConfig).at("/nodesExt")) { + for (CouchbaseService enabledService : enabledServices) { + boolean found = false; + Iterator fieldNames = node.get("services").fieldNames(); + while (fieldNames.hasNext()) { + if (fieldNames.next().startsWith(enabledService.getIdentifier())) { + found = true; + } + } + if (!found) { + logger().trace("Service {} not yet part of config, retrying.", enabledService); + return false; + } + } + } + return true; + } catch (IOException ex) { + logger().error("Unable to parse response: {}", rawConfig, ex); + return false; + } + } + } } diff --git a/modules/couchbase/src/main/java/org/testcontainers/couchbase/CouchbaseService.java b/modules/couchbase/src/main/java/org/testcontainers/couchbase/CouchbaseService.java index d284c660709..c60fc6b71d0 100644 --- a/modules/couchbase/src/main/java/org/testcontainers/couchbase/CouchbaseService.java +++ b/modules/couchbase/src/main/java/org/testcontainers/couchbase/CouchbaseService.java @@ -24,21 +24,30 @@ public enum CouchbaseService { /** * Key-Value service. */ - KV, + KV("kv"), /** * Query (N1QL) service. */ - QUERY, + QUERY("n1ql"), /** * Search (FTS) service. */ - SEARCH, + SEARCH("fts"), /** * Indexing service (needed if QUERY is also used!). */ - INDEX + INDEX("index"); + private final String identifier; + + CouchbaseService(String identifier) { + this.identifier = identifier; + } + + String getIdentifier() { + return identifier; + } } From bf0e4836e0136bad3844abd5db24260e2451c8e3 Mon Sep 17 00:00:00 2001 From: Artjom Kalita Date: Sat, 6 Feb 2021 17:26:55 +0200 Subject: [PATCH 4/9] Improve logging for port listener (#3736) * Improve logging for port listener, additional clean up for string concatenation in loop Co-authored-by: akalita Co-authored-by: Sergei Egorov --- .../InternalCommandPortListeningCheck.java | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/org/testcontainers/containers/wait/internal/InternalCommandPortListeningCheck.java b/core/src/main/java/org/testcontainers/containers/wait/internal/InternalCommandPortListeningCheck.java index 9b4be81b4c4..cd601e6a131 100644 --- a/core/src/main/java/org/testcontainers/containers/wait/internal/InternalCommandPortListeningCheck.java +++ b/core/src/main/java/org/testcontainers/containers/wait/internal/InternalCommandPortListeningCheck.java @@ -24,23 +24,23 @@ public class InternalCommandPortListeningCheck implements java.util.concurrent.C @Override public Boolean call() { - String command = "true"; + StringBuilder command = new StringBuilder("true"); for (int internalPort : internalPorts) { - command += " && "; - command += " ("; - command += format("cat /proc/net/tcp* | awk '{print $2}' | grep -i ':0*%x'", internalPort); - command += " || "; - command += format("nc -vz -w 1 localhost %d", internalPort); - command += " || "; - command += format("/bin/bash -c ' Date: Sat, 6 Feb 2021 18:27:51 +0300 Subject: [PATCH 5/9] Added documentation for Bigtable Emulator container (#3708) --- docs/modules/gcloud.md | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/docs/modules/gcloud.md b/docs/modules/gcloud.md index 041129919ee..33d50ee634f 100644 --- a/docs/modules/gcloud.md +++ b/docs/modules/gcloud.md @@ -5,10 +5,11 @@ Testcontainers module for the Google Cloud Platform's [Cloud SDK](https://cloud.google.com/sdk/). -Currently, the module supports `Datastore`, `Firestore`, `Spanner`, and `Pub/Sub` emulators. In order to use it, you should use the following classes: +Currently, the module supports `Bigtable`, `Datastore`, `Firestore`, `Spanner`, and `Pub/Sub` emulators. In order to use it, you should use the following classes: Class | Container Image -|- +BigtableEmulatorContainer | [gcr.io/google.com/cloudsdktool/cloud-sdk:emulators](https://gcr.io/google.com/cloudsdktool/cloud-sdk) DatastoreEmulatorContainer | [gcr.io/google.com/cloudsdktool/cloud-sdk:emulators](https://gcr.io/google.com/cloudsdktool/cloud-sdk) FirestoreEmulatorContainer | [gcr.io/google.com/cloudsdktool/cloud-sdk:emulators](https://gcr.io/google.com/cloudsdktool/cloud-sdk) SpannerEmulatorContainer | [gcr.io/cloud-spanner-emulator/emulator](https://gcr.io/cloud-spanner-emulator/emulator) @@ -16,6 +17,26 @@ PubSubEmulatorContainer | [gcr.io/google.com/cloudsdktool/cloud-sdk:emulators](h ## Usage example +### Bigtable + +Start Bigtable Emulator during a test: + + +[Starting a Bigtable Emulator container](../../modules/gcloud/src/test/java/org/testcontainers/containers/BigtableEmulatorContainerTest.java) inside_block:emulatorContainer + + +Create a test Bigtable table in the Emulator: + + +[Create a test table](../../modules/gcloud/src/test/java/org/testcontainers/containers/BigtableEmulatorContainerTest.java) inside_block:createTable + + +Test against the Emulator: + + +[Testing with a Bigtable Emulator container](../../modules/gcloud/src/test/java/org/testcontainers/containers/BigtableEmulatorContainerTest.java) inside_block:testWithEmulatorContainer + + ### Datastore Start Datastore Emulator during a test: @@ -93,7 +114,7 @@ Start Pub/Sub Emulator during a test: [Starting a Pub/Sub Emulator container](../../modules/gcloud/src/test/java/org/testcontainers/containers/PubSubEmulatorContainerTest.java) inside_block:emulatorContainer -Create a test Pub/Sub topic the Emulator: +Create a test Pub/Sub topic in the Emulator: [Create a test topic](../../modules/gcloud/src/test/java/org/testcontainers/containers/PubSubEmulatorContainerTest.java) inside_block:createTopic From fd7412fe15070a2ccf70f99c8d7edf829501f8e2 Mon Sep 17 00:00:00 2001 From: Per Lundberg Date: Sat, 6 Feb 2021 17:31:13 +0200 Subject: [PATCH 6/9] GenericContainer: fix typo in Javadoc (#3684) --- .../java/org/testcontainers/containers/GenericContainer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/testcontainers/containers/GenericContainer.java b/core/src/main/java/org/testcontainers/containers/GenericContainer.java index 0a0962cd6c0..4536a70789c 100644 --- a/core/src/main/java/org/testcontainers/containers/GenericContainer.java +++ b/core/src/main/java/org/testcontainers/containers/GenericContainer.java @@ -1339,7 +1339,7 @@ public void copyFileFromContainer(String containerPath, String destinationPath) } /** - * Allow container startup to be attempted more than once if an error occurs. To be if containers are + * Allow container startup to be attempted more than once if an error occurs. To be used if containers are * 'flaky' but this flakiness is not something that should affect test outcomes. * * @param attempts number of attempts From 73da361f47d5c144665f7baf0b45919ec63a61c7 Mon Sep 17 00:00:00 2001 From: Alar Aule Date: Sat, 6 Feb 2021 17:34:18 +0200 Subject: [PATCH 7/9] PostGIS JDBC url sample version update (#3606) Latest Testcontainer versions are using new postgis/postgis images, which versioning is different (containing both PostgreSQL version - PostGIS version) Sample is from older mdillion/postgis versions, with copy-paste sample fails. --- docs/modules/databases/jdbc.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/modules/databases/jdbc.md b/docs/modules/databases/jdbc.md index 65db9aaf07f..6853cf44c5c 100644 --- a/docs/modules/databases/jdbc.md +++ b/docs/modules/databases/jdbc.md @@ -37,7 +37,7 @@ Insert `tc:` after `jdbc:` as follows. Note that the hostname, port and database #### Using PostGIS -`jdbc:tc:postgis:9.6:///databasename` +`jdbc:tc:postgis:9.6-2.5:///databasename` #### Using Presto From 625ddc374cac2447cbbcb59c9d7e07c49ac30e99 Mon Sep 17 00:00:00 2001 From: Sergei Egorov Date: Sat, 6 Feb 2021 16:41:20 +0100 Subject: [PATCH 8/9] Kafka cluster example (#1984, #3758) Co-authored-by: Sean Glover --- examples/kafka-cluster/build.gradle | 17 +++ .../kafkacluster/KafkaContainerCluster.java | 102 ++++++++++++++++++ .../KafkaContainerClusterTest.java | 97 +++++++++++++++++ examples/settings.gradle | 1 + .../containers/KafkaContainer.java | 8 +- .../containers/KafkaContainerTest.java | 16 +++ 6 files changed, 239 insertions(+), 2 deletions(-) create mode 100644 examples/kafka-cluster/build.gradle create mode 100644 examples/kafka-cluster/src/test/java/com/example/kafkacluster/KafkaContainerCluster.java create mode 100644 examples/kafka-cluster/src/test/java/com/example/kafkacluster/KafkaContainerClusterTest.java diff --git a/examples/kafka-cluster/build.gradle b/examples/kafka-cluster/build.gradle new file mode 100644 index 00000000000..db07c6859f3 --- /dev/null +++ b/examples/kafka-cluster/build.gradle @@ -0,0 +1,17 @@ +plugins { + id 'java' +} + +repositories { + jcenter() +} + +dependencies { + testCompileOnly "org.projectlombok:lombok:1.18.10" + testAnnotationProcessor "org.projectlombok:lombok:1.18.10" + testCompile 'org.testcontainers:kafka' + testCompile 'org.apache.kafka:kafka-clients:2.3.1' + testCompile 'org.assertj:assertj-core:3.14.0' + testCompile 'com.google.guava:guava:23.0' + testCompile 'org.slf4j:slf4j-simple:1.7.30' +} diff --git a/examples/kafka-cluster/src/test/java/com/example/kafkacluster/KafkaContainerCluster.java b/examples/kafka-cluster/src/test/java/com/example/kafkacluster/KafkaContainerCluster.java new file mode 100644 index 00000000000..fd5b1a0a0b6 --- /dev/null +++ b/examples/kafka-cluster/src/test/java/com/example/kafkacluster/KafkaContainerCluster.java @@ -0,0 +1,102 @@ +package com.example.kafkacluster; + +import lombok.SneakyThrows; +import org.rnorth.ducttape.unreliables.Unreliables; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.lifecycle.Startable; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.utility.DockerImageName; + +import java.util.Collection; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static java.util.concurrent.TimeUnit.SECONDS; + +/** + * Provides an easy way to launch a Kafka cluster with multiple brokers. + */ +public class KafkaContainerCluster implements Startable { + + private final int brokersNum; + private final Network network; + private final GenericContainer zookeeper; + private final Collection brokers; + + public KafkaContainerCluster(String confluentPlatformVersion, int brokersNum, int internalTopicsRf) { + if (brokersNum < 0) { + throw new IllegalArgumentException("brokersNum '" + brokersNum + "' must be greater than 0"); + } + if (internalTopicsRf < 0 || internalTopicsRf > brokersNum) { + throw new IllegalArgumentException("internalTopicsRf '" + internalTopicsRf + "' must be less than brokersNum and greater than 0"); + } + + this.brokersNum = brokersNum; + this.network = Network.newNetwork(); + + this.zookeeper = new GenericContainer<>(DockerImageName.parse("confluentinc/cp-zookeeper").withTag(confluentPlatformVersion)) + .withNetwork(network) + .withNetworkAliases("zookeeper") + .withEnv("ZOOKEEPER_CLIENT_PORT", String.valueOf(KafkaContainer.ZOOKEEPER_PORT)); + + this.brokers = IntStream + .range(0, this.brokersNum) + .mapToObj(brokerNum -> { + return new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka").withTag(confluentPlatformVersion)) + .withNetwork(this.network) + .withNetworkAliases("broker-" + brokerNum) + .dependsOn(this.zookeeper) + .withExternalZookeeper("zookeeper:" + KafkaContainer.ZOOKEEPER_PORT) + .withEnv("KAFKA_BROKER_ID", brokerNum + "") + .withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", internalTopicsRf + "") + .withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", internalTopicsRf + "") + .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", internalTopicsRf + "") + .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", internalTopicsRf + ""); + }) + .collect(Collectors.toList()); + } + + public Collection getBrokers() { + return this.brokers; + } + + public String getBootstrapServers() { + return brokers.stream() + .map(KafkaContainer::getBootstrapServers) + .collect(Collectors.joining(",")); + } + + private Stream> allContainers() { + return Stream.concat( + this.brokers.stream(), + Stream.of(this.zookeeper) + ); + } + + @Override + @SneakyThrows + public void start() { + Stream startables = this.brokers.stream().map(Startable.class::cast); + Startables.deepStart(startables).get(60, SECONDS); + + Unreliables.retryUntilTrue(30, TimeUnit.SECONDS, () -> { + Container.ExecResult result = this.zookeeper.execInContainer( + "sh", "-c", + "zookeeper-shell zookeeper:" + KafkaContainer.ZOOKEEPER_PORT + " ls /brokers/ids | tail -n 1" + ); + String brokers = result.getStdout(); + + return brokers != null && brokers.split(",").length == this.brokersNum; + }); + } + + @Override + public void stop() { + allContainers().parallel().forEach(GenericContainer::stop); + } +} diff --git a/examples/kafka-cluster/src/test/java/com/example/kafkacluster/KafkaContainerClusterTest.java b/examples/kafka-cluster/src/test/java/com/example/kafkacluster/KafkaContainerClusterTest.java new file mode 100644 index 00000000000..0756b24d58a --- /dev/null +++ b/examples/kafka-cluster/src/test/java/com/example/kafkacluster/KafkaContainerClusterTest.java @@ -0,0 +1,97 @@ +package com.example.kafkacluster; + +import com.google.common.collect.ImmutableMap; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.Test; +import org.rnorth.ducttape.unreliables.Unreliables; + +import java.time.Duration; +import java.util.Collection; +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.tuple; + +public class KafkaContainerClusterTest { + + @Test + public void testKafkaContainerCluster() throws Exception { + try ( + KafkaContainerCluster cluster = new KafkaContainerCluster("5.2.1", 3, 2) + ) { + cluster.start(); + String bootstrapServers = cluster.getBootstrapServers(); + + assertThat(cluster.getBrokers()).hasSize(3); + + testKafkaFunctionality(bootstrapServers, 3, 2); + } + } + + protected void testKafkaFunctionality(String bootstrapServers, int partitions, int rf) throws Exception { + try ( + AdminClient adminClient = AdminClient.create(ImmutableMap.of( + AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers + )); + + KafkaProducer producer = new KafkaProducer<>( + ImmutableMap.of( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers, + ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString() + ), + new StringSerializer(), + new StringSerializer() + ); + + KafkaConsumer consumer = new KafkaConsumer<>( + ImmutableMap.of( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers, + ConsumerConfig.GROUP_ID_CONFIG, "tc-" + UUID.randomUUID(), + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest" + ), + new StringDeserializer(), + new StringDeserializer() + ); + ) { + String topicName = "messages"; + + Collection topics = Collections.singletonList(new NewTopic(topicName, partitions, (short) rf)); + adminClient.createTopics(topics).all().get(30, TimeUnit.SECONDS); + + consumer.subscribe(Collections.singletonList(topicName)); + + producer.send(new ProducerRecord<>(topicName, "testcontainers", "rulezzz")).get(); + + Unreliables.retryUntilTrue(10, TimeUnit.SECONDS, () -> { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + + if (records.isEmpty()) { + return false; + } + + assertThat(records) + .hasSize(1) + .extracting(ConsumerRecord::topic, ConsumerRecord::key, ConsumerRecord::value) + .containsExactly(tuple(topicName, "testcontainers", "rulezzz")); + + return true; + }); + + consumer.unsubscribe(); + } + } + +} diff --git a/examples/settings.gradle b/examples/settings.gradle index 25da14a0931..ea988b4ada8 100644 --- a/examples/settings.gradle +++ b/examples/settings.gradle @@ -19,6 +19,7 @@ includeBuild '..' // explicit include to allow Dependabot to autodiscover subprojects include 'disque-job-queue' +include 'kafka-cluster' include 'linked-container' include 'mongodb-container' include 'redis-backed-cache' diff --git a/modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java b/modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java index 12867239251..b1bc3d735f9 100644 --- a/modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java +++ b/modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java @@ -24,6 +24,8 @@ public class KafkaContainer extends GenericContainer { public static final int ZOOKEEPER_PORT = 2181; + private static final String DEFAULT_INTERNAL_TOPIC_RF = "1"; + private static final int PORT_NOT_ASSIGNED = -1; protected String externalZookeeperConnect = null; @@ -60,8 +62,10 @@ public KafkaContainer(final DockerImageName dockerImageName) { withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER"); withEnv("KAFKA_BROKER_ID", "1"); - withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1"); - withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1"); + withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF); + withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", DEFAULT_INTERNAL_TOPIC_RF); + withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF); + withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", DEFAULT_INTERNAL_TOPIC_RF); withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + ""); withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0"); } diff --git a/modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java b/modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java index 263653822e6..d219aa17b13 100644 --- a/modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java +++ b/modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java @@ -6,8 +6,12 @@ import com.google.common.collect.ImmutableMap; import java.time.Duration; +import java.util.Collection; import java.util.UUID; import java.util.concurrent.TimeUnit; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -110,7 +114,15 @@ public void testConfluentPlatformVersion6() throws Exception { } protected void testKafkaFunctionality(String bootstrapServers) throws Exception { + testKafkaFunctionality(bootstrapServers, 1, 1); + } + + protected void testKafkaFunctionality(String bootstrapServers, int partitions, int rf) throws Exception { try ( + AdminClient adminClient = AdminClient.create(ImmutableMap.of( + AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers + )); + KafkaProducer producer = new KafkaProducer<>( ImmutableMap.of( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers, @@ -131,6 +143,10 @@ protected void testKafkaFunctionality(String bootstrapServers) throws Exception ); ) { String topicName = "messages-" + UUID.randomUUID(); + + Collection topics = singletonList(new NewTopic(topicName, partitions, (short) rf)); + adminClient.createTopics(topics).all().get(30, TimeUnit.SECONDS); + consumer.subscribe(singletonList(topicName)); producer.send(new ProducerRecord<>(topicName, "testcontainers", "rulezzz")).get(); From 585876a0f9ce0e48067eea4f1c60aa99a22079ee Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Sat, 6 Feb 2021 15:42:24 +0000 Subject: [PATCH 9/9] Bump assertj-core from 3.18.1 to 3.19.0 in /modules/junit-jupiter Bumps [assertj-core](https://github.com/assertj/assertj-core) from 3.18.1 to 3.19.0. - [Release notes](https://github.com/assertj/assertj-core/releases) - [Commits](https://github.com/assertj/assertj-core/compare/assertj-core-3.18.1...assertj-core-3.19.0) Signed-off-by: dependabot[bot] --- modules/junit-jupiter/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/junit-jupiter/build.gradle b/modules/junit-jupiter/build.gradle index 12d4321d10f..bc6aa8b1555 100644 --- a/modules/junit-jupiter/build.gradle +++ b/modules/junit-jupiter/build.gradle @@ -12,7 +12,7 @@ dependencies { testCompile ('org.mockito:mockito-core:3.6.28') { exclude(module: 'hamcrest-core') } - testCompile 'org.assertj:assertj-core:3.18.1' + testCompile 'org.assertj:assertj-core:3.19.0' testCompile 'org.junit.jupiter:junit-jupiter-params:5.7.0' testRuntime 'org.postgresql:postgresql:42.2.18'