Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mqttv5PahoMessageDrivenChannelAdapter.addTopic(topic, qos) throws IndexOutOfBoundsException #8873

Closed
RXCheese opened this issue Jan 30, 2024 · 2 comments

Comments

@RXCheese
Copy link

RXCheese commented Jan 30, 2024

Version 6.2.1

pom dependency

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-core</artifactId>
    <version>6.2.1</version>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stream</artifactId>
    <version>6.2.1</version>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
    <version>6.2.1</version>
</dependency>
<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.mqttv5.client</artifactId>
    <version>1.2.5</version>
</dependency>

bean

@Bean(name = "adapter")
public MessageProducerSupport mqttInbound( MqttConnectionOptions mqttConnectionOptions, String mqttClientId) {
    Mqttv5PahoMessageDrivenChannelAdapter adapter = new Mqttv5PahoMessageDrivenChannelAdapter(
            mqttConnectionOptions, mqttClientId.concat("_consumer"),
            mqttProperties.receiveTopicsName());
    adapter.connectComplete(true);
    adapter.setPayloadType(String.class);
    adapter.setManualAcks(false);
    adapter.setOutputChannel(inboundChannel);
    return adapter;
}

bean inject and use

@Resource
private Mqttv5PahoMessageDrivenChannelAdapter adapter;

@Override
public void subscribe(String topic) {
    adapter.addTopic(topic);
}

Describe the bug

Follow the code to run, adapter.addTopic(topic) will throw IndexOutOfBoundsException

Caused by: java.lang.IndexOutOfBoundsException: Index 0 out of bounds for length 0
	at java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
	at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
	at java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:266)
	at java.base/java.util.Objects.checkIndex(Objects.java:359)
	at java.base/java.util.ArrayList.get(ArrayList.java:427)
	at org.eclipse.paho.mqttv5.client.MqttAsyncClient.subscribe(MqttAsyncClient.java:1276)
	at org.eclipse.paho.mqttv5.client.MqttAsyncClient.subscribe(MqttAsyncClient.java:1205)
	at org.springframework.integration.mqtt.inbound.Mqttv5PahoMessageDrivenChannelAdapter.addTopic(Mqttv5PahoMessageDrivenChannelAdapter.java:279)
	at org.springframework.integration.mqtt.inbound.AbstractMqttMessageDrivenChannelAdapter.addTopic(AbstractMqttMessageDrivenChannelAdapter.java:293)
	at com.uatair.airport.driver.mqtt.service.impl.MqttTopicServiceImpl.subscribe(MqttTopicServiceImpl.java:20)
	at com.uatair.airport.driver.mqtt.status.StatusHandler.productStatusTopo(StatusHandler.java:52)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169)
	at org.springframework.integration.handler.support.IntegrationInvocableHandlerMethod.doInvoke(IntegrationInvocableHandlerMethod.java:45)
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119)
	at org.springframework.integration.handler.support.MessagingMethodInvokerHelper$HandlerMethod.invoke(MessagingMethodInvokerHelper.java:1086)
	at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.invokeHandlerMethod(MessagingMethodInvokerHelper.java:569)
	at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:482)
	at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:360)
	at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:114)
	... 67 more

The error location according to IndexOutOfBoundsException in Mqttv5PahoMessageDrivenChannelAdapter at line 279

this.mqttClient.subscribe(new MqttSubscription(topic, qos), this::messageArrived).waitForCompletion(getCompletionTimeout());

@Override
public void addTopic(String topic, int qos) {
this.topicLock.lock();
try {
super.addTopic(topic, qos);
if (this.mqttClient != null && this.mqttClient.isConnected()) {
this.mqttClient.subscribe(new MqttSubscription(topic, qos), this::messageArrived)
.waitForCompletion(getCompletionTimeout());
}
}
catch (MqttException ex) {
throw new MessagingException("Failed to subscribe to topic " + topic, ex);
}
finally {
this.topicLock.unlock();
}
}

in AbstractMqttMessageDrivenChannelAdapter at line 293

/**
* Add a topic (or topics) to the subscribed list (qos=1).
* @param topics The topics.
* @throws MessagingException if the topics is already in the list.
* @since 4.1
*/
@ManagedOperation
public void addTopic(String... topics) {
validateTopics(topics);
this.topicLock.lock();
try {
for (String t : topics) {
addTopic(t, 1);
}
}
finally {
this.topicLock.unlock();
}
}

To Reproduce

as mentioned above

Expected behavior

adapter.addTopic(topic); success to subscribe the topic

@RXCheese RXCheese added status: waiting-for-triage The issue need to be evaluated and its future decided type: bug labels Jan 30, 2024
@artembilan artembilan added this to the 6.3.0-M1 milestone Jan 30, 2024
@artembilan
Copy link
Member

The bug is, essentially, in the MQTT Paho Client:

	public IMqttToken subscribe(MqttSubscription subscription, IMqttMessageListener messageListener)
			throws MqttException {
		return this.subscribe(new MqttSubscription[] { subscription }, null, null, messageListener,
				new MqttProperties());
	}

Where that new MqttProperties() indeed comes with an empty publishSubscriptionIdentifiers.

I suggest to raise an issue on their side: https://github.com/eclipse/paho.mqtt.java.

Meanwhile we will provide some workaround via calling:

	public IMqttToken subscribe(MqttSubscription[] subscriptions, Object userContext, MqttActionListener callback,
			IMqttMessageListener messageListener, MqttProperties subscriptionProperties) throws MqttException;

API with null for those properties like we do for normal subscription on start:

this.mqttClient.subscribe(subscriptions, null, null, listeners, null)

artembilan added a commit that referenced this issue Jan 30, 2024
Fixes: #8873

The `mqttClient.subscribe()` API does not check if properties are provided and fails on the
`subscriptionProperties.getSubscriptionIdentifiers().get(0)` call with an `IndexOutOfBoundsException`

* Use another `mqttClient.subscribe()` API in the `Mqttv5PahoMessageDrivenChannelAdapter` where there is not such a check
* Ensure that `addTopic(NAME)` works as expected in the `Mqttv5BackToBackTests`

**Cherry-pick to `6.2.x` & `6.1.x`**

# Conflicts:
#	spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java
artembilan added a commit that referenced this issue Jan 30, 2024
Fixes: #8873

The `mqttClient.subscribe()` API does not check if properties are provided and fails on the
`subscriptionProperties.getSubscriptionIdentifiers().get(0)` call with an `IndexOutOfBoundsException`

* Use another `mqttClient.subscribe()` API in the `Mqttv5PahoMessageDrivenChannelAdapter` where there is not such a check
* Ensure that `addTopic(NAME)` works as expected in the `Mqttv5BackToBackTests`

**Cherry-pick to `6.2.x` & `6.1.x`**

# Conflicts:
#	spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java
@RXCheese
Copy link
Author

I traced the bug in MQTT Paho Client .

https://github.com/eclipse/paho.mqtt.java/blob/master/org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/MqttAsyncClient.java
#L1278

@Override
public IMqttToken subscribe(MqttSubscription[] subscriptions, Object userContext, MqttActionListener callback,
	IMqttMessageListener messageListener, MqttProperties subscriptionProperties) throws MqttException {

int subId = 0;
try { 
	subId = subscriptionProperties.getSubscriptionIdentifiers().get(0);   <--- throw IndexOutOfBoundsException at this
} catch (IndexOutOfBoundsException e) {
	log.fine(CLASS_NAME, "subscribe", "No sub subscription property(s)");
}

And I found same problem in MQTT Paho Client issues https://github.com/eclipse/paho.mqtt.java/issues/984
But this repository doesn't seem to have been updated for a long time. Sad.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants