Description
Summary
Over-defining the ConsumerFactory
instance in our configuration as stated in the spring-kafka
docs won't take effect, but the default instance will be created and used instead. With a minor modification in the bean definition, the intended behavior can be achieved, but it isn't straightforward to do so.
To work the way the documentation states some spring-boot-autoconfigure
related changes are required, or if it is not possible, then the spring-kafka
documentation should be modified accordingly.
Details
I'm working with Spring Boot-based microservices connected by Kafka. The message values are in JSON format, in the serialized form of a common DTO, let's call it Document
.
- the Kafka connection is handled by
spring-kafka
- there's a
JsonDeserializer
for theDocument
- we are defining a
ConsumerFactory
and applying our custom deserializer to it
@Bean
public JsonDeserializer<Document> deserializer() {
return new DocumentMessageDeserializer();
}
@Bean
public ConsumerFactory<String, Document> consumerFactory(
JsonDeserializer<Document> deserializer,
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties) {
return new DefaultKafkaConsumerFactory<>(
kafkaBinderConfigurationProperties.mergedConsumerConfiguration(),
new StringDeserializer(),
deserializer);
}
Expected behavior
Based on the spring-kafka
documentation we should be able to override the default factory this way with our custom instance with the custom deserializer.
What happens instead
The service will get runtime errors about failed deserialization instead, as it deserializes the message with the default into a byte[]
and then tries to cast it into a Document
.
java.lang.RuntimeException: java.lang.ClassCastException: class [B cannot be cast to class com.example.dto.Document ([B is in module java.base of loader 'bootstrap'; com.example.dto.Document is in unnamed module of loader org.springframework.boot.loader.LaunchedURLClassLoader @59e84876)
On the other hand, if the property-based config is provided, then the service runs without issues.
spring:
kafka:
consumer:
value-deserializer: com.example.serializers.DocumentMessageDeserializer
Based on these symptoms I'd say, that not our ConsumerFactory
instance is used when creating the Consumer
, but the default one. It will pick up the property-based configuration, but if the property is not provided then it will use the default deserializer. (Verified it by putting breakpoint into the KafkaListenerContainerFactory
creation in the KafkaAnnotationDrivenConfiguration
, and the injected factory is in fact null.)
Our programmatically configured @Bean
should override the default ConsumerFactory
(so we should be able to deserialize the messages without setting the property).
Reason
I saw that in the KafkaAnnotationDrivenConfiguration
the ConsumerFactory
is injected like this:
@Bean
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory
.getIfAvailable(() -> new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties())));
return factory;
}
The problem I found regarding this is that if we are defining a ConsumerFactory
instance the way it is stated in the spring-kafka
docs, it won't be injected here because the ConsumerFactory<String, Document>
is not an instance of the ConsumerFactory<Object, Object>
.
Workarounds
- Using the property
- if no other config overrides happen in the
@Bean
definition, then it would work as expected
- if no other config overrides happen in the
- Creating the whole
ConcurrentKafkaListenerContainerFactory
in our configuration - Creating the
ConsumerFactory
with wildcards like this:
@Bean
public ConsumerFactory<?,?> consumerFactory(
JsonDeserializer<Document> deserializer,
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties) {
var config = new HashMap<>(kafkaBinderConfigurationProperties.mergedConsumerConfiguration());
return new DefaultKafkaConsumerFactory<>(
config,
new StringDeserializer(),
deserializer);
}
Proposed solution
IMO using wildcards when injecting the ConsumerFactory
into the kafkaListenerContainerFactory
would lead to complying with the kafka-spring
documentation, the ConsumerFactory
could be used with concrete type arguments instead of wildcards.
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ObjectProvider<ConsumerFactory<?, ?>> kafkaConsumerFactory) {
If it is not possible for some reasons I'm not aware of, then the spring-kafka
docs should be modified to contain the correct usage in the example.
Versions
spring-boot-autoconfigure:2.7.10
spring-kafka:2.8.11
This specific part of thespring-boot-autoconfigure
implementation is the same in the latest version too as I could see, so I think the same issue would persist with a version upgrade as well, but I didn't check that.
Activity