Description
Spring Pulsar version: 1.2.4
Spring Boot version: 3.4.4
JDK version is 21
I tried to switch the threads on which the @PulsarListener
methods run to virtual threads, but the thread name remained prefixed with EndpointContainer#0-0-C
.
I'm unsure if I'm missing something, but I suspect the virtual thread setup doesn't work.
Why it doesn't seem to work
Firstly, because the @PulsarListener
annotated method doesn't log using the custom thread prefix I set.
If the Spring Boot level virtual thread usage is configured, it doesn't use that thread prefix either.
Secondly, I added the following code snippet to a @SpringBootTest
:
final List<String> containerClassTypes = applicationContext
.getBean(PulsarListenerContainerRegistry.class)
.getAllListenerContainers()
.stream()
.map(container -> {
if (container instanceof ConcurrentPulsarMessageListenerContainer listenerContainer) {
return listenerContainer.getContainerProperties().getConsumerTaskExecutor().getClass().getSimpleName();
} else {
return "Not concurrent pulsar listener container.";
}
})
.toList();
The list only contained SimpleAsyncTaskExecutor
values.
How I tried to switch to virtual threads
Attempt 1
I set the spring.threads.virtual.enabled
property to true
.
Attempt 2
I created a container factory customizer bean, as described in the documentation.
@Configuration
public class PulsarVirtualThreadConfig {
@Bean
public PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<Object>> customPulsarVirtualThreadListenerCustomization() {
return factory -> factory
.getContainerProperties()
.setConsumerTaskExecutor(new VirtualThreadTaskExecutor("pulsar-consumer-virtual-thread-"));
}
}
Possible reason for the problem
After debugging the Spring Pulsar code for a while, it seems to me that at some point there is a manual deep clone of container properties, and the executor property isn't copied. Because of this it'll always be null
, and will always be set to a SimpleAsyncTaskExecutor
.
Debugging steps in the v1.2.5
codebase
PulsarListenerEndpointRegistrar:82
- It seems the
@PulsarListener
registrations start here, called by Spring during initialization.
- It seems the
PulsarListenerEndpointRegistrar:91
- The factory here contains the
VirtualThreadTaskExecutor
set using either method described earlier.
- The factory here contains the
ConcurrentPulsarListenerContainerFactory:122
- This is the last place where we can observe the factory properties containing the
VirtualThreadTaskExecutor
in theconsumerTaskExecutor
field. - The
PulsarContainerProperties
object passed into the new instance ofConcurrentPulsarMessageListenerContainer
does not have itsconsumerTaskExecutor
field set.
- This is the last place where we can observe the factory properties containing the
- When the
doStart
method is called onConcurrentPulsarMessageListenerContainer
, it'll eventually call theconfigureChildContainer
method. Here, at line 124, theconsumerTaskExecutor
is alwaysnull
, and a newSimpleAsyncTaskExecutor
will be created.