Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure resumed tasks are not accidentally forgotten #6217

Merged
merged 4 commits into from
May 6, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 142 additions & 7 deletions distributed/tests/test_cancelled_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,17 +89,12 @@ def f(ev):
("f1", "ready", "executing", "executing", {}),
("free-keys", ("f1",)),
("f1", "executing", "released", "cancelled", {}),
("f1", "ensure-task-exists", "cancelled"),
("f1", "cancelled", "fetch", "cancelled", {"f1": ("resumed", "fetch")}),
("f1", "cancelled", "resumed", "resumed", {}),
("f1", "put-in-memory"),
("f1", "cancelled", "fetch", "resumed", {}),
("f1", "resumed", "memory", "memory", {"f2": "ready"}),
("free-keys", ("f1",)),
("f1", "release-key"),
("f1", "memory", "released", "released", {}),
("f1", "released", "forgotten", "forgotten", {}),
],
strict=True,
)


Expand Down Expand Up @@ -164,8 +159,9 @@ def blockable_compute(x, lock):
lock=block_compute,
workers=[a.address],
allow_other_workers=True,
key="fut1",
)
fut2 = c.submit(inc, fut1, workers=[b.address])
fut2 = c.submit(inc, fut1, workers=[b.address], key="fut2")

await enter_get_data.wait()
await block_compute.acquire()
Expand Down Expand Up @@ -257,3 +253,142 @@ async def get_data(self, comm, *args, **kwargs):
a.block_get_data = False
# Everything should still be executing as usual after this
assert await c.submit(sum, c.map(inc, range(10))) == sum(map(inc, range(10)))


@gen_cluster(client=True, nthreads=[("", 1)])
async def test_in_flight_lost_after_resumed(c, s, b):
block_get_data = asyncio.Lock()
in_get_data = asyncio.Event()

lock_executing = Lock()

def block_execution(lock):
with lock:
return

class BlockedGetData(Worker):
async def get_data(self, comm, *args, **kwargs):
in_get_data.set()
fjetter marked this conversation as resolved.
Show resolved Hide resolved
async with block_get_data:
return await super().get_data(comm, *args, **kwargs)

async with BlockedGetData(s.address, name="blocked-get-dataworker") as a:
fut1 = c.submit(
block_execution,
lock_executing,
fjetter marked this conversation as resolved.
Show resolved Hide resolved
workers=[a.address],
allow_other_workers=True,
key="fut1",
)
# Ensure fut1 is in memory but block any further execution afterwards to
# ensure we control when the recomputation happens
await fut1
await lock_executing.acquire()
fjetter marked this conversation as resolved.
Show resolved Hide resolved
in_get_data.clear()
fjetter marked this conversation as resolved.
Show resolved Hide resolved
await block_get_data.acquire()
fut2 = c.submit(inc, fut1, workers=[b.address], key="fut2")

# This ensures that B already fetches the task, i.e. after this the task
# is guaranteed to be in flight
await in_get_data.wait()
assert fut1.key in b.tasks
assert b.tasks[fut1.key].state == "flight"

# It is removed, i.e. get_data is guaranteed to fail and f1 is scheduled
# to be recomputed on B
await s.remove_worker(a.address, "foo", close=False, safe=True)

while not b.tasks[fut1.key].state == "resumed":
await asyncio.sleep(0.01)

fut1.release()
fut2.release()

while not b.tasks[fut1.key].state == "cancelled":
await asyncio.sleep(0.01)

block_get_data.release()
while b.tasks:
await asyncio.sleep(0.01)

assert_story(
b.story(fut1.key),
expect=[
# The initial free-keys is rejected
("free-keys", (fut1.key,)),
(fut1.key, "resumed", "released", "cancelled", {}),
# After gather_dep receives the data, it tries to transition to memory but the task will release instead
(fut1.key, "cancelled", "memory", "released", {fut1.key: "forgotten"}),
],
)


@gen_cluster(client=True)
async def test_cancelled_error(c, s, a, b):
executing = Event()
lock_executing = Lock()
await lock_executing.acquire()

def block_execution(event, lock):
event.set()
with lock:
raise RuntimeError()

fut1 = c.submit(
block_execution,
executing,
lock_executing,
workers=[b.address],
allow_other_workers=True,
key="fut1",
)

await executing.wait()
assert b.tasks[fut1.key].state == "executing"
fut1.release()
while b.tasks[fut1.key].state == "executing":
await asyncio.sleep(0.01)
await lock_executing.release()
while b.tasks:
await asyncio.sleep(0.01)

assert_story(
b.story(fut1.key),
[
(fut1.key, "executing", "released", "cancelled", {}),
(fut1.key, "cancelled", "error", "error", {fut1.key: "released"}),
],
)


@gen_cluster(client=True, nthreads=[("", 1, {"resources": {"A": 1}})])
async def test_cancelled_error_with_ressources(c, s, a):
executing = Event()
lock_executing = Lock()
await lock_executing.acquire()

def block_execution(event, lock):
event.set()
with lock:
raise RuntimeError()

fut1 = c.submit(
block_execution,
executing,
lock_executing,
fjetter marked this conversation as resolved.
Show resolved Hide resolved
resources={"A": 1},
key="fut1",
)

await executing.wait()
fut2 = c.submit(inc, 1, resources={"A": 1}, key="fut2")

while fut2.key not in a.tasks:
await asyncio.sleep(0.01)

fut1.release()
while a.tasks[fut1.key].state == "executing":
await asyncio.sleep(0.01)
await lock_executing.release()

assert await fut2 == 2
10 changes: 9 additions & 1 deletion distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1682,12 +1682,20 @@ async def test_closing_scheduler_closes_workers(s, a, b):
client=True, nthreads=[("127.0.0.1", 1)], worker_kwargs={"resources": {"A": 1}}
)
async def test_resources_reset_after_cancelled_task(c, s, w):
future = c.submit(sleep, 0.2, resources={"A": 1})
lock = Lock()

def block(lock):
with lock:
return

await lock.acquire()
future = c.submit(block, lock, resources={"A": 1})

while not w.executing_count:
await asyncio.sleep(0.01)

await future.cancel()
await lock.release()

while w.executing_count:
await asyncio.sleep(0.01)
Expand Down
Loading