Description
While trying to reproduce #7063, I came across a different error, this one with queueing enabled.
The below reproducer is NOT minimal - there is likely quite a bit of simplification possible.
@gen_cluster(client=True, nthreads=[("", 1)], config={"distributed.scheduler.worker-saturation": 1.5})
async def test_steal_rootish_while_retiring(c, s, a):
"""https://github.com/dask/distributed/issues/7063
Note that this applies to both tasks that raise Reschedule as well as work stealing.
"""
ev = Event()
# Put a task in memory on a, which will be retired, and prevent b from acquiring
# a replica. This will cause a to be stuck in closing_gracefully state until we
# set b.block_gather_dep.
m = c.submit(inc, 1, key="m", workers=[a.address])
await wait(m)
async with BlockedGatherDep(s.address, nthreads=1) as b:
# Large number of tasks to make sure they're rootish
futures = c.map(
lambda i, ev: ev.wait(), range(10), ev=ev, key=[f"x-{i}" for i in range(10)]
)
while a.state.executing_count != 1 or b.state.executing_count != 1:
await asyncio.sleep(0.01)
assert s.is_rootish(s.tasks[futures[0].key])
retire_task = asyncio.create_task(c.retire_workers([a.address]))
# Wait until AMM sends AcquireReplicasEvent to b to move away m
await b.in_gather_dep.wait()
assert s.workers[a.address].status == Status.closing_gracefully
# Steal any of the tasks on a
steal_key = next(iter(a.state.executing)).key
s.reschedule(steal_key, stimulus_id="steal")
await ev.set()
# The stolen task can now complete on the other worker
await wait_for_state(steal_key, "memory", b)
await wait_for_state(steal_key, "memory", s)
# Let graceful retirement of a complete.
# This in turn reschedules whatever tasks were still processing on a to b.
b.block_gather_dep.set()
await retire_task
await wait(futures)
The test is green; however I read in the log:
File "/home/crusaderky/github/distributed/distributed/scheduler.py", line 5284, in handle_task_finished
r: tuple = self.stimulus_task_finished(
File "/home/crusaderky/github/distributed/distributed/scheduler.py", line 4649, in stimulus_task_finished
r: tuple = self._transition(
File "/home/crusaderky/github/distributed/distributed/scheduler.py", line 1813, in _transition
assert not args and not kwargs, (args, kwargs, start, finish)
AssertionError: ((), {'worker': 'tcp://127.0.0.1:45929', 'nbytes': 28, 'type': b'\x80\x04\x95\x15\x00\x00\x00\x00\x00\x00\x00\x8c\x08builtins\x94\x8c\x04bool\x94\x93\x94.', 'typename': 'bool', 'metadata': {}, 'thread': 139862053221952, 'startstops': ({'action': 'compute', 'start': 1666802403.9580944, 'stop': 1666802403.9590282},), 'status': 'OK'}, 'queued', 'memory')
What is happening:
- steal_key is processing on a
- steal_key is rescheduled, which causes the scheduler to send a free-keys message to a and put the task back in queue
- before the free-keys message can reach a, steal_key finishes on a
- steal_key transitions to memory on a, sending a TaskFinishedMsg to the scheduler.
- a queued->memory transition happens which, I suspect, is otherwise untested.
This is timing-sensitive; if free-keys reached a before the task end, then steal_key would be cancelled and transition to forgotten without any messaging when it ends.