Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3372,6 +3372,7 @@ components:
owner:
description: Name of the user who triggered these events a.
type: string
nullable: true
readOnly: true
extra:
description: |
Expand Down
42 changes: 26 additions & 16 deletions airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,18 @@
import logging
import sys
import warnings
from collections import defaultdict
from collections import defaultdict, deque
from dataclasses import dataclass, field
from functools import cached_property
from typing import TYPE_CHECKING, Any, List, Optional, Sequence, Tuple

import pendulum

from airflow.cli.cli_config import DefaultHelpParser
from airflow.configuration import conf
from airflow.exceptions import RemovedInAirflow3Warning
from airflow.models import Log
from airflow.stats import Stats
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.log.task_context_logger import TaskContextLogger
from airflow.utils.state import TaskInstanceState

PARALLELISM: int = conf.getint("core", "PARALLELISM")
Expand Down Expand Up @@ -131,6 +130,16 @@ def __init__(self, parallelism: int = PARALLELISM):
self.queued_tasks: dict[TaskInstanceKey, QueuedTaskInstanceType] = {}
self.running: set[TaskInstanceKey] = set()
self.event_buffer: dict[TaskInstanceKey, EventBufferValueType] = {}
self._task_event_logs: deque[Log] = deque()
"""
Deque for storing task event log messages.

This attribute is only internally public and should not be manipulated
directly by subclasses.

:meta private:
"""

self.attempts: dict[TaskInstanceKey, RunningRetryAttemptType] = defaultdict(RunningRetryAttemptType)

def __repr__(self):
Expand All @@ -139,6 +148,10 @@ def __repr__(self):
def start(self): # pragma: no cover
"""Executors may need to get things started."""

def log_task_event(self, *, event: str, extra: str, ti_key: TaskInstanceKey):
"""Add an event to the log table."""
self._task_event_logs.append(Log(event=event, task_instance=ti_key, extra=extra))

def queue_command(
self,
task_instance: TaskInstance,
Expand Down Expand Up @@ -288,13 +301,20 @@ def trigger_tasks(self, open_slots: int) -> None:
# if it hasn't been much time since first check, let it be checked again next time
self.log.info("queued but still running; attempt=%s task=%s", attempt.total_tries, key)
continue

# Otherwise, we give up and remove the task from the queue.
self.send_message_to_task_logs(
logging.ERROR,
self.log.error(
"could not queue task %s (still running after %d attempts).",
key,
attempt.total_tries,
ti=ti,
)
self.log_task_event(
event="task launch failure",
extra=(
"Task was in running set and could not be queued "
f"after {attempt.total_tries} attempts."
),
ti_key=key,
)
del self.attempts[key]
del self.queued_tasks[key]
Expand Down Expand Up @@ -526,16 +546,6 @@ def send_callback(self, request: CallbackRequest) -> None:
raise ValueError("Callback sink is not ready.")
self.callback_sink.send(request)

@cached_property
def _task_context_logger(self) -> TaskContextLogger:
return TaskContextLogger(
component_name="Executor",
call_site_logger=self.log,
)

def send_message_to_task_logs(self, level: int, msg: str, *args, ti: TaskInstance | TaskInstanceKey):
self._task_context_logger._log(level, msg, *args, ti=ti)

@staticmethod
def get_cli_commands() -> list[GroupCommand]:
"""
Expand Down
82 changes: 57 additions & 25 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import sys
import time
import warnings
from collections import Counter, defaultdict
from collections import Counter, defaultdict, deque
from dataclasses import dataclass
from datetime import timedelta
from functools import lru_cache, partial
Expand All @@ -44,6 +44,7 @@
from airflow.executors.executor_loader import ExecutorLoader
from airflow.jobs.base_job_runner import BaseJobRunner
from airflow.jobs.job import Job, perform_heartbeat
from airflow.models import Log
from airflow.models.dag import DAG, DagModel
from airflow.models.dagbag import DagBag
from airflow.models.dagrun import DagRun
Expand All @@ -62,7 +63,6 @@
from airflow.utils import timezone
from airflow.utils.event_scheduler import EventScheduler
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.log.task_context_logger import TaskContextLogger
from airflow.utils.retries import MAX_DB_RETRIES, retry_db_transaction, run_with_db_retries
from airflow.utils.session import NEW_SESSION, create_session, provide_session
from airflow.utils.sqlalchemy import (
Expand Down Expand Up @@ -237,10 +237,6 @@ def __init__(
self.processor_agent: DagFileProcessorAgent | None = None

self.dagbag = DagBag(dag_folder=self.subdir, read_dags_from_db=True, load_op_links=False)
self._task_context_logger: TaskContextLogger = TaskContextLogger(
component_name=self.job_type,
call_site_logger=self.log,
)

@provide_session
def heartbeat_callback(self, session: Session = NEW_SESSION) -> None:
Expand Down Expand Up @@ -740,6 +736,11 @@ def _critical_section_enqueue_task_instances(self, session: Session) -> int:

return len(queued_tis)

@staticmethod
def _process_task_event_logs(log_records: deque[Log], session: Session):
objects = (log_records.popleft() for _ in range(len(log_records)))
session.bulk_save_objects(objects=objects, preserve_order=False)

def _process_executor_events(self, executor: BaseExecutor, session: Session) -> int:
"""Respond to executor events."""
if not self._standalone_dag_processor and not self.processor_agent:
Expand Down Expand Up @@ -842,7 +843,8 @@ def _process_executor_events(self, executor: BaseExecutor, session: Session) ->
)
if info is not None:
msg += " Extra info: %s" % info # noqa: RUF100, UP031, flynt
self._task_context_logger.error(msg, ti=ti)
self.log.error(msg)
session.add(Log(event="state mismatch", extra=msg, task_instance=ti.key))

# Get task from the Serialized DAG
try:
Expand Down Expand Up @@ -1066,6 +1068,14 @@ def _run_scheduler_loop(self) -> None:
num_finished_events += self._process_executor_events(
executor=executor, session=session
)

for executor in self.job.executors:
try:
with create_session() as session:
self._process_task_event_logs(executor._task_event_logs, session)
except Exception:
self.log.exception("Something went wrong when trying to save task event logs.")

if self.processor_agent:
self.processor_agent.heartbeat()

Expand Down Expand Up @@ -1649,11 +1659,20 @@ def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None:
cleaned_up_task_instances = set(executor.cleanup_stuck_queued_tasks(tis=stuck_tis))
for ti in stuck_tis:
if repr(ti) in cleaned_up_task_instances:
self._task_context_logger.warning(
self.log.warning(
"Marking task instance %s stuck in queued as failed. "
"If the task instance has available retries, it will be retried.",
ti,
ti=ti,
)
session.add(
Log(
event="stuck in queued",
task_instance=ti.key,
extra=(
"Task will be marked as failed. If the task instance has "
"available retries, it will be retried."
),
)
)
except NotImplementedError:
self.log.debug("Executor doesn't support cleanup of stuck queued tasks. Skipping.")
Expand Down Expand Up @@ -1817,22 +1836,35 @@ def _find_zombies(self) -> None:
if zombies:
self.log.warning("Failing (%s) jobs without heartbeat after %s", len(zombies), limit_dttm)

for ti, file_loc, processor_subdir in zombies:
zombie_message_details = self._generate_zombie_message_details(ti)
request = TaskCallbackRequest(
full_filepath=file_loc,
processor_subdir=processor_subdir,
simple_task_instance=SimpleTaskInstance.from_ti(ti),
msg=str(zombie_message_details),
)
log_message = (
f"Detected zombie job: {request} "
"(See https://airflow.apache.org/docs/apache-airflow/"
"stable/core-concepts/tasks.html#zombie-undead-tasks)"
)
self._task_context_logger.error(log_message, ti=ti)
self.job.executor.send_callback(request)
Stats.incr("zombies_killed", tags={"dag_id": ti.dag_id, "task_id": ti.task_id})
with create_session() as session:
for ti, file_loc, processor_subdir in zombies:
zombie_message_details = self._generate_zombie_message_details(ti)
request = TaskCallbackRequest(
full_filepath=file_loc,
processor_subdir=processor_subdir,
simple_task_instance=SimpleTaskInstance.from_ti(ti),
msg=str(zombie_message_details),
)
session.add(
Log(
event="heartbeat timeout",
task_instance=ti.key,
extra=(
f"Task did not emit heartbeat within time limit ({self._zombie_threshold_secs} "
"seconds) and will be terminated. "
"See https://airflow.apache.org/docs/apache-airflow/"
"stable/core-concepts/tasks.html#zombie-undead-tasks"
),
)
)
self.log.error(
"Detected zombie job: %s "
"(See https://airflow.apache.org/docs/apache-airflow/"
"stable/core-concepts/tasks.html#zombie-undead-tasks)",
request,
)
self.job.executor.send_callback(request)
Stats.incr("zombies_killed", tags={"dag_id": ti.dag_id, "task_id": ti.task_id})

# [END find_zombies]

Expand Down
24 changes: 20 additions & 4 deletions airflow/models/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,18 @@
# under the License.
from __future__ import annotations

from typing import TYPE_CHECKING

from sqlalchemy import Column, Index, Integer, String, Text

from airflow.models.base import Base, StringID
from airflow.utils import timezone
from airflow.utils.sqlalchemy import UtcDateTime

if TYPE_CHECKING:
from airflow.models.taskinstance import TaskInstance
from airflow.models.taskinstancekey import TaskInstanceKey


class Log(Base):
"""Used to actively log events to the database."""
Expand All @@ -49,22 +55,32 @@ class Log(Base):
Index("idx_log_task_instance", dag_id, task_id, run_id, map_index, try_number),
)

def __init__(self, event, task_instance=None, owner=None, owner_display_name=None, extra=None, **kwargs):
def __init__(
self,
event,
task_instance: TaskInstance | TaskInstanceKey | None = None,
owner=None,
owner_display_name=None,
extra=None,
**kwargs,
):
self.dttm = timezone.utcnow()
self.event = event
self.extra = extra

task_owner = None

self.execution_date = None
if task_instance:
self.dag_id = task_instance.dag_id
self.task_id = task_instance.task_id
self.execution_date = task_instance.execution_date
if execution_date := getattr(task_instance, "execution_date", None):
self.execution_date = execution_date
self.run_id = task_instance.run_id
self.try_number = task_instance.try_number
self.map_index = task_instance.map_index
if getattr(task_instance, "task", None):
task_owner = task_instance.task.owner
if task := getattr(task_instance, "task", None):
task_owner = task.owner

if "task_id" in kwargs:
self.task_id = kwargs["task_id"]
Expand Down
33 changes: 20 additions & 13 deletions airflow/providers/amazon/aws/executors/batch/batch_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@

from __future__ import annotations

import contextlib
import logging
import time
from collections import deque
from contextlib import suppress
from copy import deepcopy
from typing import TYPE_CHECKING, Any, Dict, List, Sequence

Expand Down Expand Up @@ -292,12 +291,19 @@ def attempt_submit_jobs(self):

if failure_reason:
if attempt_number >= int(self.__class__.MAX_SUBMIT_JOB_ATTEMPTS):
self.send_message_to_task_logs(
logging.ERROR,
"This job has been unsuccessfully attempted too many times (%s). Dropping the task. Reason: %s",
self.log.error(
(
"This job has been unsuccessfully attempted too many times (%s). "
"Dropping the task. Reason: %s"
),
attempt_number,
failure_reason,
ti=key,
)
self.log_task_event(
event="batch job submit failure",
extra=f"This job has been unsuccessfully attempted too many times ({attempt_number}). "
f"Dropping the task. Reason: {failure_reason}",
ti_key=key,
)
self.fail(key=key)
else:
Expand All @@ -317,7 +323,7 @@ def attempt_submit_jobs(self):
exec_config=exec_config,
attempt_number=attempt_number,
)
with contextlib.suppress(AttributeError):
with suppress(AttributeError):
# TODO: Remove this when min_airflow_version is 2.10.0 or higher in Amazon provider.
# running_state is added in Airflow 2.10 and only needed to support task adoption
# (an optional executor feature).
Expand Down Expand Up @@ -458,10 +464,11 @@ def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[Task
not_adopted_tis = [ti for ti in tis if ti not in adopted_tis]
return not_adopted_tis

def send_message_to_task_logs(self, level: int, msg: str, *args, ti: TaskInstance | TaskInstanceKey):
def log_task_event(self, *, event: str, extra: str, ti_key: TaskInstanceKey):
# TODO: remove this method when min_airflow_version is set to higher than 2.10.0
try:
super().send_message_to_task_logs(level, msg, *args, ti=ti)
except AttributeError:
# ``send_message_to_task_logs`` is added in 2.10.0
self.log.error(msg, *args)
with suppress(AttributeError):
super().log_task_event(
event=event,
extra=extra,
ti_key=ti_key,
)
Loading