Skip to content

Runtime error when a long async callback is triggered #962

@m2-farzan

Description

@m2-farzan

Bug report

Required Info:

  • Operating System:
    • Ubuntu 20.04
  • Installation type:
    • Binaries
  • Version or commit hash:
    • galactic 1.0.8-1focal.20220209.144341
  • DDS implementation:
    • Fast-RTPS
  • Client library (if applicable):
    • rclpy

Steps to reproduce issue

import asyncio
import logging
import rclpy
from std_msgs.msg import String

logging.getLogger('asyncio').setLevel(logging.DEBUG)

async def cb(msg):
    print('Received a message:')
    await asyncio.sleep(0.1)
    print(msg.data)

async def main():
    rclpy.init()
    node = rclpy.create_node('my_node')
    node.create_subscription(String, '/test', cb, 10)
    while True:
        rclpy.spin_once(node, timeout_sec=0.001)
        await asyncio.sleep(0.001)

asyncio.run(main())

Run the subscriber and then run:

ros2 topic pub -1 /test std_msgs/msg/String "data: hello"

Expected behavior

When published to /test, the node should print Received a message:, wait for 0.1 seconds, and then print the message data.

From #166 (comment):

Any callback can now be a coroutine function (async def). If the callback is a coroutine and it becomes blocked the executor will move on and resume executing it later. The details of this are handled in the class Task which is a class very similar to asyncio.Task. asyncio can't be used directly because it is not thread safe.

Actual behavior

Executing <Task pending name='Task-1' coro=<main() running at my_listener.py:19> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f2ed81ddb80>()] created at /usr/lib/python3.8/asyncio/base_events.py:422> cb=[_run_until_complete_cb() at /usr/lib/python3.8/asyncio/base_events.py:184] created at /usr/lib/python3.8/asyncio/base_events.py:595> took 0.199 seconds
Received a message:
Traceback (most recent call last):
  File "/usr/lib/python3.8/runpy.py", line 194, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/usr/lib/python3.8/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "/home/mostafa/.vscode/extensions/ms-python.python-2022.8.0/pythonFiles/lib/python/debugpy/__main__.py", line 45, in <module>
    cli.main()
  File "/home/mostafa/.vscode/extensions/ms-python.python-2022.8.0/pythonFiles/lib/python/debugpy/../debugpy/server/cli.py", line 444, in main
    run()
  File "/home/mostafa/.vscode/extensions/ms-python.python-2022.8.0/pythonFiles/lib/python/debugpy/../debugpy/server/cli.py", line 285, in run_file
    runpy.run_path(target_as_str, run_name=compat.force_str("__main__"))
  File "/usr/lib/python3.8/runpy.py", line 265, in run_path
    return _run_module_code(code, init_globals, run_name,
  File "/usr/lib/python3.8/runpy.py", line 97, in _run_module_code
    _run_code(code, mod_globals, init_globals,
  File "/usr/lib/python3.8/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "my_listener.py", line 21, in <module>
    asyncio.run(main())
  File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "my_listener.py", line 18, in main
    rclpy.spin_once(node, timeout_sec=0.001)
  File "/home/mostafa/dev/github/rclpy/issues/long-await/install/rclpy/lib/python3.8/site-packages/rclpy/__init__.py", line 176, in spin_once
    executor.spin_once(timeout_sec=timeout_sec)
  File "/home/mostafa/dev/github/rclpy/issues/long-await/install/rclpy/lib/python3.8/site-packages/rclpy/executors.py", line 712, in spin_once
    raise handler.exception()
  File "/home/mostafa/dev/github/rclpy/issues/long-await/install/rclpy/lib/python3.8/site-packages/rclpy/task.py", line 242, in __call__
    self._handler.send(None)
  File "/home/mostafa/dev/github/rclpy/issues/long-await/install/rclpy/lib/python3.8/site-packages/rclpy/executors.py", line 418, in handler
    await call_coroutine(entity, arg)
  File "/home/mostafa/dev/github/rclpy/issues/long-await/install/rclpy/lib/python3.8/site-packages/rclpy/executors.py", line 343, in _execute_subscription
    await await_or_execute(sub.callback, msg)
  File "/home/mostafa/dev/github/rclpy/issues/long-await/install/rclpy/lib/python3.8/site-packages/rclpy/executors.py", line 104, in await_or_execute
    return await callback(*args)
  File "my_listener.py", line 10, in cb
    await asyncio.sleep(0.1)
  File "/usr/lib/python3.8/asyncio/tasks.py", line 659, in sleep
    return await future
RuntimeError: await wasn't used with future

Additional information

Observation

Reducing the timeout in the asyncio.sleep line makes the issue disappear. In the example on provided, on my system, a timeout of ~0.002 seconds or smaller works fine.

Guess

This piece might play a role:

rclpy/rclpy/rclpy/task.py

Lines 234 to 256 in c76a6a1

self._executing = True
if inspect.iscoroutine(self._handler):
# Execute a coroutine
try:
self._handler.send(None)
except StopIteration as e:
# The coroutine finished; store the result
self._handler.close()
self.set_result(e.value)
self._complete_task()
except Exception as e:
self.set_exception(e)
self._complete_task()
else:
# Execute a normal function
try:
self.set_result(self._handler(*self._args, **self._kwargs))
except Exception as e:
self.set_exception(e)
self._complete_task()
self._executing = False

My theory is that self._handler.send(None) is non-blocking, causing the line self._executing = False to be executed before the async callback is finished (while self.done is still False). So this causes the function to be assumed as paused (kind of like a yielded generator), so the already-running coro object gets called again by this piece:

# Yield tasks in-progress before waiting for new work
tasks = None
with self._tasks_lock:
tasks = list(self._tasks)
if tasks:
for task, entity, node in reversed(tasks):
if (not task.executing() and not task.done() and
(node is None or node in nodes_to_use)):
yielded_work = True
yield task, entity, node
with self._tasks_lock:
# Get rid of any tasks that are done
self._tasks = list(filter(lambda t_e_n: not t_e_n[0].done(), self._tasks))

This causes some kind of clash, hence RuntimeError: await wasn't used with future.

So as a quick test, I commented out the self._executing = False line. This caused the second call to be avoided, (so no runtime error anymore), but the callback didn't finish (the message wasn't printed out).

Another minimal example

rclpy practically does something like this:

import asyncio

async def cb():
    print('a')
    await asyncio.sleep(1)
    print('b')

async def main():
    _handler = cb()
    while True:
        _handler.send(None)

asyncio.run(main())

Which also throws RuntimeError: await wasn't used with future.

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions