-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Add task context logging feature to allow forwarding messages to task logs #32646
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
44 commits
Select commit
Hold shift + click to select a range
001372b
Add arbitrary log shipper
pankajkoti bccb93e
Address first set of review comments
pankajkoti a313337
Address second set of comments
pankajkoti 22d6c33
Add tests
pankajkoti 55846d6
Fix mypy failures
pankajkoti 8425a2d
Merge branch 'main' into arbitrary-log-shipper
pankajkoti ea13b23
Update airflow/config_templates/default_airflow.cfg
pankajkoti 24dfadf
Address @uranusjr's comments
pankajkoti b416abb
Merge branch 'main' into arbitrary-log-shipper
pankajkoti c77d70a
Address @hussein-awala's comments
pankajkoti 828ab5e
Ignore type override for redis provider log handler's set_context
pankajkoti 81da99b
Merge branch 'main' into arbitrary-log-shipper
pankajkoti 2e02e13
Remove type ignore by accepting **kwargs
pankajkoti 652ffce
Expose log like methods
pankajkoti abd7c3f
Update airflow/utils/log/file_task_handler.py
pankajkoti d58c36c
Revert renaming of ti->ti_or_ti_key in _render_filename()
pankajkoti 0846479
Merge branch 'main' into arbitrary-log-shipper
pankajkoti 04856aa
Address review comments
pankajkoti 2dda898
Merge branch 'main' into arbitrary-log-shipper
pankajkoti 0cef6d0
Update airflow/utils/log/task_context_logger.py
pankajkoti 6c53958
Merge branch 'main' into arbitrary-log-shipper
pankajkoti aa82275
make signature of _log consistent with stdlib
dstandish ad18098
don't add class method get_from_key
dstandish 188a606
improve ti not found message
dstandish 6c74515
simplify
dstandish 7b598b8
docstring
dstandish 07e14db
config doc
dstandish 98fcd3b
fix caller logger ref
dstandish 4aba065
fix caller logger ref
dstandish c6adb63
move call site logger to init
dstandish 12678f2
make signature uniform
dstandish e4c0fe6
Revert "make signature uniform"
dstandish 6bd7d27
Rename method should_log -> should_log_to_task_context
pankajkoti ab7a644
rename should_log_to_task_context to enabled
dstandish c7f5463
fix static check
dstandish 12a086a
fix test
dstandish 8ca79f7
fix test
dstandish 266077a
Merge branch 'main' into arbitrary-log-shipper
pankajkoti 594be63
Merge branch 'main' into arbitrary-log-shipper
pankajkoti 0ef2acd
Merge branch 'main' into arbitrary-log-shipper
pankajkoti 1a6fa7a
fix test
dstandish f4cd69e
Merge branch 'main' into arbitrary-log-shipper
pankajkoti e356cd1
Merge branch 'main' into arbitrary-log-shipper
dstandish e7ac5d9
Remove test log
pankajkoti File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,182 @@ | ||
| # | ||
| # Licensed to the Apache Software Foundation (ASF) under one | ||
| # or more contributor license agreements. See the NOTICE file | ||
| # distributed with this work for additional information | ||
| # regarding copyright ownership. The ASF licenses this file | ||
| # to you under the Apache License, Version 2.0 (the | ||
| # "License"); you may not use this file except in compliance | ||
| # with the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, | ||
| # software distributed under the License is distributed on an | ||
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| # KIND, either express or implied. See the License for the | ||
| # specific language governing permissions and limitations | ||
| # under the License. | ||
| from __future__ import annotations | ||
|
|
||
| import logging | ||
| from contextlib import suppress | ||
| from copy import copy | ||
| from logging import Logger | ||
| from typing import TYPE_CHECKING | ||
|
|
||
| from airflow.configuration import conf | ||
|
|
||
| if TYPE_CHECKING: | ||
| from airflow.models.taskinstance import TaskInstance | ||
| from airflow.utils.log.file_task_handler import FileTaskHandler | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class TaskContextLogger: | ||
| """ | ||
| Class for sending messages to task instance logs from outside task execution context. | ||
|
|
||
| This is intended to be used mainly in exceptional circumstances, to give visibility into | ||
| events related to task execution when otherwise there would be none. | ||
|
|
||
| :meta private: | ||
| """ | ||
|
|
||
| def __init__(self, component_name: str, call_site_logger: Logger | None = None): | ||
| """ | ||
| Initialize the task context logger with the component name. | ||
|
|
||
| :param component_name: the name of the component that will be used to identify the log messages | ||
| :param call_site_logger: if provided, message will also be emitted through this logger | ||
| """ | ||
| self.component_name = component_name | ||
| self.task_handler = self._get_task_handler() | ||
| self.enabled = self._should_enable() | ||
| self.call_site_logger = call_site_logger | ||
|
|
||
| def _should_enable(self) -> bool: | ||
| if not conf.getboolean("logging", "enable_task_context_logger"): | ||
| return False | ||
| if not getattr(self.task_handler, "supports_task_context_logging", False): | ||
| logger.warning("Task handler does not support task context logging") | ||
| return False | ||
| logger.info("Task context logging is enabled") | ||
| return True | ||
|
|
||
| @staticmethod | ||
| def _get_task_handler() -> FileTaskHandler | None: | ||
| """Returns the task handler that supports task context logging.""" | ||
| handlers = [ | ||
| handler | ||
| for handler in logging.getLogger("airflow.task").handlers | ||
| if getattr(handler, "supports_task_context_logging", False) | ||
| ] | ||
| if not handlers: | ||
| return None | ||
| h = handlers[0] | ||
| if TYPE_CHECKING: | ||
| assert isinstance(h, FileTaskHandler) | ||
| return h | ||
|
|
||
| def _log(self, level: int, msg: str, *args, ti: TaskInstance): | ||
| """ | ||
| Emit a log message to the task instance logs. | ||
|
|
||
| :param level: the log level | ||
| :param msg: the message to relay to task context log | ||
| :param ti: the task instance | ||
| """ | ||
| if self.call_site_logger and self.call_site_logger.isEnabledFor(level=level): | ||
| with suppress(Exception): | ||
| self.call_site_logger.log(level, msg, *args) | ||
|
|
||
| if not self.enabled: | ||
| return | ||
|
|
||
| if not self.task_handler: | ||
| return | ||
pankajkoti marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| task_handler = copy(self.task_handler) | ||
| try: | ||
| if hasattr(task_handler, "mark_end_on_close"): | ||
| task_handler.mark_end_on_close = False | ||
| task_handler.set_context(ti, identifier=self.component_name) | ||
| filename, lineno, func, stackinfo = logger.findCaller() | ||
| record = logging.LogRecord( | ||
| self.component_name, level, filename, lineno, msg, args, None, func=func | ||
| ) | ||
| task_handler.emit(record) | ||
| finally: | ||
| task_handler.close() | ||
|
|
||
| def critical(self, msg: str, *args, ti: TaskInstance): | ||
| """ | ||
| Emit a log message with level CRITICAL to the task instance logs. | ||
|
|
||
| :param msg: the message to relay to task context log | ||
| :param ti: the task instance | ||
| """ | ||
| self._log(logging.CRITICAL, msg, *args, ti=ti) | ||
|
|
||
| def fatal(self, msg: str, *args, ti: TaskInstance): | ||
| """ | ||
| Emit a log message with level FATAL to the task instance logs. | ||
|
|
||
| :param msg: the message to relay to task context log | ||
| :param ti: the task instance | ||
| """ | ||
| self._log(logging.FATAL, msg, *args, ti=ti) | ||
|
|
||
| def error(self, msg: str, *args, ti: TaskInstance): | ||
| """ | ||
| Emit a log message with level ERROR to the task instance logs. | ||
|
|
||
| :param msg: the message to relay to task context log | ||
| :param ti: the task instance | ||
| """ | ||
| self._log(logging.ERROR, msg, *args, ti=ti) | ||
|
|
||
| def warn(self, msg: str, *args, ti: TaskInstance): | ||
| """ | ||
| Emit a log message with level WARN to the task instance logs. | ||
|
|
||
| :param msg: the message to relay to task context log | ||
| :param ti: the task instance | ||
| """ | ||
| self._log(logging.WARN, msg, *args, ti=ti) | ||
|
|
||
| def warning(self, msg: str, *args, ti: TaskInstance): | ||
| """ | ||
| Emit a log message with level WARNING to the task instance logs. | ||
|
|
||
| :param msg: the message to relay to task context log | ||
| :param ti: the task instance | ||
| """ | ||
| self._log(logging.WARNING, msg, *args, ti=ti) | ||
|
|
||
| def info(self, msg: str, *args, ti: TaskInstance): | ||
| """ | ||
| Emit a log message with level INFO to the task instance logs. | ||
|
|
||
| :param msg: the message to relay to task context log | ||
| :param ti: the task instance | ||
| """ | ||
| self._log(logging.INFO, msg, *args, ti=ti) | ||
|
|
||
| def debug(self, msg: str, *args, ti: TaskInstance): | ||
| """ | ||
| Emit a log message with level DEBUG to the task instance logs. | ||
|
|
||
| :param msg: the message to relay to task context log | ||
| :param ti: the task instance | ||
| """ | ||
| self._log(logging.DEBUG, msg, *args, ti=ti) | ||
|
|
||
| def notset(self, msg: str, *args, ti: TaskInstance): | ||
| """ | ||
| Emit a log message with level NOTSET to the task instance logs. | ||
|
|
||
| :param msg: the message to relay to task context log | ||
| :param ti: the task instance | ||
| """ | ||
| self._log(logging.NOTSET, msg, *args, ti=ti) | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.