-
Notifications
You must be signed in to change notification settings - Fork 378
feat(ray): Implement dynamic scale-in for RaySwordfishActor #5903
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
This commit implements the dynamic scaling down (scale-in) functionality for RaySwordfishActor to release idle resources. Key changes: - Implement `retire_idle_ray_workers` in `RayWorkerManager` to identify and release idle workers. - Add `pending_release_blacklist` to track retiring workers and prevent them from being reused or causing "worker died" errors. - Move scale-down cooldown logic to `RayWorkerManager` to prevent frequent scale-down operations. - Optimize `retire_idle_ray_workers` to reduce lock contention by releasing the lock before performing Ray/Python operations. - Update `try_autoscale` in `flotilla.py` to support empty resource requests, enabling Ray to scale down resources. - Fix unit tests in `src/daft-distributed/src/scheduling/worker.rs` and ensure compatibility with the scheduler loop. This addresses the issue where `udfActor` could not dynamically scale down and prevents "worker died" errors during graceful shutdown.
Greptile SummaryThis PR implements dynamic scale-in functionality for Key Changes:
Configuration:
Issues Found:
Confidence Score: 3/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant S as SchedulerActor
participant WM as RayWorkerManager
participant W as RaySwordfishWorker
participant R as Ray
participant F as Flotilla
Note over S: Scheduler loop iteration
S->>WM: worker_snapshots()
WM->>WM: refresh_workers()
WM->>F: start_ray_workers(existing_ids)
F->>R: ray.nodes()
R-->>F: node list
F-->>WM: RaySwordfishWorker instances
WM-->>S: worker snapshots
S->>S: schedule_tasks()
S->>S: get_autoscaling_request()
alt Scale up needed
S->>WM: try_autoscale(bundles)
WM->>WM: Clear pending_release_blacklist
WM->>F: try_autoscale(bundles)
F->>R: request_resources(bundles)
else No scale up, check scale down
S->>S: Count idle workers
alt idle workers > min_survivor_workers
S->>WM: retire_idle_ray_workers(num_to_retire, false)
WM->>WM: Lock state mutex
WM->>WM: Identify idle workers (idle_duration >= threshold)
WM->>WM: Remove workers from ray_workers
WM->>WM: Add to pending_release_blacklist
WM->>WM: Unlock state mutex
WM->>W: release(py)
W->>W: Check active tasks == 0
W->>W: set_state(Releasing)
W->>R: shutdown()
W->>W: set_state(Released)
WM->>F: clear_autoscaling_requests()
F->>R: request_resources([])
end
end
Note over S: Job completion
S->>WM: retire_idle_ray_workers(all_workers, true)
WM->>F: clear_autoscaling_requests()
F->>R: request_resources([])
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additional Comments (3)
-
src/daft-distributed/src/python/ray/worker_manager.rs, line 287-340 (link)logic: Holding the mutex lock while calling
worker.release(py)and Python operations can cause significant lock contention. The state mutex is held from line 288 through line 340, during which Python GIL operations occur (lines 334-340). This blocks other operations likesubmit_tasks_to_workersunnecessarily.Consider releasing the lock before Python operations:
-
src/daft-distributed/src/python/ray/worker.rs, line 138-146 (link)style: The
releasemethod silently returns early if there are inflight tasks without setting state or logging. This could lead to confusion during debugging.Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
-
src/daft-distributed/src/scheduling/scheduler/scheduler_actor.rs, line 138-144 (link)style: Environment variable parsing with defaults lacks documentation. Consider adding comments explaining these configuration options and their defaults.
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
8 files reviewed, 3 comments
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #5903 +/- ##
==========================================
- Coverage 72.37% 72.30% -0.07%
==========================================
Files 965 965
Lines 125733 126029 +296
==========================================
+ Hits 90996 91130 +134
- Misses 34737 34899 +162
🚀 New features to boost your workflow:
|
This commit implements the dynamic scaling down (scale-in) functionality for RaySwordfishActor to release idle resources.
Key changes:
retire_idle_ray_workersinRayWorkerManagerto identify and release idle workers.pending_release_blacklistto track retiring workers and prevent them from being reused or causing "worker died" errors.RayWorkerManagerto prevent frequent scale-down operations.retire_idle_ray_workersto reduce lock contention by releasing the lock before performing Ray/Python operations.try_autoscaleinflotilla.pyto support empty resource requests, enabling Ray to scale down resources.src/daft-distributed/src/scheduling/worker.rsand ensure compatibility with the scheduler loop.This addresses the issue where
udfActorcould not dynamically scale down and prevents "worker died" errors during graceful shutdown.Changes Made
Related Issues