Skip to content

Review Kafka auto-configuration and support for defining additional beans #40174

Open
@notusedusername

Description

@notusedusername

Summary

Over-defining the ConsumerFactory instance in our configuration as stated in the spring-kafka docs won't take effect, but the default instance will be created and used instead. With a minor modification in the bean definition, the intended behavior can be achieved, but it isn't straightforward to do so.

To work the way the documentation states some spring-boot-autoconfigure related changes are required, or if it is not possible, then the spring-kafka documentation should be modified accordingly.

Details

I'm working with Spring Boot-based microservices connected by Kafka. The message values are in JSON format, in the serialized form of a common DTO, let's call it Document.

  • the Kafka connection is handled by spring-kafka
  • there's a JsonDeserializer for the Document
  • we are defining a ConsumerFactory and applying our custom deserializer to it
@Bean
public JsonDeserializer<Document> deserializer() {
    return new DocumentMessageDeserializer();
}

@Bean
public ConsumerFactory<String, Document> consumerFactory(
        JsonDeserializer<Document> deserializer,
        KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties) {
    return new DefaultKafkaConsumerFactory<>(
            kafkaBinderConfigurationProperties.mergedConsumerConfiguration(),
            new StringDeserializer(),
            deserializer);
}

Expected behavior

Based on the spring-kafka documentation we should be able to override the default factory this way with our custom instance with the custom deserializer.

What happens instead

The service will get runtime errors about failed deserialization instead, as it deserializes the message with the default into a byte[] and then tries to cast it into a Document.

java.lang.RuntimeException: java.lang.ClassCastException: class [B cannot be cast to class com.example.dto.Document ([B is in module java.base of loader 'bootstrap'; com.example.dto.Document is in unnamed module of loader org.springframework.boot.loader.LaunchedURLClassLoader @59e84876)

On the other hand, if the property-based config is provided, then the service runs without issues.

spring:
  kafka:
    consumer:
      value-deserializer: com.example.serializers.DocumentMessageDeserializer

Based on these symptoms I'd say, that not our ConsumerFactory instance is used when creating the Consumer, but the default one. It will pick up the property-based configuration, but if the property is not provided then it will use the default deserializer. (Verified it by putting breakpoint into the KafkaListenerContainerFactory creation in the KafkaAnnotationDrivenConfiguration, and the injected factory is in fact null.)

Our programmatically configured @Bean should override the default ConsumerFactory (so we should be able to deserialize the messages without setting the property).

Reason

I saw that in the KafkaAnnotationDrivenConfiguration the ConsumerFactory is injected like this:

@Bean
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
	ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
	ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory
	.getIfAvailable(() -> new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties())));
return factory;
}

The problem I found regarding this is that if we are defining a ConsumerFactory instance the way it is stated in the spring-kafka docs, it won't be injected here because the ConsumerFactory<String, Document> is not an instance of the ConsumerFactory<Object, Object>.

Workarounds

  • Using the property
    • if no other config overrides happen in the @Bean definition, then it would work as expected
  • Creating the whole ConcurrentKafkaListenerContainerFactory in our configuration
  • Creating the ConsumerFactory with wildcards like this:
 @Bean
public ConsumerFactory<?,?> consumerFactory(
        JsonDeserializer<Document> deserializer,
        KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties) {
    var config = new HashMap<>(kafkaBinderConfigurationProperties.mergedConsumerConfiguration());
    return new DefaultKafkaConsumerFactory<>(
            config,
            new StringDeserializer(),
            deserializer);
}

Proposed solution

IMO using wildcards when injecting the ConsumerFactory into the kafkaListenerContainerFactory would lead to complying with the kafka-spring documentation, the ConsumerFactory could be used with concrete type arguments instead of wildcards.

ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
			ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
			ObjectProvider<ConsumerFactory<?, ?>> kafkaConsumerFactory) {

If it is not possible for some reasons I'm not aware of, then the spring-kafka docs should be modified to contain the correct usage in the example.

Versions

  • spring-boot-autoconfigure:2.7.10
  • spring-kafka:2.8.11
    This specific part of the spring-boot-autoconfigure implementation is the same in the latest version too as I could see, so I think the same issue would persist with a version upgrade as well, but I didn't check that.

Activity

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions