Skip to content

Commit 6659f87

Browse files
committed
continu to refactor
1 parent 8992ab5 commit 6659f87

File tree

2 files changed

+328
-0
lines changed

2 files changed

+328
-0
lines changed
Lines changed: 326 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,326 @@
1+
# -*- coding: utf-8 -*-
2+
3+
## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ##
4+
# InMemoryMqttWorker.py ---
5+
# --------------------------------
6+
# Copyright (c) 2025
7+
# L. CAPOCCHI (capocchi@univ-corse.fr)
8+
# SPE Lab - SISU Group - University of Corsica
9+
# --------------------------------
10+
# Version 1.0 last modified: 12/28/25
11+
## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ##
12+
#
13+
# GENERAL NOTES AND REMARKS:
14+
#
15+
# MQTT-specific implementation of InMemoryMessagingWorker
16+
#
17+
## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ##
18+
19+
import logging
20+
import time
21+
from typing import Dict, Any, Optional
22+
23+
from DEVSKernel.BrokerDEVS.Workers.InMemoryMessagingWorker import InMemoryMessagingWorker, MessageAdapter, MessageConsumer, MessageProducer
24+
from DEVSKernel.BrokerDEVS.logconfig import LOGGING_LEVEL
25+
26+
logger = logging.getLogger("DEVSKernel.BrokerDEVS.InMemoryMqttWorker")
27+
logger.setLevel(LOGGING_LEVEL)
28+
29+
try:
30+
import paho.mqtt.client as mqtt
31+
MQTT_AVAILABLE = True
32+
except Exception:
33+
MQTT_AVAILABLE = False
34+
logger.warning("paho-mqtt not available. Install with: pip install paho-mqtt")
35+
36+
37+
class MqttMessageConsumer:
38+
"""MQTT consumer wrapper providing consumer-like interface."""
39+
40+
def __init__(self, broker_address: str, port: int = 1883, **kwargs):
41+
"""
42+
Initialize MQTT consumer.
43+
44+
Args:
45+
broker_address: MQTT broker address
46+
port: MQTT broker port (default: 1883)
47+
**kwargs: Additional paho-mqtt client options (client_id, username, password, etc.)
48+
"""
49+
self.broker_address = broker_address
50+
self.port = port
51+
self.client = mqtt.Client()
52+
self.subscribed_topics = []
53+
self.last_message = None
54+
55+
# Set up callbacks
56+
self.client.on_connect = self._on_connect
57+
self.client.on_message = self._on_message
58+
59+
# Apply additional options
60+
if 'client_id' in kwargs:
61+
self.client._client_id = kwargs['client_id']
62+
if 'username' in kwargs and 'password' in kwargs:
63+
self.client.username_pw_set(kwargs['username'], kwargs['password'])
64+
65+
# Connect to broker
66+
self._connect()
67+
68+
def _on_connect(self, client, userdata, flags, rc):
69+
"""Callback for when client connects to broker."""
70+
if rc == 0:
71+
logger.debug("MQTT consumer connected successfully")
72+
else:
73+
logger.error("Failed to connect to MQTT broker, return code: %d", rc)
74+
75+
def _on_message(self, client, userdata, msg):
76+
"""Callback for when a PUBLISH message is received from the broker."""
77+
self.last_message = msg
78+
79+
def _connect(self) -> None:
80+
"""Connect to MQTT broker."""
81+
try:
82+
self.client.connect(self.broker_address, self.port, keepalive=60)
83+
self.client.loop_start()
84+
logger.debug("MQTT consumer connected to broker: %s:%d", self.broker_address, self.port)
85+
except Exception as e:
86+
logger.error("Failed to connect to MQTT broker: %s", e)
87+
88+
def subscribe(self, topics: list) -> None:
89+
"""
90+
Subscribe to topics.
91+
92+
Args:
93+
topics: List of topic names to subscribe to
94+
"""
95+
if isinstance(topics, str):
96+
topics = [topics]
97+
98+
for topic in topics:
99+
try:
100+
self.client.subscribe(topic, qos=1)
101+
self.subscribed_topics.append(topic)
102+
logger.debug("Subscribed to topic: %s", topic)
103+
except Exception as e:
104+
logger.error("Failed to subscribe to topic %s: %s", topic, e)
105+
106+
def poll(self, timeout: float = 1.0) -> Optional[Any]:
107+
"""
108+
Poll for messages.
109+
110+
Args:
111+
timeout: Timeout in seconds
112+
113+
Returns:
114+
Message if available, None otherwise
115+
"""
116+
if self.last_message is not None:
117+
msg = self.last_message
118+
self.last_message = None
119+
return msg
120+
121+
# Wait for a message
122+
time.sleep(min(timeout, 0.1))
123+
return self.last_message
124+
125+
def close(self) -> None:
126+
"""Close the consumer connection."""
127+
try:
128+
self.client.loop_stop()
129+
self.client.disconnect()
130+
logger.debug("MQTT consumer closed")
131+
except Exception as e:
132+
logger.error("Error closing MQTT consumer: %s", e)
133+
134+
135+
class MqttMessageProducer:
136+
"""MQTT producer wrapper providing producer-like interface."""
137+
138+
def __init__(self, broker_address: str, port: int = 1883, **kwargs):
139+
"""
140+
Initialize MQTT producer.
141+
142+
Args:
143+
broker_address: MQTT broker address
144+
port: MQTT broker port (default: 1883)
145+
**kwargs: Additional paho-mqtt client options (client_id, username, password, etc.)
146+
"""
147+
self.broker_address = broker_address
148+
self.port = port
149+
self.client = mqtt.Client()
150+
151+
# Set up callbacks
152+
self.client.on_connect = self._on_connect
153+
154+
# Apply additional options
155+
if 'client_id' in kwargs:
156+
self.client._client_id = kwargs['client_id']
157+
if 'username' in kwargs and 'password' in kwargs:
158+
self.client.username_pw_set(kwargs['username'], kwargs['password'])
159+
160+
# Connect to broker
161+
self._connect()
162+
163+
def _on_connect(self, client, userdata, flags, rc):
164+
"""Callback for when client connects to broker."""
165+
if rc == 0:
166+
logger.debug("MQTT producer connected successfully")
167+
else:
168+
logger.error("Failed to connect to MQTT broker, return code: %d", rc)
169+
170+
def _connect(self) -> None:
171+
"""Connect to MQTT broker."""
172+
try:
173+
self.client.connect(self.broker_address, self.port, keepalive=60)
174+
self.client.loop_start()
175+
logger.debug("MQTT producer connected to broker: %s:%d", self.broker_address, self.port)
176+
except Exception as e:
177+
logger.error("Failed to connect to MQTT broker: %s", e)
178+
179+
def produce(self, topic: str, value: bytes, **kwargs) -> None:
180+
"""
181+
Produce a message to a topic.
182+
183+
Args:
184+
topic: Topic name
185+
value: Message payload
186+
**kwargs: Additional options (ignored for MQTT)
187+
"""
188+
try:
189+
self.client.publish(topic, payload=value, qos=1)
190+
logger.debug("Published message to topic: %s (size=%d bytes)", topic, len(value))
191+
except Exception as e:
192+
logger.error("Failed to publish message: %s", e)
193+
194+
def flush(self, timeout: Optional[float] = None) -> None:
195+
"""Flush pending messages (no-op for MQTT)."""
196+
pass
197+
198+
def close(self) -> None:
199+
"""Close the producer connection."""
200+
try:
201+
self.client.loop_stop()
202+
self.client.disconnect()
203+
logger.debug("MQTT producer closed")
204+
except Exception as e:
205+
logger.error("Error closing MQTT producer: %s", e)
206+
207+
208+
class MqttMessageAdapter(MessageAdapter):
209+
"""MQTT-specific implementation of MessageAdapter"""
210+
211+
def __init__(self, broker_address: str = "localhost", port: int = 1883, **kwargs):
212+
"""
213+
Initialize MQTT adapter.
214+
215+
Args:
216+
broker_address: MQTT broker address
217+
port: MQTT broker port
218+
**kwargs: Additional MQTT options
219+
"""
220+
self.broker_address = broker_address
221+
self.port = port
222+
self.mqtt_options = kwargs
223+
224+
def create_consumer(self, config: Dict[str, Any]) -> MessageConsumer:
225+
"""Create an MQTT consumer"""
226+
# Extract broker info from config if provided, otherwise use adapter defaults
227+
broker = config.get('broker_address', self.broker_address)
228+
port = config.get('port', self.port)
229+
230+
# Extract MQTT options
231+
mqtt_opts = {k: v for k, v in config.items() if k not in ['broker_address', 'port']}
232+
mqtt_opts.update(self.mqtt_options)
233+
234+
return MqttMessageConsumer(broker, port, **mqtt_opts)
235+
236+
def create_producer(self, config: Dict[str, Any]) -> MessageProducer:
237+
"""Create an MQTT producer"""
238+
# Extract broker info from config if provided, otherwise use adapter defaults
239+
broker = config.get('broker_address', self.broker_address)
240+
port = config.get('port', self.port)
241+
242+
# Extract MQTT options
243+
mqtt_opts = {k: v for k, v in config.items() if k not in ['broker_address', 'port']}
244+
mqtt_opts.update(self.mqtt_options)
245+
246+
return MqttMessageProducer(broker, port, **mqtt_opts)
247+
248+
def extract_message_value(self, message: Any) -> bytes:
249+
"""Extract value from MQTT message"""
250+
return message.payload
251+
252+
def has_error(self, message: Any) -> bool:
253+
"""MQTT messages don't have errors in the same way (this is a no-op)"""
254+
return False
255+
256+
def get_topic(self, message: Any) -> str:
257+
"""Get topic from MQTT message"""
258+
return message.topic
259+
260+
261+
class InMemoryMqttWorker(InMemoryMessagingWorker):
262+
"""
263+
MQTT-specific worker thread that manages one atomic model in memory.
264+
This is a concrete implementation of InMemoryMessagingWorker for MQTT.
265+
"""
266+
267+
def __init__(
268+
self,
269+
model_name: str,
270+
aDEVS,
271+
broker_host: str = "localhost",
272+
broker_port: int = 1883,
273+
in_topic: str = None,
274+
out_topic: str = None,
275+
client_id: str = None,
276+
username: str = None,
277+
password: str = None
278+
):
279+
"""
280+
Initialize the MQTT worker.
281+
282+
Args:
283+
model_name: Name of the DEVS model
284+
aDEVS: The atomic DEVS model instance
285+
broker_host: MQTT broker hostname (default: localhost)
286+
broker_port: MQTT broker port (default: 1883)
287+
in_topic: Topic to receive messages from coordinator
288+
out_topic: Topic to send messages to coordinator
289+
client_id: MQTT client ID (default: auto-generated)
290+
username: MQTT username for authentication (optional)
291+
password: MQTT password for authentication (optional)
292+
"""
293+
if client_id is None:
294+
client_id = f"worker-{model_name}-{aDEVS.myID}-{int(time.time() * 1000)}"
295+
296+
consumer_config = {
297+
"broker_address": broker_host,
298+
"port": broker_port,
299+
"client_id": client_id,
300+
}
301+
302+
producer_config = {
303+
"broker_address": broker_host,
304+
"port": broker_port,
305+
"client_id": f"{client_id}-producer",
306+
}
307+
308+
# Add authentication if provided
309+
if username is not None:
310+
consumer_config["username"] = username
311+
producer_config["username"] = username
312+
if password is not None:
313+
consumer_config["password"] = password
314+
producer_config["password"] = password
315+
316+
adapter = MqttMessageAdapter(broker_host, broker_port)
317+
318+
super().__init__(
319+
model_name=model_name,
320+
aDEVS=aDEVS,
321+
adapter=adapter,
322+
consumer_config=consumer_config,
323+
producer_config=producer_config,
324+
in_topic=in_topic,
325+
out_topic=out_topic
326+
)

devsimpy/DEVSKernel/BrokerDEVS/Workers/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,14 @@
1414

1515
from DEVSKernel.BrokerDEVS.Workers.InMemoryMessagingWorker import InMemoryMessagingWorker
1616
from DEVSKernel.BrokerDEVS.Workers.InMemoryKafkaWorker import InMemoryKafkaWorker
17+
from DEVSKernel.BrokerDEVS.Workers.InMemoryMqttWorker import InMemoryMqttWorker
1718
from DEVSKernel.BrokerDEVS.Workers.InMemoryBrokerWorker import InMemoryBrokerWorker
1819
from DEVSKernel.BrokerDEVS.Workers.BrokerMS4MeWorker import BrokerMS4MeWorker
1920

2021
__all__ = [
2122
'InMemoryMessagingWorker',
2223
'InMemoryKafkaWorker',
24+
'InMemoryMqttWorker',
2325
'InMemoryBrokerWorker',
2426
'BrokerMS4MeWorker',
2527
]

0 commit comments

Comments
 (0)