Skip to content

Conversation

@huleilei
Copy link
Contributor

This commit implements the dynamic scaling down (scale-in) functionality for RaySwordfishActor to release idle resources.

Key changes:

  • Implement retire_idle_ray_workers in RayWorkerManager to identify and release idle workers.
  • Add pending_release_blacklist to track retiring workers and prevent them from being reused or causing "worker died" errors.
  • Move scale-down cooldown logic to RayWorkerManager to prevent frequent scale-down operations.
  • Optimize retire_idle_ray_workers to reduce lock contention by releasing the lock before performing Ray/Python operations.
  • Update try_autoscale in flotilla.py to support empty resource requests, enabling Ray to scale down resources.
  • Fix unit tests in src/daft-distributed/src/scheduling/worker.rs and ensure compatibility with the scheduler loop.

This addresses the issue where udfActor could not dynamically scale down and prevents "worker died" errors during graceful shutdown.

Changes Made

Related Issues

This commit implements the dynamic scaling down (scale-in) functionality for RaySwordfishActor to release idle resources.

Key changes:
- Implement `retire_idle_ray_workers` in `RayWorkerManager` to identify and release idle workers.
- Add `pending_release_blacklist` to track retiring workers and prevent them from being reused or causing "worker died" errors.
- Move scale-down cooldown logic to `RayWorkerManager` to prevent frequent scale-down operations.
- Optimize `retire_idle_ray_workers` to reduce lock contention by releasing the lock before performing Ray/Python operations.
- Update `try_autoscale` in `flotilla.py` to support empty resource requests, enabling Ray to scale down resources.
- Fix unit tests in `src/daft-distributed/src/scheduling/worker.rs` and ensure compatibility with the scheduler loop.

This addresses the issue where `udfActor` could not dynamically scale down and prevents "worker died" errors during graceful shutdown.
@greptile-apps
Copy link
Contributor

greptile-apps bot commented Dec 31, 2025

Greptile Summary

This PR implements dynamic scale-in functionality for RaySwordfishActor to automatically release idle Ray workers and reduce resource consumption when cluster capacity is no longer needed.

Key Changes:

  • Added retire_idle_ray_workers method to RayWorkerManager that identifies idle workers (based on configurable idle threshold) and gracefully releases them
  • Introduced pending_release_blacklist mechanism to prevent Ray from immediately respawning workers that are being retired, with a configurable TTL (default 120s)
  • Added ActorState enum and idle duration tracking to RaySwordfishWorker for state management
  • Integrated downscale logic into the scheduler loop to automatically retire idle workers while maintaining a minimum survivor count (default 1)
  • Updated try_autoscale to support empty resource requests and call clear_autoscaling_requests() to signal Ray that resources can be scaled down
  • Added comprehensive test coverage for the new functionality

Configuration:

  • DAFT_AUTOSCALING_DOWNSCALE_ENABLED: Enable/disable downscaling (default: true)
  • DAFT_AUTOSCALING_DOWNSCALE_IDLE_SECONDS: Idle threshold before retirement (default: 60s)
  • DAFT_AUTOSCALING_MIN_SURVIVOR_WORKERS: Minimum workers to keep (default: 1)
  • DAFT_AUTOSCALING_PENDING_RELEASE_EXCLUDE_SECONDS: Blacklist TTL (default: 120s)

Issues Found:

  • Lock contention in retire_idle_ray_workers: The state mutex is held during Python GIL operations, which can block other critical operations like task submission

Confidence Score: 3/5

  • This PR implements important functionality but has a critical lock contention issue that could impact performance under load
  • The implementation is generally sound with good test coverage, but the mutex lock is held during Python operations in retire_idle_ray_workers, which violates the locking order principle mentioned in the custom rules and can cause significant performance degradation. The blacklist mechanism is well-designed, and the integration with the scheduler loop is clean. Once the lock contention issue is resolved, this would be safe to merge.
  • src/daft-distributed/src/python/ray/worker_manager.rs requires attention to fix the lock contention issue in retire_idle_ray_workers

Important Files Changed

Filename Overview
src/daft-distributed/src/python/ray/worker_manager.rs Implements retire_idle_ray_workers with pending_release_blacklist mechanism; potential lock contention during worker release
src/daft-distributed/src/python/ray/worker.rs Adds ActorState tracking and idle duration calculation; clean implementation with proper state transitions
src/daft-distributed/src/scheduling/scheduler/scheduler_actor.rs Integrates downscale logic into scheduler loop; uses environment variables for configuration without defaults in code comments

Sequence Diagram

sequenceDiagram
    participant S as SchedulerActor
    participant WM as RayWorkerManager
    participant W as RaySwordfishWorker
    participant R as Ray
    participant F as Flotilla

    Note over S: Scheduler loop iteration
    S->>WM: worker_snapshots()
    WM->>WM: refresh_workers()
    WM->>F: start_ray_workers(existing_ids)
    F->>R: ray.nodes()
    R-->>F: node list
    F-->>WM: RaySwordfishWorker instances
    WM-->>S: worker snapshots

    S->>S: schedule_tasks()
    S->>S: get_autoscaling_request()
    
    alt Scale up needed
        S->>WM: try_autoscale(bundles)
        WM->>WM: Clear pending_release_blacklist
        WM->>F: try_autoscale(bundles)
        F->>R: request_resources(bundles)
    else No scale up, check scale down
        S->>S: Count idle workers
        alt idle workers > min_survivor_workers
            S->>WM: retire_idle_ray_workers(num_to_retire, false)
            WM->>WM: Lock state mutex
            WM->>WM: Identify idle workers (idle_duration >= threshold)
            WM->>WM: Remove workers from ray_workers
            WM->>WM: Add to pending_release_blacklist
            WM->>WM: Unlock state mutex
            WM->>W: release(py)
            W->>W: Check active tasks == 0
            W->>W: set_state(Releasing)
            W->>R: shutdown()
            W->>W: set_state(Released)
            WM->>F: clear_autoscaling_requests()
            F->>R: request_resources([])
        end
    end

    Note over S: Job completion
    S->>WM: retire_idle_ray_workers(all_workers, true)
    WM->>F: clear_autoscaling_requests()
    F->>R: request_resources([])
Loading

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional Comments (3)

  1. src/daft-distributed/src/python/ray/worker_manager.rs, line 287-340 (link)

    logic: Holding the mutex lock while calling worker.release(py) and Python operations can cause significant lock contention. The state mutex is held from line 288 through line 340, during which Python GIL operations occur (lines 334-340). This blocks other operations like submit_tasks_to_workers unnecessarily.

    Consider releasing the lock before Python operations:

  2. src/daft-distributed/src/python/ray/worker.rs, line 138-146 (link)

    style: The release method silently returns early if there are inflight tasks without setting state or logging. This could lead to confusion during debugging.

    Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

  3. src/daft-distributed/src/scheduling/scheduler/scheduler_actor.rs, line 138-144 (link)

    style: Environment variable parsing with defaults lacks documentation. Consider adding comments explaining these configuration options and their defaults.

    Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

8 files reviewed, 3 comments

Edit Code Review Agent Settings | Greptile

@codecov
Copy link

codecov bot commented Dec 31, 2025

Codecov Report

❌ Patch coverage is 51.10410% with 155 lines in your changes missing coverage. Please review.
✅ Project coverage is 72.30%. Comparing base (29ffd49) to head (e8b7527).

Files with missing lines Patch % Lines
.../daft-distributed/src/python/ray/worker_manager.rs 0.00% 109 Missing ⚠️
src/daft-distributed/src/python/ray/worker.rs 0.00% 34 Missing ⚠️
...aft-distributed/src/scheduling/scheduler/linear.rs 0.00% 6 Missing ⚠️
daft/runners/flotilla.py 20.00% 4 Missing ⚠️
...ibuted/src/scheduling/scheduler/scheduler_actor.rs 95.83% 1 Missing ⚠️
src/daft-distributed/src/scheduling/worker.rs 99.24% 1 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #5903      +/-   ##
==========================================
- Coverage   72.37%   72.30%   -0.07%     
==========================================
  Files         965      965              
  Lines      125733   126029     +296     
==========================================
+ Hits        90996    91130     +134     
- Misses      34737    34899     +162     
Files with missing lines Coverage Δ
...ft-distributed/src/scheduling/scheduler/default.rs 88.99% <100.00%> (+0.02%) ⬆️
...c/daft-distributed/src/scheduling/scheduler/mod.rs 88.04% <ø> (ø)
...ibuted/src/scheduling/scheduler/scheduler_actor.rs 90.25% <95.83%> (+0.30%) ⬆️
src/daft-distributed/src/scheduling/worker.rs 86.74% <99.24%> (+12.50%) ⬆️
daft/runners/flotilla.py 46.85% <20.00%> (-0.79%) ⬇️
...aft-distributed/src/scheduling/scheduler/linear.rs 87.50% <0.00%> (-2.25%) ⬇️
src/daft-distributed/src/python/ray/worker.rs 0.00% <0.00%> (ø)
.../daft-distributed/src/python/ray/worker_manager.rs 0.00% <0.00%> (ø)

... and 6 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant