-
Notifications
You must be signed in to change notification settings - Fork 7.2k
Description
What happened + What you expected to happen
Summary
When using Ray Client mode, if the Ray Client specific server process (the per-session backend process spawned by the proxier) is terminated unexpectedly—for example, manually killed (kill -9), OOM-killed, or crashed—the client’s attempt to reconnect within the configured grace period often fails with:
grpc.StatusCode.NOT_FOUND
"Attempted to reconnect a session that has already been cleaned up"
So reconnection is unreliable when the backend specific server dies, even though the design intends to allow reconnection within a grace period.
Why this is a problem
-
User expectation
Users enable a reconnect grace period (e.g.RAY_CLIENT_RECONNECT_GRACE_PERIOD) expecting the client to recover when the connection or backend is temporarily unavailable. If the specific server dies (kill, crash, OOM), reconnection should succeed within that window. Today it often does not. -
Operational impact
Any scenario that causes the specific server process to exit (maintenance, resource limits, bugs) forces clients to fail with NOT_FOUND instead of reconnecting transparently. That makes Ray Client less robust in real deployments. -
Inconsistency with design
The proxier keeps session state (e.g.job_config,clients_last_seen) for the grace period specifically to allow reconnection. The current behavior indicates that cleanup timing, channel readiness handling, and/or backend re-initialization are not aligned with that goal.
When it happens (reproduction scenarios)
-
Manual kill of the specific server
Connect with Ray Client, then kill the corresponding Ray Client specific server process (e.g. the one listening on the port shown in logs). The client detects the broken connection (e.g. after gRPC keepalive or stream error) and tries to reconnect; the request often fails with NOT_FOUND. -
Specific server crash or OOM
Same as above: the process exits, the client tries to reconnect within the grace period, and the same errors can occur. -
No requirement for full cluster or proxier restart
The issue is reproducible with only the specific server process dying; the Ray head node and the proxier can stay up.
Root causes (from debugging)
The following issues were identified during analysis by AI:
-
Proxier blocking the Datapath RPC
TheDatapathhandler’sfinallyblock usedself.stopped.wait(timeout=cleanup_delay), blocking the gRPC stream for the whole reconnect grace period. The client could not observe the RpcError (and thus could not start reconnecting) until after that wait, making timely reconnection impossible. -
Proxier waiting too long for channel readiness
When the specific server was already dead,get_channel()waited for channel readiness with a long default timeout (e.g. 30s), delaying server recreation and reconnection. -
Client not retrying on transient connection errors
During the short window when the proxier was recreating the backend,ray.get()did not retry onConnectionError(mapped from gRPCUNAVAILABLE), so user code saw failures instead of transparent retries. -
Backend re-initialization after server recreation
After the proxier recreated a new specific server, it still forwarded requests withreconnecting=True. The new server has no prior session, so it correctly rejected them (NOT_FOUND). The proxier did not send a fresh init (syntheticInitRequest) or forcereconnecting=Falsefor the new backend. -
Response cache /
req_idhandling for synthetic init
When a syntheticInitRequestwas added, usingreq_id=0orreq_id=-1led to the specific server’sOrderedResponseCachetreating the request as already acknowledged or out-of-range, triggering FAILED_PRECONDITION. The cache and init path need to be consistent (e.g. init not cached, and synthetic init using a reservedreq_idthat the cache ignores).
Expected behavior
- When the specific server process dies (kill, crash, OOM), the client should be able to reconnect within the configured reconnect grace period without the user seeing NOT_FOUND.
- Session state (e.g.
job_config) kept by the proxier for the grace period should be used to recreate the specific server and re-establish the session (including a fresh init toward the new backend), so that subsequentray.get()and other calls succeed.
Proposed direction
A fix was developed by AI to address the above root causes. It is not claimed to be the optimal or canonical solution; the community may prefer a different design. The intent of this issue is to document the problem and when it occurs; the concrete solution will be proposed in #60601 for review.
In short, the proposed direction includes:
- Proxier: Non-blocking cleanup (e.g. schedule delayed cleanup in a background thread so the Datapath RPC returns immediately and the client can see the RpcError and reconnect).
- Proxier: Shorter channel readiness timeout when reconnecting, and eager removal of dead server entries so a new specific server can be created quickly.
- Proxier: When a new specific server is created for a reconnecting client, send a synthetic
InitRequestand do not forwardreconnecting=Trueuntil the backend has completed init (e.g. track “backend needs init” and clear it on first init response). - Specific server: Ensure init responses are not cached in the ordered response cache, and use a reserved
req_idfor synthetic init so cache logic does not treat it as a duplicate or stale request. - Client: Retry
ray.get()onConnectionErrorwhen reconnection is enabled, so short-lived unavailability during backend recreation does not surface as a hard failure to the user.
Versions / Dependencies
- Ray Client mode (client → proxier → specific server).
- Master branch
Reproduction script
#!/usr/bin/env python3
"""
Test script: Reproduce and verify Ray Client reconnection issue
Usage:
python test_reconnect_issue.py
Test steps:
1. Connect to Ray cluster (ray://ray-head:10001)
2. Perform some operations to establish connection
3. Prompt user to manually kill specific server process
4. Wait for a period
5. Attempt to reconnect (perform operations)
6. Verify if "Attempted to reconnect a session that has already been cleaned up" error occurs
"""
import time
import sys
try:
import ray
except ImportError:
print("Error: Ray is not installed. Please install Ray first.")
sys.exit(1)
def main():
print("=" * 80)
print("Ray Client Reconnection Issue Test")
print("=" * 80)
# Connect to Ray cluster
ray_address = "ray://ray-head:10001" # Change to your Ray cluster address
print(f"\n[Step 1/4] Connecting to Ray cluster: {ray_address}")
try:
ray.init(address=ray_address, ignore_reinit_error=True)
print("✓ Connection successful")
except Exception as e:
print(f"✗ Connection failed: {e}")
return False
# Perform some operations to establish connection
print("\n[Step 2/4] Establishing connection and performing operations...")
try:
@ray.remote
def hello():
return "Hello from Ray!"
# Execute several tasks to ensure connection is established
print(" Executing remote tasks...")
futures = [hello.remote() for _ in range(3)]
results = ray.get(futures)
print(f"✓ Connection established. Task results: {results}")
print(f"✓ Current Ray status: {ray.is_initialized()}")
except Exception as e:
print(f"✗ Failed to establish connection: {e}")
ray.shutdown()
return False
# Prompt user to manually kill specific server
print("\n" + "=" * 80)
print("[Step 3/4] Please manually kill the specific server process")
print("=" * 80)
print("\nPlease perform the following steps:")
print(" 1. Find the specific server process:")
print(" ps aux | grep 'ray.util.client.server' | grep 'specific-server'")
print(" Or:")
print(" ps aux | grep python | grep 'ray.util.client.server'")
print("\n 2. Kill the process:")
print(" kill -9 <PID>")
print("\n 3. Verify the process has been killed:")
print(" ps aux | grep <PID> (should not find anything)")
print("\nWaiting 60 seconds, then the script will automatically attempt to reconnect...")
print("(You can also press Ctrl+C to start the test early)")
print("\n" + "-" * 80)
try:
# Countdown
for i in range(60, 0, -5):
print(f" Countdown: {i} seconds... (Press Ctrl+C to start early)", end='\r')
time.sleep(5)
print("\n")
except KeyboardInterrupt:
print("\n Starting test early...")
# Attempt to reconnect
print("\n[Step 4/4] Attempting to reconnect...")
print(" Executing remote task to trigger reconnection logic...")
try:
@ray.remote
def test_reconnect():
return "Reconnect test successful!"
# Attempt to execute operation, this will trigger reconnection
result = ray.get(test_reconnect.remote(), timeout=60)
print(f"✓ Reconnection successful! Result: {result}")
print("\n" + "=" * 80)
print("✓ TEST PASSED: Reconnection works correctly (issue is fixed)")
print("=" * 80)
return True
except Exception as e:
error_msg = str(e)
print(f"\n✗ Reconnection failed. Error message:")
print(f" {error_msg}")
if "Attempted to reconnect a session that has already been cleaned up" in error_msg:
print("\n" + "=" * 80)
print("✗ TEST FAILED: Issue reproduced!")
return False
else:
print("\n" + "=" * 80)
print("⚠ TEST INCONCLUSIVE: Different error occurred")
print("=" * 80)
print(f" This might be a different issue: {e}")
print(f" Error type: {type(e).__name__}")
return None
finally:
try:
ray.shutdown()
except:
pass
if __name__ == "__main__":
print("\nThis script tests the Ray Client reconnection race condition issue.")
print("It will:")
print(" 1. Connect to Ray cluster (ray://ray-head:10001)")
print(" 2. Establish connection and perform some operations")
print(" 3. Prompt you to manually kill the specific server process")
print(" 4. Wait, then attempt to reconnect")
print(" 5. Verify if the expected error occurs\n")
result = main()
print("\n" + "=" * 80)
print("Test completed!")
print("=" * 80)
if result is False:
print("\nIssue reproduced, fix is necessary.")
sys.exit(1)
elif result is True:
print("\nReconnection works correctly, issue may be fixed.")
sys.exit(0)
else:
print("\nTest result is inconclusive, further investigation needed.")
sys.exit(2)Before fixed:
This script tests the Ray Client reconnection race condition issue.
It will:
1. Connect to Ray cluster (ray://ray-head:10001)
2. Establish connection and perform some operations
3. Prompt you to manually kill the specific server process
4. Wait, then attempt to reconnect
5. Verify if the expected error occurs
================================================================================
Ray Client Reconnection Issue Test
================================================================================
[Step 1/4] Connecting to Ray cluster: ray://ray-head:10001
2026-01-30 11:50:35,481 INFO client_builder.py:241 -- Passing the following kwargs to ray.init() on the server: ignore_reinit_error, log_to_driver
SIGTERM handler is not set because current thread is not the main thread.
✓ Connection successful
[Step 2/4] Establishing connection and performing operations...
Executing remote tasks...
✓ Connection established. Task results: ['Hello from Ray!', 'Hello from Ray!', 'Hello from Ray!']
✓ Current Ray status: True
================================================================================
[Step 3/4] Please manually kill the specific server process
================================================================================
Please perform the following steps:
1. Find the specific server process:
ps aux | grep 'ray.util.client.server' | grep 'specific-server'
Or:
ps aux | grep python | grep 'ray.util.client.server'
2. Kill the process:
kill -9 <PID>
3. Verify the process has been killed:
ps aux | grep <PID> (should not find anything)
Waiting 60 seconds, then the script will automatically attempt to reconnect...
(You can also press Ctrl+C to start the test early)
--------------------------------------------------------------------------------
^CCountdown: 50 seconds... (Press Ctrl+C to start early)
Starting test early...
[Step 4/4] Attempting to reconnect...
Executing remote task to trigger reconnection logic...
2026-01-30 11:51:19,395 ERROR dataclient.py:330 -- Unrecoverable error in data channel.
✗ Reconnection failed. Error message:
Request can't be sent because the Ray client has already been disconnected due to an error. Last exception: <_MultiThreadedRendezvous of RPC that terminated with:
status = StatusCode.NOT_FOUND
details = "Attempted to reconnect a session that has already been cleaned up"
debug_error_string = "UNKNOWN:Error received from peer {grpc_status:5, grpc_message:"Attempted to reconnect a session that has already been cleaned up"}"
>
================================================================================
✗ TEST FAILED: Issue reproduced!
================================================================================
Test completed!
================================================================================
Issue reproduced, fix is necessary.
After fixed:
This script tests the Ray Client reconnection race condition issue.
It will:
1. Connect to Ray cluster (ray://ray-head:10001)
2. Establish connection and perform some operations
3. Prompt you to manually kill the specific server process
4. Wait, then attempt to reconnect
5. Verify if the expected error occurs
================================================================================
Ray Client Reconnection Issue Test
================================================================================
[Step 1/4] Connecting to Ray cluster: ray://ray-head:10001
2026-01-30 13:27:15,182 INFO client_builder.py:248 -- Passing the following kwargs to ray.init() on the server: ignore_reinit_error, log_to_driver
✓ Connection successful
[Step 2/4] Establishing connection and performing operations...
Executing remote tasks...
✓ Connection established. Task results: ['Hello from Ray!', 'Hello from Ray!', 'Hello from Ray!']
✓ Current Ray status: True
================================================================================
[Step 3/4] Please manually kill the specific server process
================================================================================
Please perform the following steps:
1. Find the specific server process:
ps aux | grep 'ray.util.client.server' | grep 'specific-server'
Or:
ps aux | grep python | grep 'ray.util.client.server'
2. Kill the process:
kill -9 <PID>
3. Verify the process has been killed:
ps aux | grep <PID> (should not find anything)
Waiting 60 seconds, then the script will automatically attempt to reconnect...
(You can also press Ctrl+C to start the test early)
--------------------------------------------------------------------------------
^CCountdown: 50 seconds... (Press Ctrl+C to start early)
Starting test early...
[Step 4/4] Attempting to reconnect...
Executing remote task to trigger reconnection logic...
✓ Reconnection successful! Result: Reconnect test successful!
================================================================================
✓ TEST PASSED: Reconnection works correctly (issue is fixed)
================================================================================
================================================================================
Test completed!
================================================================================
Reconnection works correctly, issue may be fixed.
Issue Severity
None