diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 4e769ad420c..735b8bd9555 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -1025,6 +1025,7 @@ def __init__( "missing-data": self.handle_missing_data, "long-running": self.handle_long_running, "reschedule": self.reschedule, + "keep-alive": lambda comm: None, } client_handlers = { diff --git a/distributed/worker.py b/distributed/worker.py index b32ab2e52a4..2c2c26b2339 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -792,6 +792,13 @@ async def _register_with_scheduler(self): self.batched_stream = BatchedSend(interval="2ms", loop=self.loop) self.batched_stream.start(comm) + pc = PeriodicCallback( + lambda: self.batched_stream.send({"op": "keep-alive"}), + 60000, + io_loop=self.io_loop, + ) + self.periodic_callbacks["keep-alive"] = pc + pc.start() self.periodic_callbacks["heartbeat"].start() self.loop.add_callback(self.handle_scheduler, comm)