Skip to content

Commit

Permalink
[Serve] integrate with Ray structured logging (ray-project#46215)
Browse files Browse the repository at this point in the history
<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?

Integrated Ray structured logging into Serve's logger. Two things are
done in this PR:
1. Moved the logics to flatten serve's special key for extra logging
attributes to `_append_flatten_attributes()` and called it from
`generate_record_format_attrs()` so Ray structured logging can apply
those nested attributes
2. Refactored Serve's json formatter into context filters and applied
them onto Serve's log handlers. Reuses core's `JSONFormatter` when
formatting structured logs

The main difference after integrated with Ray structured logging is
there are new attributes provided by Ray (e.g. `node_id`, `filename`,
and `lineno`). Also, the message format is just the message body itself,
without filename and line number prefix.

Example log before the change
```
{
   "levelname":"INFO",
   "asctime":"2024-06-24 22:15:34,526",
   "actor_id":"59ef544fa6127fffd1b6c4ad01000000",
   "worker_id":"ce0efd00c47b08f51a5c04e2cdfb8919ed6df4d0b4b938d3e75b4321",
   "deployment":"app_Counter",
   "replica":"j8fhh1bm",
   "component_name":"replica",
   "request_id":"6f04a2fd-db85-4687-8302-ef478e7abee8",
   "route":"/favicon.ico",
   "application":"app",
   "message":"replica.py:373 - __CALL__ OK 2.8ms"
}
```

Example log after the change
```
{
   "asctime":"2024-06-24 22:10:57,906",
   "levelname":"INFO",
   "message":"__CALL__ OK 4.3ms",
   "filename":"replica.py",
   "lineno":373,
   "worker_id":"833490a0fcc662b86c04403694c85b1d14364a05b5c6d6dfec5d5d47",
   "node_id":"1f3b2424bccc7f407fad2bf549954b723ea46405dded64d7386239a7",
   "actor_id":"b5f78e9af7148f019d9e89ab01000000",
   "route":"/favicon.ico",
   "request_id":"59f62350-7da6-47a2-a964-b6dd4e512003",
   "application":"app",
   "deployment":"app_Counter",
   "replica":"8p7guoov",
   "component_name":"replica"
}
```

## Related issue number

Closes ray-project#46125

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: Gene Su <e870252314@gmail.com>
Signed-off-by: Gene Der Su <gdsu@ucdavis.edu>
Co-authored-by: Jiajun Yao <jeromeyjj@gmail.com>
  • Loading branch information
GeneDer and jjyao authored Jun 26, 2024
1 parent b257b49 commit f21b7f8
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 136 deletions.
4 changes: 4 additions & 0 deletions python/ray/_private/ray_logging/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
"taskName",
}

LOGGER_FLATTEN_KEYS = {
"ray_serve_extra_fields",
}


class LogKey(str, Enum):
# Core context
Expand Down
29 changes: 27 additions & 2 deletions python/ray/_private/ray_logging/formatters.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,32 @@
import logging
import json
from ray._private.ray_logging.constants import LogKey, LOGRECORD_STANDARD_ATTRS
from ray._private.ray_logging.constants import (
LogKey,
LOGRECORD_STANDARD_ATTRS,
LOGGER_FLATTEN_KEYS,
)
from ray._private.ray_constants import LOGGER_FORMAT
from typing import Any, Dict


def _append_flatten_attributes(formatted_attrs: Dict[str, Any], key: str, value: Any):
"""Flatten the dictionary values for special keys and append the values in place.
If the key is in `LOGGER_FLATTEN_KEYS`, the value will be flattened and appended
to the `formatted_attrs` dictionary. Otherwise, the key-value pair will be appended
directly.
"""
if key in LOGGER_FLATTEN_KEYS:
if not isinstance(value, dict):
raise ValueError(
f"Expected a dictionary passing into {key}, but got {type(value)}"
)
for k, v in value.items():
if k in formatted_attrs:
raise KeyError(f"Found duplicated key in the log record: {k}")
formatted_attrs[k] = v
else:
formatted_attrs[key] = value


def generate_record_format_attrs(
Expand All @@ -28,7 +53,7 @@ def generate_record_format_attrs(
for key, value in record.__dict__.items():
# Both Ray and user-provided context are stored in `record_format`.
if key not in LOGRECORD_STANDARD_ATTRS:
record_format_attrs[key] = value
_append_flatten_attributes(record_format_attrs, key, value)
return record_format_attrs


Expand Down
8 changes: 8 additions & 0 deletions python/ray/serve/_private/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,14 @@
SERVE_LOG_TIME: "%(asctime)s",
}

# There are some attributes that we only use internally or don't provide values to the
# users. Adding to this set will remove them from structured logs.
SERVE_LOG_UNWANTED_ATTRS = {
"serve_access_log",
"task_id",
"job_id",
}

SERVE_LOG_EXTRA_FIELDS = "ray_serve_extra_fields"

# Serve HTTP request header key for routing requests.
Expand Down
147 changes: 63 additions & 84 deletions python/ray/serve/_private/logging_utils.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,31 @@
import builtins
import copy
import json
import logging
import os
import sys
import traceback
from typing import Any, Optional, Tuple

import ray
from ray._private.ray_logging.filters import CoreContextFilter
from ray._private.ray_logging.formatters import JSONFormatter
from ray.serve._private.common import ServeComponentType
from ray.serve._private.constants import (
RAY_SERVE_ENABLE_CPU_PROFILING,
RAY_SERVE_ENABLE_JSON_LOGGING,
RAY_SERVE_ENABLE_MEMORY_PROFILING,
RAY_SERVE_LOG_TO_STDERR,
SERVE_LOG_ACTOR_ID,
SERVE_LOG_APPLICATION,
SERVE_LOG_COMPONENT,
SERVE_LOG_COMPONENT_ID,
SERVE_LOG_DEPLOYMENT,
SERVE_LOG_EXTRA_FIELDS,
SERVE_LOG_LEVEL_NAME,
SERVE_LOG_MESSAGE,
SERVE_LOG_RECORD_FORMAT,
SERVE_LOG_REPLICA,
SERVE_LOG_REQUEST_ID,
SERVE_LOG_ROUTE,
SERVE_LOG_TIME,
SERVE_LOG_WORKER_ID,
SERVE_LOG_UNWANTED_ATTRS,
SERVE_LOGGER_NAME,
)
from ray.serve._private.utils import get_component_file_name
Expand All @@ -42,85 +40,74 @@
buildin_print = builtins.print


class ServeJSONFormatter(logging.Formatter):
"""Serve Logging Json Formatter
class ServeComponentFilter(logging.Filter):
"""Serve component filter.
The formatter will generate the json log format on the fly
based on the field of record.
The filter will add the component name, id, and type to the log record.
"""

ADD_IF_EXIST_FIELDS = [
SERVE_LOG_REQUEST_ID,
SERVE_LOG_ROUTE,
SERVE_LOG_APPLICATION,
]

def __init__(
self,
component_name: str,
component_id: str,
component_type: Optional[ServeComponentType] = None,
):
self.component_log_fmt = {
SERVE_LOG_LEVEL_NAME: SERVE_LOG_RECORD_FORMAT[SERVE_LOG_LEVEL_NAME],
SERVE_LOG_TIME: SERVE_LOG_RECORD_FORMAT[SERVE_LOG_TIME],
}
try:
runtime_context = ray.get_runtime_context()
actor_id = runtime_context.get_actor_id()
if actor_id:
self.component_log_fmt[SERVE_LOG_ACTOR_ID] = actor_id
worker_id = runtime_context.get_worker_id()
if worker_id:
self.component_log_fmt[SERVE_LOG_WORKER_ID] = worker_id
except Exception:
# If get_runtime_context() fails for any reason, do nothing (no adding
# actor_id and/or worker_id to the fmt)
pass

if component_type and component_type == ServeComponentType.REPLICA:
self.component_log_fmt[SERVE_LOG_DEPLOYMENT] = component_name
self.component_log_fmt[SERVE_LOG_REPLICA] = component_id
self.component_log_fmt[SERVE_LOG_COMPONENT] = component_type
self.component_name = component_name
self.component_id = component_id
self.component_type = component_type

def filter(self, record: logging.LogRecord) -> bool:
"""Add component attributes to the log record.
Note: the filter doesn't do any filtering, it only adds the component
attributes.
"""
if self.component_type and self.component_type == ServeComponentType.REPLICA:
setattr(record, SERVE_LOG_DEPLOYMENT, self.component_name)
setattr(record, SERVE_LOG_REPLICA, self.component_id)
setattr(record, SERVE_LOG_COMPONENT, self.component_type)
else:
self.component_log_fmt[SERVE_LOG_COMPONENT] = component_name
self.component_log_fmt[SERVE_LOG_COMPONENT_ID] = component_id
self.message_formatter = logging.Formatter(
SERVE_LOG_RECORD_FORMAT[SERVE_LOG_MESSAGE]
)
self.asctime_formatter = logging.Formatter("%(asctime)s")
setattr(record, SERVE_LOG_COMPONENT, self.component_name)
setattr(record, SERVE_LOG_REPLICA, self.component_id)

def format(self, record: logging.LogRecord) -> str:
"""Format the log record into json format.
return True

Args:
record: The log record to be formatted.

Returns:
The formatted log record in json format.
"""
record_format = copy.deepcopy(self.component_log_fmt)
record_format[SERVE_LOG_LEVEL_NAME] = record.levelname
record_format[SERVE_LOG_TIME] = self.asctime_formatter.format(record)
class ServeContextFilter(logging.Filter):
"""Serve context filter.
for field in ServeJSONFormatter.ADD_IF_EXIST_FIELDS:
if field in record.__dict__:
record_format[field] = record.__dict__[field]
The filter will add the route, request id, app name to the log record.
record_format[SERVE_LOG_MESSAGE] = self.message_formatter.format(record)
Note: the filter doesn't do any filtering, it only adds the serve request context
attributes.
"""

if SERVE_LOG_EXTRA_FIELDS in record.__dict__:
if not isinstance(record.__dict__[SERVE_LOG_EXTRA_FIELDS], dict):
raise ValueError(
f"Expected a dictionary passing into {SERVE_LOG_EXTRA_FIELDS}, "
f"but got {type(record.__dict__[SERVE_LOG_EXTRA_FIELDS])}"
)
for k, v in record.__dict__[SERVE_LOG_EXTRA_FIELDS].items():
if k in record_format:
raise KeyError(f"Found duplicated key in the log record: {k}")
record_format[k] = v
def filter(self, record):
request_context = ray.serve.context._serve_request_context.get()
if request_context.route:
setattr(record, SERVE_LOG_ROUTE, request_context.route)
if request_context.request_id:
setattr(record, SERVE_LOG_REQUEST_ID, request_context.request_id)
if request_context.app_name:
setattr(record, SERVE_LOG_APPLICATION, request_context.app_name)
return True


class ServeLogAttributeRemovalFilter(logging.Filter):
"""Serve log attribute removal filter.
The filter will remove unwanted attributes on the log record so they won't be
included in the structured logs.
return json.dumps(record_format)
Note: the filter doesn't do any filtering, it only removes unwanted attributes.
"""

def filter(self, record):
for key in SERVE_LOG_UNWANTED_ATTRS:
if hasattr(record, key):
delattr(record, key)

return True


class ServeFormatter(logging.Formatter):
Expand Down Expand Up @@ -304,26 +291,12 @@ def configure_component_logger(
logger.setLevel(logging_config.log_level)
logger.handlers.clear()

factory = logging.getLogRecordFactory()

def record_factory(*args, **kwargs):
request_context = ray.serve.context._serve_request_context.get()
record = factory(*args, **kwargs)
if request_context.route:
setattr(record, SERVE_LOG_ROUTE, request_context.route)
if request_context.request_id:
setattr(record, SERVE_LOG_REQUEST_ID, request_context.request_id)
if request_context.app_name:
setattr(record, SERVE_LOG_APPLICATION, request_context.app_name)
return record

logging.setLogRecordFactory(record_factory)

# Only add stream handler if RAY_SERVE_LOG_TO_STDERR is True.
if RAY_SERVE_LOG_TO_STDERR:
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(ServeFormatter(component_name, component_id))
stream_handler.addFilter(log_to_stderr_filter)
stream_handler.addFilter(ServeContextFilter())
logger.addHandler(stream_handler)

if logging_config.logs_dir:
Expand Down Expand Up @@ -355,15 +328,21 @@ def record_factory(*args, **kwargs):
"'LoggingConfig' to enable json format."
)
if RAY_SERVE_ENABLE_JSON_LOGGING or logging_config.encoding == EncodingType.JSON:
file_handler.setFormatter(
ServeJSONFormatter(component_name, component_id, component_type)
file_handler.addFilter(CoreContextFilter())
file_handler.addFilter(ServeContextFilter())
file_handler.addFilter(
ServeComponentFilter(component_name, component_id, component_type)
)
file_handler.setFormatter(JSONFormatter())
else:
file_handler.setFormatter(ServeFormatter(component_name, component_id))

if logging_config.enable_access_log is False:
file_handler.addFilter(log_access_log_filter)

# Remove unwanted attributes from the log record.
file_handler.addFilter(ServeLogAttributeRemovalFilter())

# Redirect print, stdout, and stderr to Serve logger.
if not RAY_SERVE_LOG_TO_STDERR:
builtins.print = redirected_print
Expand Down
Loading

0 comments on commit f21b7f8

Please sign in to comment.