Skip to content

Conversation

@g199209
Copy link
Contributor

@g199209 g199209 commented Jan 30, 2026

This PR is NOT intended to be merged; it is just for discussion in #60600.

…s dies (e.g. killed or crashed)

Signed-off-by: mingfei <mingfei@mds-trading.com>
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request significantly enhances the Ray Client's reconnection capabilities, especially by enabling the recreation of specific server processes upon their failure. The changes are extensive and well-thought-out, introducing more robust error handling and state management to make reconnections more seamless. My review focuses on improving code maintainability by suggesting refactoring of duplicated logic and removing an unused variable. Overall, this is a strong improvement to Ray Client's reliability.

Comment on lines +763 to +782
if client_id not in self.clients_last_seen:
self.reconnect_grace_periods.pop(client_id, None)
self.job_configs.pop(client_id, None)
self.job_config_bytes.pop(client_id, None)
self.ray_init_kwargs.pop(client_id, None)
self.backend_needs_init.pop(client_id, None)
return

current_last_seen = self.clients_last_seen[client_id]
if current_last_seen > start_time:
return

if self.num_clients > 0:
self.num_clients -= 1
del self.clients_last_seen[client_id]
self.reconnect_grace_periods.pop(client_id, None)
self.job_configs.pop(client_id, None)
self.job_config_bytes.pop(client_id, None)
self.ray_init_kwargs.pop(client_id, None)
self.backend_needs_init.pop(client_id, None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The client state cleanup logic is duplicated here and later in the finally block of the Datapath method. To improve maintainability and reduce code duplication, consider extracting this logic into a single helper method.

For example, you could introduce a method like _cleanup_client_state_locked:

def _cleanup_client_state_locked(self, client_id: str):
    """Helper to remove all state for a given client_id."""
    if client_id in self.clients_last_seen:
        if self.num_clients > 0:
            self.num_clients -= 1
        del self.clients_last_seen[client_id]
    self.reconnect_grace_periods.pop(client_id, None)
    self.job_configs.pop(client_id, None)
    self.job_config_bytes.pop(client_id, None)
    self.ray_init_kwargs.pop(client_id, None)
    self.backend_needs_init.pop(client_id, None)

Using this helper would simplify this block and the one in the finally clause.

if channel is None:
# If the server process is dead or the channel isn't becoming ready,
# don't sit in a 30s timeout loop. Eagerly recreate the server.
exit_code = server.poll() if server is not None else None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The variable exit_code is assigned here but is not used anywhere in the subsequent code. It should be removed to improve code clarity.

Comment on lines +1061 to +1072
with self.clients_lock:
# Cancel any scheduled cleanup and clean immediately.
self._invalidate_cleanup_locked(client_id)
if client_id in self.clients_last_seen:
if self.num_clients > 0:
self.num_clients -= 1
del self.clients_last_seen[client_id]
self.reconnect_grace_periods.pop(client_id, None)
self.job_configs.pop(client_id, None)
self.job_config_bytes.pop(client_id, None)
self.ray_init_kwargs.pop(client_id, None)
self.backend_needs_init.pop(client_id, None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This immediate cleanup logic is also present in the _run_cleanup method. As mentioned in another comment, extracting this into a shared helper method would reduce code duplication and improve maintainability.

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 4 potential issues.

if channel is None:
# If the server process is dead or the channel isn't becoming ready,
# don't sit in a 30s timeout loop. Eagerly recreate the server.
exit_code = server.poll() if server is not None else None
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused variable suggests incomplete reconnection logic

Low Severity

The variable exit_code is computed but never used. Comparing with similar code at lines 440-448 in get_channel, where exit_code is checked with if exit_code is not None: to decide whether to eagerly remove a dead server, this assignment appears to be dead code. The comment at lines 880-881 mentions checking "if the server process is dead," but the actual check was never implemented.

Fix in Cursor Fix in Web

# But check if we still have job_config - if so, allow recovery (race condition)
if job_config is not None:
# Restore clients_last_seen entry for this reconnection
self.clients_last_seen[client_id] = start_time
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Session recovery path causes incorrect client count

Medium Severity

When recovering a session where clients_last_seen was cleaned but job_config still exists (the "race condition" mentioned in the comment), the code restores clients_last_seen[client_id] but does not increment num_clients. When this recovered session later ends, the cleanup code at lines 1064-1067 will decrement num_clients because it sees the entry in clients_last_seen. This causes num_clients to drift toward zero or become incorrect.

Fix in Cursor Fix in Web

raise RuntimeError(
"Cannot reconnect: server was removed and job config "
"is not available. Please reconnect as a new client."
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RuntimeErrors bypass finally cleanup due to incorrect placement

High Severity

Several RuntimeError exceptions are raised in the reconnection path (lines 854, 870, 884, 898, 913, 923, 943) with comments stating they should "go through exception handling so cleanup can happen properly in finally block." However, these exceptions are raised BEFORE the try block starts at line 977, so the finally block at line 1054 will never execute. This leaves clients_last_seen entries orphaned since cleanup was already invalidated at line 817.

Additional Locations (1)

Fix in Cursor Fix in Web

raise
if deadline and time.monotonic() > deadline:
raise
time.sleep(0.5)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing shutdown check causes infinite loop in get

High Severity

The new ConnectionError exception handler retries indefinitely when _in_shutdown is True and no timeout is specified. When the connection fails permanently, _get_object_iterator immediately raises ConnectionError("Client is shutting down.") because _in_shutdown is True. This exception is caught, and since _reconnect_enabled remains True and deadline is None, the loop continues forever with 0.5s sleeps. A check for self._in_shutdown is missing.

Fix in Cursor Fix in Web

@ray-gardener ray-gardener bot added question Just a question :) do-not-merge Do not merge this PR! core Issues that should be addressed in Ray Core community-contribution Contributed by the community labels Jan 30, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-contribution Contributed by the community core Issues that should be addressed in Ray Core do-not-merge Do not merge this PR! question Just a question :)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant