Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions airflow/utils/log/file_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from airflow.utils.context import Context
from airflow.utils.helpers import parse_template_string, render_template_to_string
from airflow.utils.log.logging_mixin import SetContextPropagate
from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler
from airflow.utils.log.non_caching_file_handler import NonCachingRotatingFileHandler
from airflow.utils.session import provide_session
from airflow.utils.state import State, TaskInstanceState

Expand Down Expand Up @@ -176,14 +176,24 @@ class FileTaskHandler(logging.Handler):

:param base_log_folder: Base log folder to place logs.
:param filename_template: template filename string
:param max_bytes: max bytes size for the log file
:param backup_count: backup file count for the log file
:param delay: default False -> StreamHandler, True -> Handler
"""

trigger_should_wrap = True
inherits_from_empty_operator_log_message = (
"Operator inherits from empty operator and thus does not have logs"
)

def __init__(self, base_log_folder: str, filename_template: str | None = None):
def __init__(
self,
base_log_folder: str,
filename_template: str | None = None,
max_bytes: int = 0,
backup_count: int = 0,
delay: bool = False,
):
super().__init__()
self.handler: logging.Handler | None = None
self.local_base = base_log_folder
Expand All @@ -196,6 +206,9 @@ def __init__(self, base_log_folder: str, filename_template: str | None = None):
stacklevel=(2 if type(self) == FileTaskHandler else 3),
)
self.maintain_propagate: bool = False
self.max_bytes = max_bytes
self.backup_count = backup_count
self.delay = delay
"""
If true, overrides default behavior of setting propagate=False

Expand Down Expand Up @@ -224,7 +237,13 @@ def set_context(self, ti: TaskInstance, *, identifier: str | None = None) -> Non
to task logs from a context other than task or trigger run
"""
local_loc = self._init_file(ti, identifier=identifier)
self.handler = NonCachingFileHandler(local_loc, encoding="utf-8")
self.handler = NonCachingRotatingFileHandler(
local_loc,
encoding="utf-8",
maxBytes=self.max_bytes,
backupCount=self.backup_count,
delay=self.delay,
)
if self.formatter:
self.handler.setFormatter(self.formatter)
self.handler.setLevel(self.level)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,24 @@ Example of custom logger name:
}
},
)

If you want to limit the log size of the tasks, you can add the handlers.task.max_bytes parameter.

Example of limiting the size of tasks:

.. code-block:: python

from copy import deepcopy
from pydantic.utils import deep_update
from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG

LOGGING_CONFIG = deep_update(
deepcopy(DEFAULT_LOGGING_CONFIG),
{
"handlers": {
"task": {
"max_bytes": 104857600, # 100MB
}
}
},
)