Skip to content

Add asyncio support [WIP] #359

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 49 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
5900056
Add central typings to ease project navigation
sveinse Oct 11, 2021
b48c472
First working concept
sveinse Oct 12, 2021
1be9102
Migrate to vars access methods and deprecate attr
sveinse Oct 25, 2021
5631a03
Adding support for reading PDO map from OD
sveinse Oct 25, 2021
6c6367c
Updated README.rst
sveinse Nov 14, 2021
c9a69e9
Fix bugs
sveinse Nov 14, 2021
09fe0a3
Merge branch 'christiansandberg:master' into feature-asyncio
sveinse Nov 14, 2021
6aac78c
Added loop to connect()
sveinse Nov 14, 2021
dbaeb87
Refactor on async callbacks
sveinse Nov 29, 2021
215d585
Merge branch 'christiansandberg:master' into feature-asyncio
sveinse Nov 29, 2021
3b5f869
Merge pull request #1 from christiansandberg/master
sveinse Feb 5, 2022
afd9f5c
Added more support for async
sveinse Sep 12, 2022
0a0157d
Merge pull request #2 from christiansandberg/master
sveinse Sep 12, 2022
db01e4c
Implement async guarding to prevent accidental blocking IO
sveinse Nov 26, 2022
6715f33
Merge upstream 'master' into feature-asyncio
sveinse Nov 26, 2022
ea7dbe5
Minor formatting updates
sveinse Nov 26, 2022
95daae2
Merge branch 'christiansandberg:master' into feature-asyncio
sveinse Jan 10, 2023
2616f12
fix typo
Mar 8, 2023
4061f71
Handle timeout in aread_response
Mar 13, 2023
e6ce8f6
Merge pull request #3 from mrk-its/fix-typo
sveinse Mar 25, 2023
e664747
Merge pull request #4 from mrk-its/aread_response_timeout
sveinse Mar 25, 2023
8c74fdc
Merge pull request #5 from christiansandberg/master
sveinse Mar 25, 2023
56ed224
Annotation and fixes
sveinse Mar 27, 2023
30d695d
Merge 'master' into feature-asyncio
sveinse Apr 26, 2024
abbc2dc
Updated after merging in master
sveinse Apr 26, 2024
41e028d
Minor improvements
sveinse May 12, 2024
67420a1
Improvements
sveinse May 14, 2024
1f2a3f4
Minor housekeeping updates
sveinse May 17, 2024
9dd782e
Merge branch 'master' into feature-asyncio
sveinse May 17, 2024
d0160a5
Merge branch 'master' into feature-asyncio
sveinse May 18, 2024
fe08d89
Merge branch 'master' into feature-asyncio
sveinse May 18, 2024
aa292a6
Migrate SDO client to another thread which allow reuse of existing co…
sveinse May 18, 2024
fd3be01
Merge branch 'master' into feature-asyncio
sveinse Jun 14, 2024
6dca2e1
Merge master into feature-asyncio
sveinse Aug 31, 2024
59a7643
Minor fixes
sveinse Aug 31, 2024
bd749cd
Fixup the canopen connect mechanism for async
sveinse Feb 2, 2025
dba463a
Merge master into feature-asyncio
sveinse Feb 2, 2025
46f9b4a
Workaround for NMT Slave to avoid blocking IO
sveinse Feb 2, 2025
8260d7b
Merge branch 'canopen-python:master' into feature-asyncio
sveinse Apr 12, 2025
a5de223
Comments and notes update
sveinse May 2, 2025
fdb6414
Unittests for running async and non-async
sveinse May 2, 2025
edc0444
Refurbish async guard system
sveinse May 2, 2025
2204ef3
Merge branch 'master' into feature-asyncio
sveinse May 2, 2025
535f975
Update tests for async
sveinse May 2, 2025
c1e3659
Minor comment and typing updates
sveinse May 4, 2025
e3c84eb
Implement the framwork for sync back-end
sveinse May 4, 2025
3138176
Minor improvements
sveinse May 8, 2025
751f854
Added callback dispatcher
sveinse May 15, 2025
e9ef593
Merge 'master' into feature-asyncio
sveinse Jun 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 103 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
CANopen for Python
==================
CANopen for Python, asyncio port
================================

A Python implementation of the CANopen_ standard.
The aim of the project is to support the most common parts of the CiA 301
Expand All @@ -8,6 +8,41 @@ automation tasks rather than a standard compliant master implementation.

The library supports Python 3.8 or newer.

This library is the asyncio port of CANopen. See below for code example.


Branch notes
------------
This branch is work in progress, where the intent is to concept test running
the backend callbacks and unchanged from the sync version. The sync-async
crossing is done via sync waiting via `asyncio.to_thread()` in each class
that needs it.

The goal was to simplify the impact of the async changes. Having an async
backend requires a lot of duplication of code.


Async status
------------

The remaining work for feature complete async implementation:

* Implement :code:`ABlockUploadStream`, :code:`ABlockDownloadStream` and
:code:`ATextIOWrapper` for async in :code:`SdoClient`

* Implement :code:`EcmyConsumer.wait()` for async

* Implement async in :code:`LssMaster``

* Async implementation of :code:`BaseNode402`

* Implement async variant of :code:`Network.add_node`. This will probably also
add need of async variant of :code:`input_from_node` in eds.py

* Update unittests for async

* Update documentation and examples


Features
--------
Expand Down Expand Up @@ -156,6 +191,72 @@ The :code:`n` is the PDO index (normally 1 to 4). The second form of access is f
network.disconnect()


Asyncio
-------

This library can be used with asyncio.

.. code-block:: python

import asyncio
import canopen
import can

async def my_node(network, nodeid, od):

# Create the node object and load the OD
node = network.add_node(nodeid, od)

# Read the PDOs from the remote
await node.tpdo.aread()
await node.rpdo.aread()

# Set the module state
node.nmt.set_state('OPERATIONAL')

# Set motor speed via SDO
await node.sdo['MotorSpeed'].aset_raw(2)

while True:

# Wait for TPDO 1
t = await node.tpdo[1].await_for_reception(1)
if not t:
continue

# Get the TPDO 1 value
rpm = node.tpdo[1]['MotorSpeed Actual'].get_raw()
print(f'SPEED on motor {nodeid}:', rpm)

# Sleep a little
await asyncio.sleep(0.2)

# Send RPDO 1 with some data
node.rpdo[1]['Some variable'].set_phys(42)
node.rpdo[1].transmit()

async def main():

# Start with creating a network representing one CAN bus
network = canopen.Network()

# Connect to the CAN bus
# Arguments are passed to python-can's can.Bus() constructor
# (see https://python-can.readthedocs.io/en/latest/bus.html).
# Note the loop parameter to enable asyncio operation
loop = asyncio.get_event_loop()
network.connect(interface='pcan', bitrate=1000000, loop=loop)

# Create two independent tasks for two nodes 51 and 52 which will run concurrently
task1 = asyncio.create_task(my_node(network, 51, '/path/to/object_dictionary.eds'))
task2 = asyncio.create_task(my_node(network, 52, '/path/to/object_dictionary.eds'))

# Wait for both to complete (which will never happen)
await asyncio.gather((task1, task2))

asyncio.run(main())


Debugging
---------

Expand Down
30 changes: 30 additions & 0 deletions canopen/async_guard.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
""" Utils for async """
import functools
import logging
import threading
import traceback

# NOTE: Global, but needed to be able to use ensure_not_async() in
# decorator context.
_ASYNC_SENTINELS: dict[int, bool] = {}

logger = logging.getLogger(__name__)


def set_async_sentinel(enable: bool):
""" Register a function to validate if async is running """
_ASYNC_SENTINELS[threading.get_ident()] = enable


def ensure_not_async(fn):
""" Decorator that will ensure that the function is not called if async
is running.
"""
@functools.wraps(fn)
def async_guard_wrap(*args, **kwargs):
if _ASYNC_SENTINELS.get(threading.get_ident(), False):
st = "".join(traceback.format_stack())
logger.debug("Traceback:\n%s", st.rstrip())
raise RuntimeError(f"Calling a blocking function, {fn.__qualname__}() in {fn.__code__.co_filename}:{fn.__code__.co_firstlineno}, while running async")
return fn(*args, **kwargs)
return async_guard_wrap
34 changes: 28 additions & 6 deletions canopen/emcy.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from __future__ import annotations
import asyncio
import logging
import struct
import threading
import time
from typing import Callable, List, Optional

from canopen.async_guard import ensure_not_async
import canopen.network


Expand All @@ -17,16 +20,20 @@

def __init__(self):
#: Log of all received EMCYs for this node
self.log: List["EmcyError"] = []
self.log: List[EmcyError] = []
#: Only active EMCYs. Will be cleared on Error Reset
self.active: List["EmcyError"] = []
self.active: List[EmcyError] = []
self.callbacks = []
self.emcy_received = threading.Condition()
self.network: canopen.network.Network = canopen.network._UNINITIALIZED_NETWORK

# @callback # NOTE: called from another thread
@ensure_not_async # NOTE: Safeguard for accidental async use
def on_emcy(self, can_id, data, timestamp):
code, register, data = EMCY_STRUCT.unpack(data)
entry = EmcyError(code, register, data, timestamp)

# NOTE: Blocking lock
with self.emcy_received:
if code & 0xFF00 == 0:
# Error reset
Expand All @@ -36,10 +43,10 @@
self.log.append(entry)
self.emcy_received.notify_all()

for callback in self.callbacks:
callback(entry)
# Call all registered callbacks
self.network.dispatch_callbacks(self.callbacks, entry)

def add_callback(self, callback: Callable[["EmcyError"], None]):
def add_callback(self, callback: Callable[[EmcyError], None]):
"""Get notified on EMCY messages from this node.

:param callback:
Expand All @@ -53,9 +60,10 @@
self.log = []
self.active = []

@ensure_not_async # NOTE: Safeguard for accidental async use
def wait(
self, emcy_code: Optional[int] = None, timeout: float = 10
) -> "EmcyError":
) -> EmcyError:
"""Wait for a new EMCY to arrive.

:param emcy_code: EMCY code to wait for
Expand All @@ -65,8 +73,10 @@
"""
end_time = time.time() + timeout
while True:
# NOTE: Blocking lock
with self.emcy_received:
prev_log_size = len(self.log)
# NOTE: Blocking call
self.emcy_received.wait(timeout)
if len(self.log) == prev_log_size:
# Resumed due to timeout
Expand All @@ -81,6 +91,18 @@
# This is the one we're interested in
return emcy

async def async_wait(
self, emcy_code: Optional[int] = None, timeout: float = 10
) -> EmcyError:
"""Wait for a new EMCY to arrive.

:param emcy_code: EMCY code to wait for
:param timeout: Max time in seconds to wait

:return: The EMCY exception object or None if timeout
"""
return await asyncio.to_thread(self.wait, emcy_code, timeout)

Check warning on line 104 in canopen/emcy.py

View check run for this annotation

Codecov / codecov/patch

canopen/emcy.py#L104

Added line #L104 was not covered by tests


class EmcyProducer:

Expand Down
19 changes: 19 additions & 0 deletions canopen/lss.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import asyncio
import logging
import queue
import struct
import time
from typing import Optional, TYPE_CHECKING

from canopen.async_guard import ensure_not_async
import canopen.network


Expand Down Expand Up @@ -241,6 +244,8 @@ def send_identify_non_configured_remote_slave(self):
message[0] = CS_IDENTIFY_NON_CONFIGURED_REMOTE_SLAVE
self.__send_command(message)

# FIXME: Make async implementation "afast_scan"
@ensure_not_async # NOTE: Safeguard for accidental async use
def fast_scan(self):
"""This command sends a series of fastscan message
to find unconfigured slave with lowest number of LSS idenities
Expand All @@ -257,6 +262,7 @@ def fast_scan(self):
lss_next = 0

if self.__send_fast_scan_message(lss_id[0], lss_bit_check, lss_sub, lss_next):
# NOTE: Blocking call
time.sleep(0.01)
while lss_sub < 4:
lss_bit_check = 32
Expand All @@ -266,12 +272,14 @@ def fast_scan(self):
if not self.__send_fast_scan_message(lss_id[lss_sub], lss_bit_check, lss_sub, lss_next):
lss_id[lss_sub] |= 1<<lss_bit_check

# NOTE: Blocking call
time.sleep(0.01)

lss_next = (lss_sub + 1) & 3
if not self.__send_fast_scan_message(lss_id[lss_sub], lss_bit_check, lss_sub, lss_next):
return False, None

# NOTE: Blocking call
time.sleep(0.01)

# Now the next 32 bits will be scanned
Expand All @@ -296,6 +304,8 @@ def __send_fast_scan_message(self, id_number, bit_checker, lss_sub, lss_next):

return False

# FIXME: Make async implementation "__asend_lss_address"
@ensure_not_async # NOTE: Safeguard for accidental async use
def __send_lss_address(self, req_cs, number):
message = bytearray(8)

Expand All @@ -304,6 +314,7 @@ def __send_lss_address(self, req_cs, number):
response = self.__send_command(message)
# some device needs these delays between messages
# because it can't handle messages arriving with no delay
# NOTE: Blocking call
time.sleep(0.2)

return response
Expand Down Expand Up @@ -359,6 +370,9 @@ def __send_configure(self, req_cs, value1=0, value2=0):
error_msg = f"LSS Error: {error_code}"
raise LssError(error_msg)

# FIXME: Make async implementation "__asend_command"

@ensure_not_async # NOTE: Safeguard for accidental async use
def __send_command(self, message):
"""Send a LSS operation code to the network

Expand All @@ -376,6 +390,7 @@ def __send_command(self, message):
response = None
if not self.responses.empty():
logger.info("There were unexpected messages in the queue")
# FIXME: Recreating the queue
self.responses = queue.Queue()

self.network.send_message(self.LSS_TX_COBID, message)
Expand All @@ -386,14 +401,18 @@ def __send_command(self, message):
# Wait for the slave to respond
# TODO check if the response is LSS response message
try:
# NOTE: Blocking call
response = self.responses.get(
block=True, timeout=self.RESPONSE_TIMEOUT)
except queue.Empty:
raise LssError("No LSS response received")

return response

# @callback # NOTE: called from another thread
@ensure_not_async # NOTE: Safeguard for accidental async use
def on_message_received(self, can_id, data, timestamp):
# NOTE: Blocking call
self.responses.put(bytes(data))


Expand Down
Loading
Loading