Skip to content

Commit

Permalink
Propagate logs to stdout when in k8s executor pod (#28440)
Browse files Browse the repository at this point in the history
This is necessary because file task handler reads from pod logs (i.e. stdout) when pod is running.

Previously we were not propagating task logs from `airflow.task`, presumably to avoid duplicating entries, because we had copied the handler to root.  However, if we just remove the handler from task, we can safely enable propagation here, since there won't be multiple task handlers floating around.
  • Loading branch information
dstandish authored Jan 9, 2023
1 parent ff48ba1 commit 3ececb2
Show file tree
Hide file tree
Showing 6 changed files with 362 additions and 169 deletions.
124 changes: 96 additions & 28 deletions airflow/cli/commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import json
import logging
import os
import sys
import textwrap
from contextlib import contextmanager, redirect_stderr, redirect_stdout, suppress
from typing import Generator, Union
Expand All @@ -43,6 +44,7 @@
from airflow.models.dag import DAG
from airflow.models.dagrun import DagRun
from airflow.models.operator import needs_expansion
from airflow.settings import IS_K8S_EXECUTOR_POD
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.dependencies_deps import SCHEDULER_QUEUED_DEPS
from airflow.typing_compat import Literal
Expand Down Expand Up @@ -283,38 +285,52 @@ def _extract_external_executor_id(args) -> str | None:


@contextmanager
def _capture_task_logs(ti: TaskInstance) -> Generator[None, None, None]:
def _move_task_handlers_to_root(ti: TaskInstance) -> Generator[None, None, None]:
"""
Manage logging context for a task run.
Move handlers for task logging to root logger.
- Replace the root logger configuration with the airflow.task configuration
so we can capture logs from any custom loggers used in the task.
We want anything logged during task run to be propagated to task log handlers.
If running in a k8s executor pod, also keep the stream handler on root logger
so that logs are still emitted to stdout.
"""
# nothing to do
if not ti.log.handlers or settings.DONOT_MODIFY_HANDLERS:
yield
return

# Move task handlers to root and reset task logger and restore original logger settings after exit.
# If k8s executor, we need to ensure that root logger has a console handler, so that
# task logs propagate to stdout (this is how webserver retrieves them while task is running).
root_logger = logging.getLogger()
console_handler = next((h for h in root_logger.handlers if h.name == "console"), None)
with LoggerMutationHelper(root_logger), LoggerMutationHelper(ti.log) as task_helper:
task_helper.move(root_logger)
if IS_K8S_EXECUTOR_POD:
if console_handler and console_handler not in root_logger.handlers:
root_logger.addHandler(console_handler)
yield

- Redirect stdout and stderr to the task instance log, as INFO and WARNING
level messages, respectively.

@contextmanager
def _redirect_stdout_to_ti_log(ti: TaskInstance) -> Generator[None, None, None]:
"""
modify = not settings.DONOT_MODIFY_HANDLERS
if modify:
root_logger, task_logger = logging.getLogger(), logging.getLogger("airflow.task")
Redirect stdout to ti logger.
orig_level = root_logger.level
root_logger.setLevel(task_logger.level)
orig_handlers = root_logger.handlers.copy()
root_logger.handlers[:] = task_logger.handlers
Redirect stdout and stderr to the task instance log as INFO and WARNING
level messages, respectively.
try:
If stdout already redirected (possible when task running with option
`--local`), don't redirect again.
"""
# if sys.stdout is StreamLogWriter, it means we already redirected
# likely before forking in LocalTaskJob
if not isinstance(sys.stdout, StreamLogWriter):
info_writer = StreamLogWriter(ti.log, logging.INFO)
warning_writer = StreamLogWriter(ti.log, logging.WARNING)

with redirect_stdout(info_writer), redirect_stderr(warning_writer):
yield

finally:
if modify:
# Restore the root logger to its original state.
root_logger.setLevel(orig_level)
root_logger.handlers[:] = orig_handlers
else:
yield


class TaskCommandMarker:
Expand Down Expand Up @@ -366,12 +382,6 @@ def task_run(args, dag=None):

settings.MASK_SECRETS_IN_LOGS = True

# IMPORTANT, have to re-configure ORM with the NullPool, otherwise, each "run" command may leave
# behind multiple open sleeping connections while heartbeating, which could
# easily exceed the database connection limit when
# processing hundreds of simultaneous tasks.
settings.reconfigure_orm(disable_connection_pool=True)

get_listener_manager().hook.on_starting(component=TaskCommandMarker())

if args.pickle:
Expand All @@ -390,11 +400,19 @@ def task_run(args, dag=None):

log.info("Running %s on host %s", ti, hostname)

# IMPORTANT, have to re-configure ORM with the NullPool, otherwise, each "run" command may leave
# behind multiple open sleeping connections while heartbeating, which could
# easily exceed the database connection limit when
# processing hundreds of simultaneous tasks.
# this should be last thing before running, to reduce likelihood of an open session
# which can cause trouble if running process in a fork.
settings.reconfigure_orm(disable_connection_pool=True)

try:
if args.interactive:
_run_task_by_selected_method(args, dag, ti)
else:
with _capture_task_logs(ti):
with _move_task_handlers_to_root(ti), _redirect_stdout_to_ti_log(ti):
_run_task_by_selected_method(args, dag, ti)
finally:
try:
Expand Down Expand Up @@ -644,3 +662,53 @@ def task_clear(args):
include_subdags=not args.exclude_subdags,
include_parentdag=not args.exclude_parentdag,
)


class LoggerMutationHelper:
"""
Helper for moving and resetting handlers and other logger attrs.
:meta private:
"""

def __init__(self, logger):
self.handlers = logger.handlers[:]
self.level = logger.level
self.propagate = logger.propagate
self.source_logger = logger

def apply(self, logger, replace=True):
"""
Set ``logger`` with attrs stored on instance.
If ``logger`` is root logger, don't change propagate.
"""
if replace:
logger.handlers[:] = self.handlers
else:
for h in self.handlers:
if h not in logger.handlers:
logger.addHandler(h)
logger.level = self.level
if logger is not logging.getLogger():
logger.propagate = self.propagate

def move(self, logger, replace=True):
"""
Replace ``logger`` attrs with those from source.
:param logger: target logger
:param replace: if True, remove all handlers from target first; otherwise add if not present.
"""
self.apply(logger, replace=replace)
self.source_logger.propagate = True
self.source_logger.handlers[:] = []

def reset(self):
self.apply(self.source_logger)

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.reset()
2 changes: 2 additions & 0 deletions airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,8 @@ def initialize():
executor_constants.CELERY_KUBERNETES_EXECUTOR,
executor_constants.LOCAL_KUBERNETES_EXECUTOR,
}
IS_K8S_EXECUTOR_POD = bool(os.environ.get("AIRFLOW_IS_K8S_EXECUTOR_POD", ""))
"""Will be True if running in kubernetes executor pod."""

HIDE_SENSITIVE_VAR_CONN_FIELDS = conf.getboolean("core", "hide_sensitive_var_conn_fields")

Expand Down
35 changes: 24 additions & 11 deletions airflow/utils/log/file_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@
from airflow.exceptions import RemovedInAirflow3Warning
from airflow.utils.context import Context
from airflow.utils.helpers import parse_template_string, render_template_to_string
from airflow.utils.log.logging_mixin import SetContextPropagate
from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler
from airflow.utils.session import create_session
from airflow.utils.state import State

if TYPE_CHECKING:
from airflow.models import TaskInstance
from airflow.utils.log.logging_mixin import SetContextPropagate


class FileTaskHandler(logging.Handler):
Expand All @@ -62,19 +62,31 @@ def __init__(self, base_log_folder: str, filename_template: str | None = None):
# handler, not the one that calls super()__init__.
stacklevel=(2 if type(self) == FileTaskHandler else 3),
)
self.maintain_propagate: bool = False
"""
If true, overrides default behavior of setting propagate=False
:meta private:
"""

def set_context(self, ti: TaskInstance) -> None | SetContextPropagate:
"""
Provide task_instance context to airflow task handler.
Generally speaking returns None. But if attr `maintain_propagate` has
been set to propagate, then returns sentinel MAINTAIN_PROPAGATE. This
has the effect of overriding the default behavior to set `propagate`
to False whenever set_context is called. At time of writing, this
functionality is only used in unit testing.
:param ti: task instance object
"""
local_loc = self._init_file(ti)
self.handler = NonCachingFileHandler(local_loc, encoding="utf-8")
if self.formatter:
self.handler.setFormatter(self.formatter)
self.handler.setLevel(self.level)
return None
return SetContextPropagate.MAINTAIN_PROPAGATE if self.maintain_propagate else None

def emit(self, record):
if self.handler:
Expand All @@ -92,16 +104,17 @@ def _render_filename(self, ti: TaskInstance, try_number: int) -> str:
with create_session() as session:
dag_run = ti.get_dagrun(session=session)
template = dag_run.get_log_template(session=session).filename
str_tpl, jinja_tpl = parse_template_string(template)
str_tpl, jinja_tpl = parse_template_string(template)

if jinja_tpl:
if hasattr(ti, "task"):
context = ti.get_template_context()
else:
context = Context(ti=ti, ts=dag_run.logical_date.isoformat())
context["try_number"] = try_number
return render_template_to_string(jinja_tpl, context)
elif str_tpl:
if jinja_tpl:
if hasattr(ti, "task"):
context = ti.get_template_context(session=session)
else:
context = Context(ti=ti, ts=dag_run.logical_date.isoformat())
context["try_number"] = try_number
return render_template_to_string(jinja_tpl, context)

if str_tpl:
try:
dag = ti.task.dag
except AttributeError: # ti.task is not always set.
Expand Down
13 changes: 10 additions & 3 deletions airflow/utils/log/logging_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
from logging import Handler, Logger, StreamHandler
from typing import IO, cast

from airflow.settings import IS_K8S_EXECUTOR_POD

# 7-bit C1 ANSI escape sequences
ANSI_ESCAPE = re.compile(r"\x1B[@-_][0-?]*[ -/]*[@-~]")

Expand Down Expand Up @@ -165,9 +167,10 @@ def isatty(self):

class RedirectStdHandler(StreamHandler):
"""
This class is like a StreamHandler using sys.stderr/stdout, but always uses
This class is like a StreamHandler using sys.stderr/stdout, but uses
whatever sys.stderr/stderr is currently set to rather than the value of
sys.stderr/stdout at handler construction time.
sys.stderr/stdout at handler construction time, except when running a
task in a kubernetes executor pod.
"""

def __init__(self, stream):
Expand All @@ -179,13 +182,17 @@ def __init__(self, stream):
self._use_stderr = True
if "stdout" in stream:
self._use_stderr = False

self._orig_stream = sys.stdout
else:
self._orig_stream = sys.stderr
# StreamHandler tries to set self.stream
Handler.__init__(self)

@property
def stream(self):
"""Returns current stream."""
if IS_K8S_EXECUTOR_POD:
return self._orig_stream
if self._use_stderr:
return sys.stderr

Expand Down
Loading

0 comments on commit 3ececb2

Please sign in to comment.