Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify log context setting / propagation handling #28571

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion airflow/config_templates/airflow_local_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@
"airflow.task": {
"handlers": ["task"],
"level": LOG_LEVEL,
# Set to true here (and reset via set_context) so that if no file is configured we still get logs!
"propagate": True,
"filters": ["mask_secrets"],
},
Expand Down
4 changes: 2 additions & 2 deletions airflow/utils/log/file_processor_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from airflow import settings
from airflow.utils.helpers import parse_template_string
from airflow.utils.log.logging_mixin import DISABLE_PROPOGATE
from airflow.utils.log.logging_mixin import SetContextPropagate
from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler


Expand Down Expand Up @@ -65,7 +65,7 @@ def set_context(self, filename):
self._symlink_latest_log_directory()
self._cur_date = datetime.today()

return DISABLE_PROPOGATE
return SetContextPropagate.DISABLE_PROPAGATE

def emit(self, record):
if self.handler is not None:
Expand Down
16 changes: 1 addition & 15 deletions airflow/utils/log/file_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
from airflow.executors.executor_loader import ExecutorLoader
from airflow.utils.context import Context
from airflow.utils.helpers import parse_template_string, render_template_to_string
from airflow.utils.log.logging_mixin import SetContextPropagate
from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler
from airflow.utils.session import create_session
from airflow.utils.state import State
Expand Down Expand Up @@ -63,31 +62,18 @@ def __init__(self, base_log_folder: str, filename_template: str | None = None):
# handler, not the one that calls super()__init__.
stacklevel=(2 if type(self) == FileTaskHandler else 3),
)
self.maintain_propagate: bool = False
"""
If true, overrides default behavior of setting propagate=False

:meta private:
"""

def set_context(self, ti: TaskInstance) -> None | SetContextPropagate:
def set_context(self, ti: TaskInstance) -> None:
"""
Provide task_instance context to airflow task handler.

Generally speaking returns None. But if attr `maintain_propagate` has
been set to propagate, then returns sentinel MAINTAIN_PROPAGATE. This
has the effect of overriding the default behavior to set `propagate`
to False whenever set_context is called. At time of writing, this
functionality is only used in unit testing.

:param ti: task instance object
"""
local_loc = self._init_file(ti)
self.handler = NonCachingFileHandler(local_loc, encoding="utf-8")
if self.formatter:
self.handler.setFormatter(self.formatter)
self.handler.setLevel(self.level)
return SetContextPropagate.MAINTAIN_PROPAGATE if self.maintain_propagate else None

def emit(self, record):
if self.handler:
Expand Down
40 changes: 18 additions & 22 deletions airflow/utils/log/logging_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,22 @@
ANSI_ESCAPE = re.compile(r"\x1B[@-_][0-?]*[ -/]*[@-~]")


# Private: A sentinel objects
# Private: Sentinel objects
class SetContextPropagate(enum.Enum):
""":meta private:"""
"""
Deprecated. Previously used to allow disabling of propagation (the default) of
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a private class, can we remove this outright?

"airflow.task" logger. But now that we move these handlers to root (instead of
copy) we do not want to disable propagation -- messages will only be
processed by root logger.

:meta private:
"""

# If a `set_context` function wants to _keep_ propagation set on it's logger it needs to return this
# special value.
# Not used anymore.
MAINTAIN_PROPAGATE = object()
# Don't use this one anymore!

DISABLE_PROPAGATE = object()
"""Return this sentinel from set_context to disable propagation."""


def __getattr__(name):
Expand Down Expand Up @@ -87,8 +94,7 @@ def log(self) -> Logger:
return LoggingMixin._get_log(self, self.__class__)

def _set_context(self, context):
if context is not None:
set_context(self.log, context)
set_context(self.log, context)


class ExternalLoggingMixin:
Expand Down Expand Up @@ -218,24 +224,14 @@ def set_context(logger, value):
:param logger: logger
:param value: value to set
"""
while logger:
orig_propagate = logger.propagate
if value is not None:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes in this function are mainly what this PR is about. We no longer need to "walk" the logger hierarchy because we no longer duplicate task logging handlers.

And as a result we no longer need to disable propagation by default, so we can revert back to the prior approach of just disabling when explicitly asked to do so.

for handler in logger.handlers:
# Not all handlers need to have context passed in so we ignore
# the error when handlers do not have set_context defined.

# Don't use getatrr so we have type checking. And we don't care if handler is actually a
# FileTaskHandler, it just needs to have a set_context function!
# Not all handlers need to have context passed
if hasattr(handler, "set_context"):
# Don't use getatrr so we have type checking. And we don't care if handler is actually a
# FileTaskHandler, it just needs to have a set_context function!
from airflow.utils.log.file_task_handler import FileTaskHandler

flag = cast(FileTaskHandler, handler).set_context(value)
# By default we disable propagate once we have configured the logger, unless that handler
# explicitly asks us to keep it on.
if flag is not SetContextPropagate.MAINTAIN_PROPAGATE:
if flag is SetContextPropagate.DISABLE_PROPAGATE:
logger.propagate = False
if orig_propagate is True:
# If we were set to propagate before we turned if off, then keep passing set_context up
logger = logger.parent
else:
break
2 changes: 1 addition & 1 deletion tests/models/test_baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ def test_weight_rule_override(self):

# ensure the default logging config is used for this test, no matter what ran before
@pytest.mark.usefixtures("reset_logging_config")
def test_logging_propogated_by_default(self, caplog):
def test_logging_propagated_by_default(self, caplog):
"""Test that when set_context hasn't been called that log records are emitted"""
BaseOperator(task_id="test").log.warning("test")
# This looks like "how could it fail" but this actually checks that the handler called `emit`. Testing
Expand Down
31 changes: 0 additions & 31 deletions tests/task/task_runner/test_standard_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import logging
import os
import time
from contextlib import contextmanager
from pathlib import Path
from unittest import mock
from unittest.mock import patch
Expand All @@ -34,7 +33,6 @@
from airflow.models.taskinstance import TaskInstance
from airflow.task.task_runner.standard_task_runner import StandardTaskRunner
from airflow.utils import timezone
from airflow.utils.log.file_task_handler import FileTaskHandler
from airflow.utils.platform import getuser
from airflow.utils.state import State
from airflow.utils.timeout import timeout
Expand All @@ -48,34 +46,6 @@
TASK_FORMAT = "%(filename)s:%(lineno)d %(levelname)s - %(message)s"


@contextmanager
def propagate_task_logger():
"""
Set `airflow.task` logger to propagate.

Apparently, caplog doesn't work if you don't propagate messages to root.

But the normal behavior of the `airflow.task` logger is not to propagate.

When freshly configured, the logger is set to propagate. However,
ordinarily when set_context is called, this is set to False.

To override this behavior, so that the messages make it to caplog, we
must tell the handler to maintain its current setting.
"""
logger = logging.getLogger("airflow.task")
h = logger.handlers[0]
assert isinstance(h, FileTaskHandler) # just to make sure / document
_propagate = h.maintain_propagate
if _propagate is False:
h.maintain_propagate = True
try:
yield
finally:
if _propagate is False:
h.maintain_propagate = _propagate


@pytest.mark.usefixtures("reset_logging_config")
class TestStandardTaskRunner:
def setup_class(self):
Expand Down Expand Up @@ -201,7 +171,6 @@ def test_start_and_terminate_run_as_user(self, mock_init):

assert runner.return_code() is not None

@propagate_task_logger()
@patch("airflow.utils.log.file_task_handler.FileTaskHandler._init_file")
def test_early_reap_exit(self, mock_init, caplog):
"""
Expand Down
51 changes: 8 additions & 43 deletions tests/utils/test_logging_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
from __future__ import annotations

import logging
import sys
import warnings
from unittest import mock

import pytest

from airflow.utils.log.logging_mixin import SetContextPropagate, StreamLogWriter, set_context
from airflow.utils.log.logging_mixin import StreamLogWriter, set_context


@pytest.fixture
Expand Down Expand Up @@ -66,22 +65,22 @@ def setup_method(self):
warnings.filterwarnings(action="always")

def test_set_context(self, child_logger, parent_child_handlers):
handler1, handler2 = parent_child_handlers
handler1.set_context = mock.MagicMock()
handler2.set_context = mock.MagicMock()
h_parent, h_child = parent_child_handlers
h_parent.set_context = mock.MagicMock()
h_child.set_context = mock.MagicMock()

parent = logging.getLogger(__name__)
parent.propagate = False
parent.addHandler(handler1)
parent.addHandler(h_parent)
log = parent.getChild("child")
log.addHandler(handler2),
log.addHandler(h_child),
log.propagate = True

value = "test"
set_context(log, value)

handler1.set_context.assert_called_once_with(value)
handler2.set_context.assert_called_once_with(value)
h_parent.set_context.assert_not_called()
h_child.set_context.assert_called_once_with(value)

def teardown_method(self):
warnings.resetwarnings()
Expand Down Expand Up @@ -140,37 +139,3 @@ def test_iobase_compatibility(self):
assert not log.closed
# has no specific effect
log.close()


@pytest.mark.parametrize(["maintain_propagate"], [[SetContextPropagate.MAINTAIN_PROPAGATE], [None]])
def test_set_context_propagation(parent_child_handlers, child_logger, maintain_propagate):
# Test the behaviour of set_context and logger propagation and the MAINTAIN_PROPAGATE return

parent_handler, handler = parent_child_handlers
handler.set_context = mock.MagicMock(return_value=maintain_propagate)

# Before settting_context, ensure logs make it to the parent
line = sys._getframe().f_lineno + 1
record = child_logger.makeRecord(
child_logger.name, logging.INFO, __file__, line, "test message", [], None
)
child_logger.handle(record)

handler.handle.assert_called_once_with(record)
# Should call the parent handler too in the default/unconfigured case
parent_handler.handle.assert_called_once_with(record)

parent_handler.handle.reset_mock()
handler.handle.reset_mock()

# Ensure that once we've called set_context on the handler we disable propagation to parent loggers by
# default!
set_context(child_logger, {})

child_logger.handle(record)

handler.handle.assert_called_once_with(record)
if maintain_propagate is SetContextPropagate.MAINTAIN_PROPAGATE:
parent_handler.handle.assert_called_once_with(record)
else:
parent_handler.handle.assert_not_called()