Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscriber does not receive any new messages after reconnect. #149

Open
sivaNbalusu5 opened this issue Jul 6, 2023 · 0 comments
Open

Subscriber does not receive any new messages after reconnect. #149

sivaNbalusu5 opened this issue Jul 6, 2023 · 0 comments

Comments

@sivaNbalusu5
Copy link

Hi,

I have a subscriber script working fine if at all there are no disconnects from the broker, but if the Connection is lost and reconnected after few seconds, It wont receive any new updates pushed to the topics its subscribed to

[2023-03-10 19:14:08,266] amqtt.client {client.py:505} WARNING - Disconnected from broker
[2023-03-10 19:14:08,268] amqtt.client {client.py:517} DEBUG - Auto-reconnecting
[2023-03-10 19:14:08,268] amqtt.client {client.py:228} DEBUG - Reconnecting with session parameters: Session(clientId=amqtt/sLBFm2:l;9FR:kFv, state=disconnected)
[2023-03-10 19:14:09,269] amqtt.client {client.py:235} DEBUG - Reconnect attempt 1 ...
[2023-03-10 19:14:09,281] amqtt.client {client.py:490} WARNING - MQTT connection failed: ConnectionRefusedError(61, "Connect call failed ('127.0.0.1', 8883)")
[2023-03-10 19:14:09,281] transitions.core {core.py:430} DEBUG - Executed machine preparation callbacks before conditions.
[2023-03-10 19:14:09,281] transitions.core {core.py:263} DEBUG - Initiating transition from state disconnected to state disconnected...
[2023-03-10 19:14:09,282] transitions.core {core.py:267} DEBUG - Executed callbacks before conditions.
[2023-03-10 19:14:09,282] transitions.core {core.py:273} DEBUG - Executed callback before transition.
[2023-03-10 19:14:09,282] transitions.core {core.py:133} DEBUG - Exiting state disconnected. Processing callbacks...
[2023-03-10 19:14:09,282] transitions.core {core.py:135} INFO - Finished processing state disconnected exit callbacks.
[2023-03-10 19:14:09,282] transitions.core {core.py:127} DEBUG - Entering state disconnected. Processing callbacks...
[2023-03-10 19:14:09,282] transitions.core {core.py:129} INFO - Finished processing state disconnected enter callbacks.
[2023-03-10 19:14:09,282] transitions.core {core.py:279} DEBUG - Executed callback after transition.
[2023-03-10 19:14:09,282] transitions.core {core.py:447} DEBUG - Executed machine finalize callbacks
[2023-03-10 19:14:09,282] amqtt.client {client.py:240} WARNING - Reconnection attempt failed: ConnectException(ConnectionRefusedError(61, "Connect call failed ('127.0.0.1', 8883)"))
[2023-03-10 19:14:09,282] amqtt.client {client.py:248} DEBUG - Waiting 2 second before next attempt
[2023-03-10 19:14:11,284] amqtt.client {client.py:235} DEBUG - Reconnect attempt 2 ...
[2023-03-10 19:14:11,303] amqtt.client.plugins {manager.py:153} DEBUG - Plugins len(_fired_events)=1
[2023-03-10 19:14:11,304] amqtt.client.plugins.packet_logger_plugin {logging.py:45} DEBUG - amqtt/sLBFm2:l;9FR:kFv -out-> ConnectPacket(ts=2023-03-10 19:14:11.303740, fixed=MQTTFixedHeader(length=34, flags=0x0), variable=ConnectVariableHeader(proto_name=MQTT, proto_level=4, flags=0x2, keepalive=59), payload=ConnectVariableHeader(client_id=amqtt/sLBFm2:l;9FR:kFv, will_topic=None, will_message=None, username=None, password=None))
[2023-03-10 19:14:11,314] amqtt.client.plugins {manager.py:153} DEBUG - Plugins len(_fired_events)=1
[2023-03-10 19:14:11,314] amqtt.client.plugins.packet_logger_plugin {logging.py:34} DEBUG - amqtt/sLBFm2:l;9FR:kFv <-in-- ConnackPacket(ts=2023-03-10 19:14:11.314284, fixed=MQTTFixedHeader(length=2, flags=0x0), variable=ConnackVariableHeader(session_parent=0x0, return_code=0x0), payload=None)
[2023-03-10 19:14:11,314] amqtt.mqtt.protocol.handler {handler.py:404} DEBUG - amqtt/sLBFm2:l;9FR:kFv Starting reader coro
[2023-03-10 19:14:11,314] amqtt.mqtt.protocol.handler {handler.py:137} DEBUG - Handler tasks started
[2023-03-10 19:14:11,314] amqtt.mqtt.protocol.handler {handler.py:174} DEBUG - Begin messages delivery retries
[2023-03-10 19:14:11,314] amqtt.mqtt.protocol.handler {handler.py:190} DEBUG - End messages delivery retries
[2023-03-10 19:14:11,314] amqtt.mqtt.protocol.handler {handler.py:139} DEBUG - Handler ready
[2023-03-10 19:14:11,314] transitions.core {core.py:430} DEBUG - Executed machine preparation callbacks before conditions.
[2023-03-10 19:14:11,314] transitions.core {core.py:263} DEBUG - Initiating transition from state disconnected to state connected...
[2023-03-10 19:14:11,314] transitions.core {core.py:267} DEBUG - Executed callbacks before conditions.
[2023-03-10 19:14:11,314] transitions.core {core.py:273} DEBUG - Executed callback before transition.
[2023-03-10 19:14:11,314] transitions.core {core.py:133} DEBUG - Exiting state disconnected. Processing callbacks...
[2023-03-10 19:14:11,314] transitions.core {core.py:135} INFO - Finished processing state disconnected exit callbacks.
[2023-03-10 19:14:11,314] transitions.core {core.py:127} DEBUG - Entering state connected. Processing callbacks...
[2023-03-10 19:14:11,315] transitions.core {core.py:129} INFO - Finished processing state connected enter callbacks.
[2023-03-10 19:14:11,315] transitions.core {core.py:279} DEBUG - Executed callback after transition.
[2023-03-10 19:14:11,315] transitions.core {core.py:447} DEBUG - Executed machine finalize callbacks
[2023-03-10 19:14:11,315] amqtt.client {client.py:470} DEBUG - connected to localhost:8883
[2023-03-10 19:14:11,315] amqtt.client {client.py:502} DEBUG - Watch broker disconnection
[2023-03-10 19:15:10,316] amqtt.mqtt.protocol.handler {client_handler.py:88} DEBUG - Scheduling Ping
[2023-03-10 19:15:10,318] amqtt.client.plugins {manager.py:153} DEBUG - Plugins len(_fired_events)=1
[2023-03-10 19:15:10,318] amqtt.client.plugins.packet_logger_plugin {logging.py:45} DEBUG - amqtt/sLBFm2:l;9FR:kFv -out-> PingReqPacket(ts=2023-03-10 19:15:10.317854, fixed=MQTTFixedHeader(length=0, flags=0x0), variable=None, payload=None)

[2023-03-10 19:15:10,318] amqtt.mqtt.protocol.handler {handler.py:494} DEBUG - amqtt/sLBFm2:l;9FR:kFv Input stream read timeout

here is the example is used:

import asyncio

from amqtt.client import MQTTClient, ClientException
from amqtt.mqtt.constants import QOS_1, QOS_2

logger = logging.getLogger(name)
config = {
            'keep_alive': 60,
            'ping_delay': 1,
            'auto_reconnect': True,
            'reconnect_max_interval': 10,
            'reconnect_retries': 100,
        }


async def uptime_coro():
	C = MQTTClient(config=config)
	await C.connect('mqtts://localhost:8883',cafile="cert_with_ca.pem", cleansession=True)
	await C.subscribe([(str('sa/feedback_data'), QOS_1)])
	while True:
		try:
			message = await C.deliver_message()
			packet = message.publish_packet
			print( str(packet.payload.data))
		except ClientException as ce:
			logger.error("Client exception: %s" % ce)


if name == 'main':
	formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
	logging.basicConfig(level=logging.DEBUG, format=formatter)
	asyncio.get_event_loop().run_until_complete(uptime_coro())```

_Originally posted by @sivaNbalusu5 in https://github.com/Yakifo/amqtt/discussions/146_
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant