Skip to content

Commit 59e5457

Browse files
committed
Test deadlock
1 parent c2b28cf commit 59e5457

File tree

2 files changed

+64
-1
lines changed

2 files changed

+64
-1
lines changed

distributed/tests/test_worker_state_machine.py

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,13 @@
55

66
from distributed.protocol.serialize import Serialize
77
from distributed.utils import recursive_to_dict
8-
from distributed.utils_test import _LockedCommPool, assert_story, gen_cluster, inc
8+
from distributed.utils_test import (
9+
_LockedCommPool,
10+
assert_story,
11+
freeze_data_fetching,
12+
gen_cluster,
13+
inc,
14+
)
915
from distributed.worker_state_machine import (
1016
ExecuteFailureEvent,
1117
ExecuteSuccessEvent,
@@ -348,3 +354,31 @@ async def test_in_memory_while_in_flight(c, s, a, b):
348354
# Let the comm from b to a return the result
349355
event.set()
350356
assert await y == 4 # Data in flight from b has been discarded
357+
358+
359+
@gen_cluster(client=True)
360+
async def test_forget_data_needed(c, s, a, b):
361+
"""
362+
1. A task transitions to fetch and is added to data_needed
363+
2. _ensure_communicating runs, but the network is saturated so the task is not
364+
popped from data_needed
365+
3. Task is forgotten
366+
4. Task is recreated from scratch and transitioned to fetch again
367+
5. BUG: at the moment of writing this test, data_needed.push silently did nothing,
368+
because it still contained the forgotten task, which is a different TaskState
369+
instance which will be no longer updated.
370+
6. _ensure_communicating runs. It pops the forgotten task and discards it.
371+
7. We now have a task stuck in fetch state.
372+
"""
373+
x = c.submit(inc, 1, key="x", workers=[a.address])
374+
with freeze_data_fetching(b):
375+
y = c.submit(inc, x, key="y", workers=[b.address])
376+
await wait_for_state("x", "fetch", b)
377+
x.release()
378+
y.release()
379+
while s.tasks or a.tasks or b.tasks:
380+
await asyncio.sleep(0.01)
381+
382+
x = c.submit(inc, 2, key="x", workers=[a.address])
383+
y = c.submit(inc, x, key="y", workers=[b.address])
384+
assert await y == 4

distributed/utils_test.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2261,3 +2261,32 @@ def wait_for_log_line(
22612261
if match in line:
22622262
return line
22632263
i += 1
2264+
2265+
2266+
@contextmanager
2267+
def freeze_data_fetching(w: Worker, *, jump_start: bool = False):
2268+
"""Prevent any task from transitioning from fetch to flight on the worker while
2269+
inside the context, simulating a situation where the worker's network comms are
2270+
saturated.
2271+
This is not the same as setting the worker to Status=paused, which would also
2272+
inform the Scheduler and prevent further tasks to be enqueued on the worker.
2273+
Parameters
2274+
----------
2275+
w: Worker
2276+
The Worker on which tasks will not transition from fetch to flight
2277+
jump_start: bool
2278+
If False, tasks will remain in fetch state after exiting the context, until
2279+
something else triggers ensure_communicating.
2280+
If True, trigger ensure_communicating on exit; this simulates e.g. an unrelated
2281+
worker moving out of in_flight_workers.
2282+
"""
2283+
old_out_connections = w.total_out_connections
2284+
old_comm_threshold = w.comm_threshold_bytes
2285+
w.total_out_connections = 0
2286+
w.comm_threshold_bytes = 0
2287+
yield
2288+
w.total_out_connections = old_out_connections
2289+
w.comm_threshold_bytes = old_comm_threshold
2290+
if jump_start:
2291+
w.status = Status.paused
2292+
w.status = Status.running

0 commit comments

Comments
 (0)