Skip to content

Commit 48845e1

Browse files
committed
GH-61: Add (De)Serializer options for Factories
Fixes GH-61 (#61) Added setter injections for both (De)Serializers in Consumer and Producer
1 parent e857c24 commit 48845e1

File tree

4 files changed

+123
-11
lines changed

4 files changed

+123
-11
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,28 +22,55 @@
2222
import org.apache.kafka.clients.consumer.Consumer;
2323
import org.apache.kafka.clients.consumer.ConsumerConfig;
2424
import org.apache.kafka.clients.consumer.KafkaConsumer;
25+
import org.apache.kafka.common.serialization.Deserializer;
2526

2627
/**
2728
* The {@link ConsumerFactory} implementation to produce a new {@link Consumer} instance
28-
* for provided {@link Map} {@code configs} on each {@link #createConsumer()}
29+
* for provided {@link Map} {@code configs} and optional {@link Deserializer} {@code keyDeserializer},
30+
* {@code valueDeserializer} implementations on each {@link #createConsumer()}
2931
* invocation.
3032
*
3133
* @param <K> the key type.
3234
* @param <V> the value type.
3335
*
3436
* @author Gary Russell
37+
* @author Murali Reddy
3538
*/
3639
public class DefaultKafkaConsumerFactory<K, V> implements ConsumerFactory<K, V> {
3740

3841
private final Map<String, Object> configs;
3942

43+
private Deserializer<K> keyDeserializer;
44+
45+
private Deserializer<V> valueDeserializer;
46+
4047
public DefaultKafkaConsumerFactory(Map<String, Object> configs) {
48+
this(configs, null, null);
49+
}
50+
51+
public DefaultKafkaConsumerFactory(Map<String, Object> configs,
52+
Deserializer<K> keyDeserializer,
53+
Deserializer<V> valueDeserializer) {
4154
this.configs = new HashMap<>(configs);
55+
this.keyDeserializer = keyDeserializer;
56+
this.valueDeserializer = valueDeserializer;
57+
}
58+
59+
public void setKeyDeserializer(Deserializer<K> keyDeserializer) {
60+
this.keyDeserializer = keyDeserializer;
61+
}
62+
63+
public void setValueDeserializer(Deserializer<V> valueDeserializer) {
64+
this.valueDeserializer = valueDeserializer;
4265
}
4366

4467
@Override
4568
public Consumer<K, V> createConsumer() {
46-
return new KafkaConsumer<>(this.configs);
69+
return createKafkaConsumer();
70+
}
71+
72+
protected KafkaConsumer<K, V> createKafkaConsumer() {
73+
return new KafkaConsumer<K, V>(this.configs, this.keyDeserializer, this.valueDeserializer);
4774
}
4875

4976
@Override

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
import org.apache.commons.logging.Log;
2626
import org.apache.commons.logging.LogFactory;
27+
2728
import org.apache.kafka.clients.producer.Callback;
2829
import org.apache.kafka.clients.producer.KafkaProducer;
2930
import org.apache.kafka.clients.producer.Producer;
@@ -32,6 +33,7 @@
3233
import org.apache.kafka.common.Metric;
3334
import org.apache.kafka.common.MetricName;
3435
import org.apache.kafka.common.PartitionInfo;
36+
import org.apache.kafka.common.serialization.Serializer;
3537

3638
import org.springframework.beans.factory.DisposableBean;
3739
import org.springframework.context.Lifecycle;
@@ -40,6 +42,11 @@
4042
* The {@link ProducerFactory} implementation for the {@code singleton} shared {@link Producer}
4143
* instance.
4244
* <p>
45+
* This implementation will produce a new {@link Producer} instance
46+
* for provided {@link Map} {@code configs} and optional {@link Serializer} {@code keySerializer},
47+
* {@code valueSerializer} implementations on each {@link #createProducer()}
48+
* invocation.
49+
* <p>
4350
* The {@link Producer} instance is freed from the external {@link Producer#close()} invocation
4451
* with the internal wrapper. The real {@link Producer#close()} is called on the target
4552
* {@link Producer} during the {@link Lifecycle#stop()} or {@link DisposableBean#destroy()}.
@@ -48,6 +55,7 @@
4855
* @param <V> the value type.
4956
*
5057
* @author Gary Russell
58+
* @author Murali Reddy
5159
*/
5260
public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>, Lifecycle, DisposableBean {
5361

@@ -57,10 +65,29 @@ public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>,
5765

5866
private volatile CloseSafeProducer<K, V> producer;
5967

68+
private Serializer<K> keySerializer;
69+
70+
private Serializer<V> valueSerializer;
71+
6072
private volatile boolean running;
6173

6274
public DefaultKafkaProducerFactory(Map<String, Object> configs) {
75+
this(configs, null, null);
76+
}
77+
78+
public DefaultKafkaProducerFactory(Map<String, Object> configs, Serializer<K> keySerializer,
79+
Serializer<V> valueSerializer) {
6380
this.configs = new HashMap<>(configs);
81+
this.keySerializer = keySerializer;
82+
this.valueSerializer = valueSerializer;
83+
}
84+
85+
public void setKeySerializer(Serializer<K> keySerializer) {
86+
this.keySerializer = keySerializer;
87+
}
88+
89+
public void setValueSerializer(Serializer<V> valueSerializer) {
90+
this.valueSerializer = valueSerializer;
6491
}
6592

6693
@Override
@@ -100,13 +127,17 @@ public Producer<K, V> createProducer() {
100127
if (this.producer == null) {
101128
synchronized (this) {
102129
if (this.producer == null) {
103-
this.producer = new CloseSafeProducer<K, V>(new KafkaProducer<K, V>(this.configs));
130+
this.producer = new CloseSafeProducer<K, V>(createKafkaProducer());
104131
}
105132
}
106133
}
107134
return this.producer;
108135
}
109136

137+
protected KafkaProducer<K, V> createKafkaProducer() {
138+
return new KafkaProducer<K, V>(this.configs, this.keySerializer, this.valueSerializer);
139+
}
140+
110141
private static class CloseSafeProducer<K, V> implements Producer<K, V> {
111142

112143
private final Producer<K, V> delegate;

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -187,15 +187,15 @@ public PlatformTransactionManager transactionManager() {
187187

188188
@Bean
189189
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
190-
kafkaListenerContainerFactory() {
190+
kafkaListenerContainerFactory() {
191191
SimpleKafkaListenerContainerFactory<Integer, String> factory = new SimpleKafkaListenerContainerFactory<>();
192192
factory.setConsumerFactory(consumerFactory());
193193
return factory;
194194
}
195195

196196
@Bean
197197
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
198-
kafkaJsonListenerContainerFactory() {
198+
kafkaJsonListenerContainerFactory() {
199199
SimpleKafkaListenerContainerFactory<Integer, String> factory = new SimpleKafkaListenerContainerFactory<>();
200200
factory.setConsumerFactory(consumerFactory());
201201
factory.setMessageConverter(new StringJsonMessageConverter());
@@ -204,7 +204,7 @@ public PlatformTransactionManager transactionManager() {
204204

205205
@Bean
206206
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
207-
kafkaManualAckListenerContainerFactory() {
207+
kafkaManualAckListenerContainerFactory() {
208208
SimpleKafkaListenerContainerFactory<Integer, String> factory = new SimpleKafkaListenerContainerFactory<>();
209209
factory.setConsumerFactory(manualConsumerFactory());
210210
factory.setAckMode(AckMode.MANUAL_IMMEDIATE);
@@ -213,7 +213,7 @@ public PlatformTransactionManager transactionManager() {
213213

214214
@Bean
215215
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
216-
kafkaAutoStartFalseListenerContainerFactory() {
216+
kafkaAutoStartFalseListenerContainerFactory() {
217217
SimpleKafkaListenerContainerFactory<Integer, String> factory = new SimpleKafkaListenerContainerFactory<>();
218218
factory.setConsumerFactory(consumerFactory());
219219
factory.setAutoStartup(false);
@@ -222,7 +222,7 @@ public PlatformTransactionManager transactionManager() {
222222

223223
@Bean
224224
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
225-
kafkaRebalanceListenerContainerFactory() {
225+
kafkaRebalanceListenerContainerFactory() {
226226
SimpleKafkaListenerContainerFactory<Integer, String> factory = new SimpleKafkaListenerContainerFactory<>();
227227
factory.setConsumerFactory(consumerFactory());
228228
factory.setConsumerRebalanceListener(consumerRebalanceListener());
@@ -366,9 +366,9 @@ public void listen4(@Payload String foo, Acknowledgment ack) {
366366
}
367367

368368
@KafkaListener(id = "fiz", topicPartitions = {
369-
@TopicPartition(topic = "annotated5", partitions = {"0", "1"}),
370-
@TopicPartition(topic = "annotated6", partitions = {"0", "1"})
371-
})
369+
@TopicPartition(topic = "annotated5", partitions = { "0", "1" }),
370+
@TopicPartition(topic = "annotated6", partitions = { "0", "1" })
371+
})
372372
public void listen5(ConsumerRecord<?, ?> record) {
373373
this.record = record;
374374
this.latch5.countDown();

spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,60 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
172172
logger.info("Stop auto");
173173
}
174174

175+
@Test
176+
public void testAutoCommitWithRebalanceListener() throws Exception {
177+
logger.info("Start auto");
178+
Map<String, Object> props = KafkaTestUtils.consumerProps("test10", "true", embeddedKafka);
179+
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props);
180+
ConcurrentMessageListenerContainer<Integer, String> container =
181+
new ConcurrentMessageListenerContainer<>(cf, topic1);
182+
final CountDownLatch latch = new CountDownLatch(4);
183+
container.setMessageListener(new MessageListener<Integer, String>() {
184+
185+
@Override
186+
public void onMessage(ConsumerRecord<Integer, String> message) {
187+
logger.info("auto: " + message);
188+
latch.countDown();
189+
}
190+
});
191+
final CountDownLatch rebalancePartitionsAssignedLatch = new CountDownLatch(2);
192+
final CountDownLatch rebalancePartitionsRevokedLatch = new CountDownLatch(2);
193+
container.setConsumerRebalanceListener(new ConsumerRebalanceListener() {
194+
195+
@Override
196+
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
197+
logger.info("In test, partitions revoked:" + partitions);
198+
rebalancePartitionsRevokedLatch.countDown();
199+
}
200+
201+
@Override
202+
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
203+
logger.info("In test, partitions assigned:" + partitions);
204+
rebalancePartitionsAssignedLatch.countDown();
205+
}
206+
207+
});
208+
209+
container.setConcurrency(2);
210+
container.setBeanName("testAuto");
211+
container.start();
212+
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
213+
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
214+
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<Integer, String>(senderProps);
215+
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
216+
template.setDefaultTopic(topic1);
217+
template.send(0, "foo");
218+
template.send(2, "bar");
219+
template.send(0, "baz");
220+
template.send(2, "qux");
221+
template.flush();
222+
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
223+
assertThat(rebalancePartitionsAssignedLatch.await(60, TimeUnit.SECONDS)).isTrue();
224+
assertThat(rebalancePartitionsRevokedLatch.await(60, TimeUnit.SECONDS)).isTrue();
225+
container.stop();
226+
logger.info("Stop auto");
227+
}
228+
175229
@Test
176230
public void testAfterListenCommit() throws Exception {
177231
logger.info("Start manual");

0 commit comments

Comments
 (0)