Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1298,7 +1298,7 @@
try:
return self.cluster.dashboard_link
except AttributeError:
scheduler, info = self._get_scheduler_info()
scheduler, info = self._get_scheduler_info(n_workers=0)

Check warning on line 1301 in distributed/client.py

View check run for this annotation

Codecov / codecov/patch

distributed/client.py#L1301

Added line #L1301 was not covered by tests
Copy link

Copilot AI Apr 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using n_workers=0 in this context results in an empty workers list. If the intention is to provide a default view for diagnostics, consider using a value such as -1 (to fetch all) or the same default limit (e.g. 5) used elsewhere.

Suggested change
scheduler, info = self._get_scheduler_info(n_workers=0)
scheduler, info = self._get_scheduler_info(n_workers=-1)

Copilot uses AI. Check for mistakes.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's intentional since the workers info is not even accessed here

if scheduler is None:
return None
else:
Expand All @@ -1312,20 +1312,20 @@

return format_dashboard_link(host, port)

def _get_scheduler_info(self):
def _get_scheduler_info(self, n_workers):
from distributed.scheduler import Scheduler

if (
self.cluster
and hasattr(self.cluster, "scheduler")
and isinstance(self.cluster.scheduler, Scheduler)
):
info = self.cluster.scheduler.identity()
info = self.cluster.scheduler.identity(n_workers=n_workers)

Check warning on line 1323 in distributed/client.py

View check run for this annotation

Codecov / codecov/patch

distributed/client.py#L1323

Added line #L1323 was not covered by tests
scheduler = self.cluster.scheduler
elif (
self._loop_runner.is_started() and self.scheduler and not self.asynchronous
):
info = sync(self.loop, self.scheduler.identity)
info = sync(self.loop, self.scheduler.identity, n_workers=n_workers)

Check warning on line 1328 in distributed/client.py

View check run for this annotation

Codecov / codecov/patch

distributed/client.py#L1328

Added line #L1328 was not covered by tests
scheduler = self.scheduler
else:
info = self._scheduler_identity
Expand Down Expand Up @@ -1368,7 +1368,7 @@
except PackageNotFoundError:
JUPYTERLAB = False

scheduler, info = self._get_scheduler_info()
scheduler, info = self._get_scheduler_info(n_workers=5)

Check warning on line 1371 in distributed/client.py

View check run for this annotation

Codecov / codecov/patch

distributed/client.py#L1371

Added line #L1371 was not covered by tests

return get_template("client.html.j2").render(
id=self.id,
Expand Down Expand Up @@ -1585,18 +1585,20 @@

logger.debug("Started scheduling coroutines. Synchronized")

async def _update_scheduler_info(self):
async def _update_scheduler_info(self, n_workers=5):
if self.status not in ("running", "connecting") or self.scheduler is None:
return
try:
self._scheduler_identity = SchedulerInfo(await self.scheduler.identity())
self._scheduler_identity = SchedulerInfo(
await self.scheduler.identity(n_workers=n_workers)
)
except OSError:
logger.debug("Not able to query scheduler for identity")

async def _wait_for_workers(
self, n_workers: int, timeout: float | None = None
) -> None:
info = await self.scheduler.identity()
info = await self.scheduler.identity(n_workers=-1)
self._scheduler_identity = SchedulerInfo(info)
if timeout:
deadline = time() + parse_timedelta(timeout)
Expand All @@ -1619,7 +1621,7 @@
% (running_workers(info), n_workers, timeout)
)
await asyncio.sleep(0.1)
info = await self.scheduler.identity()
info = await self.scheduler.identity(n_workers=-1)
self._scheduler_identity = SchedulerInfo(info)

def wait_for_workers(self, n_workers: int, timeout: float | None = None) -> None:
Expand Down Expand Up @@ -4407,11 +4409,14 @@
else:
return state

def scheduler_info(self, **kwargs):
def scheduler_info(self, n_workers: int = 5, **kwargs: Any) -> SchedulerInfo:
"""Basic information about the workers in the cluster
Parameters
----------
n_workers: int
The number of workers for which to fetch information. To fetch all,
use -1.
**kwargs : dict
Optional keyword arguments for the remote function
Expand All @@ -4429,7 +4434,7 @@
'time-delay': 0.0061032772064208984}}}
"""
if not self.asynchronous:
self.sync(self._update_scheduler_info)
self.sync(self._update_scheduler_info, n_workers=n_workers)
return self._scheduler_identity

def dump_cluster_state(
Expand Down
15 changes: 13 additions & 2 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1654,6 +1654,8 @@ class SchedulerState:
idle_task_count: set[WorkerState]
#: Workers that are fully utilized. May include non-running workers.
saturated: set[WorkerState]
#: Current total memory across all workers (sum over memory_limit)
total_memory: int
#: Current number of threads across all workers
total_nthreads: int
#: History of number of threads
Expand Down Expand Up @@ -1778,6 +1780,7 @@ def __init__(
self.task_groups = {}
self.task_prefixes = {}
self.task_metadata = {}
self.total_memory = 0
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's almost overkill to maintain additional state for this but if we have to call identity often I want it to be constant time

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I'm surprised we are not tracking this already. Seems like a useful metric for adaptive scaling or smth like that 🤷

self.total_nthreads = 0
self.total_nthreads_history = [(time(), 0)]
self.queued = queued
Expand Down Expand Up @@ -4075,16 +4078,22 @@ def _repr_html_(self) -> str:
tasks=self.tasks,
)

def identity(self) -> dict[str, Any]:
def identity(self, n_workers: int = -1) -> dict[str, Any]:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the default of -1 is to maintain backwards compat (no idea who is calling this in the wild). All callers in this code base are setting this explicitly

"""Basic information about ourselves and our cluster"""
if n_workers == -1:
n_workers = len(self.workers)
d = {
"type": type(self).__name__,
"id": str(self.id),
"address": self.address,
"services": {key: v.port for (key, v) in self.services.items()},
"started": self.time_started,
"n_workers": len(self.workers),
"total_threads": self.total_nthreads,
"total_memory": self.total_memory,
"workers": {
worker.address: worker.identity() for worker in self.workers.values()
worker.address: worker.identity()
for worker in itertools.islice(self.workers.values(), n_workers)
},
}
return d
Expand Down Expand Up @@ -4535,6 +4544,7 @@ async def add_worker(
dh_addresses.add(address)
dh["nthreads"] += nthreads

self.total_memory += ws.memory_limit
self.total_nthreads += nthreads
self.total_nthreads_history.append((time(), self.total_nthreads))
self.aliases[name] = address
Expand Down Expand Up @@ -5446,6 +5456,7 @@ async def remove_worker(
dh_addresses: set = dh["addresses"]
dh_addresses.remove(address)
dh["nthreads"] -= ws.nthreads
self.total_memory -= ws.memory_limit
self.total_nthreads -= ws.nthreads
self.total_nthreads_history.append((time(), self.total_nthreads))
if not dh_addresses:
Expand Down
4 changes: 4 additions & 0 deletions distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3923,6 +3923,10 @@ def test_scheduler_info(c):
assert isinstance(info, dict)
assert len(info["workers"]) == 2
assert isinstance(info["started"], float)
info = c.scheduler_info(n_workers=1)
assert len(info["workers"]) == 1
info = c.scheduler_info(n_workers=-1)
assert len(info["workers"]) == 2


def test_write_scheduler_file(c, loop):
Expand Down
3 changes: 3 additions & 0 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4301,6 +4301,9 @@ async def test_Scheduler__to_dict(c, s, a):
"extensions",
"services",
"started",
"n_workers",
"total_threads",
"total_memory",
"workers",
"status",
"thread_id",
Expand Down
6 changes: 3 additions & 3 deletions distributed/widgets/templates/scheduler_info.html.j2
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,23 @@
<strong>Comm:</strong> {{ address }}
</td>
<td style="text-align: left;">
<strong>Workers:</strong> {{ workers | length }}
<strong>Workers:</strong> {{ n_workers }} {% if n_workers > workers | length %} (shown below: {{ workers | length }}) {% endif %}
</td>
</tr>
<tr>
<td style="text-align: left;">
<strong>Dashboard:</strong> <a href="{{ scheduler | format_dashboard_address }}" target="_blank">{{ scheduler | format_dashboard_address }}</a>
</td>
<td style="text-align: left;">
<strong>Total threads:</strong> {{ workers.values() | map(attribute='nthreads') | sum }}
<strong>Total threads:</strong> {{ total_threads }}
</td>
</tr>
<tr>
<td style="text-align: left;">
<strong>Started:</strong> {{ started | datetime_from_timestamp | format_time_ago }}
</td>
<td style="text-align: left;">
<strong>Total memory:</strong> {{ workers.values() | map(attribute='memory_limit') | sum | format_bytes }}
<strong>Total memory:</strong> {{ total_memory | format_bytes }}
</td>
</tr>
</table>
Expand Down
Loading