Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
001372b
Add arbitrary log shipper
pankajkoti Jul 17, 2023
bccb93e
Address first set of review comments
pankajkoti Jul 18, 2023
a313337
Address second set of comments
pankajkoti Jul 18, 2023
22d6c33
Add tests
pankajkoti Jul 18, 2023
55846d6
Fix mypy failures
pankajkoti Jul 19, 2023
8425a2d
Merge branch 'main' into arbitrary-log-shipper
pankajkoti Jul 24, 2023
ea13b23
Update airflow/config_templates/default_airflow.cfg
pankajkoti Jul 24, 2023
24dfadf
Address @uranusjr's comments
pankajkoti Jul 24, 2023
b416abb
Merge branch 'main' into arbitrary-log-shipper
pankajkoti Jul 24, 2023
c77d70a
Address @hussein-awala's comments
pankajkoti Jul 25, 2023
828ab5e
Ignore type override for redis provider log handler's set_context
pankajkoti Jul 25, 2023
81da99b
Merge branch 'main' into arbitrary-log-shipper
pankajkoti Jul 25, 2023
2e02e13
Remove type ignore by accepting **kwargs
pankajkoti Jul 26, 2023
652ffce
Expose log like methods
pankajkoti Jul 26, 2023
abd7c3f
Update airflow/utils/log/file_task_handler.py
pankajkoti Jul 28, 2023
d58c36c
Revert renaming of ti->ti_or_ti_key in _render_filename()
pankajkoti Jul 28, 2023
0846479
Merge branch 'main' into arbitrary-log-shipper
pankajkoti Jul 28, 2023
04856aa
Address review comments
pankajkoti Jul 29, 2023
2dda898
Merge branch 'main' into arbitrary-log-shipper
pankajkoti Sep 1, 2023
0cef6d0
Update airflow/utils/log/task_context_logger.py
pankajkoti Sep 1, 2023
6c53958
Merge branch 'main' into arbitrary-log-shipper
pankajkoti Nov 14, 2023
aa82275
make signature of _log consistent with stdlib
dstandish Nov 14, 2023
ad18098
don't add class method get_from_key
dstandish Nov 14, 2023
188a606
improve ti not found message
dstandish Nov 14, 2023
6c74515
simplify
dstandish Nov 14, 2023
7b598b8
docstring
dstandish Nov 14, 2023
07e14db
config doc
dstandish Nov 14, 2023
98fcd3b
fix caller logger ref
dstandish Nov 14, 2023
4aba065
fix caller logger ref
dstandish Nov 15, 2023
c6adb63
move call site logger to init
dstandish Nov 15, 2023
12678f2
make signature uniform
dstandish Nov 15, 2023
e4c0fe6
Revert "make signature uniform"
dstandish Nov 15, 2023
6bd7d27
Rename method should_log -> should_log_to_task_context
pankajkoti Nov 15, 2023
ab7a644
rename should_log_to_task_context to enabled
dstandish Nov 15, 2023
c7f5463
fix static check
dstandish Nov 15, 2023
12a086a
fix test
dstandish Nov 15, 2023
8ca79f7
fix test
dstandish Nov 16, 2023
266077a
Merge branch 'main' into arbitrary-log-shipper
pankajkoti Nov 16, 2023
594be63
Merge branch 'main' into arbitrary-log-shipper
pankajkoti Nov 16, 2023
0ef2acd
Merge branch 'main' into arbitrary-log-shipper
pankajkoti Nov 16, 2023
1a6fa7a
fix test
dstandish Nov 16, 2023
f4cd69e
Merge branch 'main' into arbitrary-log-shipper
pankajkoti Nov 16, 2023
e356cd1
Merge branch 'main' into arbitrary-log-shipper
dstandish Nov 17, 2023
e7ac5d9
Remove test log
pankajkoti Nov 17, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,18 @@ logging:
type: boolean
example: ~
default: "False"
enable_task_context_logger:
description: |
If enabled, Airflow may ship messages to task logs from outside the task run context, e.g. from
the scheduler, executor, or callback execution context. This can help in circumstances such as
when there's something blocking the execution of the task and ordinarily there may be no task
logs at all.
This is set to True by default. If you encounter issues with this feature
(e.g. scheduler performance issues) it can be disabled.
version_added: 2.8.0
type: boolean
example: ~
default: "True"
metrics:
description: |
StatsD (https://github.com/etsy/statsd) integration settings.
Expand Down
7 changes: 6 additions & 1 deletion airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
from airflow.utils import timezone
from airflow.utils.event_scheduler import EventScheduler
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.log.task_context_logger import TaskContextLogger
from airflow.utils.retries import MAX_DB_RETRIES, retry_db_transaction, run_with_db_retries
from airflow.utils.session import NEW_SESSION, create_session, provide_session
from airflow.utils.sqlalchemy import (
Expand Down Expand Up @@ -233,6 +234,10 @@ def __init__(
self.processor_agent: DagFileProcessorAgent | None = None

self.dagbag = DagBag(dag_folder=self.subdir, read_dags_from_db=True, load_op_links=False)
self._task_context_logger: TaskContextLogger = TaskContextLogger(
component_name=self.job_type,
call_site_logger=self.log,
)

@provide_session
def heartbeat_callback(self, session: Session = NEW_SESSION) -> None:
Expand Down Expand Up @@ -773,7 +778,7 @@ def _process_executor_events(self, session: Session) -> int:
"Executor reports task instance %s finished (%s) although the "
"task says it's %s. (Info: %s) Was the task killed externally?"
)
self.log.error(msg, ti, state, ti.state, info)
self._task_context_logger.error(msg, ti, state, ti.state, info, ti=ti)

# Get task from the Serialized DAG
try:
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/elasticsearch/log/es_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ def emit(self, record):
setattr(record, self.offset_field, int(time.time() * (10**9)))
self.handler.emit(record)

def set_context(self, ti: TaskInstance) -> None:
def set_context(self, ti: TaskInstance, **kwargs) -> None:
"""
Provide task_instance context to airflow task handler.

Expand Down
4 changes: 3 additions & 1 deletion airflow/providers/microsoft/azure/log/wasb_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
if TYPE_CHECKING:
import logging

from airflow.models.taskinstance import TaskInstance


def get_default_delete_local_copy():
"""Load delete_local_logs conf if Airflow version > 2.6 and return False if not.
Expand Down Expand Up @@ -93,7 +95,7 @@ def hook(self):
)
return None

def set_context(self, ti) -> None:
def set_context(self, ti: TaskInstance, **kwargs) -> None:
super().set_context(ti)
# Local location and remote location is needed to open and
# upload local log file to Wasb remote storage.
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/redis/log/redis_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def _read(
).decode()
return log_str, {"end_of_log": True}

def set_context(self, ti: TaskInstance):
def set_context(self, ti: TaskInstance, **kwargs) -> None:
super().set_context(ti)
self.handler = _RedisHandler(
self.conn,
Expand Down
50 changes: 43 additions & 7 deletions airflow/utils/log/file_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"""File logging handler for tasks."""
from __future__ import annotations

import inspect
import logging
import os
import warnings
Expand All @@ -31,7 +32,7 @@
import pendulum

from airflow.configuration import conf
from airflow.exceptions import RemovedInAirflow3Warning
from airflow.exceptions import AirflowException, RemovedInAirflow3Warning
from airflow.executors.executor_loader import ExecutorLoader
from airflow.utils.context import Context
from airflow.utils.helpers import parse_template_string, render_template_to_string
Expand All @@ -41,7 +42,7 @@
from airflow.utils.state import State, TaskInstanceState

if TYPE_CHECKING:
from airflow.models import TaskInstance
from airflow.models.taskinstance import TaskInstance, TaskInstanceKey

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -131,6 +132,32 @@ def _interleave_logs(*logs):
last = v


def _ensure_ti(ti: TaskInstanceKey | TaskInstance, session) -> TaskInstance:
"""Given TI | TIKey, return a TI object.

Will raise exception if no TI is found in the database.
"""
from airflow.models.taskinstance import TaskInstance, TaskInstanceKey

if not isinstance(ti, TaskInstanceKey):
return ti
val = (
session.query(TaskInstance)
.filter(
TaskInstance.task_id == ti.task_id,
TaskInstance.dag_id == ti.dag_id,
TaskInstance.run_id == ti.run_id,
TaskInstance.map_index == ti.map_index,
)
.one_or_none()
)
if isinstance(val, TaskInstance):
val._try_number = ti.try_number
return val
else:
raise AirflowException(f"Could not find TaskInstance for {ti}")


class FileTaskHandler(logging.Handler):
"""
FileTaskHandler is a python log handler that handles and reads task instance logs.
Expand Down Expand Up @@ -170,7 +197,7 @@ def __init__(self, base_log_folder: str, filename_template: str | None = None):
Some handlers emit "end of log" markers, and may not wish to do so when task defers.
"""

def set_context(self, ti: TaskInstance) -> None | SetContextPropagate:
def set_context(self, ti: TaskInstance, *, identifier: str | None = None) -> None | SetContextPropagate:
"""
Provide task_instance context to airflow task handler.

Expand All @@ -181,14 +208,20 @@ def set_context(self, ti: TaskInstance) -> None | SetContextPropagate:
functionality is only used in unit testing.

:param ti: task instance object
:param identifier: if set, adds suffix to log file. For use when relaying exceptional messages
to task logs from a context other than task or trigger run
"""
local_loc = self._init_file(ti)
local_loc = self._init_file(ti, identifier=identifier)
self.handler = NonCachingFileHandler(local_loc, encoding="utf-8")
if self.formatter:
self.handler.setFormatter(self.formatter)
self.handler.setLevel(self.level)
return SetContextPropagate.MAINTAIN_PROPAGATE if self.maintain_propagate else None

@cached_property
def supports_task_context_logging(self) -> bool:
return "identifier" in inspect.signature(self.set_context).parameters

@staticmethod
def add_triggerer_suffix(full_path, job_id=None):
"""
Expand Down Expand Up @@ -217,9 +250,10 @@ def close(self):
if self.handler:
self.handler.close()

def _render_filename(self, ti: TaskInstance, try_number: int) -> str:
def _render_filename(self, ti: TaskInstance | TaskInstanceKey, try_number: int) -> str:
"""Return the worker log filename."""
with create_session() as session:
ti = _ensure_ti(ti, session)
dag_run = ti.get_dagrun(session=session)
template = dag_run.get_log_template(session=session).filename
str_tpl, jinja_tpl = parse_template_string(template)
Expand Down Expand Up @@ -458,7 +492,7 @@ def _prepare_log_folder(self, directory: Path):
print(f"Failed to change {directory} permission to {new_folder_permissions}: {e}")
pass

def _init_file(self, ti):
def _init_file(self, ti, *, identifier: str | None = None):
"""
Create log directory and give it permissions that are configured.

Expand All @@ -472,7 +506,9 @@ def _init_file(self, ti):
)
local_relative_path = self._render_filename(ti, ti.try_number)
full_path = os.path.join(self.local_base, local_relative_path)
if ti.is_trigger_log_context is True:
if identifier:
full_path += f".{identifier}.log"
elif ti.is_trigger_log_context is True:
# if this is true, we're invoked via set_context in the context of
# setting up individual trigger logging. return trigger log path.
full_path = self.add_triggerer_suffix(full_path=full_path, job_id=ti.triggerer_job.id)
Expand Down
182 changes: 182 additions & 0 deletions airflow/utils/log/task_context_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import logging
from contextlib import suppress
from copy import copy
from logging import Logger
from typing import TYPE_CHECKING

from airflow.configuration import conf

if TYPE_CHECKING:
from airflow.models.taskinstance import TaskInstance
from airflow.utils.log.file_task_handler import FileTaskHandler

logger = logging.getLogger(__name__)


class TaskContextLogger:
"""
Class for sending messages to task instance logs from outside task execution context.

This is intended to be used mainly in exceptional circumstances, to give visibility into
events related to task execution when otherwise there would be none.

:meta private:
"""

def __init__(self, component_name: str, call_site_logger: Logger | None = None):
"""
Initialize the task context logger with the component name.

:param component_name: the name of the component that will be used to identify the log messages
:param call_site_logger: if provided, message will also be emitted through this logger
"""
self.component_name = component_name
self.task_handler = self._get_task_handler()
self.enabled = self._should_enable()
self.call_site_logger = call_site_logger

def _should_enable(self) -> bool:
if not conf.getboolean("logging", "enable_task_context_logger"):
return False
if not getattr(self.task_handler, "supports_task_context_logging", False):
logger.warning("Task handler does not support task context logging")
return False
logger.info("Task context logging is enabled")
return True

@staticmethod
def _get_task_handler() -> FileTaskHandler | None:
"""Returns the task handler that supports task context logging."""
handlers = [
handler
for handler in logging.getLogger("airflow.task").handlers
if getattr(handler, "supports_task_context_logging", False)
]
if not handlers:
return None
h = handlers[0]
if TYPE_CHECKING:
assert isinstance(h, FileTaskHandler)
return h

def _log(self, level: int, msg: str, *args, ti: TaskInstance):
"""
Emit a log message to the task instance logs.

:param level: the log level
:param msg: the message to relay to task context log
:param ti: the task instance
"""
if self.call_site_logger and self.call_site_logger.isEnabledFor(level=level):
with suppress(Exception):
self.call_site_logger.log(level, msg, *args)

if not self.enabled:
return

if not self.task_handler:
return

task_handler = copy(self.task_handler)
try:
if hasattr(task_handler, "mark_end_on_close"):
task_handler.mark_end_on_close = False
task_handler.set_context(ti, identifier=self.component_name)
filename, lineno, func, stackinfo = logger.findCaller()
record = logging.LogRecord(
self.component_name, level, filename, lineno, msg, args, None, func=func
)
task_handler.emit(record)
finally:
task_handler.close()

def critical(self, msg: str, *args, ti: TaskInstance):
"""
Emit a log message with level CRITICAL to the task instance logs.

:param msg: the message to relay to task context log
:param ti: the task instance
"""
self._log(logging.CRITICAL, msg, *args, ti=ti)

def fatal(self, msg: str, *args, ti: TaskInstance):
"""
Emit a log message with level FATAL to the task instance logs.

:param msg: the message to relay to task context log
:param ti: the task instance
"""
self._log(logging.FATAL, msg, *args, ti=ti)

def error(self, msg: str, *args, ti: TaskInstance):
"""
Emit a log message with level ERROR to the task instance logs.

:param msg: the message to relay to task context log
:param ti: the task instance
"""
self._log(logging.ERROR, msg, *args, ti=ti)

def warn(self, msg: str, *args, ti: TaskInstance):
"""
Emit a log message with level WARN to the task instance logs.

:param msg: the message to relay to task context log
:param ti: the task instance
"""
self._log(logging.WARN, msg, *args, ti=ti)

def warning(self, msg: str, *args, ti: TaskInstance):
"""
Emit a log message with level WARNING to the task instance logs.

:param msg: the message to relay to task context log
:param ti: the task instance
"""
self._log(logging.WARNING, msg, *args, ti=ti)

def info(self, msg: str, *args, ti: TaskInstance):
"""
Emit a log message with level INFO to the task instance logs.

:param msg: the message to relay to task context log
:param ti: the task instance
"""
self._log(logging.INFO, msg, *args, ti=ti)

def debug(self, msg: str, *args, ti: TaskInstance):
"""
Emit a log message with level DEBUG to the task instance logs.

:param msg: the message to relay to task context log
:param ti: the task instance
"""
self._log(logging.DEBUG, msg, *args, ti=ti)

def notset(self, msg: str, *args, ti: TaskInstance):
"""
Emit a log message with level NOTSET to the task instance logs.

:param msg: the message to relay to task context log
:param ti: the task instance
"""
self._log(logging.NOTSET, msg, *args, ti=ti)
Loading