Skip to content

Commit 50d2911

Browse files
authored
Refactor missing-data command (#6332)
1 parent 3bedd8e commit 50d2911

File tree

5 files changed

+54
-50
lines changed

5 files changed

+54
-50
lines changed

distributed/scheduler.py

Lines changed: 26 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -4683,53 +4683,48 @@ def handle_task_erred(self, key: str, stimulus_id: str, **msg) -> None:
46834683
self.send_all(client_msgs, worker_msgs)
46844684

46854685
def handle_missing_data(
4686-
self, key: str, errant_worker: str, stimulus_id: str, **kwargs
4686+
self, key: str, worker: str, errant_worker: str, stimulus_id: str
46874687
) -> None:
4688-
"""Signal that `errant_worker` does not hold `key`
4688+
"""Signal that `errant_worker` does not hold `key`.
46894689
4690-
This may either indicate that `errant_worker` is dead or that we may be
4691-
working with stale data and need to remove `key` from the workers
4692-
`has_what`.
4693-
4694-
If no replica of a task is available anymore, the task is transitioned
4695-
back to released and rescheduled, if possible.
4690+
This may either indicate that `errant_worker` is dead or that we may be working
4691+
with stale data and need to remove `key` from the workers `has_what`. If no
4692+
replica of a task is available anymore, the task is transitioned back to
4693+
released and rescheduled, if possible.
46964694
46974695
Parameters
46984696
----------
4699-
key : str, optional
4700-
Task key that could not be found, by default None
4701-
errant_worker : str, optional
4702-
Address of the worker supposed to hold a replica, by default None
4697+
key : str
4698+
Task key that could not be found
4699+
worker : str
4700+
Address of the worker informing the scheduler
4701+
errant_worker : str
4702+
Address of the worker supposed to hold a replica
47034703
"""
4704-
logger.debug("handle missing data key=%s worker=%s", key, errant_worker)
4704+
logger.debug(f"handle missing data {key=} {worker=} {errant_worker=}")
47054705
self.log_event(errant_worker, {"action": "missing-data", "key": key})
47064706

4707-
if key not in self.tasks:
4707+
ts = self.tasks.get(key)
4708+
ws = self.workers.get(errant_worker)
4709+
if not ts or not ws or ws not in ts.who_has:
47084710
return
47094711

4710-
ts: TaskState = self.tasks[key]
4711-
ws: WorkerState = self.workers.get(errant_worker)
4712-
4713-
if ws is not None and ws in ts.who_has:
4714-
self.remove_replica(ts, ws)
4712+
self.remove_replica(ts, ws)
47154713
if ts.state == "memory" and not ts.who_has:
47164714
if ts.run_spec:
47174715
self.transitions({key: "released"}, stimulus_id)
47184716
else:
47194717
self.transitions({key: "forgotten"}, stimulus_id)
47204718

4721-
def release_worker_data(self, key, worker, stimulus_id):
4722-
ws: WorkerState = self.workers.get(worker)
4723-
ts: TaskState = self.tasks.get(key)
4724-
if not ws or not ts:
4719+
def release_worker_data(self, key: str, worker: str, stimulus_id: str) -> None:
4720+
ts = self.tasks.get(key)
4721+
ws = self.workers.get(worker)
4722+
if not ts or not ws or ws not in ts.who_has:
47254723
return
4726-
recommendations: dict = {}
4727-
if ws in ts.who_has:
4728-
self.remove_replica(ts, ws)
4729-
if not ts.who_has:
4730-
recommendations[ts.key] = "released"
4731-
if recommendations:
4732-
self.transitions(recommendations, stimulus_id)
4724+
4725+
self.remove_replica(ts, ws)
4726+
if not ts.who_has:
4727+
self.transitions({key: "released"}, stimulus_id)
47334728

47344729
def handle_long_running(self, key=None, worker=None, compute_duration=None):
47354730
"""A task has seceded from the thread pool
@@ -4907,7 +4902,7 @@ async def register_scheduler_plugin(self, plugin, name=None, idempotent=None):
49074902

49084903
self.add_plugin(plugin, name=name, idempotent=idempotent)
49094904

4910-
def worker_send(self, worker, msg):
4905+
def worker_send(self, worker: str, msg: dict[str, Any]) -> None:
49114906
"""Send message to worker
49124907
49134908
This also handles connection failures by adding a callback to remove

distributed/tests/test_worker.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2696,15 +2696,17 @@ async def test_gather_dep_exception_one_task_2(c, s, a, b):
26962696
See also test_gather_dep_exception_one_task
26972697
"""
26982698
# This test does not trigger the condition reliably but is a very easy case
2699-
# which should function correctly regardles
2699+
# which should function correctly regardless
27002700

27012701
fut1 = c.submit(inc, 1, workers=[a.address], key="f1")
27022702
fut2 = c.submit(inc, fut1, workers=[b.address], key="f2")
27032703

27042704
while fut1.key not in b.tasks or b.tasks[fut1.key].state == "flight":
27052705
await asyncio.sleep(0)
27062706

2707-
s.handle_missing_data(key="f1", errant_worker=a.address, stimulus_id="test")
2707+
s.handle_missing_data(
2708+
key="f1", worker=b.address, errant_worker=a.address, stimulus_id="test"
2709+
)
27082710

27092711
await fut2
27102712

distributed/tests/test_worker_state_machine.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,8 +133,8 @@ def test_sendmsg_to_dict():
133133
def test_merge_recs_instructions():
134134
x = TaskState("x")
135135
y = TaskState("y")
136-
instr1 = RescheduleMsg(key="foo", worker="a", stimulus_id="test")
137-
instr2 = RescheduleMsg(key="bar", worker="b", stimulus_id="test")
136+
instr1 = RescheduleMsg(key="foo", stimulus_id="test")
137+
instr2 = RescheduleMsg(key="bar", stimulus_id="test")
138138
assert merge_recs_instructions(
139139
({x: "memory"}, [instr1]),
140140
({y: "released"}, [instr2]),

distributed/worker.py

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@
120120
Instructions,
121121
InvalidTransition,
122122
LongRunningMsg,
123+
MissingDataMsg,
123124
Recs,
124125
RecsInstrs,
125126
ReleaseWorkerDataMsg,
@@ -2145,7 +2146,7 @@ def transition_long_running_rescheduled(
21452146
self, ts: TaskState, *, stimulus_id: str
21462147
) -> RecsInstrs:
21472148
recs: Recs = {ts: "released"}
2148-
smsg = RescheduleMsg(key=ts.key, worker=self.address, stimulus_id=stimulus_id)
2149+
smsg = RescheduleMsg(key=ts.key, stimulus_id=stimulus_id)
21492150
return recs, [smsg]
21502151

21512152
def transition_executing_rescheduled(
@@ -2158,11 +2159,7 @@ def transition_executing_rescheduled(
21582159
return merge_recs_instructions(
21592160
(
21602161
{ts: "released"},
2161-
[
2162-
RescheduleMsg(
2163-
key=ts.key, worker=self.address, stimulus_id=stimulus_id
2164-
)
2165-
],
2162+
[RescheduleMsg(key=ts.key, stimulus_id=stimulus_id)],
21662163
),
21672164
self._ensure_computing(),
21682165
)
@@ -3285,6 +3282,7 @@ async def gather_dep(
32853282
return None
32863283

32873284
recommendations: Recs = {}
3285+
instructions: Instructions = []
32883286
response = {}
32893287
to_gather_keys: set[str] = set()
32903288
cancelled_keys: set[str] = set()
@@ -3406,17 +3404,17 @@ def done_event():
34063404
ts.who_has.discard(worker)
34073405
self.has_what[worker].discard(ts.key)
34083406
self.log.append((d, "missing-dep", stimulus_id, time()))
3409-
self.batched_stream.send(
3410-
{
3411-
"op": "missing-data",
3412-
"errant_worker": worker,
3413-
"key": d,
3414-
"stimulus_id": stimulus_id,
3415-
}
3407+
instructions.append(
3408+
MissingDataMsg(
3409+
key=d,
3410+
errant_worker=worker,
3411+
stimulus_id=stimulus_id,
3412+
)
34163413
)
34173414
recommendations[ts] = "fetch"
34183415
del data, response
34193416
self.transitions(recommendations, stimulus_id=stimulus_id)
3417+
self._handle_instructions(instructions)
34203418

34213419
if refresh_who_has:
34223420
# All workers that hold known replicas of our tasks are busy.

distributed/worker_state_machine.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -346,14 +346,23 @@ class ReleaseWorkerDataMsg(SendMessageToScheduler):
346346
stimulus_id: str
347347

348348

349+
@dataclass
350+
class MissingDataMsg(SendMessageToScheduler):
351+
op = "missing-data"
352+
353+
__slots__ = ("key", "errant_worker", "stimulus_id")
354+
key: str
355+
errant_worker: str
356+
stimulus_id: str
357+
358+
349359
# Not to be confused with RescheduleEvent below or the distributed.Reschedule Exception
350360
@dataclass
351361
class RescheduleMsg(SendMessageToScheduler):
352362
op = "reschedule"
353363

354-
__slots__ = ("key", "worker", "stimulus_id")
364+
__slots__ = ("key", "stimulus_id")
355365
key: str
356-
worker: str
357366
stimulus_id: str
358367

359368

0 commit comments

Comments
 (0)