Skip to content

Commit 751f854

Browse files
committed
Added callback dispatcher
1 parent 3138176 commit 751f854

File tree

10 files changed

+252
-192
lines changed

10 files changed

+252
-192
lines changed

canopen/emcy.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
from typing import Callable, List, Optional
88

99
from canopen.async_guard import ensure_not_async
10-
from canopen.utils import call_callbacks
1110
import canopen.network
1211

1312

@@ -26,6 +25,7 @@ def __init__(self):
2625
self.active: List[EmcyError] = []
2726
self.callbacks = []
2827
self.emcy_received = threading.Condition()
28+
self.network: canopen.network.Network = canopen.network._UNINITIALIZED_NETWORK
2929

3030
# @callback # NOTE: called from another thread
3131
@ensure_not_async # NOTE: Safeguard for accidental async use
@@ -44,8 +44,7 @@ def on_emcy(self, can_id, data, timestamp):
4444
self.emcy_received.notify_all()
4545

4646
# Call all registered callbacks
47-
# FIXME: Add the nework loop to the callback
48-
call_callbacks(self.callbacks, None, entry)
47+
self.network.dispatch_callbacks(self.callbacks, entry)
4948

5049
def add_callback(self, callback: Callable[[EmcyError], None]):
5150
"""Get notified on EMCY messages from this node.

canopen/network.py

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ def __init__(self, bus: Optional[can.BusABC] = None, notifier: Optional[can.Noti
4040
#: :meth:`canopen.Network.connect` is called
4141
self.bus: Optional[BusABC] = bus
4242
self.loop: Optional[asyncio.AbstractEventLoop] = loop
43+
self._tasks: set[asyncio.Task] = set()
4344
#: A :class:`~canopen.network.NodeScanner` for detecting nodes
4445
self.scanner = NodeScanner(self)
4546
#: List of :class:`can.Listener` objects.
@@ -119,6 +120,12 @@ def connect(self, *args, **kwargs) -> Network:
119120
self.bus = can.Bus(*args, **kwargs)
120121
logger.info("Connected to '%s'", self.bus.channel_info)
121122
if self.notifier is None:
123+
# Do not start a can notifier with the async loop. It changes the
124+
# behavior of the notifier callbacks. Instead of running the
125+
# callbacks from a separate thread, it runs the callbacks in the
126+
# same thread as the event loop where blocking calls are not allowed.
127+
# This library needs to support both async and sync, so we need to
128+
# use the notifier in a separate thread.
122129
self.notifier = can.Notifier(self.bus, [], self.NOTIFIER_CYCLE)
123130
for listener in self.listeners:
124131
self.notifier.add_listener(listener)
@@ -148,6 +155,15 @@ def __enter__(self):
148155
def __exit__(self, type, value, traceback):
149156
self.disconnect()
150157

158+
async def __aenter__(self):
159+
# FIXME: When TaskGroup are available, we should use them to manage the
160+
# tasks. The user must use the `async with` statement with the Network
161+
# to ensure its created.
162+
return self
163+
164+
async def __aexit__(self, type, value, traceback):
165+
self.disconnect()
166+
151167
# FIXME: Implement async "aadd_node"
152168

153169
def add_node(
@@ -264,11 +280,44 @@ def notify(self, can_id: int, data: bytearray, timestamp: float) -> None:
264280
Timestamp of the message, preferably as a Unix timestamp
265281
"""
266282
if can_id in self.subscribers:
267-
callbacks = self.subscribers[can_id]
268-
for callback in callbacks:
269-
callback(can_id, data, timestamp)
283+
self.dispatch_callbacks(self.subscribers[can_id], can_id, data, timestamp)
270284
self.scanner.on_message_received(can_id)
271285

286+
def on_error(self, exc: BaseException) -> None:
287+
"""This method is called to handle any exception in the callbacks."""
288+
289+
# Exceptions in any callbaks should not affect CAN processing
290+
logger.exception("Exception in callback: %s", exc_info=exc)
291+
292+
def dispatch_callbacks(self, callbacks: List[Callback], *args) -> None:
293+
"""Dispatch a list of callbacks with the given arguments.
294+
295+
:param callbacks:
296+
List of callbacks to call
297+
:param args:
298+
Arguments to pass to the callbacks
299+
"""
300+
def task_done(task: asyncio.Task) -> None:
301+
"""Callback to be called when a task is done."""
302+
self._tasks.discard(task)
303+
304+
# FIXME: This section should probably be migrated to a TaskGroup.
305+
# However, this is not available yet in Python 3.8 - 3.10.
306+
try:
307+
if (exc := task.exception()) is not None:
308+
self.on_error(exc)
309+
except (asyncio.CancelledError, asyncio.InvalidStateError) as exc:
310+
# Handle cancelled tasks and unfinished tasks gracefully
311+
self.on_error(exc)
312+
313+
# Run the callbacks
314+
for callback in callbacks:
315+
result = callback(*args)
316+
if result is not None and asyncio.iscoroutine(result):
317+
task = asyncio.create_task(result)
318+
self._tasks.add(task)
319+
task.add_done_callback(task_done)
320+
272321
def check(self) -> None:
273322
"""Check that no fatal error has occurred in the receiving thread.
274323
@@ -397,7 +446,7 @@ def on_message_received(self, msg):
397446
self.network.notify(msg.arbitration_id, msg.data, msg.timestamp)
398447
except Exception as e:
399448
# Exceptions in any callbaks should not affect CAN processing
400-
logger.error(str(e))
449+
self.network.on_error(e)
401450

402451
def stop(self) -> None:
403452
"""Override abstract base method to release any resources."""

canopen/nmt.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
from typing import Callable, Optional, TYPE_CHECKING
77

88
from canopen.async_guard import ensure_not_async
9-
from canopen.utils import call_callbacks
109
import canopen.network
1110

1211
if TYPE_CHECKING:
@@ -144,7 +143,7 @@ def on_heartbeat(self, can_id, data, timestamp):
144143
self.state_update.notify_all()
145144

146145
# Call all registered callbacks
147-
call_callbacks(self._callbacks, self.network.loop, new_state)
146+
self.network.dispatch_callbacks(self._callbacks, new_state)
148147

149148
def send_command(self, code: int):
150149
"""Send an NMT command code to the node.

canopen/node/remote.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ def associate_network(self, network: canopen.network.Network):
5959
self.tpdo.network = network
6060
self.rpdo.network = network
6161
self.nmt.network = network
62+
self.emcy.network = network
6263
for sdo in self.sdo_channels:
6364
network.subscribe(sdo.tx_cobid, sdo.on_response)
6465
network.subscribe(0x700 + self.id, self.nmt.on_heartbeat)
@@ -79,6 +80,7 @@ def remove_network(self) -> None:
7980
self.tpdo.network = canopen.network._UNINITIALIZED_NETWORK
8081
self.rpdo.network = canopen.network._UNINITIALIZED_NETWORK
8182
self.nmt.network = canopen.network._UNINITIALIZED_NETWORK
83+
self.emcy.network = canopen.network._UNINITIALIZED_NETWORK
8284

8385
def add_sdo(self, rx_cobid, tx_cobid):
8486
"""Add an additional SDO channel.

canopen/pdo/base.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
from canopen import variable
1414
from canopen.async_guard import ensure_not_async
1515
from canopen.sdo import SdoAbortedError
16-
from canopen.utils import call_callbacks
1716

1817
if TYPE_CHECKING:
1918
from canopen import LocalNode, RemoteNode
@@ -339,7 +338,7 @@ def on_message(self, can_id, data, timestamp):
339338
self.receive_condition.notify_all()
340339

341340
# Call all registered callbacks
342-
call_callbacks(self.callbacks, self.pdo_node.network.loop, self)
341+
self.pdo_node.network.dispatch_callbacks(self.callbacks, self)
343342

344343
def add_callback(self, callback: Callable[[PdoMap], None]) -> None:
345344
"""Add a callback which will be called on receive.

canopen/utils.py

Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
"""Additional utility functions for canopen."""
22

3-
import asyncio
4-
from typing import Optional, Union, Iterable, Callable
3+
from typing import Optional, Union
54

65

76
def pretty_index(index: Optional[Union[int, str]],
@@ -22,24 +21,3 @@ def pretty_index(index: Optional[Union[int, str]],
2221
sub_str = f"{sub!r}"
2322

2423
return ":".join(s for s in (index_str, sub_str) if s)
25-
26-
27-
def call_callbacks(callbacks: Iterable[Callable], loop: asyncio.AbstractEventLoop | None = None, *args, **kwargs) -> bool:
28-
"""Call a list of callbacks with the given arguments.
29-
30-
"""
31-
32-
def dispatch():
33-
for callback in callbacks:
34-
result = callback(*args, **kwargs)
35-
if result is not None and asyncio.iscoroutine(result):
36-
asyncio.create_task(result)
37-
38-
# If the loop is running, call the callbacks from the loop to minimize
39-
# blocking and multithreading issues.
40-
if loop is not None and loop.is_running():
41-
loop.call_soon_threadsafe(dispatch)
42-
return False
43-
else:
44-
dispatch()
45-
return True

0 commit comments

Comments
 (0)