diff --git a/CHANGELOG.md b/CHANGELOG.md
index 6bf12b52095..54f50a67436 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,9 @@ All notable changes to this project will be documented in this file.
### Fixed
- Fixed retrieval of Docker host IP when running inside Docker. ([\#479](https://github.com/testcontainers/testcontainers-java/issues/479))
+### Changed
+- Added Kafka module ([\#546](https://github.com/testcontainers/testcontainers-java/pull/546))
+
## [1.5.1] - 2017-12-19
### Fixed
diff --git a/core/src/main/java/org/testcontainers/containers/AmbassadorContainer.java b/core/src/main/java/org/testcontainers/containers/AmbassadorContainer.java
index 756b88d05ab..0b86821d293 100644
--- a/core/src/main/java/org/testcontainers/containers/AmbassadorContainer.java
+++ b/core/src/main/java/org/testcontainers/containers/AmbassadorContainer.java
@@ -15,9 +15,12 @@
* An ambassador container is used as a TCP proxy, enabling any TCP port of another linked container to be exposed
* publicly, even if that container does not make the port public itself. The richnorth/ambassador:latest
* container is used (based on HAProxy).
+ *
+ * @deprecated use {@link SocatContainer}
*/
@EqualsAndHashCode(callSuper = false)
@Data
+@Deprecated
public class AmbassadorContainer> extends GenericContainer {
private final String otherContainerName;
diff --git a/core/src/main/java/org/testcontainers/containers/DockerComposeContainer.java b/core/src/main/java/org/testcontainers/containers/DockerComposeContainer.java
index d4061d9e499..b718351c86b 100644
--- a/core/src/main/java/org/testcontainers/containers/DockerComposeContainer.java
+++ b/core/src/main/java/org/testcontainers/containers/DockerComposeContainer.java
@@ -11,9 +11,6 @@
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.SystemUtils;
import org.junit.runner.Description;
-import org.rnorth.ducttape.ratelimits.RateLimiter;
-import org.rnorth.ducttape.ratelimits.RateLimiterBuilder;
-import org.rnorth.ducttape.unreliables.Unreliables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.profiler.Profiler;
@@ -27,8 +24,15 @@
import org.zeroturnaround.exec.stream.slf4j.Slf4jStream;
import java.io.File;
-import java.util.*;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
@@ -46,7 +50,6 @@ public class DockerComposeContainer> e
* Random identifier which will become part of spawned containers names, so we can shut them down
*/
private final String identifier;
- private final Map ambassadorContainers = new HashMap<>();
private final List composeFiles;
private final Set spawnedContainerIds = new HashSet<>();
private final Set spawnedNetworkIds = new HashSet<>();
@@ -56,6 +59,10 @@ public class DockerComposeContainer> e
private boolean pull = true;
private boolean tailChildContainers;
+ private final AtomicInteger nextAmbassadorPort = new AtomicInteger(2000);
+ private final Map> ambassadorPortMappings = new ConcurrentHashMap<>();
+ private final SocatContainer ambassadorContainer = new SocatContainer();
+
private static final Object MUTEX = new Object();
/**
@@ -64,12 +71,6 @@ public class DockerComposeContainer> e
*/
private Map env = new HashMap<>();
- private static final RateLimiter AMBASSADOR_CREATION_RATE_LIMITER = RateLimiterBuilder
- .newBuilder()
- .withRate(6, TimeUnit.MINUTES)
- .withConstantThroughput()
- .build();
-
@Deprecated
public DockerComposeContainer(File composeFile, String identifier) {
this(identifier, composeFile);
@@ -201,31 +202,9 @@ private List listChildContainers() {
}
private void startAmbassadorContainers(Profiler profiler) {
- for (final Map.Entry address : ambassadorContainers.entrySet()) {
-
- try {
- // Start any ambassador containers we need
- profiler.start("Ambassador container startup");
-
- final AmbassadorContainer ambassadorContainer = address.getValue();
- Unreliables.retryUntilSuccess(120, TimeUnit.SECONDS, () -> {
-
- AMBASSADOR_CREATION_RATE_LIMITER.doWhenReady(() -> {
- Profiler localProfiler = profiler.startNested("Ambassador container: " + ambassadorContainer.getContainerName());
-
- localProfiler.start("Start ambassador container");
-
- ambassadorContainer.start();
- });
-
- return null;
- });
- } catch (Exception e) {
- logger().warn("Exception during ambassador container startup!", e);
- } finally {
- profiler.stop().log();
- }
- }
+ profiler.start("Ambassador container startup");
+ ambassadorContainer.start();
+ profiler.stop().log();
}
private Logger logger() {
@@ -237,8 +216,8 @@ public void finished(Description description) {
synchronized (MUTEX) {
- // shut down all the ambassador containers
- ambassadorContainers.forEach((String address, AmbassadorContainer container) -> container.stop());
+ // shut down the ambassador container
+ ambassadorContainer.stop();
// Kill the services using docker-compose
try {
@@ -270,7 +249,7 @@ public SELF withExposedService(String serviceName, int servicePort) {
}
/*
- * For every service/port pair that needs to be exposed, we have to start an 'ambassador container'.
+ * For every service/port pair that needs to be exposed, we register a target on an 'ambassador container'.
*
* The ambassador container's role is to link (within the Docker network) to one of the
* compose services, and proxy TCP network I/O out to a port that the ambassador container
@@ -282,13 +261,12 @@ public SELF withExposedService(String serviceName, int servicePort) {
* {@link GenericContainer} should ensure that the ambassador container is on the same network
* as the rest of the compose environment.
*/
- AmbassadorContainer ambassadorContainer =
- new AmbassadorContainer<>(new FutureContainer(this.identifier + "_" + serviceName), serviceName, servicePort)
- .withEnv(env);
-
- // Ambassador containers will all be started together after docker compose has started
- ambassadorContainers.put(serviceName + ":" + servicePort, ambassadorContainer);
+ // Ambassador container will be started together after docker compose has started
+ int ambassadorPort = nextAmbassadorPort.getAndIncrement();
+ ambassadorPortMappings.computeIfAbsent(serviceName, __ -> new ConcurrentHashMap<>()).put(servicePort, ambassadorPort);
+ ambassadorContainer.withTarget(ambassadorPort, serviceName, servicePort);
+ ambassadorContainer.addLink(new FutureContainer(this.identifier + "_" + serviceName), serviceName);
return self();
}
@@ -307,7 +285,7 @@ public DockerComposeContainer withExposedService(String serviceName, int instanc
* @return a host IP address or hostname that can be used for accessing the service container.
*/
public String getServiceHost(String serviceName, Integer servicePort) {
- return ambassadorContainers.get(serviceName + ":" + servicePort).getContainerIpAddress();
+ return ambassadorContainer.getContainerIpAddress();
}
/**
@@ -321,7 +299,7 @@ public String getServiceHost(String serviceName, Integer servicePort) {
* @return a port that can be used for accessing the service container.
*/
public Integer getServicePort(String serviceName, Integer servicePort) {
- return ambassadorContainers.get(serviceName + ":" + servicePort).getMappedPort(servicePort);
+ return ambassadorContainer.getMappedPort(ambassadorPortMappings.get(serviceName).get(servicePort));
}
public SELF withScaledService(String serviceBaseName, int numInstances) {
diff --git a/core/src/main/java/org/testcontainers/containers/Network.java b/core/src/main/java/org/testcontainers/containers/Network.java
index dc848b02b8c..c2ea8a7b993 100644
--- a/core/src/main/java/org/testcontainers/containers/Network.java
+++ b/core/src/main/java/org/testcontainers/containers/Network.java
@@ -52,7 +52,7 @@ class NetworkImpl extends ExternalResource implements Network {
private final AtomicBoolean initialized = new AtomicBoolean();
@Override
- public String getId() {
+ public synchronized String getId() {
if (initialized.compareAndSet(false, true)) {
id = create();
}
diff --git a/core/src/main/java/org/testcontainers/containers/SocatContainer.java b/core/src/main/java/org/testcontainers/containers/SocatContainer.java
new file mode 100644
index 00000000000..5949a14490d
--- /dev/null
+++ b/core/src/main/java/org/testcontainers/containers/SocatContainer.java
@@ -0,0 +1,42 @@
+package org.testcontainers.containers;
+
+import org.testcontainers.utility.Base58;
+import org.testcontainers.utility.TestcontainersConfiguration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * A socat container is used as a TCP proxy, enabling any TCP port of another container to be exposed
+ * publicly, even if that container does not make the port public itself.
+ */
+public class SocatContainer extends GenericContainer {
+
+ private final Map targets = new HashMap<>();
+
+ public SocatContainer() {
+ super(TestcontainersConfiguration.getInstance().getSocatContainerImage());
+ withCreateContainerCmdModifier(it -> it.withEntrypoint("/bin/sh"));
+ withCreateContainerCmdModifier(it -> it.withName("testcontainers-socat-" + Base58.randomString(8)));
+ }
+
+ public SocatContainer withTarget(int exposedPort, String host) {
+ return withTarget(exposedPort, host, exposedPort);
+ }
+
+ public SocatContainer withTarget(int exposedPort, String host, int internalPort) {
+ addExposedPort(exposedPort);
+ targets.put(exposedPort, String.format("%s:%s", host, internalPort));
+ return self();
+ }
+
+ @Override
+ protected void configure() {
+ withCommand("-c",
+ targets.entrySet().stream()
+ .map(entry -> "socat TCP-LISTEN:" + entry.getKey() + ",fork,reuseaddr TCP:" + entry.getValue())
+ .collect(Collectors.joining(" & "))
+ );
+ }
+}
\ No newline at end of file
diff --git a/core/src/main/java/org/testcontainers/utility/TestcontainersConfiguration.java b/core/src/main/java/org/testcontainers/utility/TestcontainersConfiguration.java
index bb7acb0bc9e..890cd0d4279 100644
--- a/core/src/main/java/org/testcontainers/utility/TestcontainersConfiguration.java
+++ b/core/src/main/java/org/testcontainers/utility/TestcontainersConfiguration.java
@@ -30,6 +30,10 @@ public String getAmbassadorContainerImage() {
return (String) properties.getOrDefault("ambassador.container.image", "richnorth/ambassador:latest");
}
+ public String getSocatContainerImage() {
+ return (String) properties.getOrDefault("socat.container.image", "alpine/socat:latest");
+ }
+
public String getVncRecordedContainerImage() {
return (String) properties.getOrDefault("vncrecorder.container.image", "richnorth/vnc-recorder:latest");
}
diff --git a/modules/kafka/pom.xml b/modules/kafka/pom.xml
new file mode 100644
index 00000000000..09a20d01fe0
--- /dev/null
+++ b/modules/kafka/pom.xml
@@ -0,0 +1,45 @@
+
+
+ 4.0.0
+
+
+ org.testcontainers
+ testcontainers-parent
+ 0-SNAPSHOT
+ ../../pom.xml
+
+
+ kafka
+ TestContainers :: Apache Kafka
+
+
+
+ ${project.groupId}
+ testcontainers
+ ${project.version}
+
+
+
+ org.apache.kafka
+ kafka-clients
+ 1.0.0
+ test
+
+
+
+ org.assertj
+ assertj-core
+ 3.8.0
+ test
+
+
+
+ com.google.guava
+ guava
+ 23.0
+ test
+
+
+
+
\ No newline at end of file
diff --git a/modules/kafka/src/main/java/org.testcontainers.containers/KafkaContainer.java b/modules/kafka/src/main/java/org.testcontainers.containers/KafkaContainer.java
new file mode 100644
index 00000000000..228dc8a3399
--- /dev/null
+++ b/modules/kafka/src/main/java/org.testcontainers.containers/KafkaContainer.java
@@ -0,0 +1,82 @@
+package org.testcontainers.containers;
+
+import org.testcontainers.utility.Base58;
+
+import java.util.stream.Stream;
+
+public class KafkaContainer extends GenericContainer {
+
+ public static final int KAFKA_PORT = 9092;
+
+ public static final int ZOOKEEPER_PORT = 2181;
+
+ protected String externalZookeeperConnect = null;
+
+ protected SocatContainer proxy;
+
+ public KafkaContainer() {
+ this("4.0.0");
+ }
+
+ public KafkaContainer(String confluentPlatformVersion) {
+ super("confluentinc/cp-kafka:" + confluentPlatformVersion);
+
+ withNetwork(Network.newNetwork());
+ String networkAlias = "kafka-" + Base58.randomString(6);
+ withNetworkAliases(networkAlias);
+ withExposedPorts(KAFKA_PORT);
+
+ // Use two listeners with different names, it will force Kafka to communicate with itself via internal
+ // listener when KAFKA_INTER_BROKER_LISTENER_NAME is set, otherwise Kafka will try to use the advertised listener
+ withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:9092,BROKER://" + networkAlias + ":9093");
+ withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT");
+ withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER");
+
+ withEnv("KAFKA_BROKER_ID", "1");
+ withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1");
+ withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1");
+ withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + "");
+ }
+
+ public KafkaContainer withEmbeddedZookeeper() {
+ externalZookeeperConnect = null;
+ return self();
+ }
+
+ public KafkaContainer withExternalZookeeper(String connectString) {
+ externalZookeeperConnect = connectString;
+ return self();
+ }
+
+ public String getBootstrapServers() {
+ return String.format("PLAINTEXT://%s:%s", proxy.getContainerIpAddress(), proxy.getFirstMappedPort());
+ }
+
+ @Override
+ public void start() {
+ String networkAlias = getNetworkAliases().get(0);
+ proxy = new SocatContainer()
+ .withNetwork(getNetwork())
+ .withTarget(9092, networkAlias)
+ .withTarget(2181, networkAlias);
+
+ proxy.start();
+ withEnv("KAFKA_ADVERTISED_LISTENERS", "BROKER://" + networkAlias + ":9093,PLAINTEXT://" + proxy.getContainerIpAddress() + ":" + proxy.getFirstMappedPort());
+
+ if (externalZookeeperConnect != null) {
+ withEnv("KAFKA_ZOOKEEPER_CONNECT", externalZookeeperConnect);
+ } else {
+ addExposedPort(ZOOKEEPER_PORT);
+ withEnv("KAFKA_ZOOKEEPER_CONNECT", "localhost:2181");
+ withClasspathResourceMapping("tc-zookeeper.properties", "/zookeeper.properties", BindMode.READ_ONLY);
+ withCommand("sh", "-c", "zookeeper-server-start /zookeeper.properties & /etc/confluent/docker/run");
+ }
+
+ super.start();
+ }
+
+ @Override
+ public void stop() {
+ Stream.of(super::stop, proxy::stop).parallel().forEach(Runnable::run);
+ }
+}
\ No newline at end of file
diff --git a/modules/kafka/src/main/resources/tc-zookeeper.properties b/modules/kafka/src/main/resources/tc-zookeeper.properties
new file mode 100644
index 00000000000..f0cb437927d
--- /dev/null
+++ b/modules/kafka/src/main/resources/tc-zookeeper.properties
@@ -0,0 +1,3 @@
+clientPort=2181
+dataDir=/var/lib/zookeeper/data
+dataLogDir=/var/lib/zookeeper/log
\ No newline at end of file
diff --git a/modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java b/modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java
new file mode 100644
index 00000000000..f88cb1703ed
--- /dev/null
+++ b/modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java
@@ -0,0 +1,116 @@
+package org.testcontainers.containers;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.Test;
+import org.rnorth.ducttape.unreliables.Unreliables;
+
+import java.util.Arrays;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.tuple;
+
+public class KafkaContainerTest {
+
+ @Test
+ public void testUsage() throws Exception {
+ try (KafkaContainer kafka = new KafkaContainer()) {
+ kafka.start();
+ testKafkaFunctionality(kafka.getBootstrapServers());
+ }
+ }
+
+ @Test
+ public void testExternalZookeeperWithKafkaNetwork() throws Exception {
+ try (
+ KafkaContainer kafka = new KafkaContainer()
+ .withExternalZookeeper("zookeeper:2181");
+
+ GenericContainer zookeeper = new GenericContainer("confluentinc/cp-zookeeper:4.0.0")
+ .withNetwork(kafka.getNetwork())
+ .withNetworkAliases("zookeeper")
+ .withEnv("ZOOKEEPER_CLIENT_PORT", "2181");
+ ) {
+ Stream.of(kafka, zookeeper).parallel().forEach(GenericContainer::start);
+
+ testKafkaFunctionality(kafka.getBootstrapServers());
+ }
+ }
+
+ @Test
+ public void testExternalZookeeperWithExternalNetwork() throws Exception {
+ try (
+ Network network = Network.newNetwork();
+
+ KafkaContainer kafka = new KafkaContainer()
+ .withNetwork(network)
+ .withExternalZookeeper("zookeeper:2181");
+
+ GenericContainer zookeeper = new GenericContainer("confluentinc/cp-zookeeper:4.0.0")
+ .withNetwork(network)
+ .withNetworkAliases("zookeeper")
+ .withEnv("ZOOKEEPER_CLIENT_PORT", "2181");
+ ) {
+ Stream.of(kafka, zookeeper).parallel().forEach(GenericContainer::start);
+
+ testKafkaFunctionality(kafka.getBootstrapServers());
+ }
+ }
+
+ protected void testKafkaFunctionality(String bootstrapServers) throws Exception {
+ try (
+ KafkaProducer producer = new KafkaProducer<>(
+ ImmutableMap.of(
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers,
+ ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString()
+ ),
+ new StringSerializer(),
+ new StringSerializer()
+ );
+
+ KafkaConsumer consumer = new KafkaConsumer<>(
+ ImmutableMap.of(
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers,
+ ConsumerConfig.GROUP_ID_CONFIG, "tc-" + UUID.randomUUID(),
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"
+ ),
+ new StringDeserializer(),
+ new StringDeserializer()
+ );
+ ) {
+ String topicName = "messages";
+ consumer.subscribe(Arrays.asList(topicName));
+
+ producer.send(new ProducerRecord<>(topicName, "testcontainers", "rulezzz")).get();
+
+ Unreliables.retryUntilTrue(10, TimeUnit.SECONDS, () -> {
+ ConsumerRecords records = consumer.poll(100);
+
+ if (records.isEmpty()) {
+ return false;
+ }
+
+ assertThat(records)
+ .hasSize(1)
+ .extracting(ConsumerRecord::topic, ConsumerRecord::key, ConsumerRecord::value)
+ .containsExactly(tuple(topicName, "testcontainers", "rulezzz"));
+
+ return true;
+ });
+
+ consumer.unsubscribe();
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/modules/kafka/src/test/resources/logback-test.xml b/modules/kafka/src/test/resources/logback-test.xml
new file mode 100644
index 00000000000..7bd6a94d827
--- /dev/null
+++ b/modules/kafka/src/test/resources/logback-test.xml
@@ -0,0 +1,18 @@
+
+
+
+
+
+ %d{HH:mm:ss.SSS} %-5level %logger - %msg%n
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index bf158718386..c73552febcf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -165,6 +165,7 @@
modules/postgresql
modules/selenium
modules/nginx
+ modules/kafka
modules/jdbc-test