Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,7 @@ Example of limiting the size of tasks:
deepcopy(DEFAULT_LOGGING_CONFIG),
{
"handlers": {
"task": {
"max_bytes": 104857600, # 100MB
}
"task": {"max_bytes": 104857600, "backup_count": 1} # 100MB and keep 1 history rotate log.
}
},
)
87 changes: 87 additions & 0 deletions tests/utils/test_log_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import pendulum
import pytest
from kubernetes.client import models as k8s
from pydantic.v1.utils import deep_update
from requests.adapters import Response

from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
Expand Down Expand Up @@ -245,6 +246,92 @@ def task_callable(ti):
# Remove the generated tmp log file.
os.remove(log_filename)

def test_file_task_handler_rotate_size_limit(self):
def reset_log_config(update_conf):
import logging.config

logging_config = DEFAULT_LOGGING_CONFIG
logging_config = deep_update(logging_config, update_conf)
logging.config.dictConfig(logging_config)

def task_callable(ti):
pass

max_bytes_size = 60000
update_conf = {"handlers": {"task": {"max_bytes": max_bytes_size, "backup_count": 1}}}
reset_log_config(update_conf)
dag = DAG("dag_for_testing_file_task_handler_rotate_size_limit", start_date=DEFAULT_DATE)
task = PythonOperator(
task_id="task_for_testing_file_log_handler_rotate_size_limit",
python_callable=task_callable,
dag=dag,
)
dagrun = dag.create_dagrun(
run_type=DagRunType.MANUAL,
state=State.RUNNING,
execution_date=DEFAULT_DATE,
data_interval=dag.timetable.infer_manual_data_interval(run_after=DEFAULT_DATE),
)
ti = TaskInstance(task=task, run_id=dagrun.run_id)

ti.try_number = 1
ti.state = State.RUNNING

logger = ti.log
ti.log.disabled = False

file_handler = next(
(handler for handler in logger.handlers if handler.name == FILE_TASK_HANDLER), None
)
assert file_handler is not None

set_context(logger, ti)
assert file_handler.handler is not None
# We expect set_context generates a file locally, this is the first log file
# in this test, it should generate 2 when it finishes.
log_filename = file_handler.handler.baseFilename
assert os.path.isfile(log_filename)
assert log_filename.endswith("1.log"), log_filename

# mock to generate 2000 lines of log, the total size is larger than max_bytes_size
for i in range(1, 2000):
logger.info("this is a Test. %s", i)

# this is the rotate log file
log_rotate_1_name = log_filename + ".1"
assert os.path.isfile(log_rotate_1_name)

current_file_size = os.path.getsize(log_filename)
rotate_file_1_size = os.path.getsize(log_rotate_1_name)
assert rotate_file_1_size > max_bytes_size * 0.9
assert rotate_file_1_size < max_bytes_size
assert current_file_size < max_bytes_size

# Return value of read must be a tuple of list and list.
logs, metadatas = file_handler.read(ti)

# the log content should have the filename of both current log file and rotate log file.
find_current_log = False
find_rotate_log_1 = False
for log in logs:
if log_filename in str(log):
find_current_log = True
if log_rotate_1_name in str(log):
find_rotate_log_1 = True
assert find_current_log is True
assert find_rotate_log_1 is True

assert isinstance(logs, list)
# Logs for running tasks should show up too.
assert isinstance(logs, list)
assert isinstance(metadatas, list)
assert len(logs) == len(metadatas)
assert isinstance(metadatas[0], dict)

# Remove the two generated tmp log files.
os.remove(log_filename)
os.remove(log_rotate_1_name)

@patch("airflow.utils.log.file_task_handler.FileTaskHandler._read_from_local")
def test__read_when_local(self, mock_read_local, create_task_instance):
"""
Expand Down