Skip to content

The Mqttv5PahoMessageHandler.publish() fails with a Connect already in progress (32110) under high concurrency #9597

Closed
@artembilan

Description

@artembilan

When input channel for the endpoint with Mqttv5PahoMessageHandler is an ExecutorChannel and a lot of concurrent messages are sent, the following exception can be thrown:

2024-10-25 16:41:31.164 [][] ERROR o.s.i.handler.LoggingHandler:250 - org.springframework.messaging.MessageHandlingException: Failed to publish to MQTT in the [bean 'mqttOutboundHandler' for component 'mqttOutboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#1'; defined in: 'class path resource [com/rms/config/MqttConfig.class]'; from source: 'com.rms.config.MqttConfig.mqttOutboundHandler(org.eclipse.paho.mqttv5.client.MqttConnectionOptions)'], failedMessage=GenericMessage [payload= MQTT MSG 510 from Spring Boot!, headers={replyChannel=nullChannel, errorChannel=, id=c38c5f3b-d4fb-56ac-6abb-f24e3c4d11d6, mqtt_topic=test/, timestamp=1729888891103}]
	at org.springframework.integration.mqtt.outbound.Mqttv5PahoMessageHandler.publish(Mqttv5PahoMessageHandler.java:283)
	at org.springframework.integration.mqtt.outbound.Mqttv5PahoMessageHandler.handleMessageInternal(Mqttv5PahoMessageHandler.java:222)
	at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
	at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:132)
	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
	at org.springframework.integration.dispatcher.UnicastingDispatcher$1.run(UnicastingDispatcher.java:129)
	at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:56)
	at java.base/java.util.concurrent.ThreadPerTaskExecutor$TaskRunner.run(ThreadPerTaskExecutor.java:314)
	at java.base/java.lang.VirtualThread.run(VirtualThread.java:309)
Caused by: Connect already in progress (32110)
	at org.eclipse.paho.mqttv5.client.MqttAsyncClient.connect(MqttAsyncClient.java:734)
	at org.eclipse.paho.mqttv5.client.MqttAsyncClient.connect(MqttAsyncClient.java:715)
	at org.springframework.integration.mqtt.outbound.Mqttv5PahoMessageHandler.publish(Mqttv5PahoMessageHandler.java:269)
	... 9 more

The code:

if (!this.mqttClient.isConnected()) {
	this.mqttClient.connect(this.connectionOptions).waitForCompletion(completionTimeout);
}

has to be guarded with a Lock to prevent concurrent connect() calls.

The workaround is to add a RequestHandlerRetryAdvice into this endpoint.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions