Skip to content

Commit 54b3498

Browse files
committed
tweaks
1 parent 71a31a7 commit 54b3498

File tree

2 files changed

+10
-9
lines changed

2 files changed

+10
-9
lines changed

distributed/tests/test_worker_state_machine.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -475,9 +475,9 @@ async def test_forget_data_needed(c, s, a, b):
475475
popped from data_needed
476476
3. Task is forgotten
477477
4. Task is recreated from scratch and transitioned to fetch again
478-
5. BUG: at the moment of writing this test, data_needed.push silently did nothing,
479-
because it still contained the forgotten task, which is a different TaskState
480-
instance which will be no longer updated.
478+
5. BUG: at the moment of writing this test, adding to data_needed silently did
479+
nothing, because it still contained the forgotten task, which is a different
480+
TaskState instance which will be no longer updated.
481481
6. _ensure_communicating runs. It pops the forgotten task and discards it.
482482
7. We now have a task stuck in fetch state.
483483
"""

distributed/worker.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2998,13 +2998,14 @@ def _ensure_communicating(self, *, stimulus_id: str) -> RecsInstrs:
29982998

29992999
ts = self.data_needed.pop()
30003000

3001-
if not ts.who_has:
3002-
recommendations[ts] = "missing"
3003-
continue
30043001
if self.validate:
30053002
assert ts.state == "fetch"
30063003
assert self.address not in ts.who_has
30073004

3005+
if not ts.who_has:
3006+
recommendations[ts] = "missing"
3007+
continue
3008+
30083009
workers = [
30093010
w
30103011
for w in ts.who_has
@@ -3315,7 +3316,7 @@ def done_event():
33153316
)
33163317
for d in has_what:
33173318
ts = self.tasks[d]
3318-
ts.who_has.discard(worker)
3319+
ts.who_has.remove(worker)
33193320
if not ts.who_has and ts.state in (
33203321
"fetch",
33213322
"flight",
@@ -4639,7 +4640,7 @@ async def _get_data():
46394640
cache_loads = LRU(maxsize=100)
46404641

46414642

4642-
def loads_function(bytes_object: bytes) -> Callable:
4643+
def loads_function(bytes_object):
46434644
"""Load a function from bytes, cache bytes"""
46444645
if len(bytes_object) < 100000:
46454646
try:
@@ -4691,7 +4692,7 @@ def execute_task(task):
46914692
_cache_lock = threading.Lock()
46924693

46934694

4694-
def dumps_function(func: Callable) -> bytes:
4695+
def dumps_function(func) -> bytes:
46954696
"""Dump a function to bytes, cache functions"""
46964697
try:
46974698
with _cache_lock:

0 commit comments

Comments
 (0)