Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Dashboard] New dashboard skeleton #9099

Merged
merged 14 commits into from
Jul 27, 2020
Empty file added dashboard/__init__.py
Empty file.
229 changes: 229 additions & 0 deletions dashboard/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
import argparse
import asyncio
import logging
import logging.handlers
import os
import sys
import traceback

import aiohttp
import aioredis
from grpc.experimental import aio as aiogrpc

import ray
import ray.new_dashboard.consts as dashboard_consts
import ray.new_dashboard.utils as dashboard_utils
import ray.ray_constants as ray_constants
import ray.services
import ray.utils
import psutil

logger = logging.getLogger(__name__)

aiogrpc.init_grpc_aio()


class DashboardAgent(object):
def __init__(self,
redis_address,
redis_password=None,
temp_dir=None,
log_dir=None,
node_manager_port=None,
object_store_name=None,
raylet_name=None):
"""Initialize the DashboardAgent object."""
self._agent_cls_list = dashboard_utils.get_all_modules(
dashboard_utils.DashboardAgentModule)
ip, port = redis_address.split(":")
# Public attributes are accessible for all agent modules.
self.redis_address = (ip, int(port))
self.redis_password = redis_password
self.temp_dir = temp_dir
self.log_dir = log_dir
self.node_manager_port = node_manager_port
self.object_store_name = object_store_name
self.raylet_name = raylet_name
self.ip = ray.services.get_node_ip_address()
self.server = aiogrpc.server(options=(("grpc.so_reuseport", 0), ))
listen_address = "[::]:0"
logger.info("Dashboard agent listen at: %s", listen_address)
self.port = self.server.add_insecure_port(listen_address)
self.aioredis_client = None
self.aiogrpc_raylet_channel = aiogrpc.insecure_channel("{}:{}".format(
self.ip, self.node_manager_port))
self.http_session = aiohttp.ClientSession(
loop=asyncio.get_event_loop())

def _load_modules(self):
fyrestone marked this conversation as resolved.
Show resolved Hide resolved
"""Load dashboard agent modules."""
modules = []
for cls in self._agent_cls_list:
logger.info("Load %s: %s",
dashboard_utils.DashboardAgentModule.__name__, cls)
c = cls(self)
modules.append(c)
logger.info("Load {} modules.".format(len(modules)))
return modules

async def run(self):
# Create an aioredis client for all modules.
self.aioredis_client = await aioredis.create_redis_pool(
address=self.redis_address, password=self.redis_password)

# Start a grpc asyncio server.
await self.server.start()

# Write the dashboard agent port to redis.
await self.aioredis_client.set(
"{}{}".format(dashboard_consts.DASHBOARD_AGENT_PORT_PREFIX,
self.ip), self.port)

async def _check_parent():
"""Check if raylet is dead."""
curr_proc = psutil.Process()
while True:
parent = curr_proc.parent()
if parent is None or parent.pid == 1:
logger.error("raylet is dead, agent will die because "
"it fate-shares with raylet.")
sys.exit(0)
await asyncio.sleep(
dashboard_consts.
DASHBOARD_AGENT_CHECK_PARENT_INTERVAL_SECONDS)

modules = self._load_modules()
await asyncio.gather(_check_parent(),
*(m.run(self.server) for m in modules))
await self.server.wait_for_termination()


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Dashboard agent.")
parser.add_argument(
"--redis-address",
required=True,
type=str,
help="The address to use for Redis.")
parser.add_argument(
"--node-manager-port",
required=True,
type=int,
help="The port to use for starting the node manager")
parser.add_argument(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need object store name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the job module to start the driver.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. We were planning to wait to merge the jobs portion of the new dashboard code until we get approval from Robert and the like to add jobs. I'm not sure what the latest status is there, but I'll look into it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to keep this argument in order to share the same code base.

"--object-store-name",
required=True,
type=str,
default=None,
help="The socket name of the plasma store")
parser.add_argument(
"--raylet-name",
required=True,
type=str,
default=None,
help="The socket path of the raylet process")
parser.add_argument(
"--redis-password",
required=False,
type=str,
default=None,
help="The password to use for Redis")
parser.add_argument(
"--logging-level",
required=False,
type=lambda s: logging.getLevelName(s.upper()),
default=ray_constants.LOGGER_LEVEL,
choices=ray_constants.LOGGER_LEVEL_CHOICES,
help=ray_constants.LOGGER_LEVEL_HELP)
parser.add_argument(
"--logging-format",
required=False,
type=str,
default=ray_constants.LOGGER_FORMAT,
help=ray_constants.LOGGER_FORMAT_HELP)
parser.add_argument(
"--logging-filename",
required=False,
type=str,
default=dashboard_consts.DASHBOARD_AGENT_LOG_FILENAME,
help="Specify the name of log file, "
"log to stdout if set empty, default is \"{}\".".format(
dashboard_consts.DASHBOARD_AGENT_LOG_FILENAME))
parser.add_argument(
"--logging-rotate-bytes",
required=False,
type=int,
default=dashboard_consts.LOGGING_ROTATE_BYTES,
help="Specify the max bytes for rotating "
"log file, default is {} bytes.".format(
dashboard_consts.LOGGING_ROTATE_BYTES))
parser.add_argument(
"--logging-rotate-backup-count",
required=False,
type=int,
default=dashboard_consts.LOGGING_ROTATE_BACKUP_COUNT,
help="Specify the backup count of rotated log file, default is {}.".
format(dashboard_consts.LOGGING_ROTATE_BACKUP_COUNT))
parser.add_argument(
"--log-dir",
required=False,
type=str,
default=None,
help="Specify the path of log directory.")
parser.add_argument(
"--temp-dir",
required=False,
type=str,
default=None,
help="Specify the path of the temporary directory use by Ray process.")

args = parser.parse_args()
try:
if args.temp_dir:
temp_dir = "/" + args.temp_dir.strip("/")
else:
temp_dir = "/tmp/ray"
os.makedirs(temp_dir, exist_ok=True)

if args.log_dir:
log_dir = args.log_dir
else:
log_dir = os.path.join(temp_dir, "session_latest/logs")
os.makedirs(log_dir, exist_ok=True)

if args.logging_filename:
logging_handlers = [
logging.handlers.RotatingFileHandler(
os.path.join(log_dir, args.logging_filename),
maxBytes=args.logging_rotate_bytes,
backupCount=args.logging_rotate_backup_count)
]
else:
logging_handlers = None
logging.basicConfig(
level=args.logging_level,
format=args.logging_format,
handlers=logging_handlers)

agent = DashboardAgent(
args.redis_address,
redis_password=args.redis_password,
temp_dir=temp_dir,
log_dir=log_dir,
node_manager_port=args.node_manager_port,
object_store_name=args.object_store_name,
raylet_name=args.raylet_name)

loop = asyncio.get_event_loop()
loop.create_task(agent.run())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create_task is available after python 3.7. https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task

Can you use Python 3.5 compatible API? I believe it was ensure_future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method asyncio.loop.create_task is available in Python 3.5. Please refer to: https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.create_task

loop.run_forever()
except Exception as e:
# Something went wrong, so push an error to all drivers.
redis_client = ray.services.create_redis_client(
args.redis_address, password=args.redis_password)
traceback_str = ray.utils.format_error_message(traceback.format_exc())
message = ("The agent on node {} failed with the following "
"error:\n{}".format(os.uname()[1], traceback_str))
ray.utils.push_error_to_driver_through_redis(
redis_client, ray_constants.DASHBOARD_AGENT_DIED_ERROR, message)
raise e
16 changes: 16 additions & 0 deletions dashboard/consts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
DASHBOARD_AGENT_PORT_PREFIX = "DASHBOARD_AGENT_PORT_PREFIX:"
DASHBOARD_AGENT_LOG_FILENAME = "dashboard_agent.log"
DASHBOARD_AGENT_CHECK_PARENT_INTERVAL_SECONDS = 2
MAX_COUNT_OF_GCS_RPC_ERROR = 10
UPDATE_NODES_INTERVAL_SECONDS = 5
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you set it to 1 second? We'd like to make it real time. It'll be also great if this value is configurable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I will make the values configurable in the future. Here are two ways:

  • Read value from environment, just like env_integer in ray_constants.py.
  • Pass value from dashboard arguments.

I prefer the first one, It's cleaner but not easy to use.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the first one is fine. It'll be pretty easy to iterate or upgrade if necessary.

CONNECT_GCS_INTERVAL_SECONDS = 2
PURGE_DATA_INTERVAL_SECONDS = 60 * 10
REDIS_KEY_DASHBOARD = "dashboard"
REDIS_KEY_GCS_SERVER_ADDRESS = "GcsServerAddress"
REPORT_METRICS_TIMEOUT_SECONDS = 2
REPORT_METRICS_INTERVAL_SECONDS = 10
# Named signals
SIGNAL_NODE_INFO_FETCHED = "node_info_fetched"
# Default param for RotatingFileHandler
LOGGING_ROTATE_BYTES = 100 * 1000 # maxBytes
LOGGING_ROTATE_BACKUP_COUNT = 5 # backupCount
Loading