-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Closed
Closed
Copy link
Labels
Description
MQTTjs Version
5.8.0
Broker
AWS
Environment
React Native (engine: Hermes)
Description
After releasing a new version of out react native app with the mqtt feature we started receiving crashes in Firebase crashlytics.

This is the only issue I found related to this problem
nodejs/readable-stream#207
Code
// mqtt.tsx
import React, { createContext, useContext } from 'react';
import mqtt from 'mqtt';
import useMqttConnection, { MqttError, MqttStatus } from 'hooks/useMqttConnection';
import { useAppSelector } from 'store';
interface MqttContextValue {
mqttClient: mqtt.MqttClient | null;
mqttStatus: MqttStatus;
mqttError: MqttError | null;
subscribeToTopic: (topic: string, ops?: mqtt.IClientSubscribeOptions) => void;
}
// @ts-ignore
const MqttContext = createContext<MqttContextValue>({ mqttClient: {} });
export const MqttProvider = ({ children }: React.PropsWithChildren) => {
const isAuthenticated = useAppSelector(state => state.auth.authenticated);
const { mqttClient, mqttStatus, mqttError, setMqttError, setMqttStatus } =
useMqttConnection(isAuthenticated);
const subscribeToTopic = (topic: string, ops: mqtt.IClientSubscribeOptions = { qos: 1 }) => {
if (!mqttClient) return;
mqttClient.subscribe(topic, ops, error => {
if (error) {
setMqttStatus('Error');
setMqttError({
type: 'MqttTopic',
msg: error.message,
});
}
});
};
return (
<MqttContext.Provider
value={{
mqttClient,
mqttStatus,
mqttError,
subscribeToTopic,
}}
>
{children}
</MqttContext.Provider>
);
};
export const useMqtt = () => useContext(MqttContext);// useMqttConnection.ts
import { useState, useEffect, useRef, useCallback } from 'react';
import mqtt, { Timer } from 'mqtt';
import BackgroundTimer from 'react-native-background-timer';
import { instance } from 'api';
export type MqttStatus = 'Connected' | 'Disconnected' | 'Offline' | 'Reconnecting' | 'Error';
export type MqttError = { type: string; msg: string };
interface WssDetails {
signedUrl: string;
clientId: string;
validitySeconds: number;
}
// dont directly assign methods to timer object otherwise this throws: Cannot set property 'NaN' of undefined
const timer: Timer = {
clear: id => BackgroundTimer.clearInterval(id),
// @ts-expect-error
set: (func, time) => BackgroundTimer.setInterval(func, time),
};
const getWssDetails = () => instance.get<WssDetails>('/app/wss');
function useMqttConnection(isAuthenticated: boolean) {
const [mqttStatus, setMqttStatus] = useState<MqttStatus>('Disconnected');
const [mqttError, setMqttError] = useState<MqttError | null>(null);
const [mqttClient, setMqttClient] = useState<mqtt.MqttClient | null>(null);
const [wssDetails, setWssDetails] = useState<WssDetails | null>(null);
const wssDetailsRef = useRef<WssDetails | null>(wssDetails);
const isFetchingWssDetails = useRef(false);
wssDetailsRef.current = wssDetails;
const hasWssDetails = !!wssDetailsRef.current;
const doMqttConnection = isAuthenticated;
const fetchWssDetails = useCallback(() => {
isFetchingWssDetails.current = true;
return getWssDetails()
.then(r => {
setWssDetails(r.data);
})
.catch(() => {
// in case we could not fetch initial details, retry every 25 seconds
setTimeout(() => {
if (!wssDetailsRef.current) {
fetchWssDetails();
}
}, 25000);
})
.finally(() => {
isFetchingWssDetails.current = false;
});
}, []);
useEffect(() => {
if (!doMqttConnection) return;
fetchWssDetails();
}, [doMqttConnection, fetchWssDetails]);
useEffect(() => {
if (!doMqttConnection || !hasWssDetails) return;
const transformWsUrl = (
url: string,
options: mqtt.IClientOptions,
currentClient: mqtt.MqttClient,
) => {
if (!isFetchingWssDetails.current) {
fetchWssDetails();
}
currentClient.options.clientId = wssDetailsRef.current!.clientId;
return wssDetailsRef.current!.signedUrl;
};
const client = mqtt
.connect(wssDetailsRef.current!.signedUrl, {
clientId: wssDetailsRef.current!.clientId,
reconnectPeriod: 5000,
queueQoSZero: true,
resubscribe: true,
clean: true,
keepalive: 60,
protocolVersion: 5,
properties: {
sessionExpiryInterval: 600,
},
timerVariant: timer,
transformWsUrl,
})
.on('connect', () => {
setMqttStatus('Connected');
})
.on('error', error => {
setMqttError({ type: 'MqttGeneral', msg: error.message });
})
.on('disconnect', () => {
setMqttStatus('Disconnected');
})
.on('offline', () => {
setMqttStatus('Offline');
})
.on('reconnect', () => {
setMqttStatus('Reconnecting');
})
.on('close', () => {
setMqttStatus('Disconnected');
});
setMqttClient(client);
return () => {
client.end();
};
}, [doMqttConnection, fetchWssDetails, hasWssDetails]);
return {
mqttClient,
mqttStatus,
mqttError,
setMqttStatus,
setMqttError,
};
}
export default useMqttConnection;Minimal Reproduction
Unfortunately we cannot reproduce this locally. We are only recording crashes in Firebase Crashlytics
Debug logs
Fatal Exception: com.facebook.react.common.JavascriptException: Error: stream.push() after EOF, js engine: hermes, stack:
anonymous@3372:233
c@3372:1426
S@3375:2499
anonymous@3375:3938
_@3478:2534
dispatchEvent@135:5649
anonymous@154:3328
value@52:779
value@49:935
value@37:3897
anonymous@37:693
value@37:2528
value@37:664
at com.facebook.react.modules.core.ExceptionsManagerModule.reportException(ExceptionsManagerModule.java:65)
at java.lang.reflect.Method.invoke(Method.java)
at com.facebook.react.bridge.JavaMethodWrapper.invoke(JavaMethodWrapper.java:372)
at com.facebook.react.bridge.JavaModuleWrapper.invoke(JavaModuleWrapper.java:146)
at com.facebook.jni.NativeRunnable.run(NativeRunnable.java)
at android.os.Handler.handleCallback(Handler.java:996)
at android.os.Handler.dispatchMessage(Handler.java:110)
at com.facebook.react.bridge.queue.MessageQueueThreadHandler.dispatchMessage(MessageQueueThreadHandler.java:27)
at android.os.Looper.loopOnce(Looper.java:210)
at android.os.Looper.loop(Looper.java:302)
at com.facebook.react.bridge.queue.MessageQueueThreadImpl$4.run(MessageQueueThreadImpl.java:233)
at java.lang.Thread.run(Thread.java:1012)