-
Notifications
You must be signed in to change notification settings - Fork 7.2k
Ray Client reconnection fails when the specific server process dies #60601
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
…s dies (e.g. killed or crashed) Signed-off-by: mingfei <mingfei@mds-trading.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request significantly enhances the Ray Client's reconnection capabilities, especially by enabling the recreation of specific server processes upon their failure. The changes are extensive and well-thought-out, introducing more robust error handling and state management to make reconnections more seamless. My review focuses on improving code maintainability by suggesting refactoring of duplicated logic and removing an unused variable. Overall, this is a strong improvement to Ray Client's reliability.
| if client_id not in self.clients_last_seen: | ||
| self.reconnect_grace_periods.pop(client_id, None) | ||
| self.job_configs.pop(client_id, None) | ||
| self.job_config_bytes.pop(client_id, None) | ||
| self.ray_init_kwargs.pop(client_id, None) | ||
| self.backend_needs_init.pop(client_id, None) | ||
| return | ||
|
|
||
| current_last_seen = self.clients_last_seen[client_id] | ||
| if current_last_seen > start_time: | ||
| return | ||
|
|
||
| if self.num_clients > 0: | ||
| self.num_clients -= 1 | ||
| del self.clients_last_seen[client_id] | ||
| self.reconnect_grace_periods.pop(client_id, None) | ||
| self.job_configs.pop(client_id, None) | ||
| self.job_config_bytes.pop(client_id, None) | ||
| self.ray_init_kwargs.pop(client_id, None) | ||
| self.backend_needs_init.pop(client_id, None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The client state cleanup logic is duplicated here and later in the finally block of the Datapath method. To improve maintainability and reduce code duplication, consider extracting this logic into a single helper method.
For example, you could introduce a method like _cleanup_client_state_locked:
def _cleanup_client_state_locked(self, client_id: str):
"""Helper to remove all state for a given client_id."""
if client_id in self.clients_last_seen:
if self.num_clients > 0:
self.num_clients -= 1
del self.clients_last_seen[client_id]
self.reconnect_grace_periods.pop(client_id, None)
self.job_configs.pop(client_id, None)
self.job_config_bytes.pop(client_id, None)
self.ray_init_kwargs.pop(client_id, None)
self.backend_needs_init.pop(client_id, None)Using this helper would simplify this block and the one in the finally clause.
| if channel is None: | ||
| # If the server process is dead or the channel isn't becoming ready, | ||
| # don't sit in a 30s timeout loop. Eagerly recreate the server. | ||
| exit_code = server.poll() if server is not None else None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| with self.clients_lock: | ||
| # Cancel any scheduled cleanup and clean immediately. | ||
| self._invalidate_cleanup_locked(client_id) | ||
| if client_id in self.clients_last_seen: | ||
| if self.num_clients > 0: | ||
| self.num_clients -= 1 | ||
| del self.clients_last_seen[client_id] | ||
| self.reconnect_grace_periods.pop(client_id, None) | ||
| self.job_configs.pop(client_id, None) | ||
| self.job_config_bytes.pop(client_id, None) | ||
| self.ray_init_kwargs.pop(client_id, None) | ||
| self.backend_needs_init.pop(client_id, None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cursor Bugbot has reviewed your changes and found 4 potential issues.
| if channel is None: | ||
| # If the server process is dead or the channel isn't becoming ready, | ||
| # don't sit in a 30s timeout loop. Eagerly recreate the server. | ||
| exit_code = server.poll() if server is not None else None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unused variable suggests incomplete reconnection logic
Low Severity
The variable exit_code is computed but never used. Comparing with similar code at lines 440-448 in get_channel, where exit_code is checked with if exit_code is not None: to decide whether to eagerly remove a dead server, this assignment appears to be dead code. The comment at lines 880-881 mentions checking "if the server process is dead," but the actual check was never implemented.
| # But check if we still have job_config - if so, allow recovery (race condition) | ||
| if job_config is not None: | ||
| # Restore clients_last_seen entry for this reconnection | ||
| self.clients_last_seen[client_id] = start_time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Session recovery path causes incorrect client count
Medium Severity
When recovering a session where clients_last_seen was cleaned but job_config still exists (the "race condition" mentioned in the comment), the code restores clients_last_seen[client_id] but does not increment num_clients. When this recovered session later ends, the cleanup code at lines 1064-1067 will decrement num_clients because it sees the entry in clients_last_seen. This causes num_clients to drift toward zero or become incorrect.
| raise RuntimeError( | ||
| "Cannot reconnect: server was removed and job config " | ||
| "is not available. Please reconnect as a new client." | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RuntimeErrors bypass finally cleanup due to incorrect placement
High Severity
Several RuntimeError exceptions are raised in the reconnection path (lines 854, 870, 884, 898, 913, 923, 943) with comments stating they should "go through exception handling so cleanup can happen properly in finally block." However, these exceptions are raised BEFORE the try block starts at line 977, so the finally block at line 1054 will never execute. This leaves clients_last_seen entries orphaned since cleanup was already invalidated at line 817.
Additional Locations (1)
| raise | ||
| if deadline and time.monotonic() > deadline: | ||
| raise | ||
| time.sleep(0.5) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing shutdown check causes infinite loop in get
High Severity
The new ConnectionError exception handler retries indefinitely when _in_shutdown is True and no timeout is specified. When the connection fails permanently, _get_object_iterator immediately raises ConnectionError("Client is shutting down.") because _in_shutdown is True. This exception is caught, and since _reconnect_enabled remains True and deadline is None, the loop continues forever with 0.5s sleeps. A check for self._in_shutdown is missing.


This PR is NOT intended to be merged; it is just for discussion in #60600.