Skip to content

Transition queued->memory causes AssertionError #7200

@crusaderky

Description

@crusaderky

While trying to reproduce #7063, I came across a different error, this one with queueing enabled.
The below reproducer is NOT minimal - there is likely quite a bit of simplification possible.

@gen_cluster(client=True, nthreads=[("", 1)], config={"distributed.scheduler.worker-saturation": 1.5})
async def test_steal_rootish_while_retiring(c, s, a):
    """https://github.com/dask/distributed/issues/7063

    Note that this applies to both tasks that raise Reschedule as well as work stealing.
    """
    ev = Event()

    # Put a task in memory on a, which will be retired, and prevent b from acquiring
    # a replica. This will cause a to be stuck in closing_gracefully state until we
    # set b.block_gather_dep.
    m = c.submit(inc, 1, key="m", workers=[a.address])
    await wait(m)

    async with BlockedGatherDep(s.address, nthreads=1) as b:
        # Large number of tasks to make sure they're rootish
        futures = c.map(
            lambda i, ev: ev.wait(), range(10), ev=ev, key=[f"x-{i}" for i in range(10)]
        )

        while a.state.executing_count != 1 or b.state.executing_count != 1:
            await asyncio.sleep(0.01)

        assert s.is_rootish(s.tasks[futures[0].key])

        retire_task = asyncio.create_task(c.retire_workers([a.address]))
        # Wait until AMM sends AcquireReplicasEvent to b to move away m
        await b.in_gather_dep.wait()
        assert s.workers[a.address].status == Status.closing_gracefully

        # Steal any of the tasks on a
        steal_key = next(iter(a.state.executing)).key
        s.reschedule(steal_key, stimulus_id="steal")
        await ev.set()

        # The stolen task can now complete on the other worker
        await wait_for_state(steal_key, "memory", b)
        await wait_for_state(steal_key, "memory", s)

        # Let graceful retirement of a complete.
        # This in turn reschedules whatever tasks were still processing on a to b.
        b.block_gather_dep.set()
        await retire_task
        await wait(futures)

The test is green; however I read in the log:

  File "/home/crusaderky/github/distributed/distributed/scheduler.py", line 5284, in handle_task_finished
    r: tuple = self.stimulus_task_finished(
  File "/home/crusaderky/github/distributed/distributed/scheduler.py", line 4649, in stimulus_task_finished
    r: tuple = self._transition(
  File "/home/crusaderky/github/distributed/distributed/scheduler.py", line 1813, in _transition
    assert not args and not kwargs, (args, kwargs, start, finish)
AssertionError: ((), {'worker': 'tcp://127.0.0.1:45929', 'nbytes': 28, 'type': b'\x80\x04\x95\x15\x00\x00\x00\x00\x00\x00\x00\x8c\x08builtins\x94\x8c\x04bool\x94\x93\x94.', 'typename': 'bool', 'metadata': {}, 'thread': 139862053221952, 'startstops': ({'action': 'compute', 'start': 1666802403.9580944, 'stop': 1666802403.9590282},), 'status': 'OK'}, 'queued', 'memory')

What is happening:

  1. steal_key is processing on a
  2. steal_key is rescheduled, which causes the scheduler to send a free-keys message to a and put the task back in queue
  3. before the free-keys message can reach a, steal_key finishes on a
  4. steal_key transitions to memory on a, sending a TaskFinishedMsg to the scheduler.
  5. a queued->memory transition happens which, I suspect, is otherwise untested.

This is timing-sensitive; if free-keys reached a before the task end, then steal_key would be cancelled and transition to forgotten without any messaging when it ends.

Metadata

Metadata

Assignees

Labels

bugSomething is brokendeadlockThe cluster appears to not make any progressscheduler

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions