Skip to content

Unable to configure virtual thread based Executor #1103

Closed
@garfailed

Description

@garfailed

Spring Pulsar version: 1.2.4
Spring Boot version: 3.4.4
JDK version is 21

I tried to switch the threads on which the @PulsarListener methods run to virtual threads, but the thread name remained prefixed with EndpointContainer#0-0-C.

I'm unsure if I'm missing something, but I suspect the virtual thread setup doesn't work.

Why it doesn't seem to work

Firstly, because the @PulsarListener annotated method doesn't log using the custom thread prefix I set.
If the Spring Boot level virtual thread usage is configured, it doesn't use that thread prefix either.

Secondly, I added the following code snippet to a @SpringBootTest:

final List<String> containerClassTypes = applicationContext
    .getBean(PulsarListenerContainerRegistry.class)
    .getAllListenerContainers()
    .stream()
    .map(container -> {
        if (container instanceof ConcurrentPulsarMessageListenerContainer listenerContainer) {
            return listenerContainer.getContainerProperties().getConsumerTaskExecutor().getClass().getSimpleName();
        } else {
            return "Not concurrent pulsar listener container.";
        }
    })
    .toList();

The list only contained SimpleAsyncTaskExecutor values.

How I tried to switch to virtual threads

Attempt 1

I set the spring.threads.virtual.enabled property to true.

Attempt 2

I created a container factory customizer bean, as described in the documentation.

@Configuration
public class PulsarVirtualThreadConfig {

    @Bean
    public PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<Object>> customPulsarVirtualThreadListenerCustomization() {
        return factory -> factory
            .getContainerProperties()
            .setConsumerTaskExecutor(new VirtualThreadTaskExecutor("pulsar-consumer-virtual-thread-"));
    }

}

Possible reason for the problem

After debugging the Spring Pulsar code for a while, it seems to me that at some point there is a manual deep clone of container properties, and the executor property isn't copied. Because of this it'll always be null, and will always be set to a SimpleAsyncTaskExecutor.

Debugging steps in the v1.2.5 codebase

  1. PulsarListenerEndpointRegistrar:82
    1. It seems the @PulsarListener registrations start here, called by Spring during initialization.
  2. PulsarListenerEndpointRegistrar:91
    1. The factory here contains the VirtualThreadTaskExecutor set using either method described earlier.
  3. ConcurrentPulsarListenerContainerFactory:122
    1. This is the last place where we can observe the factory properties containing the VirtualThreadTaskExecutor in the consumerTaskExecutor field.
    2. The PulsarContainerProperties object passed into the new instance of ConcurrentPulsarMessageListenerContainer does not have its consumerTaskExecutor field set.
  4. When the doStart method is called on ConcurrentPulsarMessageListenerContainer, it'll eventually call the configureChildContainer method. Here, at line 124, the consumerTaskExecutor is always null, and a new SimpleAsyncTaskExecutor will be created.

Metadata

Metadata

Assignees

No one assigned

    Type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions