Skip to content

Conversation

@alisterburt
Copy link

This PR adds missing support for grouped workers in SpecCluster._update_worker_status and fixes #9102

Initial support for grouped workers was added in #3013 by @mrocklin
This support is documented and implemented in other parts of SpecCluster so I have assumed that improved support is the right path forwards.

If a single entry in the spec will generate multiple dask workers then
please provide a `"group"` element to the spec, that includes the suffixes
that will be added to each name (this should be handled by your worker
class).
>>> cluster.worker_spec
{
0: {"cls": MultiWorker, "options": {"processes": 3}, "group": ["-0", "-1", -2"]}
1: {"cls": MultiWorker, "options": {"processes": 2}, "group": ["-0", "-1"]}
}
These suffixes should correspond to the names used by the workers when
they deploy.
>>> [ws.name for ws in cluster.scheduler.workers.values()]
["0-0", "0-1", "0-2", "1-0", "1-1"]

repro using LocalCluster from dask-jobqueue as an implementation of SpecCluster with grouped workers
(LocalCluster from distributed does not implement the grouped worker spec)

import time
from dask.distributed import Client, LocalCluster
from dask_jobqueue.local import LocalCluster as JobQueueLocalCluster

from distributed import as_completed

# work func
def job(x):
    time.sleep(1)
    return x + 1


if __name__ == "__main__":
    # cluster setup

    # this works, LocalCluster does not use grouped workers
    # cluster = LocalCluster(
    #     n_workers=2,
    #     threads_per_worker=1,
    #     processes=True,
    #     lifetime="10s",
    #     lifetime_stagger="3s"
    # )

    cluster = JobQueueLocalCluster(
        cores=1,
        processes=2,
        memory="2GiB",
        walltime='0:30:00',
        # uncomment the line below to simulate graceful worker closure
        # c.f. https://jobqueue.dask.org/en/latest/clusters-advanced-tips-and-tricks.html#how-to-handle-job-queueing-system-walltime-killing-workers
        worker_extra_args=["--lifetime", "10s", "--lifetime-stagger", "3s"],
    )
    client = Client(cluster)

    cluster.adapt(
        maximum=4,
        interval='100ms',
        wait_count=1
    )

    njobs = 1000
    futures = client.map(job, range(njobs))

    print("Running test...")
    for future in as_completed(futures):
        print("cluster.workers keys: ", list(cluster.workers.keys()))
        print("cluster.worker_spec keys: ", list(cluster.worker_spec.keys()))

output before this PR with print statements to show what's going on, duplicate lines removed:

Running test...
target=4, len(self.plan)=0, len(self.observed)=0, len(self.requested)=0
target=4, len(self.plan)=4, len(self.observed)=1, len(self.requested)=4
target=4, len(self.plan)=4, len(self.observed)=4, len(self.requested)=4
...
target=4, len(self.plan)=4, len(self.observed)=4, len(self.requested)=4
cluster.workers keys:  ['LocalCluster-0', 'LocalCluster-1']
cluster.worker_spec keys:  ['LocalCluster-0', 'LocalCluster-1']
remove signal received for: LocalCluster-0-1
target=4, len(self.plan)=4, len(self.observed)=3, len(self.requested)=4
cluster.workers keys:  ['LocalCluster-0', 'LocalCluster-1']
cluster.worker_spec keys:  ['LocalCluster-0', 'LocalCluster-1']
remove signal received for: LocalCluster-1-0
target=4, len(self.plan)=4, len(self.observed)=2, len(self.requested)=4
remove signal received for: LocalCluster-0-0
target=4, len(self.plan)=4, len(self.observed)=1, len(self.requested)=4
cluster.workers keys:  ['LocalCluster-0', 'LocalCluster-1']
cluster.worker_spec keys:  ['LocalCluster-0', 'LocalCluster-1']
remove signal received for: LocalCluster-1-1
target=4, len(self.plan)=4, len(self.observed)=0, len(self.requested)=4
target=4, len(self.plan)=4, len(self.observed)=0, len(self.requested)=4
target=4, len(self.plan)=4, len(self.observed)=0, len(self.requested)=4
target=4, len(self.plan)=4, len(self.observed)=0, len(self.requested)=4
target=4, len(self.plan)=4, len(self.observed)=0, len(self.requested)=4
...
# never recovers

output after this PR with the same print statements

Running test...
target=4, len(self.plan)=0, len(self.observed)=0, len(self.requested)=0
target=4, len(self.plan)=4, len(self.observed)=1, len(self.requested)=4
target=4, len(self.plan)=4, len(self.observed)=4, len(self.requested)=4
cluster.workers keys:  ['LocalCluster-1', 'LocalCluster-0']
cluster.worker_spec keys:  ['LocalCluster-0', 'LocalCluster-1']

remove signal received for: LocalCluster-0-1
target=4, len(self.plan)=4, len(self.observed)=3, len(self.requested)=4
cluster.workers keys:  ['LocalCluster-1', 'LocalCluster-0']
cluster.worker_spec keys:  ['LocalCluster-0', 'LocalCluster-1']

remove signal received for: LocalCluster-1-0
target=4, len(self.plan)=4, len(self.observed)=2, len(self.requested)=4
cluster.workers keys:  ['LocalCluster-1', 'LocalCluster-0']
cluster.worker_spec keys:  ['LocalCluster-0', 'LocalCluster-1']

remove signal received for: LocalCluster-0-0
target=4, len(self.plan)=4, len(self.observed)=1, len(self.requested)=4
cluster.workers keys:  ['LocalCluster-1', 'LocalCluster-0']
cluster.worker_spec keys:  ['LocalCluster-0', 'LocalCluster-1']

remove signal received for: LocalCluster-1-1
target=4, len(self.plan)=4, len(self.observed)=0, len(self.requested)=4

# recovers!
target=4, len(self.plan)=2, len(self.observed)=0, len(self.requested)=2
target=4, len(self.plan)=4, len(self.observed)=1, len(self.requested)=4
target=4, len(self.plan)=4, len(self.observed)=2, len(self.requested)=4
target=4, len(self.plan)=2, len(self.observed)=2, len(self.requested)=2
target=4, len(self.plan)=4, len(self.observed)=3, len(self.requested)=4
target=4, len(self.plan)=4, len(self.observed)=4, len(self.requested)=4

@github-actions
Copy link
Contributor

github-actions bot commented Aug 26, 2025

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

    27 files  ±    0      27 suites  ±0   9h 34m 46s ⏱️ - 1h 28m 49s
 4 115 tests  -     1   4 007 ✅ +    7    104 💤  -  7  4 ❌ ±0 
51 555 runs  +1 206  49 367 ✅ +1 245  2 184 💤  - 38  4 ❌ ±0 

For more details on these failures, see this check.

Results for commit 7f0eb4c. ± Comparison against base commit 0f0adef.

This pull request removes 7 and adds 6 tests. Note that renamed tests count towards both.
distributed.comm.tests.test_comms ‑ test_ucx_client_server
distributed.comm.tests.test_ucx
distributed.comm.tests.test_ucx_config
distributed.tests.test_nanny ‑ test_nanny_closed_by_keyboard_interrupt[tcp]
distributed.tests.test_nanny ‑ test_nanny_closed_by_keyboard_interrupt[ucx]
distributed.tests.test_worker ‑ test_protocol_from_scheduler_address[Nanny]
distributed.tests.test_worker ‑ test_protocol_from_scheduler_address[Worker]
distributed.deploy.tests.test_spec_cluster ‑ test_adaptive_grouped_workers
distributed.deploy.tests.test_spec_cluster ‑ test_unexpected_close_single_grouped_worker
distributed.deploy.tests.test_spec_cluster ‑ test_unexpected_close_whole_worker_group
distributed.http.scheduler.tests.test_scheduler_http ‑ test_retire_workers_with_tuple_keys
distributed.tests.test_nanny ‑ test_nanny_closed_by_keyboard_interrupt
distributed.tests.test_worker ‑ test_executor_inherit_threadname_from_worker

♻️ This comment has been updated with latest results.

Copy link
Member

@jacobtomlinson jacobtomlinson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @alisterburt this looks great.

Could I ask you to add a small test along the lines of what you mention in the PR description to verify this PR works as expected?

@alisterburt
Copy link
Author

@jacobtomlinson great suggestion, test added with direct usage of SpecCluster and MultiWorker rather than LocalCluster from dask-jobqueue

This test seems to work but I had some problems with the test suite locally that I assume are flaky tests - happy to help work through any issues this PR might have created if they crop up

Copy link
Member

@jacobtomlinson jacobtomlinson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CI is flaky but it looks like test_unexpected_closed_worker is failing pretty consistently in CI here, so I think it's related to the changes in this PR. There's also at least one where test_adaptive_grouped_workers is failing too.

Could you take another look?

@alisterburt
Copy link
Author

alisterburt commented Oct 1, 2025

@jacobtomlinson sorry for delays - hoping to get to this soon

This PR definitely has some issue, I've been running it in 'production' workloads for a few weeks and while most of the time it's fine there are definitely some situations where the scheduler fails to recover as before this PR... need to dig some more, perhaps related to #9103

edit: adding a concrete observation now that I've added some detailed logging

end state when cluster is hanging (SLURMCluster from dask-jobqueue in this case)

Adaptive target: 1080, plan: 1080, requested: 1080, observed: 0

  **Core fixes:**

  1. **scale() method now correctly handles grouped workers** (spec.py:603-631)
     - Parameter `n` now represents number of workers, not number of specs
     - For grouped workers: converts worker count to spec count internally
     - For memory/cores parameters: correctly handles that these represent specs
     - Fixes adaptive scaling to work properly with grouped workers

  2. **_update_worker_status() handles grouped worker removal** (spec.py:465-510)
     - Added defensive check for workers already removed from scheduler_info
     - When ANY worker from a group dies, entire spec is removed (HPC assumption)
     - Regular workers keep their spec for recreation

  3. **Helper methods for name conversion** (spec.py:343-390)
     - _spec_name_to_worker_names(): converts spec name to scheduler worker names
     - _worker_name_to_spec_name(): converts scheduler worker name to spec name
     - Ensures consistent handling throughout codebase

  4. **Fixed _correct_state_internal() grouped worker retirement** (spec.py:405-425)
     - Uses actual scheduler worker names instead of spec names
     - Properly retires grouped workers using helper methods

  5. **Fixed scale_down() to use helper methods** (spec.py:655-671)
     - Consistent name conversion for both regular and grouped workers

  **Tests added:**

  - test_unexpected_close_single_grouped_worker: Tests recovery when one worker
    from a group dies (graceful close)
  - test_unexpected_close_whole_worker_group: Tests recovery when entire group
    is pre-empted (simulates SLURM job kill)
  - test_adaptive_grouped_workers: Tests adaptive scaling maintains correct
    worker count with grouped workers

  **Semantic change:**

  The `scale(n)` method now consistently means "scale to n workers" rather than
  "scale to n specs". This aligns with how adaptive scaling and the plan/requested/
  observed properties work. Tests updated to match this semantic.

  Fixes grouped worker support comprehensively rather than applying point fixes.
@alisterburt
Copy link
Author

this PR is a bandaid at best and more changes are required to ensure grouped workers are properly supported/handled by SpecCluster, I'd rather do this properly and that will benefit from starting clean

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

SpecCluster fails to remove grouped workers when they die, breaking adaptive scaling

2 participants