Description
Using the can library with async enabled Notifier
change the thread which all callbacks is called from. Without async, the notifier callbacks are run in the thread of the can rx thread. With async, it runs the callbacks via loop.call_soon_threadsafe()
which cause the notifier callbacks to be run in the main async thread instead. This changes behavior for all callbacks, not just async callbacks, which is unexpected. E.g. if using a non-async can protocol stack plus another async protocol stack together might create problems due to the change of callback threads.
Lines 111 to 119 in 5d62394
I propose a fix that prevents the async call_soon_threadsafe in Notifier._rx_thread()
with the introduction of a setting run_message_reception_in_eventloop
. Then the user can chose to opt out of running the regular non-async callbacks in the asyncio main thread. The default should be True
so we don't change the current behavior.
def _rx_thread(self, bus: BusABC) -> None:
# determine message handling callable early, not inside while loop
if self._loop and self.run_message_reception_in_eventloop: # <--- NEW
handle_message: Callable[[Message], Any] = functools.partial(
self._loop.call_soon_threadsafe,
self._on_message_received, # type: ignore[arg-type]
)
else:
handle_message = self._on_message_received
A change is required in Notifier._on_message_received()
. This function might be called in either threads, so calling the asyncio coroutine must be made thread safe:
def _on_message_received(self, msg: Message) -> None:
for callback in self.listeners:
res = callback(msg)
if res and self._loop and asyncio.iscoroutine(res):
# Schedule coroutine
# In __init__: self._tasks: set[asyncio.Task] = set()
task = asyncio.run_coroutine_threadsafe(res, self._loop)
When looking at this, #1938 and #1912 should be addressed in the same round.