From ff6d412ad9251284645bc8cbd5fb786b107e0782 Mon Sep 17 00:00:00 2001 From: Max Fitton Date: Fri, 2 Oct 2020 17:58:44 -0700 Subject: [PATCH] [Dashboard] Add API support for the logical view and machine view in new backend (#11012) * Add API support for the logical view and machine view, which lean on datacenter in common. * Update dashboard/datacenter.py Co-authored-by: fyrestone * Update dashboard/modules/logical_view/logical_view_head.py Co-authored-by: fyrestone * Address PR comments * lint * Add dashboard tests to CI build * Fix integration issues * lint Co-authored-by: Max Fitton Co-authored-by: fyrestone --- .travis.yml | 12 +- dashboard/datacenter.py | 121 ++++++++++++++++- dashboard/memory_utils.py | 4 +- dashboard/modules/logical_view/__init__.py | 0 .../modules/logical_view/logical_view_head.py | 58 ++++++++ .../logical_view/test_logical_view_head.py | 79 +++++++++++ dashboard/modules/reporter/test_reporter.py | 2 +- .../stats_collector/stats_collector_consts.py | 2 + .../stats_collector/stats_collector_head.py | 128 +++++++++++++++++- .../stats_collector/test_stats_collector.py | 73 ++++++++++ 10 files changed, 460 insertions(+), 19 deletions(-) create mode 100644 dashboard/modules/logical_view/__init__.py create mode 100644 dashboard/modules/logical_view/logical_view_head.py create mode 100644 dashboard/modules/logical_view/test_logical_view_head.py diff --git a/.travis.yml b/.travis.yml index 6c0d899cf998b..ad05c641ca142 100644 --- a/.travis.yml +++ b/.travis.yml @@ -28,7 +28,7 @@ matrix: - RAY_CYTHON_EXAMPLES=1 - RAY_USE_RANDOM_PORTS=1 install: - - . ./ci/travis/ci.sh init RAY_CI_SERVE_AFFECTED,RAY_CI_TUNE_AFFECTED,RAY_CI_PYTHON_AFFECTED + - . ./ci/travis/ci.sh init RAY_CI_SERVE_AFFECTED,RAY_CI_TUNE_AFFECTED,RAY_CI_PYTHON_AFFECTED,RAY_CI_DASHBOARD_AFFECTED before_script: - . ./ci/travis/ci.sh build @@ -40,7 +40,7 @@ matrix: - RAY_CYTHON_EXAMPLES=1 - RAY_USE_RANDOM_PORTS=1 install: - - . ./ci/travis/ci.sh init RAY_CI_SERVE_AFFECTED,RAY_CI_TUNE_AFFECTED,RAY_CI_PYTHON_AFFECTED + - . ./ci/travis/ci.sh init RAY_CI_SERVE_AFFECTED,RAY_CI_TUNE_AFFECTED,RAY_CI_PYTHON_AFFECTED,RAY_CI_DASHBOARD_AFFECTED before_script: - . ./ci/travis/ci.sh build script: @@ -55,7 +55,7 @@ matrix: - RAY_CYTHON_EXAMPLES=1 - RAY_USE_RANDOM_PORTS=1 install: - - . ./ci/travis/ci.sh init RAY_CI_SERVE_AFFECTED,RAY_CI_TUNE_AFFECTED,RAY_CI_PYTHON_AFFECTED + - . ./ci/travis/ci.sh init RAY_CI_SERVE_AFFECTED,RAY_CI_TUNE_AFFECTED,RAY_CI_PYTHON_AFFECTED,RAY_CI_DASHBOARD_AFFECTED before_script: - . ./ci/travis/ci.sh build script: @@ -84,7 +84,7 @@ matrix: - RAY_CYTHON_EXAMPLES=1 - RAY_USE_RANDOM_PORTS=1 install: - - . ./ci/travis/ci.sh init RAY_CI_SERVE_AFFECTED,RAY_CI_TUNE_AFFECTED,RAY_CI_PYTHON_AFFECTED + - . ./ci/travis/ci.sh init RAY_CI_SERVE_AFFECTED,RAY_CI_TUNE_AFFECTED,RAY_CI_PYTHON_AFFECTED,RAY_CI_DASHBOARD_AFFECTED before_script: - . ./ci/travis/ci.sh build @@ -97,7 +97,7 @@ matrix: - RAY_CYTHON_EXAMPLES=1 - RAY_USE_RANDOM_PORTS=1 install: - - . ./ci/travis/ci.sh init RAY_CI_SERVE_AFFECTED,RAY_CI_TUNE_AFFECTED,RAY_CI_PYTHON_AFFECTED + - . ./ci/travis/ci.sh init RAY_CI_SERVE_AFFECTED,RAY_CI_TUNE_AFFECTED,RAY_CI_PYTHON_AFFECTED,RAY_CI_DASHBOARD_AFFECTED before_script: - . ./ci/travis/ci.sh build script: @@ -113,7 +113,7 @@ matrix: - RAY_CYTHON_EXAMPLES=1 - RAY_USE_RANDOM_PORTS=1 install: - - . ./ci/travis/ci.sh init RAY_CI_SERVE_AFFECTED,RAY_CI_TUNE_AFFECTED,RAY_CI_PYTHON_AFFECTED + - . ./ci/travis/ci.sh init RAY_CI_SERVE_AFFECTED,RAY_CI_TUNE_AFFECTED,RAY_CI_PYTHON_AFFECTED,RAY_CI_DASHBOARD_AFFECTED before_script: - . ./ci/travis/ci.sh build script: diff --git a/dashboard/datacenter.py b/dashboard/datacenter.py index c7a0fdc67c9fa..bb1624db415a0 100644 --- a/dashboard/datacenter.py +++ b/dashboard/datacenter.py @@ -1,6 +1,7 @@ import logging - import ray.new_dashboard.consts as dashboard_consts +import ray.new_dashboard.memory_utils as memory_utils +from ray.new_dashboard.actor_utils import actor_classname_from_task_spec from ray.new_dashboard.utils import Dict, Signal logger = logging.getLogger(__name__) @@ -27,6 +28,12 @@ class DataSource: node_id_to_ip = Dict() # {node id hex(str): hostname(str)} node_id_to_hostname = Dict() + # {node ip (str): log entries by pid + # (dict from pid to list of latest log entries)} + ip_and_pid_to_logs = Dict() + # {node ip (str): error entries by pid + # (dict from pid to list of latest err entries)} + ip_and_pid_to_errors = Dict() class DataOrganizer: @@ -54,12 +61,26 @@ async def purge(): @classmethod async def get_node_actors(cls, node_id): node_stats = DataSource.node_stats.get(node_id, {}) - node_worker_id_set = set() + worker_id_to_info = {} for worker_stats in node_stats.get("workersStats", []): - node_worker_id_set.add(worker_stats["workerId"]) + worker_id_to_info[worker_stats["workerId"]] = worker_stats + node_actors = {} for actor_id, actor_table_data in DataSource.actors.items(): - if actor_table_data["address"]["workerId"] in node_worker_id_set: + if actor_table_data["address"]["workerId"] in worker_id_to_info: + worker_stats = worker_id_to_info[actor_table_data["address"][ + "workerId"]] + + actor_constructor = worker_stats.get("coreWorkerStats", {})\ + .get("actorTitle", "Unknown actor constructor") + + actor_table_data["actorConstructor"] = actor_constructor + + actor_class = actor_classname_from_task_spec( + actor_table_data.get("taskSpec", {})) + + actor_table_data["actorClass"] = actor_class + actor_table_data.update(worker_stats["coreWorkerStats"]) node_actors[actor_id] = actor_table_data return node_actors @@ -69,6 +90,18 @@ async def get_node_info(cls, node_id): node_stats = DataSource.node_stats.get(node_id, {}) node = DataSource.nodes.get(node_id, {}) + # Merge node log count information into the payload + log_info = DataSource.ip_and_pid_to_logs.get(node_physical_stats["ip"], + {}) + node_log_count = 0 + for entries in log_info.values(): + node_log_count += len(entries) + error_info = DataSource.ip_and_pid_to_errors.get( + node_physical_stats["ip"], {}) + node_err_count = 0 + for entries in error_info.values(): + node_err_count += len(entries) + # Merge coreWorkerStats (node stats) to workers (node physical stats) workers_stats = node_stats.pop("workersStats", {}) pid_to_worker_stats = {} @@ -88,15 +121,24 @@ async def get_node_info(cls, node_id): worker["coreWorkerStats"] = list(worker_stats.values()) worker["language"] = pid_to_language.get(worker["pid"], "") worker["jobId"] = pid_to_job_id.get(worker["pid"], "ffff") + worker["logCount"] = len(log_info.get(str(worker["pid"]), [])) + worker["errorCount"] = len(error_info.get(str(worker["pid"]), [])) + + ray_stats = _extract_view_data( + node_stats["viewData"], + {"object_store_used_memory", "object_store_available_memory"}) node_info = node_physical_stats - # Merge node stats to node physical stats + # Merge node stats to node physical stats under raylet node_info["raylet"] = node_stats + node_info["raylet"].update(ray_stats) + # Merge GcsNodeInfo to node physical stats node_info["raylet"].update(node) # Merge actors to node physical stats node_info["actors"] = await cls.get_node_actors(node_id) - + node_info["logCount"] = node_log_count + node_info["errorCount"] = node_err_count await GlobalSignals.node_info_fetched.send(node_info) return node_info @@ -112,3 +154,70 @@ async def get_all_node_summary(cls): node_info["raylet"].pop("viewData", None) all_nodes_summary.append(node_info) return all_nodes_summary + + @classmethod + async def get_all_node_details(cls): + node_details = [] + for node_id in DataSource.nodes.keys(): + node_details.append(await cls.get_node_info(node_id)) + return node_details + + @classmethod + async def get_all_actors(cls): + all_actors = {} + for node_id in DataSource.nodes.keys(): + all_actors.update(await cls.get_node_actors(node_id)) + return all_actors + + @classmethod + async def get_actor_creation_tasks(cls): + infeasible_tasks = sum( + (node_stats.get("infeasibleTasks", []) + for node_stats in DataSource.node_stats.values()), []) + for task in infeasible_tasks: + task["actorClass"] = actor_classname_from_task_spec(task) + task["state"] = "INFEASIBLE" + + resource_pending_tasks = sum( + (data.get("readyTasks", []) + for data in DataSource.node_stats.values()), []) + for task in resource_pending_tasks: + task["actorClass"] = actor_classname_from_task_spec(task) + task["state"] = "PENDING_RESOURCES" + + results = { + task["actorCreationTaskSpec"]["actorId"]: task + for task in resource_pending_tasks + infeasible_tasks + } + return results + + @classmethod + async def get_memory_table(cls, + sort_by=memory_utils.SortingType.OBJECT_SIZE, + group_by=memory_utils.GroupByType.STACK_TRACE): + all_worker_stats = [] + for node_stats in DataSource.node_stats.values(): + all_worker_stats.extend(node_stats.get("workersStats", [])) + memory_information = memory_utils.construct_memory_table( + all_worker_stats, group_by=group_by, sort_by=sort_by) + return memory_information + + +def _extract_view_data(views, data_keys): + view_data = {} + for view in views: + view_name = view["viewName"] + if view_name in data_keys: + if not view.get("measures"): + view_data[view_name] = 0 + continue + measure = view["measures"][0] + if "doubleValue" in measure: + measure_value = measure["doubleValue"] + elif "intValue" in measure: + measure_value = measure["intValue"] + else: + measure_value = 0 + view_data[view_name] = measure_value + + return view_data diff --git a/dashboard/memory_utils.py b/dashboard/memory_utils.py index 544aabce48044..68ba3f877bce2 100644 --- a/dashboard/memory_utils.py +++ b/dashboard/memory_utils.py @@ -269,10 +269,10 @@ def as_dict(self): } def get_entries(self) -> List[dict]: - return [entry.__dict__() for entry in self.table] + return [entry.as_dict() for entry in self.table] def __repr__(self): - return str(self.__dict__()) + return str(self.as_dict()) def __str__(self): return self.__repr__() diff --git a/dashboard/modules/logical_view/__init__.py b/dashboard/modules/logical_view/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/dashboard/modules/logical_view/logical_view_head.py b/dashboard/modules/logical_view/logical_view_head.py new file mode 100644 index 0000000000000..f8168f319b0db --- /dev/null +++ b/dashboard/modules/logical_view/logical_view_head.py @@ -0,0 +1,58 @@ +import logging +import aiohttp.web +import ray.utils +import ray.new_dashboard.utils as dashboard_utils +import ray.new_dashboard.actor_utils as actor_utils +from ray.new_dashboard.utils import rest_response +from ray.new_dashboard.datacenter import DataOrganizer +from ray.core.generated import core_worker_pb2 +from ray.core.generated import core_worker_pb2_grpc + +from grpc.experimental import aio as aiogrpc + +logger = logging.getLogger(__name__) +routes = dashboard_utils.ClassMethodRouteTable + + +class LogicalViewHead(dashboard_utils.DashboardHeadModule): + @routes.get("/logical/actor_groups") + async def get_actor_groups(self, req) -> aiohttp.web.Response: + actors = await DataOrganizer.get_all_actors() + actor_creation_tasks = await DataOrganizer.get_actor_creation_tasks() + # actor_creation_tasks have some common interface with actors, + # and they get processed and shown in tandem in the logical view + # hence we merge them together before constructing actor groups. + actors.update(actor_creation_tasks) + actor_groups = actor_utils.construct_actor_groups(actors) + return await rest_response( + success=True, + message="Fetched actor groups.", + actor_groups=actor_groups) + + @routes.get("/logical/kill_actor") + async def kill_actor(self, req) -> aiohttp.web.Response: + try: + actor_id = req.query["actorId"] + ip_address = req.query["ipAddress"] + port = req.query["port"] + except KeyError: + return await rest_response(success=False, message="Bad Request") + try: + channel = aiogrpc.insecure_channel(f"{ip_address}:{port}") + stub = core_worker_pb2_grpc.CoreWorkerServiceStub(channel) + + await stub.KillActor( + core_worker_pb2.KillActorRequest( + intended_actor_id=ray.utils.hex_to_binary(actor_id))) + + except aiogrpc.AioRpcError: + # This always throws an exception because the worker + # is killed and the channel is closed on the worker side + # before this handler, however it deletes the actor correctly. + pass + + return await rest_response( + success=True, message=f"Killed actor with id {actor_id}") + + async def run(self, server): + pass diff --git a/dashboard/modules/logical_view/test_logical_view_head.py b/dashboard/modules/logical_view/test_logical_view_head.py new file mode 100644 index 0000000000000..f4118da51e9aa --- /dev/null +++ b/dashboard/modules/logical_view/test_logical_view_head.py @@ -0,0 +1,79 @@ +import os +import sys +import logging +import requests +import time +import traceback +import pytest +import ray +from ray.new_dashboard.tests.conftest import * # noqa +from ray.test_utils import ( + format_web_url, + wait_until_server_available, +) + +os.environ["RAY_USE_NEW_DASHBOARD"] = "1" + +logger = logging.getLogger(__name__) + + +def test_actor_groups(ray_start_with_dashboard): + @ray.remote + class Foo: + def __init__(self, num): + self.num = num + + def do_task(self): + return self.num + + @ray.remote(num_gpus=1) + class InfeasibleActor: + pass + + foo_actors = [Foo.remote(4), Foo.remote(5)] + infeasible_actor = InfeasibleActor.remote() # noqa + results = [actor.do_task.remote() for actor in foo_actors] # noqa + assert (wait_until_server_available(ray_start_with_dashboard["webui_url"]) + is True) + webui_url = ray_start_with_dashboard["webui_url"] + webui_url = format_web_url(webui_url) + + timeout_seconds = 5 + start_time = time.time() + last_ex = None + while True: + time.sleep(1) + try: + response = requests.get(webui_url + "/logical/actor_groups") + response.raise_for_status() + actor_groups_resp = response.json() + assert actor_groups_resp["result"] is True, actor_groups_resp[ + "msg"] + actor_groups = actor_groups_resp["data"]["actorGroups"] + assert "Foo" in actor_groups + summary = actor_groups["Foo"]["summary"] + # 2 __init__ tasks and 2 do_task tasks + assert summary["numExecutedTasks"] == 4 + assert summary["stateToCount"]["ALIVE"] == 2 + + entries = actor_groups["Foo"]["entries"] + assert len(entries) == 2 + assert "InfeasibleActor" in actor_groups + + entries = actor_groups["InfeasibleActor"]["entries"] + assert "requiredResources" in entries[0] + assert "GPU" in entries[0]["requiredResources"] + break + except Exception as ex: + last_ex = ex + finally: + if time.time() > start_time + timeout_seconds: + ex_stack = traceback.format_exception( + type(last_ex), last_ex, + last_ex.__traceback__) if last_ex else [] + ex_stack = "".join(ex_stack) + raise Exception(f"Timed out while testing, {ex_stack}") + + +if __name__ == "__main__": + sys.exit(pytest.main(["-v", __file__])) diff --git a/dashboard/modules/reporter/test_reporter.py b/dashboard/modules/reporter/test_reporter.py index 099d0fdeacca6..915524ee98dfd 100644 --- a/dashboard/modules/reporter/test_reporter.py +++ b/dashboard/modules/reporter/test_reporter.py @@ -38,7 +38,7 @@ def getpid(self): launch_profiling = None while True: # Sometimes some startup time is required - if time.time() - start_time > 10: + if time.time() - start_time > 15: raise RayTestTimeoutException( "Timed out while collecting profiling stats, " f"launch_profiling: {launch_profiling}") diff --git a/dashboard/modules/stats_collector/stats_collector_consts.py b/dashboard/modules/stats_collector/stats_collector_consts.py index a0d58d32602f1..55119cd75dfab 100644 --- a/dashboard/modules/stats_collector/stats_collector_consts.py +++ b/dashboard/modules/stats_collector/stats_collector_consts.py @@ -1,3 +1,5 @@ NODE_STATS_UPDATE_INTERVAL_SECONDS = 1 RETRY_GET_ALL_ACTOR_INFO_INTERVAL_SECONDS = 1 ACTOR_CHANNEL = "ACTOR" +ERROR_INFO_UPDATE_INTERVAL_SECONDS = 5 +LOG_INFO_UPDATE_INTERVAL_SECONDS = 5 diff --git a/dashboard/modules/stats_collector/stats_collector_head.py b/dashboard/modules/stats_collector/stats_collector_head.py index c70f057a0d055..23f0ebde76338 100644 --- a/dashboard/modules/stats_collector/stats_collector_head.py +++ b/dashboard/modules/stats_collector/stats_collector_head.py @@ -1,6 +1,7 @@ import asyncio +import re import logging - +import json import aiohttp.web from aioredis.pubsub import Receiver from grpc.experimental import aio as aiogrpc @@ -10,6 +11,7 @@ as stats_collector_consts import ray.new_dashboard.utils as dashboard_utils from ray.new_dashboard.utils import async_loop_forever +from ray.new_dashboard.memory_utils import GroupByType, SortingType from ray.core.generated import node_manager_pb2 from ray.core.generated import node_manager_pb2_grpc from ray.core.generated import gcs_service_pb2 @@ -46,6 +48,7 @@ def __init__(self, dashboard_head): self._gcs_job_info_stub = None # ActorInfoGcsService self._gcs_actor_info_stub = None + self._collect_memory_info = False DataSource.nodes.signal.append(self._update_stubs) async def _update_stubs(self, change): @@ -69,6 +72,13 @@ async def get_all_nodes(self, req) -> aiohttp.web.Response: success=True, message="Node summary fetched.", summary=all_node_summary) + elif view == "details": + all_node_details = await DataOrganizer.get_all_node_details() + return await dashboard_utils.rest_response( + success=True, + message="All node details fetched", + clients=all_node_details, + ) elif view is not None and view.lower() == "hostNameList".lower(): alive_hostnames = set() for node in DataSource.nodes.values(): @@ -87,7 +97,56 @@ async def get_node(self, req) -> aiohttp.web.Response: node_id = req.match_info.get("node_id") node_info = await DataOrganizer.get_node_info(node_id) return await dashboard_utils.rest_response( - success=True, message="Node detail fetched.", detail=node_info) + success=True, message="Node details fetched.", detail=node_info) + + @routes.get("/memory/memory_table") + async def get_memory_table(self, req) -> aiohttp.web.Response: + group_by = req.query.get("group_by") + sort_by = req.query.get("sort_by") + kwargs = {} + if group_by: + kwargs["group_by"] = GroupByType(group_by) + if sort_by: + kwargs["sort_by"] = SortingType(sort_by) + + memory_table = await DataOrganizer.get_memory_table(**kwargs) + return await dashboard_utils.rest_response( + success=True, + message="Fetched memory table", + memory_table=memory_table.as_dict()) + + @routes.get("/memory/set_fetch") + async def set_fetch_memory_info(self, req) -> aiohttp.web.Response: + should_fetch = req.query["shouldFetch"] + if should_fetch == "true": + self._collect_memory_info = True + elif should_fetch == "false": + self._collect_memory_info = False + else: + return await dashboard_utils.rest_response( + success=False, + message=f"Unknown argument to set_fetch {should_fetch}") + return await dashboard_utils.rest_response( + success=True, + message=f"Successfully set fetching to {should_fetch}") + + @routes.get("/node_logs") + async def get_logs(self, req) -> aiohttp.web.Response: + ip = req.query["ip"] + pid = req.query.get("pid") + node_logs = DataSource.ip_and_pid_to_logs[ip] + payload = node_logs.get(pid, []) if pid else node_logs + return await dashboard_utils.rest_response( + success=True, message="Fetched logs.", logs=payload) + + @routes.get("/node_errors") + async def get_errors(self, req) -> aiohttp.web.Response: + ip = req.query["ip"] + pid = req.query.get("pid") + node_errors = DataSource.ip_and_pid_to_errors[ip] + filtered_errs = node_errors.get(pid, []) if pid else node_errors + return await dashboard_utils.rest_response( + success=True, message="Fetched errors.", errors=filtered_errs) async def _update_actors(self): # Subscribe actor channel. @@ -144,12 +203,71 @@ async def _update_node_stats(self): continue try: reply = await stub.GetNodeStats( - node_manager_pb2.GetNodeStatsRequest(), timeout=2) + node_manager_pb2.GetNodeStatsRequest( + include_memory_info=self._collect_memory_info), + timeout=2) reply_dict = node_stats_to_dict(reply) DataSource.node_stats[node_id] = reply_dict except Exception: logger.exception(f"Error updating node stats of {node_id}.") + async def _update_log_info(self): + aioredis_client = self._dashboard_head.aioredis_client + receiver = Receiver() + + channel = receiver.channel(ray.gcs_utils.LOG_FILE_CHANNEL) + await aioredis_client.subscribe(channel) + logger.info("Subscribed to %s", channel) + + async for sender, msg in receiver.iter(): + try: + data = json.loads(ray.utils.decode(msg)) + logger.error(f"data={data}") + ip = data["ip"] + pid = str(data["pid"]) + logs_for_ip = DataSource.ip_and_pid_to_logs.get(ip, {}) + logs_for_pid = logs_for_ip.get(pid, []) + logs_for_pid.extend(data["lines"]) + logs_for_ip[pid] = logs_for_pid + DataSource.ip_and_pid_to_logs[ip] = logs_for_ip + logger.info(f"Received a log for {ip} and {pid}") + except Exception: + logger.exception("Error receiving log info.") + + async def _update_error_info(self): + aioredis_client = self._dashboard_head.aioredis_client + receiver = Receiver() + + key = ray.gcs_utils.RAY_ERROR_PUBSUB_PATTERN + pattern = receiver.pattern(key) + await aioredis_client.psubscribe(pattern) + logger.info("Subscribed to %s", key) + + async for sender, msg in receiver.iter(): + try: + _, data = msg + pubsub_msg = ray.gcs_utils.PubSubMessage.FromString(data) + error_data = ray.gcs_utils.ErrorTableData.FromString( + pubsub_msg.data) + message = error_data.error_message + message = re.sub(r"\x1b\[\d+m", "", message) + match = re.search(r"\(pid=(\d+), ip=(.*?)\)", message) + if match: + pid = match.group(1) + ip = match.group(2) + errs_for_ip = DataSource.ip_and_pid_to_errors.get(ip, {}) + pid_errors = errs_for_ip.get(pid, []) + pid_errors.append({ + "message": message, + "timestamp": error_data.timestamp, + "type": error_data.type + }) + errs_for_ip[pid] = pid_errors + DataSource.ip_and_pid_to_errors[ip] = errs_for_ip + logger.info(f"Received error entry for {ip} {pid}") + except Exception: + logger.exception("Error receiving error info.") + async def run(self, server): gcs_channel = self._dashboard_head.aiogrpc_gcs_channel self._gcs_job_info_stub = \ @@ -157,4 +275,6 @@ async def run(self, server): self._gcs_actor_info_stub = \ gcs_service_pb2_grpc.ActorInfoGcsServiceStub(gcs_channel) - await asyncio.gather(self._update_node_stats(), self._update_actors()) + await asyncio.gather(self._update_node_stats(), self._update_actors(), + self._update_log_info(), + self._update_error_info()) diff --git a/dashboard/modules/stats_collector/test_stats_collector.py b/dashboard/modules/stats_collector/test_stats_collector.py index 0c8305c83b4af..dea19997c7138 100644 --- a/dashboard/modules/stats_collector/test_stats_collector.py +++ b/dashboard/modules/stats_collector/test_stats_collector.py @@ -91,6 +91,79 @@ def getpid(self): raise Exception(f"Timed out while testing, {ex_stack}") +def test_memory_table(ray_start_with_dashboard): + assert (wait_until_server_available(ray_start_with_dashboard["webui_url"])) + + @ray.remote + class ActorWithObjs: + def __init__(self): + self.obj_ref = ray.put([1, 2, 3]) + + def get_obj(self): + return ray.get(self.obj_ref) + + my_obj = ray.put([1, 2, 3] * 100) # noqa + actors = [ActorWithObjs.remote() for _ in range(2)] # noqa + results = ray.get([actor.get_obj.remote() for actor in actors]) # noqa + webui_url = format_web_url(ray_start_with_dashboard["webui_url"]) + resp = requests.get( + webui_url + "/memory/set_fetch", params={"shouldFetch": "true"}) + resp.raise_for_status() + + def check_mem_table(): + resp = requests.get(f"{webui_url}/memory/memory_table") + resp_data = resp.json() + if not resp_data["result"]: + return False + latest_memory_table = resp_data["data"]["memoryTable"] + summary = latest_memory_table["summary"] + try: + # 1 ref per handle and per object the actor has a ref to + assert summary["totalActorHandles"] == len(actors) * 2 + # 1 ref for my_obj + assert summary["totalLocalRefCount"] == 1 + return True + except AssertionError: + return False + + wait_for_condition(check_mem_table, 10) + + +def test_get_all_node_details(ray_start_with_dashboard): + assert (wait_until_server_available(ray_start_with_dashboard["webui_url"])) + + webui_url = format_web_url(ray_start_with_dashboard["webui_url"]) + + @ray.remote + class ActorWithObjs: + def __init__(self): + self.obj_ref = ray.put([1, 2, 3]) + + def get_obj(self): + return ray.get(self.obj_ref) + + actors = [ActorWithObjs.remote() for _ in range(2)] # noqa + + def check_node_details(): + resp = requests.get(f"{webui_url}/nodes?view=details") + resp_json = resp.json() + resp_data = resp_json["data"] + try: + clients = resp_data["clients"] + node = clients[0] + assert len(clients) == 1 + assert len(node.get("actors")) == 2 + # Workers information should be in the detailed payload + assert "workers" in node + assert "logCount" in node + assert len(node["workers"]) == 2 + return True + except (AssertionError, KeyError, IndexError): + return False + + wait_for_condition(check_node_details, 15) + + @pytest.mark.parametrize( "ray_start_cluster_head", [{ "include_dashboard": True