Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package org.springframework.kafka.test.rule;

import static org.assertj.core.api.Assertions.assertThat;

import java.io.File;
import java.net.ServerSocket;
import java.util.ArrayList;
Expand All @@ -24,11 +26,16 @@
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import javax.net.ServerSocketFactory;

import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkInterruptedException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.junit.rules.ExternalResource;
Expand Down Expand Up @@ -369,4 +376,24 @@ public boolean isEmbedded() {
return true;
}

public void consumeFromAllEmbeddedTopics(Consumer<?, ?> consumer) throws Exception {
final CountDownLatch consumerLatch = new CountDownLatch(1);
consumer.subscribe(Arrays.asList(this.topics), new ConsumerRebalanceListener() {

@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
}

@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
consumerLatch.countDown();
}

});
consumer.poll(0); // force assignment
assertThat(consumerLatch.await(30, TimeUnit.SECONDS))
.as("Failed to be assigned partitions from the embedded topics")
.isTrue();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,20 @@

package org.springframework.kafka.test.utils;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import org.springframework.kafka.test.rule.KafkaEmbedded;

Expand Down Expand Up @@ -56,32 +65,64 @@ public static Map<String, Object> producerProps(KafkaEmbedded embeddedKafka) {
return senderProps(embeddedKafka.getBrokersAsString());
}

/**
* Set up test properties for an {@code <Integer, String>} consumer.
* @param brokers the bootstrapServers property.
* @param group the group id.
* @param autoCommit the auto commit.
* @return the properties.
*/
public static Map<String, Object> consumerProps(String brokers, String group, String autoCommit) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, group);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.IntegerDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}

/**
* Set up test properties for an {@code <Integer, String>} producer.
* @param brokers the bootstrapServers property.
* @return the properties.
*/
public static Map<String, Object> senderProps(String brokers) {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.IntegerSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}

/**
* Poll the consumer, expecting a single record for the specified topic.
* @param consumer the consumer.
* @param topic the topic.
* @return the record.
* @throws org.junit.ComparisonFailure if exactly one record is not received.
*/
public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic) {
ConsumerRecords<K, V> received = getRecords(consumer);
assertThat(received.count()).as("Incorrect results returned", received.count()).isEqualTo(1);
return received.records(topic).iterator().next();
}

/**
* Poll the consumer for records.
* @param consumer the consumer.
* @return the records.
*/
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer) {
ConsumerRecords<K, V> received = consumer.poll(10000);
assertThat(received).as("null received from consumer.poll()").isNotNull();
return received;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,15 @@
import static org.springframework.kafka.test.assertj.KafkaConditions.value;

import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.junit.ClassRule;
import org.junit.Test;

import org.springframework.kafka.listener.ContainerTestUtils;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.ProducerListenerAdapter;
import org.springframework.kafka.test.rule.KafkaEmbedded;
Expand All @@ -59,38 +55,27 @@ public void testTemplate() throws Exception {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(
consumerProps);
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf,
TEMPLATE_TOPIC);
final BlockingQueue<ConsumerRecord<Integer, String>> records = new LinkedBlockingQueue<>();
container.setMessageListener(new MessageListener<Integer, String>() {
Consumer<Integer, String> consumer = cf.createConsumer();
embeddedKafka.consumeFromAllEmbeddedTopics(consumer);

@Override
public void onMessage(ConsumerRecord<Integer, String> record) {
records.add(record);
}

});
container.setBeanName("templateTests");
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<Integer, String>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(TEMPLATE_TOPIC);
template.syncSend("foo");
assertThat(records.poll(10, TimeUnit.SECONDS)).has(value("foo"));
assertThat(KafkaTestUtils.getSingleRecord(consumer, TEMPLATE_TOPIC)).has(value("foo"));
template.syncSend(0, 2, "bar");
ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
ConsumerRecord<Integer, String> received = KafkaTestUtils.getSingleRecord(consumer, TEMPLATE_TOPIC);
assertThat(received).has(key(2));
assertThat(received).has(partition(0));
assertThat(received).has(value("bar"));
template.syncSend(TEMPLATE_TOPIC, 0, 2, "baz");
received = records.poll(10, TimeUnit.SECONDS);
received = KafkaTestUtils.getSingleRecord(consumer, TEMPLATE_TOPIC);
assertThat(received).has(key(2));
assertThat(received).has(partition(0));
assertThat(received).has(value("baz"));
template.syncSend(TEMPLATE_TOPIC, 0, "qux");
received = records.poll(10, TimeUnit.SECONDS);
received = KafkaTestUtils.getSingleRecord(consumer, TEMPLATE_TOPIC);
assertThat(received).has(key((Integer) null));
assertThat(received).has(partition(0));
assertThat(received).has(value("qux"));
Expand All @@ -99,18 +84,20 @@ public void onMessage(ConsumerRecord<Integer, String> record) {
.setHeader(KafkaHeaders.PARTITION_ID, 0)
.setHeader(KafkaHeaders.MESSAGE_KEY, 2)
.build());
received = records.poll(10, TimeUnit.SECONDS);
received = KafkaTestUtils.getSingleRecord(consumer, TEMPLATE_TOPIC);
assertThat(received).has(key(2));
assertThat(received).has(partition(0));
assertThat(received).has(value("fiz"));
template.syncSend(MessageBuilder.withPayload("buz")
.setHeader(KafkaHeaders.PARTITION_ID, 0)
.setHeader(KafkaHeaders.MESSAGE_KEY, 2)
.build());
received = records.poll(10, TimeUnit.SECONDS);
received = KafkaTestUtils.getSingleRecord(consumer, TEMPLATE_TOPIC);
assertThat(received).has(key(2));
assertThat(received).has(partition(0));
assertThat(received).has(value("buz"));

consumer.close();
}

@Test
Expand Down
42 changes: 42 additions & 0 deletions src/reference/asciidoc/testing.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,48 @@ public KafkaEmbedded(int count, boolean controlledShutdown, String... topics) {
public KafkaEmbedded(int count, boolean controlledShutdown, int partitions, String... topics) { ... }
----

The embedded kafka class has a utility method allowing you to consume for all the topics it created:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

has an utility.
Or one more English trick? 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, yes - when yu then a

  • an umbrella
  • a unit

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doh. I have always pronounced this word as [u]
Thank you!

Merging...


[source, java]
----
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(
consumerProps);
Consumer<Integer, String> consumer = cf.createConsumer();
embeddedKafka.consumeFromAllEmbeddedTopics(consumer);
----

The `KafkaTestUtils` has some utility methods to fetch results from the consumer:

[source, java]
----
/**
* Poll the consumer, expecting a single record for the specified topic.
* @param consumer the consumer.
* @param topic the topic.
* @return the record.
* @throws org.junit.ComparisonFailure if exactly one record is not received.
*/
public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic) { ... }

/**
* Poll the consumer for records.
* @param consumer the consumer.
* @return the records.
*/
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer) { ... }
----

Usage:

[source, java]
----
...
template.syncSend(0, 2, "bar");
ConsumerRecord<Integer, String> received = KafkaTestUtils.getSingleRecord(consumer, "topic");
...
----

==== Hamcrest Matchers

The `o.s.kafka.test.hamcrest.KafkaMatchers` provides the following matchers:
Expand Down