Skip to content

Using async changes thread for non-async callbacks which is unexpected #1939

Open
@sveinse

Description

@sveinse

Using the can library with async enabled Notifier change the thread which all callbacks is called from. Without async, the notifier callbacks are run in the thread of the can rx thread. With async, it runs the callbacks via loop.call_soon_threadsafe() which cause the notifier callbacks to be run in the main async thread instead. This changes behavior for all callbacks, not just async callbacks, which is unexpected. E.g. if using a non-async can protocol stack plus another async protocol stack together might create problems due to the change of callback threads.

python-can/can/notifier.py

Lines 111 to 119 in 5d62394

def _rx_thread(self, bus: BusABC) -> None:
# determine message handling callable early, not inside while loop
if self._loop:
handle_message: Callable[[Message], Any] = functools.partial(
self._loop.call_soon_threadsafe,
self._on_message_received, # type: ignore[arg-type]
)
else:
handle_message = self._on_message_received

I propose a fix that prevents the async call_soon_threadsafe in Notifier._rx_thread() with the introduction of a setting run_message_reception_in_eventloop. Then the user can chose to opt out of running the regular non-async callbacks in the asyncio main thread. The default should be True so we don't change the current behavior.

def _rx_thread(self, bus: BusABC) -> None:
    # determine message handling callable early, not inside while loop
    if self._loop and self.run_message_reception_in_eventloop:              # <--- NEW
        handle_message: Callable[[Message], Any] = functools.partial(
            self._loop.call_soon_threadsafe,
            self._on_message_received,  # type: ignore[arg-type]
        )
    else:
        handle_message = self._on_message_received

A change is required in Notifier._on_message_received(). This function might be called in either threads, so calling the asyncio coroutine must be made thread safe:

def _on_message_received(self, msg: Message) -> None:
    for callback in self.listeners:
        res = callback(msg)
        if res and self._loop and asyncio.iscoroutine(res):
            # Schedule coroutine
            # In __init__: self._tasks: set[asyncio.Task] = set()
            task = asyncio.run_coroutine_threadsafe(res, self._loop)

When looking at this, #1938 and #1912 should be addressed in the same round.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions