Skip to content

[Core] Ray Client reconnection fails when the specific server process dies (e.g. killed or crashed) #60600

@g199209

Description

@g199209

What happened + What you expected to happen

Summary

When using Ray Client mode, if the Ray Client specific server process (the per-session backend process spawned by the proxier) is terminated unexpectedly—for example, manually killed (kill -9), OOM-killed, or crashed—the client’s attempt to reconnect within the configured grace period often fails with:

  • grpc.StatusCode.NOT_FOUND
    "Attempted to reconnect a session that has already been cleaned up"

So reconnection is unreliable when the backend specific server dies, even though the design intends to allow reconnection within a grace period.


Why this is a problem

  1. User expectation
    Users enable a reconnect grace period (e.g. RAY_CLIENT_RECONNECT_GRACE_PERIOD) expecting the client to recover when the connection or backend is temporarily unavailable. If the specific server dies (kill, crash, OOM), reconnection should succeed within that window. Today it often does not.

  2. Operational impact
    Any scenario that causes the specific server process to exit (maintenance, resource limits, bugs) forces clients to fail with NOT_FOUND instead of reconnecting transparently. That makes Ray Client less robust in real deployments.

  3. Inconsistency with design
    The proxier keeps session state (e.g. job_config, clients_last_seen) for the grace period specifically to allow reconnection. The current behavior indicates that cleanup timing, channel readiness handling, and/or backend re-initialization are not aligned with that goal.


When it happens (reproduction scenarios)

  • Manual kill of the specific server
    Connect with Ray Client, then kill the corresponding Ray Client specific server process (e.g. the one listening on the port shown in logs). The client detects the broken connection (e.g. after gRPC keepalive or stream error) and tries to reconnect; the request often fails with NOT_FOUND.

  • Specific server crash or OOM
    Same as above: the process exits, the client tries to reconnect within the grace period, and the same errors can occur.

  • No requirement for full cluster or proxier restart
    The issue is reproducible with only the specific server process dying; the Ray head node and the proxier can stay up.


Root causes (from debugging)

The following issues were identified during analysis by AI:

  1. Proxier blocking the Datapath RPC
    The Datapath handler’s finally block used self.stopped.wait(timeout=cleanup_delay), blocking the gRPC stream for the whole reconnect grace period. The client could not observe the RpcError (and thus could not start reconnecting) until after that wait, making timely reconnection impossible.

  2. Proxier waiting too long for channel readiness
    When the specific server was already dead, get_channel() waited for channel readiness with a long default timeout (e.g. 30s), delaying server recreation and reconnection.

  3. Client not retrying on transient connection errors
    During the short window when the proxier was recreating the backend, ray.get() did not retry on ConnectionError (mapped from gRPC UNAVAILABLE), so user code saw failures instead of transparent retries.

  4. Backend re-initialization after server recreation
    After the proxier recreated a new specific server, it still forwarded requests with reconnecting=True. The new server has no prior session, so it correctly rejected them (NOT_FOUND). The proxier did not send a fresh init (synthetic InitRequest) or force reconnecting=False for the new backend.

  5. Response cache / req_id handling for synthetic init
    When a synthetic InitRequest was added, using req_id=0 or req_id=-1 led to the specific server’s OrderedResponseCache treating the request as already acknowledged or out-of-range, triggering FAILED_PRECONDITION. The cache and init path need to be consistent (e.g. init not cached, and synthetic init using a reserved req_id that the cache ignores).


Expected behavior

  • When the specific server process dies (kill, crash, OOM), the client should be able to reconnect within the configured reconnect grace period without the user seeing NOT_FOUND.
  • Session state (e.g. job_config) kept by the proxier for the grace period should be used to recreate the specific server and re-establish the session (including a fresh init toward the new backend), so that subsequent ray.get() and other calls succeed.

Proposed direction

A fix was developed by AI to address the above root causes. It is not claimed to be the optimal or canonical solution; the community may prefer a different design. The intent of this issue is to document the problem and when it occurs; the concrete solution will be proposed in #60601 for review.

In short, the proposed direction includes:

  • Proxier: Non-blocking cleanup (e.g. schedule delayed cleanup in a background thread so the Datapath RPC returns immediately and the client can see the RpcError and reconnect).
  • Proxier: Shorter channel readiness timeout when reconnecting, and eager removal of dead server entries so a new specific server can be created quickly.
  • Proxier: When a new specific server is created for a reconnecting client, send a synthetic InitRequest and do not forward reconnecting=True until the backend has completed init (e.g. track “backend needs init” and clear it on first init response).
  • Specific server: Ensure init responses are not cached in the ordered response cache, and use a reserved req_id for synthetic init so cache logic does not treat it as a duplicate or stale request.
  • Client: Retry ray.get() on ConnectionError when reconnection is enabled, so short-lived unavailability during backend recreation does not surface as a hard failure to the user.

Versions / Dependencies

  • Ray Client mode (client → proxier → specific server).
  • Master branch

Reproduction script

#!/usr/bin/env python3
"""
Test script: Reproduce and verify Ray Client reconnection issue

Usage:
    python test_reconnect_issue.py

Test steps:
    1. Connect to Ray cluster (ray://ray-head:10001)
    2. Perform some operations to establish connection
    3. Prompt user to manually kill specific server process
    4. Wait for a period
    5. Attempt to reconnect (perform operations)
    6. Verify if "Attempted to reconnect a session that has already been cleaned up" error occurs
"""

import time
import sys

try:
    import ray
except ImportError:
    print("Error: Ray is not installed. Please install Ray first.")
    sys.exit(1)


def main():
    print("=" * 80)
    print("Ray Client Reconnection Issue Test")
    print("=" * 80)
    
    # Connect to Ray cluster
    ray_address = "ray://ray-head:10001"   # Change to your Ray cluster address
    print(f"\n[Step 1/4] Connecting to Ray cluster: {ray_address}")
    
    try:
        ray.init(address=ray_address, ignore_reinit_error=True)
        print("✓ Connection successful")
    except Exception as e:
        print(f"✗ Connection failed: {e}")
        return False
    
    # Perform some operations to establish connection
    print("\n[Step 2/4] Establishing connection and performing operations...")
    try:
        @ray.remote
        def hello():
            return "Hello from Ray!"
        
        # Execute several tasks to ensure connection is established
        print("  Executing remote tasks...")
        futures = [hello.remote() for _ in range(3)]
        results = ray.get(futures)
        print(f"✓ Connection established. Task results: {results}")
        print(f"✓ Current Ray status: {ray.is_initialized()}")
    except Exception as e:
        print(f"✗ Failed to establish connection: {e}")
        ray.shutdown()
        return False
    
    # Prompt user to manually kill specific server
    print("\n" + "=" * 80)
    print("[Step 3/4] Please manually kill the specific server process")
    print("=" * 80)
    print("\nPlease perform the following steps:")
    print("  1. Find the specific server process:")
    print("     ps aux | grep 'ray.util.client.server' | grep 'specific-server'")
    print("  Or:")
    print("     ps aux | grep python | grep 'ray.util.client.server'")
    print("\n  2. Kill the process:")
    print("     kill -9 <PID>")
    print("\n  3. Verify the process has been killed:")
    print("     ps aux | grep <PID>  (should not find anything)")
    print("\nWaiting 60 seconds, then the script will automatically attempt to reconnect...")
    print("(You can also press Ctrl+C to start the test early)")
    print("\n" + "-" * 80)
    
    try:
        # Countdown
        for i in range(60, 0, -5):
            print(f"  Countdown: {i} seconds... (Press Ctrl+C to start early)", end='\r')
            time.sleep(5)
        print("\n")
    except KeyboardInterrupt:
        print("\n  Starting test early...")
    
    # Attempt to reconnect
    print("\n[Step 4/4] Attempting to reconnect...")
    print("  Executing remote task to trigger reconnection logic...")
    
    try:
        @ray.remote
        def test_reconnect():
            return "Reconnect test successful!"
        
        # Attempt to execute operation, this will trigger reconnection
        result = ray.get(test_reconnect.remote(), timeout=60)
        print(f"✓ Reconnection successful! Result: {result}")
        print("\n" + "=" * 80)
        print("✓ TEST PASSED: Reconnection works correctly (issue is fixed)")
        print("=" * 80)
        return True
        
    except Exception as e:
        error_msg = str(e)
        print(f"\n✗ Reconnection failed. Error message:")
        print(f"  {error_msg}")
        
        if "Attempted to reconnect a session that has already been cleaned up" in error_msg:
            print("\n" + "=" * 80)
            print("✗ TEST FAILED: Issue reproduced!")
            return False
        else:
            print("\n" + "=" * 80)
            print("⚠ TEST INCONCLUSIVE: Different error occurred")
            print("=" * 80)
            print(f"  This might be a different issue: {e}")
            print(f"  Error type: {type(e).__name__}")
            return None
    
    finally:
        try:
            ray.shutdown()
        except:
            pass


if __name__ == "__main__":
    print("\nThis script tests the Ray Client reconnection race condition issue.")
    print("It will:")
    print("  1. Connect to Ray cluster (ray://ray-head:10001)")
    print("  2. Establish connection and perform some operations")
    print("  3. Prompt you to manually kill the specific server process")
    print("  4. Wait, then attempt to reconnect")
    print("  5. Verify if the expected error occurs\n")
    
    result = main()
    
    print("\n" + "=" * 80)
    print("Test completed!")
    print("=" * 80)
    
    if result is False:
        print("\nIssue reproduced, fix is necessary.")
        sys.exit(1)
    elif result is True:
        print("\nReconnection works correctly, issue may be fixed.")
        sys.exit(0)
    else:
        print("\nTest result is inconclusive, further investigation needed.")
        sys.exit(2)

Before fixed:

This script tests the Ray Client reconnection race condition issue.
It will:
  1. Connect to Ray cluster (ray://ray-head:10001)
  2. Establish connection and perform some operations
  3. Prompt you to manually kill the specific server process
  4. Wait, then attempt to reconnect
  5. Verify if the expected error occurs

================================================================================
Ray Client Reconnection Issue Test
================================================================================

[Step 1/4] Connecting to Ray cluster: ray://ray-head:10001
2026-01-30 11:50:35,481 INFO client_builder.py:241 -- Passing the following kwargs to ray.init() on the server: ignore_reinit_error, log_to_driver
SIGTERM handler is not set because current thread is not the main thread.
✓ Connection successful

[Step 2/4] Establishing connection and performing operations...
  Executing remote tasks...
✓ Connection established. Task results: ['Hello from Ray!', 'Hello from Ray!', 'Hello from Ray!']
✓ Current Ray status: True

================================================================================
[Step 3/4] Please manually kill the specific server process
================================================================================

Please perform the following steps:
  1. Find the specific server process:
     ps aux | grep 'ray.util.client.server' | grep 'specific-server'
  Or:
     ps aux | grep python | grep 'ray.util.client.server'

  2. Kill the process:
     kill -9 <PID>

  3. Verify the process has been killed:
     ps aux | grep <PID>  (should not find anything)

Waiting 60 seconds, then the script will automatically attempt to reconnect...
(You can also press Ctrl+C to start the test early)

--------------------------------------------------------------------------------
^CCountdown: 50 seconds... (Press Ctrl+C to start early)
  Starting test early...

[Step 4/4] Attempting to reconnect...
  Executing remote task to trigger reconnection logic...
2026-01-30 11:51:19,395 ERROR dataclient.py:330 -- Unrecoverable error in data channel.

✗ Reconnection failed. Error message:
  Request can't be sent because the Ray client has already been disconnected due to an error. Last exception: <_MultiThreadedRendezvous of RPC that terminated with:
        status = StatusCode.NOT_FOUND
        details = "Attempted to reconnect a session that has already been cleaned up"
        debug_error_string = "UNKNOWN:Error received from peer  {grpc_status:5, grpc_message:"Attempted to reconnect a session that has already been cleaned up"}"
>

================================================================================
✗ TEST FAILED: Issue reproduced!

================================================================================
Test completed!
================================================================================

Issue reproduced, fix is necessary.

After fixed:

This script tests the Ray Client reconnection race condition issue.
It will:
  1. Connect to Ray cluster (ray://ray-head:10001)
  2. Establish connection and perform some operations
  3. Prompt you to manually kill the specific server process
  4. Wait, then attempt to reconnect
  5. Verify if the expected error occurs

================================================================================
Ray Client Reconnection Issue Test
================================================================================

[Step 1/4] Connecting to Ray cluster: ray://ray-head:10001
2026-01-30 13:27:15,182 INFO client_builder.py:248 -- Passing the following kwargs to ray.init() on the server: ignore_reinit_error, log_to_driver
✓ Connection successful

[Step 2/4] Establishing connection and performing operations...
  Executing remote tasks...
✓ Connection established. Task results: ['Hello from Ray!', 'Hello from Ray!', 'Hello from Ray!']
✓ Current Ray status: True

================================================================================
[Step 3/4] Please manually kill the specific server process
================================================================================

Please perform the following steps:
  1. Find the specific server process:
     ps aux | grep 'ray.util.client.server' | grep 'specific-server'
  Or:
     ps aux | grep python | grep 'ray.util.client.server'

  2. Kill the process:
     kill -9 <PID>

  3. Verify the process has been killed:
     ps aux | grep <PID>  (should not find anything)

Waiting 60 seconds, then the script will automatically attempt to reconnect...
(You can also press Ctrl+C to start the test early)

--------------------------------------------------------------------------------
^CCountdown: 50 seconds... (Press Ctrl+C to start early)
  Starting test early...

[Step 4/4] Attempting to reconnect...
  Executing remote task to trigger reconnection logic...
✓ Reconnection successful! Result: Reconnect test successful!

================================================================================
✓ TEST PASSED: Reconnection works correctly (issue is fixed)
================================================================================

================================================================================
Test completed!
================================================================================

Reconnection works correctly, issue may be fixed.

Issue Severity

None

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething that is supposed to be working; but isn'tcommunity-backlogcoreIssues that should be addressed in Ray CorestabilitytriageNeeds triage (eg: priority, bug/not-bug, and owning component)

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions