Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 33 additions & 33 deletions python/ray/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,16 +394,19 @@ def _make_inc_temp(self, suffix="", prefix="", directory_name=None):
raise FileExistsError(errno.EEXIST,
"No usable temporary filename found")

def get_log_file_names(self, name, unique=False):
"""Generate partially randomized filenames for log files.
def get_log_file_handles(self, name, unique=False):
"""Open log files with partially randomized filenames, returning the
file handles. If output redirection has been disabled, no files will
be opened and `(None, None)` will be returned.

Args:
name (str): descriptive string for this log file.
unique (bool): if true, a counter will be attached to `name` to
ensure the returned filename is not already used.

Returns:
A tuple of two file names for redirecting (stdout, stderr).
A tuple of two file handles for redirecting (stdout, stderr), or
`(None, None)` if output redirection is disabled.
"""
redirect_output = self._ray_params.redirect_output

Expand All @@ -414,6 +417,21 @@ def get_log_file_names(self, name, unique=False):
if not redirect_output:
return None, None

log_stdout, log_stderr = self._get_log_file_names(name, unique=unique)
return open_log(log_stdout), open_log(log_stderr)

def _get_log_file_names(self, name, unique=False):
"""Generate partially randomized filenames for log files.

Args:
name (str): descriptive string for this log file.
unique (bool): if true, a counter will be attached to `name` to
ensure the returned filename is not already used.

Returns:
A tuple of two file names for redirecting (stdout, stderr).
"""

if unique:
log_stdout = self._make_inc_temp(
suffix=".out", prefix=name, directory_name=self._logs_dir)
Expand Down Expand Up @@ -501,15 +519,11 @@ def start_reaper_process(self):
def start_redis(self):
"""Start the Redis servers."""
assert self._redis_address is None
redis_out_name, redis_err_name = self.get_log_file_names(
"redis", unique=True)
redis_log_files = [(open_log(redis_out_name),
open_log(redis_err_name))]
redis_log_files = [self.get_log_file_handles("redis", unique=True)]
for i in range(self._ray_params.num_redis_shards):
shard_out_name, shard_err_name = self.get_log_file_names(
"redis-shard_{}".format(i), unique=True)
redis_log_files.append((open_log(shard_out_name),
open_log(shard_err_name)))
redis_log_files.append(
self.get_log_file_handles(
"redis-shard_{}".format(i), unique=True))

(self._redis_address, redis_shards,
process_infos) = ray.services.start_redis(
Expand All @@ -531,10 +545,8 @@ def start_redis(self):

def start_log_monitor(self):
"""Start the log monitor."""
log_out_name, log_err_name = self.get_log_file_names(
stdout_file, stderr_file = self.get_log_file_handles(
"log_monitor", unique=True)
stdout_file, stderr_file = open_log(log_out_name), open_log(
log_err_name)
process_info = ray.services.start_log_monitor(
self.redis_address,
self._logs_dir,
Expand All @@ -549,10 +561,8 @@ def start_log_monitor(self):

def start_reporter(self):
"""Start the reporter."""
reporter_out_name, reporter_err_name = self.get_log_file_names(
stdout_file, stderr_file = self.get_log_file_handles(
"reporter", unique=True)
stdout_file, stderr_file = (open_log(reporter_out_name),
open_log(reporter_err_name))
process_info = ray.services.start_reporter(
self.redis_address,
self._ray_params.metrics_agent_port,
Expand All @@ -574,10 +584,8 @@ def start_dashboard(self, require_dashboard):
if we fail to start the dashboard. Otherwise it will print
a warning if we fail to start the dashboard.
"""
dashboard_out_name, dashboard_err_name = self.get_log_file_names(
stdout_file, stderr_file = self.get_log_file_handles(
"dashboard", unique=True)
stdout_file, stderr_file = (open_log(dashboard_out_name),
open_log(dashboard_err_name))
self._webui_url, process_info = ray.services.start_dashboard(
require_dashboard,
self._ray_params.dashboard_host,
Expand All @@ -598,10 +606,8 @@ def start_dashboard(self, require_dashboard):

def start_plasma_store(self):
"""Start the plasma store."""
plasma_out_name, plasma_err_name = self.get_log_file_names(
stdout_file, stderr_file = self.get_log_file_handles(
"plasma_store", unique=True)
stdout_file, stderr_file = (open_log(plasma_out_name),
open_log(plasma_err_name))
process_info = ray.services.start_plasma_store(
self.get_resource_spec(),
self._plasma_store_socket_name,
Expand All @@ -620,10 +626,8 @@ def start_plasma_store(self):
def start_gcs_server(self):
"""Start the gcs server.
"""
gcs_out_name, gcs_err_name = self.get_log_file_names(
stdout_file, stderr_file = self.get_log_file_handles(
"gcs_server", unique=True)
stdout_file, stderr_file = (open_log(gcs_out_name),
open_log(gcs_err_name))
process_info = ray.services.start_gcs_server(
self._redis_address,
stdout_file=stdout_file,
Expand All @@ -647,10 +651,8 @@ def start_raylet(self, use_valgrind=False, use_profiler=False):
use_profiler (bool): True if we should start the process in the
valgrind profiler.
"""
raylet_out_name, raylet_err_name = self.get_log_file_names(
stdout_file, stderr_file = self.get_log_file_handles(
"raylet", unique=True)
stdout_file, stderr_file = (open_log(raylet_out_name),
open_log(raylet_err_name))
process_info = ray.services.start_raylet(
self._redis_address,
self._node_ip_address,
Expand Down Expand Up @@ -713,7 +715,7 @@ def get_job_redirected_log_file(self,
else:
name = "worker-{}".format(ray.utils.binary_to_hex(worker_id))

worker_stdout_file, worker_stderr_file = self.get_log_file_names(
worker_stdout_file, worker_stderr_file = self._get_log_file_names(
name, unique=False)
return worker_stdout_file, worker_stderr_file

Expand All @@ -723,10 +725,8 @@ def start_worker(self):

def start_monitor(self):
"""Start the monitor."""
monitor_out_name, monitor_err_name = self.get_log_file_names(
stdout_file, stderr_file = self.get_log_file_handles(
"monitor", unique=True)
stdout_file, stderr_file = (open_log(monitor_out_name),
open_log(monitor_err_name))
process_info = ray.services.start_monitor(
self._redis_address,
stdout_file=stdout_file,
Expand Down
4 changes: 4 additions & 0 deletions python/ray/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,10 @@ def setup_logger(logging_level, logging_format):


def open_log(path, **kwargs):
"""
Opens the log file at `path`, with the provided kwargs being given to
`open`.
"""
kwargs.setdefault("buffering", 1)
kwargs.setdefault("mode", "a")
kwargs.setdefault("encoding", "utf-8")
Expand Down