Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagate logs to stdout when in k8s executor pod #28440

Merged
Merged
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
febf15f
Propagate logs to stdout when in k8s executor pod
dstandish Dec 18, 2022
170402e
don't caputure stdout unless in local task
dstandish Dec 19, 2022
f867323
don't modify handlers or redirect stdout if already done
dstandish Dec 19, 2022
c0301cd
don't modify handlers or redirect stdout if already done
dstandish Dec 20, 2022
3947617
add test
dstandish Dec 20, 2022
cba5be4
refactor test
dstandish Dec 20, 2022
b744c41
restore try / finally
dstandish Dec 20, 2022
1b4e967
resolve failing tests
dstandish Dec 20, 2022
54124d2
try fixing test again
dstandish Dec 21, 2022
beaf6df
actually fix test
dstandish Dec 21, 2022
71f270b
actually _actually_ fix test
dstandish Dec 21, 2022
937535c
improve readability of move handlers ctxmgr
dstandish Dec 22, 2022
3e7baa8
remove modify indent too
dstandish Dec 22, 2022
9face81
simplify tests
dstandish Dec 22, 2022
8602005
fixup
dstandish Dec 22, 2022
ac3bfd4
fix test
dstandish Dec 23, 2022
9817191
mock fth render filename
dstandish Dec 23, 2022
b980c01
remove FTH mock and supply session to get_template_context
dstandish Dec 24, 2022
6c0dbe0
move reconfigure below context init
dstandish Dec 24, 2022
095a509
not fixture
dstandish Dec 24, 2022
6aa0dfc
simplify
dstandish Dec 31, 2022
0f54b60
fixup
dstandish Dec 31, 2022
59bdf38
add helper for moving logger attrs
dstandish Dec 31, 2022
a032e2c
move lower
dstandish Dec 31, 2022
650220f
rename mutation helper
dstandish Dec 31, 2022
65ebef6
doc
dstandish Dec 31, 2022
52ce222
clarify
dstandish Dec 31, 2022
c0ceed0
cosmetic
dstandish Dec 31, 2022
9adb850
context mgr
dstandish Dec 31, 2022
dd0f252
simplify
dstandish Dec 31, 2022
0adb2b3
simplify
dstandish Dec 31, 2022
590b3f5
simplify
dstandish Dec 31, 2022
486d5b5
typo
dstandish Dec 31, 2022
4f8aeea
simplify
dstandish Dec 31, 2022
67bc6a7
simplify
dstandish Dec 31, 2022
7e148b5
simplify
dstandish Dec 31, 2022
1a9db65
don't check TI
dstandish Jan 3, 2023
6b8b153
set private
dstandish Jan 3, 2023
6280275
inline some code
dstandish Jan 3, 2023
776b819
fix tests
dstandish Jan 4, 2023
0d4a208
static check
dstandish Jan 4, 2023
777dd4e
Update tests/cli/commands/test_task_command.py
dstandish Jan 9, 2023
1c1f1c9
remove freeze time
dstandish Jan 9, 2023
49228a2
Update tests/task/task_runner/test_standard_task_runner.py
dstandish Jan 9, 2023
1695b44
Update airflow/utils/log/file_task_handler.py
dstandish Jan 9, 2023
da74320
remove freeze time
dstandish Jan 9, 2023
b7e8d7c
fix whitespace
dstandish Jan 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 96 additions & 28 deletions airflow/cli/commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import json
import logging
import os
import sys
import textwrap
from contextlib import contextmanager, redirect_stderr, redirect_stdout, suppress
from typing import Generator, Union
Expand All @@ -43,6 +44,7 @@
from airflow.models.dag import DAG
from airflow.models.dagrun import DagRun
from airflow.models.operator import needs_expansion
from airflow.settings import IS_K8S_EXECUTOR_POD
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.dependencies_deps import SCHEDULER_QUEUED_DEPS
from airflow.typing_compat import Literal
Expand Down Expand Up @@ -283,38 +285,52 @@ def _extract_external_executor_id(args) -> str | None:


@contextmanager
def _capture_task_logs(ti: TaskInstance) -> Generator[None, None, None]:
def _move_task_handlers_to_root(ti: TaskInstance) -> Generator[None, None, None]:
"""
Manage logging context for a task run.
Move handlers for task logging to root logger.

- Replace the root logger configuration with the airflow.task configuration
so we can capture logs from any custom loggers used in the task.
We want anything logged during task run to be propagated to task log handlers.
If running in a k8s executor pod, also keep the stream handler on root logger
so that logs are still emitted to stdout.
"""
# nothing to do
if not ti.log.handlers or settings.DONOT_MODIFY_HANDLERS:
yield
return

# Move task handlers to root and reset task logger and restore original logger settings after exit.
# If k8s executor, we need to ensure that root logger has a console handler, so that
# task logs propagate to stdout (this is how webserver retrieves them while task is running).
root_logger = logging.getLogger()
console_handler = next((h for h in root_logger.handlers if h.name == "console"), None)
with LoggerMutationHelper(root_logger), LoggerMutationHelper(ti.log) as task_helper:
task_helper.move(root_logger)
if IS_K8S_EXECUTOR_POD:
dstandish marked this conversation as resolved.
Show resolved Hide resolved
if console_handler and console_handler not in root_logger.handlers:
root_logger.addHandler(console_handler)
yield

- Redirect stdout and stderr to the task instance log, as INFO and WARNING
level messages, respectively.

@contextmanager
def _redirect_stdout_to_ti_log(ti: TaskInstance) -> Generator[None, None, None]:
"""
modify = not settings.DONOT_MODIFY_HANDLERS
if modify:
root_logger, task_logger = logging.getLogger(), logging.getLogger("airflow.task")
Redirect stdout to ti logger.

orig_level = root_logger.level
root_logger.setLevel(task_logger.level)
orig_handlers = root_logger.handlers.copy()
root_logger.handlers[:] = task_logger.handlers
Comment on lines -301 to -304
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@malthe observe, we already copy (temporarily) to root logger the thandlers from airflow.task

This causes problems because we have to have complicated propagation rules at the airflow.task logger because we leave it there.

While my solution here is a few more lines (and a lot more comments) it's not very complicated. There are two parts.

  1. Instead of copying the handlers to root, I move them to root. We don't need them at task if they are already at root. This could ultimately allow for simplification of our propagation logic.
  2. Previously we remove our console handler from root at run time. Now, if we're in a k8s executor pod, I keep the console handler there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't quite understand what "stdout redirection" means – is that within Python or is that something happening outside of Python (in a shell script perhaps) – ?

When I think about this problem, it seems that there is an orthogonal concern in the logging setup of Airflow which is whether or not the task logger (during execution) should be emitted to the stdout stream.

That's something you could want in any situation, not just K8S.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't quite understand what "stdout redirection" means – is that within Python or is that something happening outside of Python (in a shell script perhaps) – ?

i am not sure what comment you refer to but, what this means is there are stdlib helpers to take things that would ordinarily go to stdout (e.g. print) and send them to some other stream.

see from contextlib import redirect_stdout. it just monkey patches sys.stdout temporarily.

we use them (for better or worse) to redirect stdout to task logs.

our log read logic when task is running is usually, read from flask log server. most task logging is redirected to log (and therefore file) and our "console" handler respects this redirection. for obvious reasons -- the celery worker log, or local executor log would get unreasonably chatty. but in k8s executor context, there is no log server on the worker, and there is no problem with keeping the stdout, and importantly our log read logic assumes everything will be forwarded to stdout -- and this is what's broken that i am here fixing.

Redirect stdout and stderr to the task instance log as INFO and WARNING
level messages, respectively.

try:
If stdout already redirected (possible when task running with option
`--local`), don't redirect again.
"""
# if sys.stdout is StreamLogWriter, it means we already redirected
# likely before forking in LocalTaskJob
if not isinstance(sys.stdout, StreamLogWriter):
info_writer = StreamLogWriter(ti.log, logging.INFO)
warning_writer = StreamLogWriter(ti.log, logging.WARNING)

with redirect_stdout(info_writer), redirect_stderr(warning_writer):
yield

finally:
if modify:
# Restore the root logger to its original state.
root_logger.setLevel(orig_level)
root_logger.handlers[:] = orig_handlers
else:
yield


class TaskCommandMarker:
Expand Down Expand Up @@ -366,12 +382,6 @@ def task_run(args, dag=None):

settings.MASK_SECRETS_IN_LOGS = True

# IMPORTANT, have to re-configure ORM with the NullPool, otherwise, each "run" command may leave
# behind multiple open sleeping connections while heartbeating, which could
# easily exceed the database connection limit when
# processing hundreds of simultaneous tasks.
settings.reconfigure_orm(disable_connection_pool=True)

get_listener_manager().hook.on_starting(component=TaskCommandMarker())

if args.pickle:
Expand All @@ -390,11 +400,19 @@ def task_run(args, dag=None):

log.info("Running %s on host %s", ti, hostname)

# IMPORTANT, have to re-configure ORM with the NullPool, otherwise, each "run" command may leave
# behind multiple open sleeping connections while heartbeating, which could
# easily exceed the database connection limit when
# processing hundreds of simultaneous tasks.
# this should be last thing before running, to reduce likelihood of an open session
# which can cause trouble if running process in a fork.
settings.reconfigure_orm(disable_connection_pool=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this related to this logging PR in anyway?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think this will fail if we remove this change. you could call it a drive by and i can remove if you like.

task runner tests failed with this change. then i was like "oh why is it just throwing away whole log config, let's try and not do that". then after leaving the FTH intact for that test, that i discovered there was an extraneous session being created in created in _render_filename. as part of troubleshooting the failures and discovering that though, also saw that this reconfigure is happening earlier than it probably ought to. should be done as late as possible i think.


try:
if args.interactive:
_run_task_by_selected_method(args, dag, ti)
else:
with _capture_task_logs(ti):
with _move_task_handlers_to_root(ti), _redirect_stdout_to_ti_log(ti):
_run_task_by_selected_method(args, dag, ti)
finally:
try:
Expand Down Expand Up @@ -643,3 +661,53 @@ def task_clear(args):
include_subdags=not args.exclude_subdags,
include_parentdag=not args.exclude_parentdag,
)


class LoggerMutationHelper:
"""
Helper for moving and resetting handlers and other logger attrs.

:meta private:
"""

def __init__(self, logger):
self.handlers = logger.handlers[:]
self.level = logger.level
self.propagate = logger.propagate
self.source_logger = logger

def apply(self, logger, replace=True):
"""
Set ``logger`` with attrs stored on instance.

If ``logger`` is root logger, don't change propagate.
"""
if replace:
logger.handlers[:] = self.handlers
else:
for h in self.handlers:
if h not in logger.handlers:
logger.addHandler(h)
logger.level = self.level
if logger is not logging.getLogger():
logger.propagate = self.propagate

def move(self, logger, replace=True):
"""
Replace ``logger`` attrs with those from source.

:param logger: target logger
:param replace: if True, remove all handlers from target first; otherwise add if not present.
"""
self.apply(logger, replace=replace)
self.source_logger.propagate = True
self.source_logger.handlers[:] = []

def reset(self):
self.apply(self.source_logger)

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.reset()
2 changes: 2 additions & 0 deletions airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,8 @@ def initialize():
executor_constants.CELERY_KUBERNETES_EXECUTOR,
executor_constants.LOCAL_KUBERNETES_EXECUTOR,
}
IS_K8S_EXECUTOR_POD = bool(os.environ.get("AIRFLOW_IS_K8S_EXECUTOR_POD", ""))
"""Will be True if running in kubernetes executor pod."""

HIDE_SENSITIVE_VAR_CONN_FIELDS = conf.getboolean("core", "hide_sensitive_var_conn_fields")

Expand Down
33 changes: 22 additions & 11 deletions airflow/utils/log/file_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@
from airflow.exceptions import RemovedInAirflow3Warning
from airflow.utils.context import Context
from airflow.utils.helpers import parse_template_string, render_template_to_string
from airflow.utils.log.logging_mixin import SetContextPropagate
from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler
from airflow.utils.session import create_session
from airflow.utils.state import State

if TYPE_CHECKING:
from airflow.models import TaskInstance
from airflow.utils.log.logging_mixin import SetContextPropagate


class FileTaskHandler(logging.Handler):
Expand All @@ -62,19 +62,29 @@ def __init__(self, base_log_folder: str, filename_template: str | None = None):
# handler, not the one that calls super()__init__.
stacklevel=(2 if type(self) == FileTaskHandler else 3),
)
self.maintain_propagate: bool = False
"""If true, overrides default behavior of setting propagate=False
dstandish marked this conversation as resolved.
Show resolved Hide resolved
:meta private:
"""

def set_context(self, ti: TaskInstance) -> None | SetContextPropagate:
"""
Provide task_instance context to airflow task handler.

Generally speaking returns None. But if attr `maintain_propagate` has
dstandish marked this conversation as resolved.
Show resolved Hide resolved
been set to propagate, then returns sentinel MAINTAIN_PROPAGATE. This
has the effect of overriding the default behavior to set `propagate`
to False whenever set_context is called. At time of writing, this
functionality is only used in unit testing.

:param ti: task instance object
"""
local_loc = self._init_file(ti)
self.handler = NonCachingFileHandler(local_loc, encoding="utf-8")
if self.formatter:
self.handler.setFormatter(self.formatter)
self.handler.setLevel(self.level)
return None
return SetContextPropagate.MAINTAIN_PROPAGATE if self.maintain_propagate else None

def emit(self, record):
if self.handler:
Expand All @@ -92,16 +102,17 @@ def _render_filename(self, ti: TaskInstance, try_number: int) -> str:
with create_session() as session:
dag_run = ti.get_dagrun(session=session)
template = dag_run.get_log_template(session=session).filename
str_tpl, jinja_tpl = parse_template_string(template)
str_tpl, jinja_tpl = parse_template_string(template)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this parsed within the session context now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you look a few lines below, we need to keep context = ti.get_template_context(session=session)
in the same session. otherwise it creates a new session (provided by provide_session), and this can cause problems post-os.fork. this is why the test for standard task runner previously blew away the entire log config because. with this, we don't have to do that anymore.


if jinja_tpl:
if hasattr(ti, "task"):
context = ti.get_template_context()
else:
context = Context(ti=ti, ts=dag_run.logical_date.isoformat())
context["try_number"] = try_number
return render_template_to_string(jinja_tpl, context)
elif str_tpl:
if jinja_tpl:
if hasattr(ti, "task"):
context = ti.get_template_context(session=session)
else:
context = Context(ti=ti, ts=dag_run.logical_date.isoformat())
context["try_number"] = try_number
return render_template_to_string(jinja_tpl, context)

if str_tpl:
try:
dag = ti.task.dag
except AttributeError: # ti.task is not always set.
Expand Down
13 changes: 10 additions & 3 deletions airflow/utils/log/logging_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
from logging import Handler, Logger, StreamHandler
from typing import IO, cast

from airflow.settings import IS_K8S_EXECUTOR_POD

# 7-bit C1 ANSI escape sequences
ANSI_ESCAPE = re.compile(r"\x1B[@-_][0-?]*[ -/]*[@-~]")

Expand Down Expand Up @@ -165,9 +167,10 @@ def isatty(self):

class RedirectStdHandler(StreamHandler):
"""
This class is like a StreamHandler using sys.stderr/stdout, but always uses
This class is like a StreamHandler using sys.stderr/stdout, but uses
whatever sys.stderr/stderr is currently set to rather than the value of
sys.stderr/stdout at handler construction time.
sys.stderr/stdout at handler construction time, except when running a
task in a kubernetes executor pod.
"""

def __init__(self, stream):
Expand All @@ -179,13 +182,17 @@ def __init__(self, stream):
self._use_stderr = True
if "stdout" in stream:
self._use_stderr = False

self._orig_stream = sys.stdout
else:
self._orig_stream = sys.stderr
# StreamHandler tries to set self.stream
Handler.__init__(self)

@property
def stream(self):
"""Returns current stream."""
if IS_K8S_EXECUTOR_POD:
return self._orig_stream
if self._use_stderr:
return sys.stderr

Expand Down
Loading