Skip to content

Commit d75cc6c

Browse files
committed
chore(demo-kafka): fix org.apache.kafka.common.errors.TimeoutException
1 parent 7d516a9 commit d75cc6c

File tree

3 files changed

+27
-10
lines changed

3 files changed

+27
-10
lines changed

demo-kafka/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,12 @@
6565
<version>${testcontainers.version}</version>
6666
<scope>test</scope>
6767
</dependency>
68+
<dependency>
69+
<groupId>org.testcontainers</groupId>
70+
<artifactId>junit-jupiter</artifactId>
71+
<version>${testcontainers.version}</version>
72+
<scope>test</scope>
73+
</dependency>
6874
</dependencies>
6975

7076
</project>

demo-kafka/src/test/java/com/helltractor/demo/container/ConfluentKafkaContainerCluster.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.helltractor.demo.container;
22

33
import org.apache.kafka.common.Uuid;
4+
import org.apache.kafka.clients.NetworkClient;
45
import org.awaitility.Awaitility;
56
import org.testcontainers.containers.Container;
67
import org.testcontainers.containers.GenericContainer;
@@ -39,7 +40,7 @@ public ConfluentKafkaContainerCluster(String confluentPlatformVersion, int broke
3940

4041
String controllerQuorumVoters = IntStream
4142
.range(0, brokersNum)
42-
.mapToObj(brokerNum -> String.format("%d@broker-%d:9092", brokerNum, brokerNum))
43+
.mapToObj(brokerNum -> String.format("%d@broker-%d:9094", brokerNum, brokerNum))
4344
.collect(Collectors.joining(","));
4445

4546
String clusterId = Uuid.randomUuid().toString();

demo-kafka/src/test/java/com/helltractor/demo/messaging/MessagingFactoryTest.java

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@
1212
import org.slf4j.LoggerFactory;
1313
import org.springframework.beans.factory.annotation.Autowired;
1414
import org.springframework.boot.test.context.SpringBootTest;
15+
import org.springframework.test.context.DynamicPropertyRegistry;
16+
import org.springframework.test.context.DynamicPropertySource;
17+
import org.testcontainers.junit.jupiter.Container;
18+
import org.testcontainers.junit.jupiter.Testcontainers;
1519

1620
import java.time.Duration;
1721
import java.util.List;
@@ -20,10 +24,14 @@
2024
import static org.junit.jupiter.api.Assertions.assertEquals;
2125

2226
@SpringBootTest(classes = KafkaApplication.class)
23-
public class MessagingFactoryTest {
27+
@Testcontainers
28+
public final class MessagingFactoryTest {
2429

2530
final static Logger logger = LoggerFactory.getLogger(MessagingFactoryTest.class);
2631

32+
@Container
33+
static ConfluentKafkaContainerCluster kafkaCluster = new ConfluentKafkaContainerCluster("7.4.0", 1, 1);
34+
2735
static class TestConsumer {
2836

2937
final AtomicInteger messageCount = new AtomicInteger();
@@ -42,31 +50,33 @@ int getTotalMessages() {
4250
}
4351
}
4452

53+
@DynamicPropertySource
54+
static void kafkaProperties(DynamicPropertyRegistry registry) {
55+
registry.add("spring.kafka.bootstrap-servers", kafkaCluster::getBootstrapServers);
56+
}
57+
4558
@Autowired
4659
MessagingFactory messagingFactory;
4760

48-
ConfluentKafkaContainerCluster cluster;
49-
5061
MessageProducer<TestMessage> processorOne;
5162
MessageProducer<TestMessage> processorTwo;
5263
MessageProducer<TestMessage> processorThree;
5364

5465
@BeforeEach
5566
void init() {
56-
cluster = new ConfluentKafkaContainerCluster("7.4.0", 3, 2);
57-
cluster.start();
67+
kafkaCluster.start();
5868
Awaitility.await()
5969
.atMost(Duration.ofMinutes(1))
6070
.pollInterval(Duration.ofSeconds(5))
61-
.until(() -> cluster.getBrokers().stream().allMatch(b -> b.isRunning()));
71+
.until(() -> kafkaCluster.getBrokers().stream().allMatch(b -> b.isRunning()));
6272
processorOne = messagingFactory.createMessageProducer(Messaging.Topic.TOPIC_ONE, TestMessage.class);
6373
processorTwo = messagingFactory.createMessageProducer(Messaging.Topic.TOPIC_TWO, TestMessage.class);
6474
processorThree = messagingFactory.createMessageProducer(Messaging.Topic.TOPIC_THREE, TestMessage.class);
6575
}
6676

6777
@AfterEach
6878
void destroy() {
69-
cluster.stop();
79+
kafkaCluster.stop();
7080
}
7181

7282
@Test
@@ -86,8 +96,8 @@ void test() throws InterruptedException {
8696
}
8797

8898
Awaitility.await()
89-
.atMost(Duration.ofSeconds(10))
90-
.until(() -> testConsumer.getTotalMessages() == 300);
99+
.atMost(Duration.ofSeconds(20))
100+
.until(() -> testConsumer.getTotalMessages() >= 300);
91101

92102
assertEquals(300, testConsumer.getTotalMessages());
93103
}

0 commit comments

Comments
 (0)