Description
Describe the issue
Given an application with two instances and:
- Uses the kafka binder
- Declares a pollable
The actuator endpoint /actuator/bindings
lasts around 100s if the call is done to the instance which has no assigned partitions.
To Reproduce
With the attached application the behavior can be reproduced. In order to simulate two instances of the same application, it declares two pollables with the same group pointing to the same topic.
To startup the application, a kafka broker is needed.
To reproduce:
- Start up the attached application
- Call to the endpoint
http://localhost:8080/actuator/bindings
Expected behavior
The call to the endpoint should not last around 100s
Version of the framework
Spring Boot: 3.0.4
spring-cloud-dependencies: 2022.0.1
Additional context
The issue is due to the class org.springframework.integration.kafka.inbound.KafkaMessageSource
is synchronized and used for polling and to retrieve the binding state from the bindings endpoint.
On the one hand, it defines the following timeouts:
this.pollTimeout = Duration.ofMillis(consumerProperties.getPollTimeout());
this.assignTimeout = Duration.ofMillis(Math.max(this.pollTimeout.toMillis() * 20, MIN_ASSIGN_TIMEOUT)); // NOSONAR - magic
Note that the default pollTimeout
is 5 seconds, so the assignTimeout
will be 100s.
And it uses them here at the method pollRecord
:
ConsumerRecords<K, V> records = this.consumer
poll(this.assignedPartitions.isEmpty() ? this.assignTimeout : this.pollTimeout);
Note that the assignTimeout
is used if there are no assignedPartitions
On the other hand, when calling the endpoint, the method of the same class isRunning
is called (which is syncronized).
Maybe to avoid the synchronization point, the state running and stoped could be manage at the class DefaultBinding
, same as it does with the "paused" state. In that manner, the polling and the retrieving of the state would not collision.