Skip to content

Commit

Permalink
Fix monitor.py bottleneck by removing excess Redis queries. (ray-proj…
Browse files Browse the repository at this point in the history
…ect#1786)

* Fix monitor.py bottleneck by removing excess Redis queries.

* Remove unnecessary default value.
  • Loading branch information
robertnishihara authored and pcmoritz committed Mar 27, 2018
1 parent 51fdbe3 commit de3cfa2
Showing 1 changed file with 27 additions and 17 deletions.
44 changes: 27 additions & 17 deletions python/ray/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ def __init__(self, redis_address, redis_port, autoscaling_config):
self.dead_local_schedulers = set()
self.live_plasma_managers = Counter()
self.dead_plasma_managers = set()
# Keep a mapping from local scheduler client ID to IP address to use
# for updating the load metrics.
self.local_scheduler_id_to_ip_map = dict()
self.load_metrics = LoadMetrics()
if autoscaling_config:
self.autoscaler = StandardAutoscaler(
Expand Down Expand Up @@ -268,22 +271,15 @@ def local_scheduler_info_handler(self, unused_channel, data):
static = message.StaticResources(i)
dynamic_resources[dyn.Key().decode("utf-8")] = dyn.Value()
static_resources[static.Key().decode("utf-8")] = static.Value()

# Update the load metrics for this local scheduler.
client_id = binascii.hexlify(message.DbClientId()).decode("utf-8")
clients = ray.global_state.client_table()
local_schedulers = [
entry for client in clients.values() for entry in client
if (entry["ClientType"] == "local_scheduler" and not
entry["Deleted"])
]
ip = None
for ls in local_schedulers:
if ls["DBClientID"] == client_id:
ip = ls["AuxAddress"].split(":")[0]
ip = self.local_scheduler_id_to_ip_map.get(client_id)
if ip:
self.load_metrics.update(ip, static_resources, dynamic_resources)
else:
print("Warning: could not find ip for client {} in {}".format(
client_id, local_schedulers))
print("Warning: could not find ip for client {}."
.format(client_id))

def plasma_manager_heartbeat_handler(self, unused_channel, data):
"""Handle a plasma manager heartbeat from Redis.
Expand Down Expand Up @@ -437,13 +433,17 @@ def driver_removed_handler(self, unused_channel, data):

self._clean_up_entries_for_driver(driver_id)

def process_messages(self):
def process_messages(self, max_messages=10000):
"""Process all messages ready in the subscription channels.
This reads messages from the subscription channels and calls the
appropriate handlers until there are no messages left.
Args:
max_messages: The maximum number of messages to process before
returning.
"""
while True:
for _ in range(max_messages):
message = self.subscribe_client.get_message()
if message is None:
return
Expand Down Expand Up @@ -515,6 +515,15 @@ def run(self):

# Handle messages from the subscription channels.
while True:
# Update the mapping from local scheduler client ID to IP address.
# This is only used to update the load metrics for the autoscaler.
local_schedulers = self.state.local_schedulers()
self.local_scheduler_id_to_ip_map = {}
for local_scheduler_info in local_schedulers:
client_id = local_scheduler_info["DBClientID"]
ip_address = local_scheduler_info["AuxAddress"].split(":")[0]
self.local_scheduler_id_to_ip_map[client_id] = ip_address

# Process autoscaling actions
if self.autoscaler:
self.autoscaler.update()
Expand Down Expand Up @@ -556,6 +565,10 @@ def run(self):
# messages.
time.sleep(ray._config.heartbeat_timeout_milliseconds() * 1e-3)

# TODO(rkn): This infinite loop should be inside of a try/except block,
# and if an exception is thrown we should push an error message to all
# drivers.


if __name__ == "__main__":
parser = argparse.ArgumentParser(description=("Parse Redis server for the "
Expand All @@ -575,9 +588,6 @@ def run(self):
redis_ip_address = get_ip_address(args.redis_address)
redis_port = get_port(args.redis_address)

# Initialize the global state.
ray.global_state._initialize_global_state(redis_ip_address, redis_port)

if args.autoscaling_config:
autoscaling_config = os.path.expanduser(args.autoscaling_config)
else:
Expand Down

0 comments on commit de3cfa2

Please sign in to comment.