-
Notifications
You must be signed in to change notification settings - Fork 556
Open
Description
Current Behavior
As of now AsyncMachine
will handle CancelledErrors
caused by cancelling tasks that are still running while a new event is triggered.
Question
Do you consider this useful and convenient or would you prefer to handle cancellation events yourself via for instance try/except or on_exception
?
Example
from transitions.extensions import AsyncMachine
import asyncio
class Model:
def __init__(self):
self.data = []
async def doing_task_1(self):
print("Doing task 1...")
await asyncio.sleep(2)
self.data.append(1)
await self.done()
async def doing_task_2(self):
print("Doing task 2...")
await asyncio.sleep(2)
self.data.append(2)
await self.done()
async def doing_task_3(self):
print("Doing task 3...")
self.data.append(3)
await self.done()
states = ["idle", "busy"]
transitions = [
{"trigger": "do_task_1", "source": "*", "dest": "busy", "after": "doing_task_1"},
{"trigger": "do_task_2", "source": "*", "dest": "busy", "after": "doing_task_2"},
{"trigger": "do_task_3", "source": "*", "dest": "busy", "after": "doing_task_3"},
{"trigger": "done", "source": "busy", "dest": "idle"},
]
model = Model()
m = AsyncMachine(model=model, states=states, transitions=transitions, initial="idle")
async def main():
asyncio.create_task(model.do_task_1()) # won't finish
await asyncio.sleep(0.1)
asyncio.create_task(model.do_task_2()) # won't finish
await asyncio.sleep(0.1)
asyncio.create_task(model.do_task_3()) # does finish
await asyncio.sleep(0.1)
assert model.is_idle()
assert model.data == [3]
asyncio.run(main())