-
Notifications
You must be signed in to change notification settings - Fork 251
Description
Bug report
Required Info:
- Operating System:
- Ubuntu 20.04
- Installation type:
- Binaries
- Version or commit hash:
- galactic 1.0.8-1focal.20220209.144341
- DDS implementation:
- Fast-RTPS
- Client library (if applicable):
- rclpy
Steps to reproduce issue
import asyncio
import logging
import rclpy
from std_msgs.msg import String
logging.getLogger('asyncio').setLevel(logging.DEBUG)
async def cb(msg):
print('Received a message:')
await asyncio.sleep(0.1)
print(msg.data)
async def main():
rclpy.init()
node = rclpy.create_node('my_node')
node.create_subscription(String, '/test', cb, 10)
while True:
rclpy.spin_once(node, timeout_sec=0.001)
await asyncio.sleep(0.001)
asyncio.run(main())
Run the subscriber and then run:
ros2 topic pub -1 /test std_msgs/msg/String "data: hello"
Expected behavior
When published to /test, the node should print Received a message:
, wait for 0.1 seconds, and then print the message data.
From #166 (comment):
Any callback can now be a coroutine function (async def). If the callback is a coroutine and it becomes blocked the executor will move on and resume executing it later. The details of this are handled in the class Task which is a class very similar to asyncio.Task. asyncio can't be used directly because it is not thread safe.
Actual behavior
Executing <Task pending name='Task-1' coro=<main() running at my_listener.py:19> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f2ed81ddb80>()] created at /usr/lib/python3.8/asyncio/base_events.py:422> cb=[_run_until_complete_cb() at /usr/lib/python3.8/asyncio/base_events.py:184] created at /usr/lib/python3.8/asyncio/base_events.py:595> took 0.199 seconds
Received a message:
Traceback (most recent call last):
File "/usr/lib/python3.8/runpy.py", line 194, in _run_module_as_main
return _run_code(code, main_globals, None,
File "/usr/lib/python3.8/runpy.py", line 87, in _run_code
exec(code, run_globals)
File "/home/mostafa/.vscode/extensions/ms-python.python-2022.8.0/pythonFiles/lib/python/debugpy/__main__.py", line 45, in <module>
cli.main()
File "/home/mostafa/.vscode/extensions/ms-python.python-2022.8.0/pythonFiles/lib/python/debugpy/../debugpy/server/cli.py", line 444, in main
run()
File "/home/mostafa/.vscode/extensions/ms-python.python-2022.8.0/pythonFiles/lib/python/debugpy/../debugpy/server/cli.py", line 285, in run_file
runpy.run_path(target_as_str, run_name=compat.force_str("__main__"))
File "/usr/lib/python3.8/runpy.py", line 265, in run_path
return _run_module_code(code, init_globals, run_name,
File "/usr/lib/python3.8/runpy.py", line 97, in _run_module_code
_run_code(code, mod_globals, init_globals,
File "/usr/lib/python3.8/runpy.py", line 87, in _run_code
exec(code, run_globals)
File "my_listener.py", line 21, in <module>
asyncio.run(main())
File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "my_listener.py", line 18, in main
rclpy.spin_once(node, timeout_sec=0.001)
File "/home/mostafa/dev/github/rclpy/issues/long-await/install/rclpy/lib/python3.8/site-packages/rclpy/__init__.py", line 176, in spin_once
executor.spin_once(timeout_sec=timeout_sec)
File "/home/mostafa/dev/github/rclpy/issues/long-await/install/rclpy/lib/python3.8/site-packages/rclpy/executors.py", line 712, in spin_once
raise handler.exception()
File "/home/mostafa/dev/github/rclpy/issues/long-await/install/rclpy/lib/python3.8/site-packages/rclpy/task.py", line 242, in __call__
self._handler.send(None)
File "/home/mostafa/dev/github/rclpy/issues/long-await/install/rclpy/lib/python3.8/site-packages/rclpy/executors.py", line 418, in handler
await call_coroutine(entity, arg)
File "/home/mostafa/dev/github/rclpy/issues/long-await/install/rclpy/lib/python3.8/site-packages/rclpy/executors.py", line 343, in _execute_subscription
await await_or_execute(sub.callback, msg)
File "/home/mostafa/dev/github/rclpy/issues/long-await/install/rclpy/lib/python3.8/site-packages/rclpy/executors.py", line 104, in await_or_execute
return await callback(*args)
File "my_listener.py", line 10, in cb
await asyncio.sleep(0.1)
File "/usr/lib/python3.8/asyncio/tasks.py", line 659, in sleep
return await future
RuntimeError: await wasn't used with future
Additional information
Observation
Reducing the timeout in the asyncio.sleep line makes the issue disappear. In the example on provided, on my system, a timeout of ~0.002 seconds or smaller works fine.
Guess
This piece might play a role:
Lines 234 to 256 in c76a6a1
self._executing = True | |
if inspect.iscoroutine(self._handler): | |
# Execute a coroutine | |
try: | |
self._handler.send(None) | |
except StopIteration as e: | |
# The coroutine finished; store the result | |
self._handler.close() | |
self.set_result(e.value) | |
self._complete_task() | |
except Exception as e: | |
self.set_exception(e) | |
self._complete_task() | |
else: | |
# Execute a normal function | |
try: | |
self.set_result(self._handler(*self._args, **self._kwargs)) | |
except Exception as e: | |
self.set_exception(e) | |
self._complete_task() | |
self._executing = False |
My theory is that self._handler.send(None)
is non-blocking, causing the line self._executing = False
to be executed before the async callback is finished (while self.done
is still False
). So this causes the function to be assumed as paused (kind of like a yielded generator), so the already-running coro object gets called again by this piece:
rclpy/rclpy/rclpy/executors.py
Lines 470 to 482 in c76a6a1
# Yield tasks in-progress before waiting for new work | |
tasks = None | |
with self._tasks_lock: | |
tasks = list(self._tasks) | |
if tasks: | |
for task, entity, node in reversed(tasks): | |
if (not task.executing() and not task.done() and | |
(node is None or node in nodes_to_use)): | |
yielded_work = True | |
yield task, entity, node | |
with self._tasks_lock: | |
# Get rid of any tasks that are done | |
self._tasks = list(filter(lambda t_e_n: not t_e_n[0].done(), self._tasks)) |
This causes some kind of clash, hence RuntimeError: await wasn't used with future
.
So as a quick test, I commented out the self._executing = False
line. This caused the second call to be avoided, (so no runtime error anymore), but the callback didn't finish (the message wasn't printed out).
Another minimal example
rclpy practically does something like this:
import asyncio
async def cb():
print('a')
await asyncio.sleep(1)
print('b')
async def main():
_handler = cb()
while True:
_handler.send(None)
asyncio.run(main())
Which also throws RuntimeError: await wasn't used with future
.