-
Couldn't load subscription status.
- Fork 6.8k
[tune/core] Use Global State API for resources #3004
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1310,9 +1310,19 @@ def cluster_resources(self): | |
|
|
||
| return dict(resources) | ||
|
|
||
| def _live_client_ids(self): | ||
| """Returns a set of client IDs corresponding to clients still alive.""" | ||
| return { | ||
| client["ClientID"] | ||
| for client in self.client_table() if client["IsInsertion"] | ||
| } | ||
|
|
||
| def available_resources(self): | ||
| """Get the current available cluster resources. | ||
|
|
||
| This is different from `cluster_resources` in that this will return | ||
| idle (available) resources rather than total resources. | ||
|
|
||
| Note that this information can grow stale as tasks start and finish. | ||
|
|
||
| Returns: | ||
|
|
@@ -1364,6 +1374,7 @@ def available_resources(self): | |
| if local_scheduler_id not in local_scheduler_ids: | ||
| del available_resources_by_id[local_scheduler_id] | ||
| else: | ||
| # TODO(rliaw): Is this a fair assumption? | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, this is a safe assumption. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd remove this comment. |
||
| # Assumes the number of Redis clients does not change | ||
| subscribe_clients = [ | ||
| redis_client.pubsub(ignore_subscribe_messages=True) | ||
|
|
@@ -1373,7 +1384,7 @@ def available_resources(self): | |
| subscribe_client.subscribe( | ||
| ray.gcs_utils.XRAY_HEARTBEAT_CHANNEL) | ||
|
|
||
| client_ids = {client["ClientID"] for client in self.client_table()} | ||
| client_ids = self._live_client_ids() | ||
|
|
||
| while set(available_resources_by_id.keys()) != client_ids: | ||
| for subscribe_client in subscribe_clients: | ||
|
|
@@ -1403,10 +1414,7 @@ def available_resources(self): | |
| available_resources_by_id[client_id] = dynamic_resources | ||
|
|
||
| # Update clients in cluster | ||
| client_ids = { | ||
| client["ClientID"] | ||
| for client in self.client_table() | ||
| } | ||
| client_ids = self._live_client_ids() | ||
|
|
||
| # Remove disconnected clients | ||
| for client_id in available_resources_by_id.keys(): | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -213,12 +213,13 @@ def _return_resources(self, resources): | |
| assert self._committed_resources.gpu >= 0 | ||
|
|
||
| def _update_avail_resources(self): | ||
| clients = ray.global_state.client_table() | ||
| if ray.worker.global_worker.use_raylet: | ||
| # TODO(rliaw): Remove once raylet flag is swapped | ||
| num_cpus = sum(cl['Resources']['CPU'] for cl in clients) | ||
| num_gpus = sum(cl['Resources'].get('GPU', 0) for cl in clients) | ||
| resources = ray.global_state.cluster_resources() | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, the Tune and core changes are sort of orthogonal after you pointed out I didn't need There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, I see, but Also, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The one case I can think of where There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If a node dies within a 10 second window of the call to It probably makes sense to break out if it is hasn't returned within e.g., 200ms and log a warning. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Alternatively, we don't even need to call |
||
| num_cpus = resources["CPU"] | ||
| num_gpus = resources["GPU"] | ||
| else: | ||
| clients = ray.global_state.client_table() | ||
| local_schedulers = [ | ||
| entry for client in clients.values() for entry in client | ||
| if (entry['ClientType'] == 'local_scheduler' | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this actually work? I thought you can get an isinsertion and then a deletion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we removed that behavior in #2880