Skip to content

Commit

Permalink
rotate runtime env agent
Browse files Browse the repository at this point in the history
Signed-off-by: dentiny <dentinyhao@gmail.com>
  • Loading branch information
dentiny committed Feb 25, 2025
1 parent 03308ae commit e10a3b5
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 12 deletions.
2 changes: 2 additions & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2839,6 +2839,8 @@ pyx_library(
"//src/ray/protobuf:serialization_cc_proto",
"//src/ray/util",
"//src/ray/util:memory",
"//src/ray/util:stream_redirection_options",
"//src/ray/util:stream_redirection_utils",
],
)

Expand Down
58 changes: 46 additions & 12 deletions python/ray/_private/runtime_env/agent/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,11 @@
runtime_env_agent_pb2,
)
from ray._private.utils import open_log
from ray._private.ray_logging import (
configure_log_file,
)
from ray._private.utils import (
get_or_create_event_loop,
)
from ray._private.process_watcher import create_check_raylet_task
from ray._raylet import StreamRedirector


def import_libs():
Expand All @@ -30,11 +28,16 @@ def import_libs():
from aiohttp import web # noqa: E402


def open_capture_files(log_dir):
def get_capture_filepaths(log_dir):
"""Get filepaths for the given [log_dir].
log_dir:
Logging directory to place output and error logs.
"""
filename = "runtime_env_agent"
return (
open_log(pathlib.Path(log_dir) / f"{filename}.out"),
open_log(pathlib.Path(log_dir) / f"{filename}.err"),
pathlib.Path(log_dir) / f"{filename}.out",
pathlib.Path(log_dir) / f"{filename}.err",
)


Expand Down Expand Up @@ -97,15 +100,15 @@ def open_capture_files(log_dir):
"--logging-rotate-bytes",
required=False,
type=int,
default=ray_constants.LOGGING_ROTATE_BYTES,
default=sys.maxsize,
help="Specify the max bytes for rotating "
"log file, default is {} bytes.".format(ray_constants.LOGGING_ROTATE_BYTES),
)
parser.add_argument(
"--logging-rotate-backup-count",
required=False,
type=int,
default=ray_constants.LOGGING_ROTATE_BACKUP_COUNT,
default=1,
help="Specify the backup count of rotated log file, default is {}.".format(
ray_constants.LOGGING_ROTATE_BACKUP_COUNT
),
Expand All @@ -127,18 +130,49 @@ def open_capture_files(log_dir):

args = parser.parse_args()

# Disable log rotation for windows platform.
logging_rotation_bytes = (
args.logging_rotate_bytes if sys.platform != "win32" else sys.maxsize
)
logging_rotation_backup_count = (
args.logging_rotate_backup_count if sys.platform != "win32" else 1
)

logging_params = dict(
logging_level=args.logging_level,
logging_format=args.logging_format,
log_dir=args.log_dir,
filename=args.logging_filename,
max_bytes=args.logging_rotate_bytes,
backup_count=args.logging_rotate_backup_count,
max_bytes=logging_rotation_bytes,
backup_count=logging_rotation_backup_count,
)

stdout_fileno = sys.stdout.fileno()
stderr_fileno = sys.stderr.fileno()
# We also manually set sys.stdout and sys.stderr because that seems to
# have an effect on the output buffering. Without doing this, stdout
# and stderr are heavily buffered resulting in seemingly lost logging
# statements. We never want to close the stdout file descriptor, dup2 will
# close it when necessary and we don't want python's GC to close it.
sys.stdout = open_log(stdout_fileno, unbuffered=True, closefd=False)
sys.stderr = open_log(stderr_fileno, unbuffered=True, closefd=False)

# Setup stdout/stderr redirect files
out_file, err_file = open_capture_files(args.log_dir)
configure_log_file(out_file, err_file)
out_filepath, err_filepath = get_capture_filepaths(args.log_dir)
StreamRedirector.redirect_stdout(
out_filepath,
logging_rotation_bytes,
logging_rotation_backup_count,
False,
False,
)
StreamRedirector.redirect_stderr(
err_filepath,
logging_rotation_bytes,
logging_rotation_backup_count,
False,
False,
)

agent = RuntimeEnvAgent(
runtime_env_dir=args.runtime_env_dir,
Expand Down
45 changes: 45 additions & 0 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,11 @@ from ray.includes.libcoreworker cimport (
CGeneratorBackpressureWaiter,
CReaderRefInfo,
)
from ray.includes.stream_redirection cimport (
CStreamRedirectionOptions,
RedirectStdout,
RedirectStderr,
)

from ray.includes.ray_config cimport RayConfig
from ray.includes.global_state_accessor cimport CGlobalStateAccessor
Expand Down Expand Up @@ -2651,6 +2656,46 @@ cdef void terminate_asyncio_thread() nogil:
core_worker = ray._private.worker.global_worker.core_worker
core_worker.stop_and_join_asyncio_threads_if_exist()

cdef class StreamRedirector:
@staticmethod
def redirect_stdout(const c_string &file_path, uint64_t rotation_max_size, uint64_t rotation_max_file_count, c_bool tee_to_stdout, c_bool tee_to_stderr):
cdef CStreamRedirectionOptions opt = CStreamRedirectionOptions()
opt.file_path = file_path
opt.rotation_max_size = rotation_max_size
opt.rotation_max_file_count = rotation_max_file_count
opt.tee_to_stdout = tee_to_stdout
opt.tee_to_stderr = tee_to_stderr
RedirectStdout(opt)

@staticmethod
def redirect_stderr(const c_string &file_path, uint64_t rotation_max_size, uint64_t rotation_max_file_count, c_bool tee_to_stdout, c_bool tee_to_stderr):
cdef CStreamRedirectionOptions opt = CStreamRedirectionOptions()
opt.file_path = file_path
opt.rotation_max_size = rotation_max_size
opt.rotation_max_file_count = rotation_max_file_count
opt.tee_to_stdout = tee_to_stdout
opt.tee_to_stderr = tee_to_stderr
RedirectStderr(opt)

@staticmethod
def redirect_stdout(const c_string &file_path, uint64_t rotation_max_size, uint64_t rotation_max_file_count, c_bool tee_to_stdout, c_bool tee_to_stderr):
cdef CStreamRedirectionOptions opt = CStreamRedirectionOptions()
opt.file_path = file_path
opt.rotation_max_size = rotation_max_size
opt.rotation_max_file_count = rotation_max_file_count
opt.tee_to_stdout = tee_to_stdout
opt.tee_to_stderr = tee_to_stderr
RedirectStdout(opt)

@staticmethod
def redirect_stderr(const c_string &file_path, uint64_t rotation_max_size, uint64_t rotation_max_file_count, c_bool tee_to_stdout, c_bool tee_to_stderr):
cdef CStreamRedirectionOptions opt = CStreamRedirectionOptions()
opt.file_path = file_path
opt.rotation_max_size = rotation_max_size
opt.rotation_max_file_count = rotation_max_file_count
opt.tee_to_stdout = tee_to_stdout
opt.tee_to_stderr = tee_to_stderr
RedirectStderr(opt)

# An empty profile event context to be used when the timeline is disabled.
cdef class EmptyProfileEvent:
Expand Down
16 changes: 16 additions & 0 deletions python/ray/includes/stream_redirection.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from libcpp.string cimport string as c_string
from libc.stdint cimport uint64_t
from libcpp cimport bool as c_bool

cdef extern from "ray/util/stream_redirection_options.h" nogil:
cdef cppclass CStreamRedirectionOptions "ray::StreamRedirectionOption":
CStreamRedirectionOptions()
c_string file_path
uint64_t rotation_max_size
uint64_t rotation_max_file_count
c_bool tee_to_stdout
c_bool tee_to_stderr

cdef extern from "ray/util/stream_redirection_utils.h" namespace "ray" nogil:
void RedirectStdout(const CStreamRedirectionOptions& opt)
void RedirectStderr(const CStreamRedirectionOptions& opt)
2 changes: 2 additions & 0 deletions src/ray/util/stream_redirection_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ namespace ray {
// For example, if redirection file set and `tee_to_stdout` both set to true, the stream
// content is written to both sinks.
struct StreamRedirectionOption {
StreamRedirectionOption() = default;

// Redirected file path on local filesystem.
std::string file_path;
// Max number of bytes in a rotated file.
Expand Down

0 comments on commit e10a3b5

Please sign in to comment.