Skip to content

Commit

Permalink
partial fix for #110, call Deserializer#configure and Serializer#conf…
Browse files Browse the repository at this point in the history
…igure (#673)

* partial fix for #110, call Deserializer#configure and Serializer#configure when KafkaConsumerProcessor sets the serde beans in Consumer and Producer configuration

* utilize the configs Map in the configure call, and add tests

* Update tests

* Remove wildcard imports

---------

Co-authored-by: Ed Mitchell <ed@wheel.com>
Co-authored-by: Guillermo Calvo <calvog@objectcomputing.com>
  • Loading branch information
3 people authored Aug 17, 2023
1 parent 7fd8c8b commit 44d7b8f
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import org.apache.kafka.common.serialization.Deserializer;

import io.micronaut.core.annotation.Nullable;
import org.apache.kafka.common.utils.Utils;

import java.util.Optional;
import java.util.Properties;

Expand Down Expand Up @@ -58,6 +60,9 @@ public Optional<Deserializer<K>> getKeyDeserializer() {
* @param keyDeserializer The key serializer
*/
public void setKeyDeserializer(@Nullable Deserializer<K> keyDeserializer) {
if (keyDeserializer != null) {
keyDeserializer.configure(Utils.propsToMap(getConfig()), true);
}
this.keyDeserializer = keyDeserializer;
}

Expand All @@ -75,6 +80,9 @@ public Optional<Deserializer<V>> getValueDeserializer() {
* @param valueDeserializer The value deserializer
*/
public void setValueDeserializer(@Nullable Deserializer<V> valueDeserializer) {
if (valueDeserializer != null) {
valueDeserializer.configure(Utils.propsToMap(getConfig()), false);
}
this.valueDeserializer = valueDeserializer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import org.apache.kafka.common.serialization.Serializer;

import io.micronaut.core.annotation.Nullable;
import org.apache.kafka.common.utils.Utils;

import java.util.Optional;
import java.util.Properties;

Expand Down Expand Up @@ -62,6 +64,9 @@ public Optional<Serializer<K>> getKeySerializer() {
* @param keySerializer The key serializer
*/
public void setKeySerializer(@Nullable Serializer<K> keySerializer) {
if (keySerializer != null) {
keySerializer.configure(Utils.propsToMap(getConfig()), true);
}
this.keySerializer = keySerializer;
}

Expand All @@ -79,6 +84,9 @@ public Optional<Serializer<V>> getValueSerializer() {
* @param valueSerializer The value serializer
*/
public void setValueSerializer(@Nullable Serializer<V> valueSerializer) {
if (valueSerializer != null) {
valueSerializer.configure(Utils.propsToMap(getConfig()), false);
}
this.valueSerializer = valueSerializer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,20 @@ import io.micronaut.context.env.EnvironmentPropertySource
import io.micronaut.context.env.MapPropertySource
import io.micronaut.context.exceptions.NoSuchBeanException
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import spock.lang.AutoCleanup
import spock.lang.Issue
import spock.lang.Specification

import java.nio.charset.StandardCharsets

import static io.micronaut.context.env.PropertySource.PropertyConvention.ENVIRONMENT_VARIABLE
import static org.apache.kafka.clients.consumer.ConsumerConfig.*

class KafkaConfigurationSpec extends Specification {

Expand All @@ -24,16 +31,16 @@ class KafkaConfigurationSpec extends Specification {
void "test default consumer configuration"() {
given:
applicationContext = ApplicationContext.builder().enableDefaultPropertySources(false)
.properties(("kafka." + KEY_DESERIALIZER_CLASS_CONFIG): StringDeserializer.name,
("kafka." + VALUE_DESERIALIZER_CLASS_CONFIG): StringDeserializer.name)
.properties(("kafka." + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG): StringDeserializer.name,
("kafka." + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG): StringDeserializer.name)
.run(ApplicationContext.class);

when:
AbstractKafkaConsumerConfiguration config = applicationContext.getBean(AbstractKafkaConsumerConfiguration)
Properties props = config.getConfig()

then:
props[BOOTSTRAP_SERVERS_CONFIG] == AbstractKafkaConfiguration.DEFAULT_BOOTSTRAP_SERVERS
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] == AbstractKafkaConfiguration.DEFAULT_BOOTSTRAP_SERVERS

when:
Consumer consumer = applicationContext.createBean(Consumer, config)
Expand All @@ -45,24 +52,76 @@ class KafkaConfigurationSpec extends Specification {
consumer.close()
}

void "test custom consumer deserializer"() {
given: "config with specific deserializer encodings"
applicationContext = ApplicationContext.run(
("kafka." + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG): StringDeserializer.name,
("kafka." + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG + ".encoding"): StandardCharsets.US_ASCII.name(),
("kafka." + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG): StringDeserializer.name,
("kafka." + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG + ".encoding"): StandardCharsets.ISO_8859_1.name(),
)

when: "custom deserializers are set"
AbstractKafkaConsumerConfiguration config = applicationContext.getBean(AbstractKafkaConsumerConfiguration)
config.setKeyDeserializer(new StringDeserializer())
config.setValueDeserializer(new StringDeserializer())

and: "a consumer is created"
KafkaConsumer consumer = applicationContext.createBean(Consumer, config)

then: "the new consumer's deserializers have the configured encoding"
consumer != null
(consumer.keyDeserializer as StringDeserializer).encoding == StandardCharsets.US_ASCII.name()
(consumer.valueDeserializer as StringDeserializer).encoding == StandardCharsets.ISO_8859_1.name()

cleanup:
consumer.close()
}

void "test custom producer serializer"() {
given: "config with specific serializer encodings"
applicationContext = ApplicationContext.run(
("kafka." + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG): StringSerializer.name,
("kafka." + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG + ".encoding"): StandardCharsets.US_ASCII.name(),
("kafka." + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG): StringSerializer.name,
("kafka." + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG + ".encoding"): StandardCharsets.ISO_8859_1.name(),
)

when: "custom serializers are set"
AbstractKafkaProducerConfiguration config = applicationContext.getBean(AbstractKafkaProducerConfiguration)
config.setKeySerializer(new StringSerializer())
config.setValueSerializer(new StringSerializer())

and: "a producer is created"
KafkaProducer producer = applicationContext.createBean(Producer, config)

then: "the new producer's serializers have the configured encoding"
producer != null
(producer.keySerializer as StringSerializer).encoding == StandardCharsets.US_ASCII.name()
(producer.valueSerializer as StringSerializer).encoding == StandardCharsets.ISO_8859_1.name()

cleanup:
producer.close()
}

void "test configure default properties"() {
given:
applicationContext = ApplicationContext.run(
('kafka.' + BOOTSTRAP_SERVERS_CONFIG): "localhost:1111",
('kafka.' + GROUP_ID_CONFIG): "mygroup",
('kafka.' + MAX_POLL_RECORDS_CONFIG): "100",
("kafka." + KEY_DESERIALIZER_CLASS_CONFIG): StringDeserializer.name,
("kafka." + VALUE_DESERIALIZER_CLASS_CONFIG): StringDeserializer.name
('kafka.' + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG): "localhost:1111",
('kafka.' + ConsumerConfig.GROUP_ID_CONFIG): "mygroup",
('kafka.' + ConsumerConfig.MAX_POLL_RECORDS_CONFIG): "100",
("kafka." + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG): StringDeserializer.name,
("kafka." + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG): StringDeserializer.name
)

when:
AbstractKafkaConsumerConfiguration config = applicationContext.getBean(AbstractKafkaConsumerConfiguration)
Properties props = config.getConfig()

then:
props[BOOTSTRAP_SERVERS_CONFIG] == "localhost:1111"
props[GROUP_ID_CONFIG] == "mygroup"
props[MAX_POLL_RECORDS_CONFIG] == "100"
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] == "localhost:1111"
props[ConsumerConfig.GROUP_ID_CONFIG] == "mygroup"
props[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] == "100"

when:
Consumer consumer = applicationContext.createBean(Consumer, config)
Expand All @@ -77,23 +136,23 @@ class KafkaConfigurationSpec extends Specification {
void "test override consumer default properties"() {
given:
applicationContext = ApplicationContext.run(
('kafka.' + BOOTSTRAP_SERVERS_CONFIG): "localhost:1111",
('kafka.' + GROUP_ID_CONFIG): "mygroup",
('kafka.consumers.default.' + BOOTSTRAP_SERVERS_CONFIG): "localhost:2222",
('kafka.' + GROUP_ID_CONFIG): "mygroup",
('kafka.consumers.default.' + MAX_POLL_RECORDS_CONFIG): "100",
("kafka.consumers.default." + KEY_DESERIALIZER_CLASS_CONFIG): StringDeserializer.name,
("kafka.consumers.default." + VALUE_DESERIALIZER_CLASS_CONFIG): StringDeserializer.name
('kafka.' + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG): "localhost:1111",
('kafka.' + ConsumerConfig.GROUP_ID_CONFIG): "mygroup",
('kafka.consumers.default.' + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG): "localhost:2222",
('kafka.' + ConsumerConfig.GROUP_ID_CONFIG): "mygroup",
('kafka.consumers.default.' + ConsumerConfig.MAX_POLL_RECORDS_CONFIG): "100",
("kafka.consumers.default." + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG): StringDeserializer.name,
("kafka.consumers.default." + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG): StringDeserializer.name
)

when:
KafkaConsumerConfiguration config = applicationContext.getBean(KafkaConsumerConfiguration)
Properties props = config.getConfig()

then:
props[BOOTSTRAP_SERVERS_CONFIG] == "localhost:2222"
props[GROUP_ID_CONFIG] == "mygroup"
props[MAX_POLL_RECORDS_CONFIG] == "100"
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] == "localhost:2222"
props[ConsumerConfig.GROUP_ID_CONFIG] == "mygroup"
props[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] == "100"

when:
Consumer consumer = applicationContext.createBean(Consumer, config)
Expand All @@ -108,19 +167,19 @@ class KafkaConfigurationSpec extends Specification {
void "test configure list fields default properties"() {
given:
applicationContext = ApplicationContext.run(
('kafka.' + BOOTSTRAP_SERVERS_CONFIG): ["localhost:1111", "localhost:1112"],
('kafka.' + GROUP_ID_CONFIG): "mygroup",
("kafka.consumers.default." + KEY_DESERIALIZER_CLASS_CONFIG): StringDeserializer.name,
("kafka.consumers.default." + VALUE_DESERIALIZER_CLASS_CONFIG): StringDeserializer.name
('kafka.' + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG): ["localhost:1111", "localhost:1112"],
('kafka.' + ConsumerConfig.GROUP_ID_CONFIG): "mygroup",
("kafka.consumers.default." + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG): StringDeserializer.name,
("kafka.consumers.default." + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG): StringDeserializer.name
)

when:
AbstractKafkaConsumerConfiguration config = applicationContext.getBean(AbstractKafkaConsumerConfiguration)
Properties props = config.getConfig()

then:
props[BOOTSTRAP_SERVERS_CONFIG] == "localhost:1111,localhost:1112"
props[GROUP_ID_CONFIG] == "mygroup"
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] == "localhost:1111,localhost:1112"
props[ConsumerConfig.GROUP_ID_CONFIG] == "mygroup"

when:
Consumer consumer = applicationContext.createBean(Consumer, config)
Expand Down Expand Up @@ -161,7 +220,7 @@ class KafkaConfigurationSpec extends Specification {
Properties props = config.config

then:
props[BOOTSTRAP_SERVERS_CONFIG] == 'localhost:1111,localhost:2222'
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] == 'localhost:1111,localhost:2222'

when: 'only env source, list, expect 3333,4444'
applicationContext.close()
Expand All @@ -172,7 +231,7 @@ class KafkaConfigurationSpec extends Specification {
props = config.config

then:
props[BOOTSTRAP_SERVERS_CONFIG] == 'localhost:3333,localhost:4444'
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] == 'localhost:3333,localhost:4444'

when: 'both sources, both list, expect 3333,4444'
applicationContext.close()
Expand All @@ -183,7 +242,7 @@ class KafkaConfigurationSpec extends Specification {
props = config.config

then:
props[BOOTSTRAP_SERVERS_CONFIG] == 'localhost:3333,localhost:4444'
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] == 'localhost:3333,localhost:4444'

when: 'both sources, single yaml, list env, expect 3333,4444'

Expand All @@ -198,7 +257,7 @@ class KafkaConfigurationSpec extends Specification {
props = config.config

then:
props[BOOTSTRAP_SERVERS_CONFIG] == 'localhost:3333,localhost:4444'
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] == 'localhost:3333,localhost:4444'

when: 'both sources, list yaml, single env, expect 3333'

Expand All @@ -213,7 +272,7 @@ class KafkaConfigurationSpec extends Specification {
props = config.config

then:
props[BOOTSTRAP_SERVERS_CONFIG] == 'localhost:3333'
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] == 'localhost:3333'

when: 'both sources, both single, expect 3333'

Expand All @@ -228,7 +287,7 @@ class KafkaConfigurationSpec extends Specification {
props = config.config

then:
props[BOOTSTRAP_SERVERS_CONFIG] == 'localhost:3333'
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] == 'localhost:3333'
}

void "test disabled"() {
Expand Down

0 comments on commit 44d7b8f

Please sign in to comment.