Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
ericl committed Jan 13, 2021
1 parent 4853aa9 commit 4539391
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 76 deletions.
6 changes: 3 additions & 3 deletions python/ray/autoscaler/_private/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from pipes import quote

import ray
from ray.experimental.internal_kv import _internal_kv_get
from ray.experimental.internal_kv import _internal_kv_get, _internal_kv_put
import ray._private.services as services
from ray.autoscaler.node_provider import NodeProvider
from ray.autoscaler._private.constants import \
Expand Down Expand Up @@ -129,13 +129,13 @@ def request_resources(num_cpus: Optional[int] = None,
"""
if not ray.is_initialized():
raise RuntimeError("Ray is not initialized yet")
r = _redis()
to_request = []
if num_cpus:
to_request += [{"CPU": 1}] * num_cpus
if bundles:
to_request += bundles
r.publish(AUTOSCALER_RESOURCE_REQUEST_CHANNEL, json.dumps(to_request))
_internal_kv_put(AUTOSCALER_RESOURCE_REQUEST_CHANNEL,
json.dumps(to_request))


def create_or_update_cluster(config_file: str,
Expand Down
87 changes: 16 additions & 71 deletions python/ray/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@
from ray.ray_logging import setup_component_logger
from ray._raylet import GlobalStateAccessor
from ray.experimental.internal_kv import _internal_kv_put, \
_internal_kv_initialized

import redis
_internal_kv_initialized, _internal_kv_get

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -80,8 +78,6 @@ class Monitor:
Attributes:
redis: A connection to the Redis server.
primary_subscribe_client: A pubsub client for the Redis server.
This is used to receive notifications about failed components.
"""

def __init__(self,
Expand All @@ -101,9 +97,6 @@ def __init__(self,
worker = ray.worker.global_worker
worker.redis_client = self.redis
worker.mode = 0
# Setup subscriptions to the primary Redis server and the Redis shards.
self.primary_subscribe_client = self.redis.pubsub(
ignore_subscribe_messages=True)
# Keep a mapping from raylet client ID to IP address to use
# for updating the load metrics.
self.raylet_id_to_ip_map = {}
Expand All @@ -122,27 +115,10 @@ def __init__(self,
def __del__(self):
"""Destruct the monitor object."""
# We close the pubsub client to avoid leaking file descriptors.
try:
primary_subscribe_client = self.primary_subscribe_client
except AttributeError:
primary_subscribe_client = None
if primary_subscribe_client is not None:
primary_subscribe_client.close()
if self.global_state_accessor is not None:
self.global_state_accessor.disconnect()
self.global_state_accessor = None

def subscribe(self, channel):
"""Subscribe to the given channel on the primary Redis shard.
Args:
channel (str): The channel to subscribe to.
Raises:
Exception: An exception is raised if the subscription fails.
"""
self.primary_subscribe_client.subscribe(channel)

def update_load_metrics(self):
"""Fetches resource usage data from GCS and updates load metrics."""

Expand Down Expand Up @@ -172,6 +148,19 @@ def update_load_metrics(self):
logger.warning(
f"Monitor: could not find ip for node {node_id}")

def update_resource_requests(self):
"""Fetches resource requests from the internal KV and updates load."""
if not _internal_kv_initialized():
return
data = _internal_kv_get(
ray.ray_constants.AUTOSCALER_RESOURCE_REQUEST_CHANNEL)
if data:
try:
resource_request = json.loads(data)
self.load_metrics.set_resource_requests(resource_request)
except Exception:
logger.exception("Error parsing resource requests")

def autoscaler_resource_request_handler(self, _, data):
"""Handle a notification of a resource request for the autoscaler.
Expand All @@ -186,41 +175,6 @@ def autoscaler_resource_request_handler(self, _, data):
resource_request = json.loads(data)
self.load_metrics.set_resource_requests(resource_request)

def process_messages(self, max_messages=10000):
"""Process all messages ready in the subscription channels.
This reads messages from the subscription channels and calls the
appropriate handlers until there are no messages left.
Args:
max_messages: The maximum number of messages to process before
returning.
"""
subscribe_clients = [self.primary_subscribe_client]
for subscribe_client in subscribe_clients:
for _ in range(max_messages):
message = None
try:
message = subscribe_client.get_message()
except redis.exceptions.ConnectionError:
pass
if message is None:
# Continue on to the next subscribe client.
break

# Parse the message.
channel = message["channel"]
data = message["data"]

if (channel ==
ray.ray_constants.AUTOSCALER_RESOURCE_REQUEST_CHANNEL):
message_handler = self.autoscaler_resource_request_handler
else:
assert False, "This code should be unreachable."

# Call the handler.
message_handler(channel, data)

def update_raylet_map(self, _append_port=False):
"""Updates internal raylet map.
Expand All @@ -240,18 +194,12 @@ def update_raylet_map(self, _append_port=False):
self.raylet_id_to_ip_map[node_id] = ip_address

def _run(self):
"""Run the monitor.
This function loops forever, checking for messages about dead database
clients and cleaning up state accordingly.
"""
"""Run the monitor loop."""

self.subscribe(ray.ray_constants.AUTOSCALER_RESOURCE_REQUEST_CHANNEL)

# Handle messages from the subscription channels.
while True:
self.update_raylet_map()
self.update_load_metrics()
self.update_resource_requests()
status = {
"load_metrics_report": self.load_metrics.summary()._asdict()
}
Expand All @@ -268,9 +216,6 @@ def _run(self):
_internal_kv_put(
DEBUG_AUTOSCALING_STATUS, as_json, overwrite=True)

# Process a round of messages.
self.process_messages()

# Wait for a autoscaler update interval before processing the next
# round of messages.
time.sleep(AUTOSCALER_UPDATE_INTERVAL_S)
Expand Down
10 changes: 8 additions & 2 deletions python/ray/tests/test_multi_node_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import ray
import ray.ray_constants as ray_constants
from ray.autoscaler.sdk import request_resources
from ray.monitor import Monitor
from ray.cluster_utils import Cluster
from ray.test_utils import generate_system_config_map, SignalActor
Expand Down Expand Up @@ -68,16 +69,21 @@ def setup_monitor(address):
monitor = Monitor(
address, None, redis_password=ray_constants.REDIS_DEFAULT_PASSWORD)
monitor.update_raylet_map(_append_port=True)
monitor.subscribe(ray.ray_constants.AUTOSCALER_RESOURCE_REQUEST_CHANNEL)
return monitor


def verify_load_metrics(monitor, expected_resource_usage=None, timeout=30):
request_resources(num_cpus=42)

while True:
monitor.update_load_metrics()
monitor.process_messages()
monitor.update_resource_requests()
resource_usage = monitor.load_metrics._get_resource_usage()

# Check resource request propagation.
req = monitor.load_metrics.resource_requests
assert req == [{"CPU": 1}] * 42, req

if "memory" in resource_usage[0]:
del resource_usage[0]["memory"]
if "object_store_memory" in resource_usage[1]:
Expand Down

0 comments on commit 4539391

Please sign in to comment.