Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions python/ray/experimental/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -1310,9 +1310,19 @@ def cluster_resources(self):

return dict(resources)

def _live_client_ids(self):
"""Returns a set of client IDs corresponding to clients still alive."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this actually work? I thought you can get an isinsertion and then a deletion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we removed that behavior in #2880

return {
client["ClientID"]
for client in self.client_table() if client["IsInsertion"]
}

def available_resources(self):
"""Get the current available cluster resources.

This is different from `cluster_resources` in that this will return
idle (available) resources rather than total resources.

Note that this information can grow stale as tasks start and finish.

Returns:
Expand Down Expand Up @@ -1364,6 +1374,7 @@ def available_resources(self):
if local_scheduler_id not in local_scheduler_ids:
del available_resources_by_id[local_scheduler_id]
else:
# TODO(rliaw): Is this a fair assumption?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is a safe assumption. self.redis_clients has one client per shard, and the number of shards doesn't change.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd remove this comment.

# Assumes the number of Redis clients does not change
subscribe_clients = [
redis_client.pubsub(ignore_subscribe_messages=True)
Expand All @@ -1373,7 +1384,7 @@ def available_resources(self):
subscribe_client.subscribe(
ray.gcs_utils.XRAY_HEARTBEAT_CHANNEL)

client_ids = {client["ClientID"] for client in self.client_table()}
client_ids = self._live_client_ids()

while set(available_resources_by_id.keys()) != client_ids:
for subscribe_client in subscribe_clients:
Expand Down Expand Up @@ -1403,10 +1414,7 @@ def available_resources(self):
available_resources_by_id[client_id] = dynamic_resources

# Update clients in cluster
client_ids = {
client["ClientID"]
for client in self.client_table()
}
client_ids = self._live_client_ids()

# Remove disconnected clients
for client_id in available_resources_by_id.keys():
Expand Down
7 changes: 4 additions & 3 deletions python/ray/tune/ray_trial_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,12 +213,13 @@ def _return_resources(self, resources):
assert self._committed_resources.gpu >= 0

def _update_avail_resources(self):
clients = ray.global_state.client_table()
if ray.worker.global_worker.use_raylet:
# TODO(rliaw): Remove once raylet flag is swapped
num_cpus = sum(cl['Resources']['CPU'] for cl in clients)
num_gpus = sum(cl['Resources'].get('GPU', 0) for cl in clients)
resources = ray.global_state.cluster_resources()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're using cluster_resources but you modified available_resources. Doesn't it make sense to make the same change in cluster_resources?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, the Tune and core changes are sort of orthogonal after you pointed out I didn't need available_resources... cluster_resources doesn't have the same problem that available_resources has.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see, but cluster_resources does count resources from dead nodes and probably shouldn't, right?

Also, available_resources could still hang if one of the nodes dies at a very unfortunate time, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cluster_resources gets its list of resources from the client_table, which clears out the resources dict for a dead node.

The one case I can think of where available_resources might hang is if one of the redis client dies in the middle... are there others?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a node dies within a 10 second window of the call to client_table() then it won't have been marked as dead yet in the client table and so the condition while set(available_resources_by_id.keys()) != client_ids: may not be met, so we'll hang there.

It probably makes sense to break out if it is hasn't returned within e.g., 200ms and log a warning.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, we don't even need to call client_table() we can just listen for e.g., 200ms and then return the info from whatever heartbeats we collected.

num_cpus = resources["CPU"]
num_gpus = resources["GPU"]
else:
clients = ray.global_state.client_table()
local_schedulers = [
entry for client in clients.values() for entry in client
if (entry['ClientType'] == 'local_scheduler'
Expand Down