Skip to content

On kotlin application, Spring kafka 3.2.0 doesn't take the message conversion logic, because of the wrong coroutine detection on MessagingMessageListenerAdapter #3277

Closed
@huisam

Description

@huisam

In what version(s) of Spring for Apache Kafka are you seeing this issue?

For example:

3.2.0

Between 3.1.4 and 3.2.0

Describe the bug

On the MessagingMessageListenerAdapter class there is a bug on the determineInferredType method.

boolean isCoroutines = KotlinDetector.isKotlinType(methodParameter.getParameterType());
isNotConvertible |= isCoroutines;

if you determine the coroutine function, you should have to use

KotlinDetector.isSuspendingFunction(Method method)

not isKotlinType method

because on kotlin, @KafkaListener method parameter type is always kotlin type. So the conversionNeeded flag changes to false.
This occurs messageConverters not working normally.

To Reproduce

@KafkaListener(topics = [TEST_KAFKA_TOPIC], groupId = TEST_KAFKA_TOPIC_GROUP)
fun onMessage(message: TestMessage) {
    execute()
}

data class TestMessage(val key: String)

Expected behavior

consumes normally and execute the execute() method but it occurs MethodArgumentNotValidException exception

org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException: Could not resolve method parameter at index 0 in public void com.company.test.spring.kafka.TestKafkaService.onMessage(com.company.test.spring.kafka.TestMessage): 1 error(s): [Error in object 'message': codes []; arguments []; default message [Payload value must not be empty]] 

Sample

sample is on the issue

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions