Description
In the past months, we've seen occasional deadlock issues connected to the scheduler/worker state machine. To debug this, I wrote a small script which essentially fetches and serializes the entire state of a stuck cluster (see below). Think Scheduler.story
+ Scheduler.get_logs
+ much more
for everything.
The script is very crude and tries to get the information without trying to properly serialize all objects. The emphasize was rather to have a is fully json serializable representation of the cluster to allow for easier portability, formatting, etc.
I wanted to preserve this for prosperity in case this helps anyone.
Further, I am wondering if we wanted to have such a functionality as a first class citizen. Think of Client.collect_cluster_state
(Note: this may create several GBs worth of data). The client function would then try to do a better job. Sometimes it would be sufficient to extend or modify existing identity
method but much more thoroughly, of course. If I were to implement this properly, I would likely start with attaching a to_dict
method to all our classes which yields a json/yaml serializable representation (including task state, worker state, etc.)
Thoughts? Would people consider this helpful?
Disclaimer
the script strips code, traceback, and exceptions from all output such that there should be no IP leak but I don't guarantee anything if there are sensitive logs, task keys, IP addresses, etc.
Last script update: 2021-09-29
from collections import deque
def _normalize(o, simple=False):
from distributed.scheduler import TaskState as TSScheduler
from distributed.scheduler import WorkerState
from distributed.worker import TaskState as TSWorker
# Blacklist runspec since this includes serialized functions
# and arguments which might include sensitive data
blacklist_attributes = [
"runspec",
"_run_spec",
"exception",
"traceback",
"_exception",
"_traceback",
]
if isinstance(o, dict):
try:
res = {}
for k, v in o.items():
k = _normalize(k, simple=simple)
try:
hash(k)
except TypeError:
k = str(k)
v = _normalize(v, simple=simple)
res[k] = v
return res
except TypeError:
print({k: type(k) for k in o})
raise
elif isinstance(o, (set, deque, tuple, list)):
return [_normalize(el, simple=simple) for el in o]
elif isinstance(o, WorkerState):
res = o.identity()
res["memory"] = {
"managed": o.memory.managed,
"managed_in_memory": o.memory.managed_in_memory,
"managed_spilled": o.memory.managed_spilled,
"unmanaged": o.memory.unmanaged,
"unmanaged_recent": o.memory.unmanaged_recent,
}
return res
elif isinstance(o, TSScheduler):
if simple:
# Due to cylcic references in the dependent/dependency graph
# mapping this causes an infinite recursion
return str(o)
base = {
"type": str(type(o)),
"repr": str(o),
}
base.update(
{
s: _normalize(getattr(o, s), simple=True)
for s in TSScheduler.__slots__
if s not in blacklist_attributes
}
)
return base
elif isinstance(o, (TSWorker, TSScheduler)):
if simple:
# Due to cylcic references in the dependent/dependency graph
# mapping this causes an infinite recursion
return str(o)
return _normalize(
{k: v for k, v in o.__dict__.items() if k not in blacklist_attributes},
simple=True,
)
else:
return str(o)
def get_worker_info(dask_worker):
import dask
return _normalize(
{
"status": dask_worker.status,
"ready": dask_worker.ready,
"constrained": dask_worker.constrained,
"long_running": dask_worker.long_running,
"executing_count": dask_worker.executing_count,
"in_flight_tasks": dask_worker.in_flight_tasks,
"in_flight_workers": dask_worker.in_flight_workers,
"paused": dask_worker.paused if hasattr(dask_worker, "paused") else None,
"log": dask_worker.log,
"tasks": dask_worker.tasks,
"memory_limit": dask_worker.memory_limit,
"memory_target_fraction": dask_worker.memory_target_fraction,
"memory_spill_fraction": dask_worker.memory_spill_fraction,
"memory_pause_fraction": dask_worker.memory_pause_fraction,
"logs": dask_worker.get_logs(),
"config": dict(dask.config.config),
"incoming_transfer_log": list(dask_worker.incoming_transfer_log),
"outgoing_transfer_log": list(dask_worker.outgoing_transfer_log),
}
)
def get_scheduler_info(dask_scheduler):
import dask
state = {
"transition_log": dask_scheduler.transition_log,
"log": dask_scheduler.log,
"tasks": dask_scheduler.tasks,
"workers": dask_scheduler.workers,
"logs": dask_scheduler.get_logs(),
"config": dict(dask.config.config),
"events": dask_scheduler.events,
}
if "stealing" in dask_scheduler.extensions:
ext = dask_scheduler.extensions["stealing"]
attrs = [
"stealable_all",
"stealable",
"key_stealable",
"in_flight",
]
stealing = {at: getattr(ext, at) for at in attrs}
in_flight_occ = ext.in_flight_occupancy
stealing["in_flight_occupancy"] = {
ws.name: v for ws, v in in_flight_occ.items()
}
state["stealing"] = stealing
return _normalize(state)
worker_info = client.run(get_worker_info)
scheduler_info = client.run_on_scheduler(get_scheduler_info)