Skip to content

Commit febd41d

Browse files
authored
Restart worker if it's unrecognized by scheduler (#6505)
1 parent fce6281 commit febd41d

File tree

2 files changed

+24
-1
lines changed

2 files changed

+24
-1
lines changed

distributed/tests/test_worker.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1808,6 +1808,28 @@ async def close(self):
18081808
assert not s.workers
18091809

18101810

1811+
@gen_cluster(
1812+
client=True,
1813+
nthreads=[("", 1)],
1814+
Worker=Nanny,
1815+
worker_kwargs={"heartbeat_interval": "1ms"},
1816+
)
1817+
async def test_heartbeat_missing_restarts(c, s: Scheduler, n: Nanny):
1818+
old_heartbeat_handler = s.handlers["heartbeat_worker"]
1819+
s.handlers["heartbeat_worker"] = lambda *args, **kwargs: {"status": "missing"}
1820+
1821+
assert n.process
1822+
await n.process.stopped.wait()
1823+
1824+
assert not s.workers
1825+
s.handlers["heartbeat_worker"] = old_heartbeat_handler
1826+
1827+
await n.process.running.wait()
1828+
assert n.status == Status.running
1829+
1830+
await c.wait_for_workers(1)
1831+
1832+
18111833
@gen_cluster(nthreads=[])
18121834
async def test_bad_local_directory(s):
18131835
try:

distributed/worker.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1257,7 +1257,8 @@ async def heartbeat(self):
12571257
logger.error(
12581258
f"Scheduler was unaware of this worker {self.address!r}. Shutting down."
12591259
)
1260-
await self.close()
1260+
# Something is out of sync; have the nanny restart us if possible.
1261+
await self.close(nanny=False)
12611262
return
12621263

12631264
self.scheduler_delay = response["time"] - middle

0 commit comments

Comments
 (0)