Create KafkaTemplate, KafkaMessageContainerListener for integration tests with kafka.
Utility library for Kafka integration tests. The Kafka containerized is detailed at the following url: https://github.com/ydespreaux/testcontainers
lib-testcontainers-kafka-utils | Spring Boot | testcontainers-kafka |
---|---|---|
1.2.0 | 2.1.x | 1.0.x |
1.1.0 | 2.0.x | 1.0.x |
1.0.0 | 1.5.x | 1.0.x |
<dependency>
<groupId>com.github.ydespreaux.testcontainers</groupId>
<artifactId>testcontainers-kafka-test</artifactId>
<version>1.2.0</version>
<scope>test</scope>
</dependency>
This factory makes it possible to create AVRO serializer / deserializer.
/**
* Create a avro serializer.
*
* @param isKey set if the serializer is applied to the key or message value
* @return
*/
public KafkaAvroSerializer createKafkaAvroSerializer(Boolean isKey);
/**
* Create a avro deserializer.
* @param isKey set if the serializer is applied to the key or message value
* @return
*/
public KafkaAvroDeserializer createKafkaAvroDeserializer(Boolean isKey);
/**
* Create a avro deserializer.
* @param isKey set if the serializer is applied to the key or message value
* @param specificAvroReader If true, tries to look up the SpecificRecord class
* @return
*/
public KafkaAvroDeserializer createKafkaAvroDeserializer(Boolean isKey, Boolean specificAvroReader);
Example:
@ClassRule
public static final ConfluentKafkaContainer kafkaContainerWithSchemaRegistry = new ConfluentKafkaContainer()
.withSchemaRegistry(true)
KafkaAvroSerializer keySerializer = new AvroSerializerFactory(kafkaContainerWithSchemaRegistry).createKafkaAvroSerializer(true);
KafkaAvroSerializer valueSerializer = new AvroSerializerFactory(kafkaContainerWithSchemaRegistry).createKafkaAvroSerializer(false);
This factory makes it possible to create KafkaTemplates.
/**
* Create a kafkaTemplate with additional properties.
*/
public <K, V> KafkaTemplate<K, V> createKafkaTemplate(Map<String, Object> additionalProperties);
/**
* Create a kafkatemplate with specific key serializer and value serializer.
*/
public <K, V> KafkaTemplate<K, V> createKafkaTemplate(String keySerializerClass, String valueSerializerClass);
/**
* Create a kafkatemplate with optional properties and a specific key serializer and value serializer.
*/
public <K, V> KafkaTemplate<K, V> createKafkaTemplate(Map<String, Object> additionalProperties, String keySerializerClass, String valueSerializerClass);
/**
* Create a kafkatemplate with specific key serializer and value serializer.
*/
public <K, V> KafkaTemplate<K, V> createKafkaTemplate(Serializer<K> keySerializer, Serializer<V> valueSerializer);
/**
* Create a kafkatemplate with optional properties and specific key serializer and value serializer
*/
public <K, V> KafkaTemplate<K, V> createKafkaTemplate(Map<String, Object> additionalProperties, Serializer<K> keySerializer, Serializer<V> valueSerializer);
Example
@ClassRule
public static final ConfluentKafkaContainer kafkaContainer = new ConfluentKafkaContainer();
KafkaTemplate<String, String> template =
new KafkaTemplateFactory(kafkaContainer)
.createKafkaTemplate("org.apache.kafka.common.serialization.StringSerializer", "org.apache.kafka.common.serialization.StringSerializer");
Format Avro:
@ClassRule
public static final ConfluentKafkaContainer kafkaContainerWithSchemaRegistry = new ConfluentKafkaContainer()
.withSchemaRegistry(true);
KafkaTemplate<String, Object> template =
new KafkaTemplateFactory(kafkaContainerWithSchemaRegistry)
.createKafkaTemplate(
new StringSerializer(),
new AvroSerializerFactory(kafkaContainerWithSchemaRegistry).createKafkaAvroSerializer(false));
This factory makes it possible to create listening containers.
public <K, V> KafkaMessageListenerContainer<K, V> createListenerContainer(
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer,
String group,
String topic,
int partitions,
MessageListener<?, ?> listener) throws Exception;
public <K, V> KafkaMessageListenerContainer<K, V> createListenerContainer(
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer,
String group,
String topic,
int partitions,
MessageListener<?, ?> listener,
Map<String, Object> optionalProperties) throws Exception;
public <K, V> KafkaMessageListenerContainer<K, V> createListenerContainer(
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer,
String group,
String[] topics,
int partitions,
MessageListener<?, ?> listener) throws Exception;
public <K, V> KafkaMessageListenerContainer<K, V> createListenerContainer(
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer,
String group,
String[] topics,
int partitions,
MessageListener<?, ?> listener,
Map<String, Object> optionalProperties) throws Exception;
public <K, V> KafkaMessageListenerContainer<K, V> createListenerContainer(
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer,
String group,
Pattern topicPattern,
int partitions,
MessageListener<?, ?> listener) throws Exception;
public <K, V> KafkaMessageListenerContainer<K, V> createListenerContainer(
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer,
String group,
Pattern topicPattern,
int partitions,
MessageListener<?, ?> listener,
Map<String, Object> optionalProperties) throws Exception;
public <K, V> KafkaMessageListenerContainer<K, V> createListenerContainer(Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer,
String group,
int partitions,
MessageListener<?, ?> listener,
Map<String, Object> optionalProperties,
ContainerProperties containerProperties) throws Exception;
Example:
@ClassRule
public static final ConfluentKafkaContainer kafkaContainer = new ConfluentKafkaContainer();
BlockingQueue<ConsumerRecord<String, String>> records = new LinkedBlockingQueue<>();
KafkaMessageListenerContainer<String, String> listenerContainer = new KafkaMessageListenerContainerFactory(kafkaContainer)
.createListenerContainer(
new StringDeserializer(),
new StringDeserializer(),
"an_group",
"my_topic",
1,
(MessageListener<String, String>) record -> records.add(record));
AVRO message:
@ClassRule
public static final ConfluentKafkaContainer kafkaContainerWithSchemaRegistry = new ConfluentKafkaContainer()
.withSchemaRegistry(true)
BlockingQueue<ConsumerRecord<String, WorkstationAvro>> records = new LinkedBlockingQueue<>();
KafkaMessageListenerContainer<String, Object> listenerContainer = new KafkaMessageListenerContainerFactory(kafkaContainerWithSchemaRegistry)
.createListenerContainer(
new StringDeserializer(),
new AvroSerializerFactory(kafkaContainerWithSchemaRegistry).createKafkaAvroDeserializer(false, true),
"an_group",
"my_topic_avro",
1,
(MessageListener<String, WorkstationAvro>) record -> records.add(record));