Skip to content

Commit

Permalink
[Dashboard][event] Basic event module (ray-project#16985)
Browse files Browse the repository at this point in the history
* Basic event module

* Fix comments

* Set the SCAN_EVENT_DIR_INTERVAL_SECONDS defaults to 2

* Fix lint

* Fix lint

* Clean code

* Try to fix flaky

* Fix test

* Disable event module by default

* Make monitor events task cancellable

* Fix error

Co-authored-by: 刘宝 <po.lb@antfin.com>
  • Loading branch information
fyrestone and 刘宝 authored Jul 14, 2021
1 parent ce6dfc9 commit f1faa79
Show file tree
Hide file tree
Showing 14 changed files with 721 additions and 3 deletions.
1 change: 1 addition & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1956,6 +1956,7 @@ filegroup(
"//src/ray/protobuf:agent_manager_py_proto",
"//src/ray/protobuf:common_py_proto",
"//src/ray/protobuf:core_worker_py_proto",
"//src/ray/protobuf:event_py_proto",
"//src/ray/protobuf:gcs_py_proto",
"//src/ray/protobuf:gcs_service_py_proto",
"//src/ray/protobuf:job_agent_py_proto",
Expand Down
2 changes: 2 additions & 0 deletions dashboard/datacenter.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class DataSource:
job_actors = Dict()
# {worker id(str): core worker stats}
core_worker_stats = Dict()
# {job id hex(str): {event id(str): event dict}}
events = Dict()
# {node ip (str): log entries by pid
# (dict from pid to list of latest log entries)}
ip_and_pid_to_logs = Dict()
Expand Down
Empty file.
90 changes: 90 additions & 0 deletions dashboard/modules/event/event_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import os
import asyncio
import logging
from typing import Union
from grpc.experimental import aio as aiogrpc

import ray.new_dashboard.utils as dashboard_utils
import ray.new_dashboard.consts as dashboard_consts
from ray.ray_constants import env_bool
from ray.new_dashboard.utils import async_loop_forever, create_task
from ray.new_dashboard.modules.event import event_consts
from ray.new_dashboard.modules.event.event_utils import monitor_events
from ray.core.generated import event_pb2
from ray.core.generated import event_pb2_grpc

logger = logging.getLogger(__name__)
routes = dashboard_utils.ClassMethodRouteTable


@dashboard_utils.dashboard_module(
enable=env_bool(event_consts.EVENT_MODULE_ENVIRONMENT_KEY, False))
class EventAgent(dashboard_utils.DashboardAgentModule):
def __init__(self, dashboard_agent):
super().__init__(dashboard_agent)
self._event_dir = os.path.join(self._dashboard_agent.log_dir, "events")
os.makedirs(self._event_dir, exist_ok=True)
self._monitor: Union[asyncio.Task, None] = None
self._stub: Union[event_pb2_grpc.ReportEventServiceStub, None] = None
self._cached_events = asyncio.Queue(
event_consts.EVENT_AGENT_CACHE_SIZE)
logger.info("Event agent cache buffer size: %s",
self._cached_events.maxsize)

async def _connect_to_dashboard(self):
""" Connect to the dashboard. If the dashboard is not started, then
this method will never returns.
Returns:
The ReportEventServiceStub object.
"""
while True:
try:
aioredis = self._dashboard_agent.aioredis_client
dashboard_rpc_address = await aioredis.get(
dashboard_consts.REDIS_KEY_DASHBOARD_RPC)
if dashboard_rpc_address:
logger.info("Report events to %s", dashboard_rpc_address)
options = (("grpc.enable_http_proxy", 0), )
channel = aiogrpc.insecure_channel(
dashboard_rpc_address, options=options)
return event_pb2_grpc.ReportEventServiceStub(channel)
except Exception:
logger.exception("Connect to dashboard failed.")
await asyncio.sleep(
event_consts.RETRY_CONNECT_TO_DASHBOARD_INTERVAL_SECONDS)

@async_loop_forever(event_consts.EVENT_AGENT_REPORT_INTERVAL_SECONDS)
async def report_events(self):
""" Report events from cached events queue. Reconnect to dashboard if
report failed. Log error after retry EVENT_AGENT_RETRY_TIMES.
This method will never returns.
"""
data = await self._cached_events.get()
for _ in range(event_consts.EVENT_AGENT_RETRY_TIMES):
try:
logger.info("Report %s events.", len(data))
request = event_pb2.ReportEventsRequest(event_strings=data)
await self._stub.ReportEvents(request)
break
except Exception:
logger.exception("Report event failed, reconnect to the "
"dashboard.")
self._stub = await self._connect_to_dashboard()
else:
data_str = str(data)
limit = event_consts.LOG_ERROR_EVENT_STRING_LENGTH_LIMIT
logger.error("Report event failed: %s",
data_str[:limit] + (data_str[limit:] and "..."))

async def run(self, server):
# Connect to dashboard.
self._stub = await self._connect_to_dashboard()
# Start monitor task.
self._monitor = monitor_events(
self._event_dir,
lambda data: create_task(self._cached_events.put(data)),
source_types=event_consts.EVENT_AGENT_MONITOR_SOURCE_TYPES)
# Start reporting events.
await self.report_events()
26 changes: 26 additions & 0 deletions dashboard/modules/event/event_consts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from ray.ray_constants import env_integer
from ray.core.generated import event_pb2

EVENT_MODULE_ENVIRONMENT_KEY = "RAY_DASHBOARD_MODULE_EVENT"
LOG_ERROR_EVENT_STRING_LENGTH_LIMIT = 1000
RETRY_CONNECT_TO_DASHBOARD_INTERVAL_SECONDS = 2
# Monitor events
SCAN_EVENT_DIR_INTERVAL_SECONDS = env_integer(
"SCAN_EVENT_DIR_INTERVAL_SECONDS", 2)
SCAN_EVENT_START_OFFSET_SECONDS = -30 * 60
CONCURRENT_READ_LIMIT = 50
EVENT_READ_LINE_COUNT_LIMIT = 200
EVENT_READ_LINE_LENGTH_LIMIT = env_integer("EVENT_READ_LINE_LENGTH_LIMIT",
2 * 1024 * 1024) # 2MB
# Report events
EVENT_AGENT_REPORT_INTERVAL_SECONDS = 0.1
EVENT_AGENT_RETRY_TIMES = 10
EVENT_AGENT_CACHE_SIZE = 10240
# Event sources
EVENT_HEAD_MONITOR_SOURCE_TYPES = [
event_pb2.Event.SourceType.Name(event_pb2.Event.GCS)
]
EVENT_AGENT_MONITOR_SOURCE_TYPES = list(
set(event_pb2.Event.SourceType.keys()) -
set(EVENT_HEAD_MONITOR_SOURCE_TYPES))
EVENT_SOURCE_ALL = event_pb2.Event.SourceType.keys()
89 changes: 89 additions & 0 deletions dashboard/modules/event/event_head.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import os
import asyncio
import logging
from typing import Union
from collections import OrderedDict, defaultdict

import aiohttp.web

import ray.new_dashboard.utils as dashboard_utils
from ray.ray_constants import env_bool
from ray.new_dashboard.modules.event import event_consts
from ray.new_dashboard.modules.event.event_utils import (
parse_event_strings,
monitor_events,
)
from ray.core.generated import event_pb2
from ray.core.generated import event_pb2_grpc
from ray.new_dashboard.datacenter import DataSource

logger = logging.getLogger(__name__)
routes = dashboard_utils.ClassMethodRouteTable

JobEvents = OrderedDict
dashboard_utils._json_compatible_types.add(JobEvents)


@dashboard_utils.dashboard_module(
enable=env_bool(event_consts.EVENT_MODULE_ENVIRONMENT_KEY, False))
class EventHead(dashboard_utils.DashboardHeadModule,
event_pb2_grpc.ReportEventServiceServicer):
def __init__(self, dashboard_head):
super().__init__(dashboard_head)
self._event_dir = os.path.join(self._dashboard_head.log_dir, "events")
os.makedirs(self._event_dir, exist_ok=True)
self._monitor: Union[asyncio.Task, None] = None

@staticmethod
def _update_events(event_list):
# {job_id: {event_id: event}}
all_job_events = defaultdict(JobEvents)
for event in event_list:
event_id = event["event_id"]
custom_fields = event.get("custom_fields")
system_event = False
if custom_fields:
job_id = custom_fields.get("job_id", "global") or "global"
else:
job_id = "global"
if system_event is False:
all_job_events[job_id][event_id] = event
# TODO(fyrestone): Limit the event count per job.
for job_id, new_job_events in all_job_events.items():
job_events = DataSource.events.get(job_id, JobEvents())
job_events.update(new_job_events)
DataSource.events[job_id] = job_events

async def ReportEvents(self, request, context):
received_events = []
if request.event_strings:
received_events.extend(parse_event_strings(request.event_strings))
logger.info("Received %d events", len(received_events))
self._update_events(received_events)
return event_pb2.ReportEventsReply(send_success=True)

@routes.get("/events")
@dashboard_utils.aiohttp_cache(2)
async def get_event(self, req) -> aiohttp.web.Response:
job_id = req.query.get("job_id")
if job_id is None:
all_events = {
job_id: list(job_events.values())
for job_id, job_events in DataSource.events.items()
}
return dashboard_utils.rest_response(
success=True, message="All events fetched.", events=all_events)

job_events = DataSource.events.get(job_id, {})
return dashboard_utils.rest_response(
success=True,
message="Job events fetched.",
job_id=job_id,
events=list(job_events.values()))

async def run(self, server):
event_pb2_grpc.add_ReportEventServiceServicer_to_server(self, server)
self._monitor = monitor_events(
self._event_dir,
lambda data: self._update_events(parse_event_strings(data)),
source_types=event_consts.EVENT_HEAD_MONITOR_SOURCE_TYPES)
Loading

0 comments on commit f1faa79

Please sign in to comment.