Skip to content

Commit d2944ef

Browse files
committed
transition_queued_memory
1 parent 621994e commit d2944ef

File tree

2 files changed

+113
-6
lines changed

2 files changed

+113
-6
lines changed

distributed/scheduler.py

Lines changed: 51 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2325,6 +2325,51 @@ def transition_waiting_memory(
23252325
pdb.set_trace()
23262326
raise
23272327

2328+
def transition_queued_memory(
2329+
self,
2330+
key: str,
2331+
stimulus_id: str,
2332+
*,
2333+
nbytes: int | None = None,
2334+
type: bytes | None = None,
2335+
typename: str | None = None,
2336+
worker: str,
2337+
**kwargs: Any,
2338+
):
2339+
try:
2340+
ws: WorkerState = self.workers[worker]
2341+
ts: TaskState = self.tasks[key]
2342+
recommendations: dict = {}
2343+
client_msgs: dict = {}
2344+
worker_msgs: dict = {}
2345+
2346+
if self.validate:
2347+
assert not ts.processing_on
2348+
assert ts.state == "queued"
2349+
2350+
if nbytes is not None:
2351+
ts.set_nbytes(nbytes)
2352+
2353+
self.queued.remove(ts)
2354+
self.check_idle_saturated(ws)
2355+
2356+
_add_to_memory(
2357+
self, ts, ws, recommendations, client_msgs, type=type, typename=typename
2358+
)
2359+
2360+
if self.validate:
2361+
assert not ts.processing_on
2362+
assert ts.who_has
2363+
2364+
return recommendations, client_msgs, worker_msgs
2365+
except Exception as e:
2366+
logger.exception(e)
2367+
if LOG_PDB:
2368+
import pdb
2369+
2370+
pdb.set_trace()
2371+
raise
2372+
23282373
def transition_processing_memory(
23292374
self,
23302375
key: str,
@@ -2646,7 +2691,7 @@ def transition_processing_released(self, key: str, stimulus_id: str):
26462691
}
26472692
]
26482693

2649-
_propagage_released(self, ts, recommendations)
2694+
_propagate_released(self, ts, recommendations)
26502695
return recommendations, {}, worker_msgs
26512696
except Exception as e:
26522697
logger.exception(e)
@@ -2870,7 +2915,7 @@ def transition_queued_released(self, key, stimulus_id):
28702915

28712916
self.queued.remove(ts)
28722917

2873-
_propagage_released(self, ts, recommendations)
2918+
_propagate_released(self, ts, recommendations)
28742919
return recommendations, client_msgs, worker_msgs
28752920
except Exception as e:
28762921
logger.exception(e)
@@ -3016,6 +3061,7 @@ def transition_released_forgotten(self, key, stimulus_id):
30163061
("waiting", "no-worker"): transition_waiting_no_worker,
30173062
("waiting", "queued"): transition_waiting_queued,
30183063
("waiting", "memory"): transition_waiting_memory,
3064+
("queued", "memory"): transition_queued_memory,
30193065
("queued", "released"): transition_queued_released,
30203066
("queued", "processing"): transition_queued_processing,
30213067
("processing", "released"): transition_processing_released,
@@ -7951,7 +7997,7 @@ def _add_to_memory(
79517997
)
79527998

79537999

7954-
def _propagage_released(
8000+
def _propagate_released(
79558001
state: SchedulerState,
79568002
ts: TaskState,
79578003
recommendations: Recs,
@@ -8305,10 +8351,9 @@ def heartbeat_interval(n: int) -> float:
83058351

83068352

83078353
def _task_slots_available(ws: WorkerState, saturation_factor: float) -> int:
8308-
"Number of tasks that can be sent to this worker without oversaturating it"
8354+
"""Number of tasks that can be sent to this worker without oversaturating it"""
83098355
assert not math.isinf(saturation_factor)
8310-
nthreads = ws.nthreads
8311-
return max(math.ceil(saturation_factor * nthreads), 1) - (
8356+
return max(math.ceil(saturation_factor * ws.nthreads), 1) - (
83128357
len(ws.processing) - len(ws.long_running)
83138358
)
83148359

distributed/tests/test_steal.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1870,3 +1870,65 @@ async def test_trivial_workload_should_not_cause_work_stealing(c, s, *workers):
18701870
await c.gather(futs)
18711871
events = s.events["stealing"]
18721872
assert len(events) == 0
1873+
1874+
1875+
@gen_cluster(
1876+
client=True,
1877+
nthreads=[("", 1)] * 2,
1878+
config={
1879+
"distributed.scheduler.worker-saturation": 1.5,
1880+
"distributed.scheduler.work-stealing": False,
1881+
},
1882+
timeout=3,
1883+
)
1884+
async def test_transition_queued_memory(c, s, a, b):
1885+
"""https://github.com/dask/distributed/issues/7200"""
1886+
ev0 = Event()
1887+
ev1 = Event()
1888+
run_once_lock = Lock()
1889+
sched_id = id(s)
1890+
1891+
async def f0(ev: Event, run_once_lock: Lock) -> None:
1892+
await ev.wait()
1893+
if await run_once_lock.acquire(blocking=False):
1894+
s = next(si for si in Scheduler._instances if id(si) == sched_id)
1895+
# The task is async, so it runs in the same thread as the scheduler
1896+
s.reschedule("x-0", stimulus_id="steal")
1897+
1898+
# You need at least 5 tasks for the group to be rootish
1899+
futures = [
1900+
c.submit(f0, ev0, run_once_lock, key="x-0"),
1901+
c.submit(lambda ev: ev.wait(), ev1, key="x-1"),
1902+
c.submit(lambda ev: ev.wait(), ev1, key="x-2"),
1903+
c.submit(lambda ev: ev.wait(), ev1, key="x-3"),
1904+
c.submit(lambda ev: ev.wait(), ev1, key="x-4"),
1905+
]
1906+
while a.state.executing_count != 1 or b.state.executing_count != 1:
1907+
await asyncio.sleep(0.01)
1908+
assert s.tasks["x-0"].state == "processing"
1909+
assert s.tasks["x-4"].state == "queued"
1910+
1911+
retire_ws = s.tasks["x-0"].processing_on
1912+
await s.retire_workers([retire_ws.address], close=False, remove=False)
1913+
assert retire_ws.status == Status.closing_gracefully
1914+
1915+
# Reschedule x-0.
1916+
# Since the worker where it's processing is not running, it can only be rescheduled
1917+
# on the other worker, which however is full. So x-0 transitions to queued.
1918+
# The reschedule also sends a {op: free-tasks} message to the worker running x-0,
1919+
# which would normally transition the task to cancelled on the worker and prevent
1920+
# a message when the task finishes. However, the task finishes before such message
1921+
# can arrive, so the task transitions to memory on the worker and sends back a
1922+
# {op: task-finished} message to the scheduler.
1923+
# This triggers a queued->memory transition.
1924+
await ev0.set()
1925+
1926+
while not any(ev[:3] == ("x-0", "queued", "memory") for ev in s.story("x-0")):
1927+
await asyncio.sleep(0.01)
1928+
1929+
await ev1.set()
1930+
# Send tasks stuck on the worker stuck in closing_gracefully state back to the
1931+
# scheduler
1932+
await s.retire_workers([retire_ws.address])
1933+
1934+
await c.gather(futures)

0 commit comments

Comments
 (0)