-
Notifications
You must be signed in to change notification settings - Fork 6.8k
[Dashboard] New dashboard skeleton #9099
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
4bd24e1
c1518f3
6bae695
21b1cea
4c653d4
2de975c
d8afdd3
87a7b8c
52460a3
aae9e9e
9cb1a3c
8e0a1be
5cd38c3
d956703
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,229 @@ | ||
import argparse | ||
import asyncio | ||
import logging | ||
import logging.handlers | ||
import os | ||
import sys | ||
import traceback | ||
|
||
import aiohttp | ||
import aioredis | ||
from grpc.experimental import aio as aiogrpc | ||
|
||
import ray | ||
import ray.new_dashboard.consts as dashboard_consts | ||
import ray.new_dashboard.utils as dashboard_utils | ||
import ray.ray_constants as ray_constants | ||
import ray.services | ||
import ray.utils | ||
import psutil | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
aiogrpc.init_grpc_aio() | ||
|
||
|
||
class DashboardAgent(object): | ||
def __init__(self, | ||
redis_address, | ||
redis_password=None, | ||
temp_dir=None, | ||
log_dir=None, | ||
node_manager_port=None, | ||
object_store_name=None, | ||
raylet_name=None): | ||
"""Initialize the DashboardAgent object.""" | ||
self._agent_cls_list = dashboard_utils.get_all_modules( | ||
dashboard_utils.DashboardAgentModule) | ||
ip, port = redis_address.split(":") | ||
# Public attributes are accessible for all agent modules. | ||
self.redis_address = (ip, int(port)) | ||
self.redis_password = redis_password | ||
self.temp_dir = temp_dir | ||
self.log_dir = log_dir | ||
self.node_manager_port = node_manager_port | ||
self.object_store_name = object_store_name | ||
self.raylet_name = raylet_name | ||
self.ip = ray.services.get_node_ip_address() | ||
self.server = aiogrpc.server(options=(("grpc.so_reuseport", 0), )) | ||
listen_address = "[::]:0" | ||
logger.info("Dashboard agent listen at: %s", listen_address) | ||
self.port = self.server.add_insecure_port(listen_address) | ||
self.aioredis_client = None | ||
self.aiogrpc_raylet_channel = aiogrpc.insecure_channel("{}:{}".format( | ||
self.ip, self.node_manager_port)) | ||
self.http_session = aiohttp.ClientSession( | ||
loop=asyncio.get_event_loop()) | ||
|
||
def _load_modules(self): | ||
"""Load dashboard agent modules.""" | ||
modules = [] | ||
for cls in self._agent_cls_list: | ||
logger.info("Load %s: %s", | ||
dashboard_utils.DashboardAgentModule.__name__, cls) | ||
c = cls(self) | ||
modules.append(c) | ||
logger.info("Load {} modules.".format(len(modules))) | ||
return modules | ||
|
||
async def run(self): | ||
# Create an aioredis client for all modules. | ||
self.aioredis_client = await aioredis.create_redis_pool( | ||
address=self.redis_address, password=self.redis_password) | ||
|
||
# Start a grpc asyncio server. | ||
await self.server.start() | ||
|
||
# Write the dashboard agent port to redis. | ||
await self.aioredis_client.set( | ||
"{}{}".format(dashboard_consts.DASHBOARD_AGENT_PORT_PREFIX, | ||
self.ip), self.port) | ||
|
||
async def _check_parent(): | ||
"""Check if raylet is dead.""" | ||
curr_proc = psutil.Process() | ||
while True: | ||
parent = curr_proc.parent() | ||
if parent is None or parent.pid == 1: | ||
logger.error("raylet is dead, agent will die because " | ||
"it fate-shares with raylet.") | ||
sys.exit(0) | ||
await asyncio.sleep( | ||
dashboard_consts. | ||
DASHBOARD_AGENT_CHECK_PARENT_INTERVAL_SECONDS) | ||
|
||
modules = self._load_modules() | ||
await asyncio.gather(_check_parent(), | ||
*(m.run(self.server) for m in modules)) | ||
await self.server.wait_for_termination() | ||
|
||
|
||
if __name__ == "__main__": | ||
parser = argparse.ArgumentParser(description="Dashboard agent.") | ||
parser.add_argument( | ||
"--redis-address", | ||
required=True, | ||
type=str, | ||
help="The address to use for Redis.") | ||
parser.add_argument( | ||
"--node-manager-port", | ||
required=True, | ||
type=int, | ||
help="The port to use for starting the node manager") | ||
parser.add_argument( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need object store name? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For the job module to start the driver. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see. We were planning to wait to merge the jobs portion of the new dashboard code until we get approval from Robert and the like to add jobs. I'm not sure what the latest status is there, but I'll look into it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We want to keep this argument in order to share the same code base. |
||
"--object-store-name", | ||
required=True, | ||
type=str, | ||
default=None, | ||
help="The socket name of the plasma store") | ||
parser.add_argument( | ||
"--raylet-name", | ||
required=True, | ||
type=str, | ||
default=None, | ||
help="The socket path of the raylet process") | ||
parser.add_argument( | ||
"--redis-password", | ||
required=False, | ||
type=str, | ||
default=None, | ||
help="The password to use for Redis") | ||
parser.add_argument( | ||
"--logging-level", | ||
required=False, | ||
type=lambda s: logging.getLevelName(s.upper()), | ||
default=ray_constants.LOGGER_LEVEL, | ||
choices=ray_constants.LOGGER_LEVEL_CHOICES, | ||
help=ray_constants.LOGGER_LEVEL_HELP) | ||
parser.add_argument( | ||
"--logging-format", | ||
required=False, | ||
type=str, | ||
default=ray_constants.LOGGER_FORMAT, | ||
help=ray_constants.LOGGER_FORMAT_HELP) | ||
parser.add_argument( | ||
"--logging-filename", | ||
required=False, | ||
type=str, | ||
default=dashboard_consts.DASHBOARD_AGENT_LOG_FILENAME, | ||
help="Specify the name of log file, " | ||
"log to stdout if set empty, default is \"{}\".".format( | ||
dashboard_consts.DASHBOARD_AGENT_LOG_FILENAME)) | ||
parser.add_argument( | ||
"--logging-rotate-bytes", | ||
required=False, | ||
type=int, | ||
default=dashboard_consts.LOGGING_ROTATE_BYTES, | ||
help="Specify the max bytes for rotating " | ||
"log file, default is {} bytes.".format( | ||
dashboard_consts.LOGGING_ROTATE_BYTES)) | ||
parser.add_argument( | ||
"--logging-rotate-backup-count", | ||
required=False, | ||
type=int, | ||
default=dashboard_consts.LOGGING_ROTATE_BACKUP_COUNT, | ||
help="Specify the backup count of rotated log file, default is {}.". | ||
format(dashboard_consts.LOGGING_ROTATE_BACKUP_COUNT)) | ||
parser.add_argument( | ||
"--log-dir", | ||
required=False, | ||
type=str, | ||
default=None, | ||
help="Specify the path of log directory.") | ||
parser.add_argument( | ||
"--temp-dir", | ||
required=False, | ||
type=str, | ||
default=None, | ||
help="Specify the path of the temporary directory use by Ray process.") | ||
|
||
args = parser.parse_args() | ||
try: | ||
if args.temp_dir: | ||
temp_dir = "/" + args.temp_dir.strip("/") | ||
else: | ||
temp_dir = "/tmp/ray" | ||
os.makedirs(temp_dir, exist_ok=True) | ||
|
||
if args.log_dir: | ||
log_dir = args.log_dir | ||
else: | ||
log_dir = os.path.join(temp_dir, "session_latest/logs") | ||
os.makedirs(log_dir, exist_ok=True) | ||
|
||
if args.logging_filename: | ||
logging_handlers = [ | ||
logging.handlers.RotatingFileHandler( | ||
os.path.join(log_dir, args.logging_filename), | ||
maxBytes=args.logging_rotate_bytes, | ||
backupCount=args.logging_rotate_backup_count) | ||
] | ||
else: | ||
logging_handlers = None | ||
logging.basicConfig( | ||
level=args.logging_level, | ||
format=args.logging_format, | ||
handlers=logging_handlers) | ||
|
||
agent = DashboardAgent( | ||
args.redis_address, | ||
redis_password=args.redis_password, | ||
temp_dir=temp_dir, | ||
log_dir=log_dir, | ||
node_manager_port=args.node_manager_port, | ||
object_store_name=args.object_store_name, | ||
raylet_name=args.raylet_name) | ||
|
||
loop = asyncio.get_event_loop() | ||
loop.create_task(agent.run()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Can you use Python 3.5 compatible API? I believe it was There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This method |
||
loop.run_forever() | ||
except Exception as e: | ||
# Something went wrong, so push an error to all drivers. | ||
redis_client = ray.services.create_redis_client( | ||
args.redis_address, password=args.redis_password) | ||
traceback_str = ray.utils.format_error_message(traceback.format_exc()) | ||
message = ("The agent on node {} failed with the following " | ||
"error:\n{}".format(os.uname()[1], traceback_str)) | ||
ray.utils.push_error_to_driver_through_redis( | ||
redis_client, ray_constants.DASHBOARD_AGENT_DIED_ERROR, message) | ||
raise e |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
DASHBOARD_AGENT_PORT_PREFIX = "DASHBOARD_AGENT_PORT_PREFIX:" | ||
DASHBOARD_AGENT_LOG_FILENAME = "dashboard_agent.log" | ||
DASHBOARD_AGENT_CHECK_PARENT_INTERVAL_SECONDS = 2 | ||
MAX_COUNT_OF_GCS_RPC_ERROR = 10 | ||
UPDATE_NODES_INTERVAL_SECONDS = 5 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you set it to 1 second? We'd like to make it real time. It'll be also great if this value is configurable. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK. I will make the values configurable in the future. Here are two ways:
I prefer the first one, It's cleaner but not easy to use. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the first one is fine. It'll be pretty easy to iterate or upgrade if necessary. |
||
CONNECT_GCS_INTERVAL_SECONDS = 2 | ||
PURGE_DATA_INTERVAL_SECONDS = 60 * 10 | ||
REDIS_KEY_DASHBOARD = "dashboard" | ||
REDIS_KEY_GCS_SERVER_ADDRESS = "GcsServerAddress" | ||
REPORT_METRICS_TIMEOUT_SECONDS = 2 | ||
REPORT_METRICS_INTERVAL_SECONDS = 10 | ||
# Named signals | ||
SIGNAL_NODE_INFO_FETCHED = "node_info_fetched" | ||
# Default param for RotatingFileHandler | ||
LOGGING_ROTATE_BYTES = 100 * 1000 # maxBytes | ||
LOGGING_ROTATE_BACKUP_COUNT = 5 # backupCount |
Uh oh!
There was an error while loading. Please reload this page.