Skip to content

[Bug]: JavascriptException: Error: stream.push() after EOF, js engine: hermes #1914

@sanduluca

Description

@sanduluca

MQTTjs Version

5.8.0

Broker

AWS

Environment

React Native (engine: Hermes)

Description

After releasing a new version of out react native app with the mqtt feature we started receiving crashes in Firebase crashlytics.
image

This is the only issue I found related to this problem
nodejs/readable-stream#207

Code
// mqtt.tsx
import React, { createContext, useContext } from 'react';
import mqtt from 'mqtt';

import useMqttConnection, { MqttError, MqttStatus } from 'hooks/useMqttConnection';
import { useAppSelector } from 'store';

interface MqttContextValue {
	mqttClient: mqtt.MqttClient | null;
	mqttStatus: MqttStatus;
	mqttError: MqttError | null;
	subscribeToTopic: (topic: string, ops?: mqtt.IClientSubscribeOptions) => void;
}

// @ts-ignore
const MqttContext = createContext<MqttContextValue>({ mqttClient: {} });

export const MqttProvider = ({ children }: React.PropsWithChildren) => {
	const isAuthenticated = useAppSelector(state => state.auth.authenticated);

	const { mqttClient, mqttStatus, mqttError, setMqttError, setMqttStatus } =
		useMqttConnection(isAuthenticated);

	const subscribeToTopic = (topic: string, ops: mqtt.IClientSubscribeOptions = { qos: 1 }) => {
		if (!mqttClient) return;

		mqttClient.subscribe(topic, ops, error => {
			if (error) {
				setMqttStatus('Error');
				setMqttError({
					type: 'MqttTopic',
					msg: error.message,
				});
			}
		});
	};

	return (
		<MqttContext.Provider
			value={{
				mqttClient,
				mqttStatus,
				mqttError,
				subscribeToTopic,
			}}
		>
			{children}
		</MqttContext.Provider>
	);
};

export const useMqtt = () => useContext(MqttContext);
// useMqttConnection.ts
import { useState, useEffect, useRef, useCallback } from 'react';
import mqtt, { Timer } from 'mqtt';
import BackgroundTimer from 'react-native-background-timer';

import { instance } from 'api';

export type MqttStatus = 'Connected' | 'Disconnected' | 'Offline' | 'Reconnecting' | 'Error';
export type MqttError = { type: string; msg: string };

interface WssDetails {
	signedUrl: string;
	clientId: string;
	validitySeconds: number;
}
// dont directly assign methods to timer object otherwise this throws: Cannot set property 'NaN' of undefined
const timer: Timer = {
	clear: id => BackgroundTimer.clearInterval(id),
	// @ts-expect-error
	set: (func, time) => BackgroundTimer.setInterval(func, time),
};

const getWssDetails = () => instance.get<WssDetails>('/app/wss');

function useMqttConnection(isAuthenticated: boolean) {
	const [mqttStatus, setMqttStatus] = useState<MqttStatus>('Disconnected');
	const [mqttError, setMqttError] = useState<MqttError | null>(null);
	const [mqttClient, setMqttClient] = useState<mqtt.MqttClient | null>(null);
	const [wssDetails, setWssDetails] = useState<WssDetails | null>(null);
	const wssDetailsRef = useRef<WssDetails | null>(wssDetails);
	const isFetchingWssDetails = useRef(false);
	wssDetailsRef.current = wssDetails;
	const hasWssDetails = !!wssDetailsRef.current;
	const doMqttConnection = isAuthenticated;

	const fetchWssDetails = useCallback(() => {
		isFetchingWssDetails.current = true;
		return getWssDetails()
			.then(r => {
				setWssDetails(r.data);
			})
			.catch(() => {
				// in case we could not fetch initial details, retry every 25 seconds
				setTimeout(() => {
					if (!wssDetailsRef.current) {
						fetchWssDetails();
					}
				}, 25000);
			})
			.finally(() => {
				isFetchingWssDetails.current = false;
			});
	}, []);

	useEffect(() => {
		if (!doMqttConnection) return;
		fetchWssDetails();
	}, [doMqttConnection, fetchWssDetails]);

	useEffect(() => {
		if (!doMqttConnection || !hasWssDetails) return;

		const transformWsUrl = (
			url: string,
			options: mqtt.IClientOptions,
			currentClient: mqtt.MqttClient,
		) => {
			if (!isFetchingWssDetails.current) {
				fetchWssDetails();
			}

			currentClient.options.clientId = wssDetailsRef.current!.clientId;
			return wssDetailsRef.current!.signedUrl;
		};

		const client = mqtt
			.connect(wssDetailsRef.current!.signedUrl, {
				clientId: wssDetailsRef.current!.clientId,
				reconnectPeriod: 5000,
				queueQoSZero: true,
				resubscribe: true,
				clean: true,
				keepalive: 60,
				protocolVersion: 5,
				properties: {
					sessionExpiryInterval: 600,
				},
				timerVariant: timer,
				transformWsUrl,
			})
			.on('connect', () => {
				setMqttStatus('Connected');
			})
			.on('error', error => {
				setMqttError({ type: 'MqttGeneral', msg: error.message });
			})
			.on('disconnect', () => {
				setMqttStatus('Disconnected');
			})
			.on('offline', () => {
				setMqttStatus('Offline');
			})
			.on('reconnect', () => {
				setMqttStatus('Reconnecting');
			})
			.on('close', () => {
				setMqttStatus('Disconnected');
			});

		setMqttClient(client);

		return () => {
			client.end();
		};
	}, [doMqttConnection, fetchWssDetails, hasWssDetails]);

	return {
		mqttClient,
		mqttStatus,
		mqttError,
		setMqttStatus,
		setMqttError,
	};
}

export default useMqttConnection;

Minimal Reproduction

Unfortunately we cannot reproduce this locally. We are only recording crashes in Firebase Crashlytics

Debug logs

Fatal Exception: com.facebook.react.common.JavascriptException: Error: stream.push() after EOF, js engine: hermes, stack:
anonymous@3372:233
c@3372:1426
S@3375:2499
anonymous@3375:3938
_@3478:2534
dispatchEvent@135:5649
anonymous@154:3328
value@52:779
value@49:935
value@37:3897
anonymous@37:693
value@37:2528
value@37:664

       at com.facebook.react.modules.core.ExceptionsManagerModule.reportException(ExceptionsManagerModule.java:65)
       at java.lang.reflect.Method.invoke(Method.java)
       at com.facebook.react.bridge.JavaMethodWrapper.invoke(JavaMethodWrapper.java:372)
       at com.facebook.react.bridge.JavaModuleWrapper.invoke(JavaModuleWrapper.java:146)
       at com.facebook.jni.NativeRunnable.run(NativeRunnable.java)
       at android.os.Handler.handleCallback(Handler.java:996)
       at android.os.Handler.dispatchMessage(Handler.java:110)
       at com.facebook.react.bridge.queue.MessageQueueThreadHandler.dispatchMessage(MessageQueueThreadHandler.java:27)
       at android.os.Looper.loopOnce(Looper.java:210)
       at android.os.Looper.loop(Looper.java:302)
       at com.facebook.react.bridge.queue.MessageQueueThreadImpl$4.run(MessageQueueThreadImpl.java:233)
       at java.lang.Thread.run(Thread.java:1012)

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions