-
-
Notifications
You must be signed in to change notification settings - Fork 747
add support for grouped workers in SpecCluster._update_worker_status
#9104
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 27 files ± 0 27 suites ±0 9h 34m 46s ⏱️ - 1h 28m 49s For more details on these failures, see this check. Results for commit 7f0eb4c. ± Comparison against base commit 0f0adef. This pull request removes 7 and adds 6 tests. Note that renamed tests count towards both.♻️ This comment has been updated with latest results. |
jacobtomlinson
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @alisterburt this looks great.
Could I ask you to add a small test along the lines of what you mention in the PR description to verify this PR works as expected?
…within grouped worker
|
@jacobtomlinson great suggestion, test added with direct usage of This test seems to work but I had some problems with the test suite locally that I assume are flaky tests - happy to help work through any issues this PR might have created if they crop up |
jacobtomlinson
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CI is flaky but it looks like test_unexpected_closed_worker is failing pretty consistently in CI here, so I think it's related to the changes in this PR. There's also at least one where test_adaptive_grouped_workers is failing too.
Could you take another look?
|
@jacobtomlinson sorry for delays - hoping to get to this soon This PR definitely has some issue, I've been running it in 'production' workloads for a few weeks and while most of the time it's fine there are definitely some situations where the scheduler fails to recover as before this PR... need to dig some more, perhaps related to #9103 edit: adding a concrete observation now that I've added some detailed logging end state when cluster is hanging (SLURMCluster from dask-jobqueue in this case) Adaptive target: 1080, plan: 1080, requested: 1080, observed: 0 |
**Core fixes:**
1. **scale() method now correctly handles grouped workers** (spec.py:603-631)
- Parameter `n` now represents number of workers, not number of specs
- For grouped workers: converts worker count to spec count internally
- For memory/cores parameters: correctly handles that these represent specs
- Fixes adaptive scaling to work properly with grouped workers
2. **_update_worker_status() handles grouped worker removal** (spec.py:465-510)
- Added defensive check for workers already removed from scheduler_info
- When ANY worker from a group dies, entire spec is removed (HPC assumption)
- Regular workers keep their spec for recreation
3. **Helper methods for name conversion** (spec.py:343-390)
- _spec_name_to_worker_names(): converts spec name to scheduler worker names
- _worker_name_to_spec_name(): converts scheduler worker name to spec name
- Ensures consistent handling throughout codebase
4. **Fixed _correct_state_internal() grouped worker retirement** (spec.py:405-425)
- Uses actual scheduler worker names instead of spec names
- Properly retires grouped workers using helper methods
5. **Fixed scale_down() to use helper methods** (spec.py:655-671)
- Consistent name conversion for both regular and grouped workers
**Tests added:**
- test_unexpected_close_single_grouped_worker: Tests recovery when one worker
from a group dies (graceful close)
- test_unexpected_close_whole_worker_group: Tests recovery when entire group
is pre-empted (simulates SLURM job kill)
- test_adaptive_grouped_workers: Tests adaptive scaling maintains correct
worker count with grouped workers
**Semantic change:**
The `scale(n)` method now consistently means "scale to n workers" rather than
"scale to n specs". This aligns with how adaptive scaling and the plan/requested/
observed properties work. Tests updated to match this semantic.
Fixes grouped worker support comprehensively rather than applying point fixes.
|
this PR is a bandaid at best and more changes are required to ensure grouped workers are properly supported/handled by SpecCluster, I'd rather do this properly and that will benefit from starting clean |
This PR adds missing support for grouped workers in
SpecCluster._update_worker_statusand fixes #9102Initial support for grouped workers was added in #3013 by @mrocklin
This support is documented and implemented in other parts of
SpecClusterso I have assumed that improved support is the right path forwards.distributed/distributed/deploy/spec.py
Lines 222 to 237 in 0f0adef
repro using
LocalClusterfromdask-jobqueueas an implementation ofSpecClusterwith grouped workers(
LocalClusterfromdistributeddoes not implement the grouped worker spec)output before this PR with print statements to show what's going on, duplicate lines removed:
Running test... target=4, len(self.plan)=0, len(self.observed)=0, len(self.requested)=0 target=4, len(self.plan)=4, len(self.observed)=1, len(self.requested)=4 target=4, len(self.plan)=4, len(self.observed)=4, len(self.requested)=4 ... target=4, len(self.plan)=4, len(self.observed)=4, len(self.requested)=4 cluster.workers keys: ['LocalCluster-0', 'LocalCluster-1'] cluster.worker_spec keys: ['LocalCluster-0', 'LocalCluster-1'] remove signal received for: LocalCluster-0-1 target=4, len(self.plan)=4, len(self.observed)=3, len(self.requested)=4 cluster.workers keys: ['LocalCluster-0', 'LocalCluster-1'] cluster.worker_spec keys: ['LocalCluster-0', 'LocalCluster-1'] remove signal received for: LocalCluster-1-0 target=4, len(self.plan)=4, len(self.observed)=2, len(self.requested)=4 remove signal received for: LocalCluster-0-0 target=4, len(self.plan)=4, len(self.observed)=1, len(self.requested)=4 cluster.workers keys: ['LocalCluster-0', 'LocalCluster-1'] cluster.worker_spec keys: ['LocalCluster-0', 'LocalCluster-1'] remove signal received for: LocalCluster-1-1 target=4, len(self.plan)=4, len(self.observed)=0, len(self.requested)=4 target=4, len(self.plan)=4, len(self.observed)=0, len(self.requested)=4 target=4, len(self.plan)=4, len(self.observed)=0, len(self.requested)=4 target=4, len(self.plan)=4, len(self.observed)=0, len(self.requested)=4 target=4, len(self.plan)=4, len(self.observed)=0, len(self.requested)=4 ... # never recoversoutput after this PR with the same print statements
Running test... target=4, len(self.plan)=0, len(self.observed)=0, len(self.requested)=0 target=4, len(self.plan)=4, len(self.observed)=1, len(self.requested)=4 target=4, len(self.plan)=4, len(self.observed)=4, len(self.requested)=4 cluster.workers keys: ['LocalCluster-1', 'LocalCluster-0'] cluster.worker_spec keys: ['LocalCluster-0', 'LocalCluster-1'] remove signal received for: LocalCluster-0-1 target=4, len(self.plan)=4, len(self.observed)=3, len(self.requested)=4 cluster.workers keys: ['LocalCluster-1', 'LocalCluster-0'] cluster.worker_spec keys: ['LocalCluster-0', 'LocalCluster-1'] remove signal received for: LocalCluster-1-0 target=4, len(self.plan)=4, len(self.observed)=2, len(self.requested)=4 cluster.workers keys: ['LocalCluster-1', 'LocalCluster-0'] cluster.worker_spec keys: ['LocalCluster-0', 'LocalCluster-1'] remove signal received for: LocalCluster-0-0 target=4, len(self.plan)=4, len(self.observed)=1, len(self.requested)=4 cluster.workers keys: ['LocalCluster-1', 'LocalCluster-0'] cluster.worker_spec keys: ['LocalCluster-0', 'LocalCluster-1'] remove signal received for: LocalCluster-1-1 target=4, len(self.plan)=4, len(self.observed)=0, len(self.requested)=4 # recovers! target=4, len(self.plan)=2, len(self.observed)=0, len(self.requested)=2 target=4, len(self.plan)=4, len(self.observed)=1, len(self.requested)=4 target=4, len(self.plan)=4, len(self.observed)=2, len(self.requested)=4 target=4, len(self.plan)=2, len(self.observed)=2, len(self.requested)=2 target=4, len(self.plan)=4, len(self.observed)=3, len(self.requested)=4 target=4, len(self.plan)=4, len(self.observed)=4, len(self.requested)=4