Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@

PARALLELISM: int = conf.getint("core", "PARALLELISM")

NOT_STARTED_MESSAGE = "The executor should be started first!"

Comment on lines -36 to -37
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we need to maintain this for backcompat but that's basically xkcd1172

QUEUEING_ATTEMPTS = 5

# Command to execute - list of strings
Expand Down
32 changes: 17 additions & 15 deletions airflow/executors/dask_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@
from __future__ import annotations

import subprocess
from typing import Any
from typing import TYPE_CHECKING, Any

from distributed import Client, Future, as_completed
from distributed.security import Security

from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.executors.base_executor import NOT_STARTED_MESSAGE, BaseExecutor, CommandType
from airflow.executors.base_executor import BaseExecutor, CommandType
from airflow.models.taskinstance import TaskInstanceKey

# queue="default" is a special case since this is the base config default queue name,
Expand Down Expand Up @@ -78,15 +78,14 @@ def execute_async(
queue: str | None = None,
executor_config: Any | None = None,
) -> None:
if TYPE_CHECKING:
assert self.client

self.validate_airflow_tasks_run_command(command)

def airflow_run():
return subprocess.check_call(command, close_fds=True)

if not self.client:
raise AirflowException(NOT_STARTED_MESSAGE)

resources = None
if queue not in _UNDEFINED_QUEUES:
scheduler_info = self.client.scheduler_info()
Expand All @@ -102,8 +101,9 @@ def airflow_run():
self.futures[future] = key # type: ignore

def _process_future(self, future: Future) -> None:
if not self.futures:
raise AirflowException(NOT_STARTED_MESSAGE)
if TYPE_CHECKING:
assert self.futures

if future.done():
key = self.futures[future]
if future.exception():
Expand All @@ -117,23 +117,25 @@ def _process_future(self, future: Future) -> None:
self.futures.pop(future)

def sync(self) -> None:
if self.futures is None:
raise AirflowException(NOT_STARTED_MESSAGE)
if TYPE_CHECKING:
assert self.futures

# make a copy so futures can be popped during iteration
for future in self.futures.copy():
self._process_future(future)

def end(self) -> None:
if not self.client:
raise AirflowException(NOT_STARTED_MESSAGE)
if self.futures is None:
raise AirflowException(NOT_STARTED_MESSAGE)
if TYPE_CHECKING:
assert self.client
assert self.futures

self.client.cancel(list(self.futures.keys()))
for future in as_completed(self.futures.copy()):
self._process_future(future)

def terminate(self):
if self.futures is None:
raise AirflowException(NOT_STARTED_MESSAGE)
if TYPE_CHECKING:
assert self.futures

self.client.cancel(self.futures.keys())
self.end()
81 changes: 42 additions & 39 deletions airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@
import time
from datetime import timedelta
from queue import Empty, Queue
from typing import Any, Dict, Optional, Sequence, Tuple
from typing import TYPE_CHECKING, Any, Dict, Optional, Sequence, Tuple

from kubernetes import client, watch
from kubernetes.client import Configuration, models as k8s
from kubernetes.client.rest import ApiException
from urllib3.exceptions import ReadTimeoutError

from airflow.exceptions import AirflowException, PodMutationHookException, PodReconciliationError
from airflow.executors.base_executor import NOT_STARTED_MESSAGE, BaseExecutor, CommandType
from airflow.executors.base_executor import BaseExecutor, CommandType
from airflow.kubernetes import pod_generator
from airflow.kubernetes.kube_client import get_kube_client
from airflow.kubernetes.kube_config import KubeConfig
Expand Down Expand Up @@ -96,9 +96,10 @@ def __init__(

def run(self) -> None:
"""Performs watching"""
if TYPE_CHECKING:
assert self.scheduler_job_id

kube_client: client.CoreV1Api = get_kube_client()
if not self.scheduler_job_id:
raise AirflowException(NOT_STARTED_MESSAGE)
while True:
try:
self.resource_version = self._run(
Expand Down Expand Up @@ -463,10 +464,10 @@ def clear_not_launched_queued_tasks(self, session=None) -> None:
is around, and if not, and there's no matching entry in our own
task_queue, marks it for re-execution.
"""
self.log.debug("Clearing tasks that have not been launched")
if not self.kube_client:
raise AirflowException(NOT_STARTED_MESSAGE)
if TYPE_CHECKING:
assert self.kube_client

self.log.debug("Clearing tasks that have not been launched")
query = session.query(TaskInstance).filter(
TaskInstance.state == State.QUEUED, TaskInstance.queued_by_job_id == self.job_id
)
Expand Down Expand Up @@ -555,6 +556,9 @@ def execute_async(
executor_config: Any | None = None,
) -> None:
"""Executes task asynchronously"""
if TYPE_CHECKING:
assert self.task_queue

if self.log.isEnabledFor(logging.DEBUG):
self.log.debug("Add task %s with command %s, executor_config %s", key, command, executor_config)
else:
Expand All @@ -571,8 +575,6 @@ def execute_async(
pod_template_file = executor_config.get("pod_template_file", None)
else:
pod_template_file = None
if not self.task_queue:
raise AirflowException(NOT_STARTED_MESSAGE)
self.event_buffer[key] = (State.QUEUED, self.scheduler_job_id)
self.task_queue.put((key, command, kube_executor_config, pod_template_file))
# We keep a temporary local record that we've handled this so we don't
Expand All @@ -581,22 +583,18 @@ def execute_async(

def sync(self) -> None:
"""Synchronize task state."""
if TYPE_CHECKING:
assert self.scheduler_job_id
assert self.kube_scheduler
assert self.kube_config
assert self.result_queue
assert self.task_queue
assert self.event_scheduler

if self.running:
self.log.debug("self.running: %s", self.running)
if self.queued_tasks:
self.log.debug("self.queued: %s", self.queued_tasks)
if not self.scheduler_job_id:
raise AirflowException(NOT_STARTED_MESSAGE)
if not self.kube_scheduler:
raise AirflowException(NOT_STARTED_MESSAGE)
if not self.kube_config:
raise AirflowException(NOT_STARTED_MESSAGE)
if not self.result_queue:
raise AirflowException(NOT_STARTED_MESSAGE)
if not self.task_queue:
raise AirflowException(NOT_STARTED_MESSAGE)
if not self.event_scheduler:
raise AirflowException(NOT_STARTED_MESSAGE)
self.kube_scheduler.sync()

last_resource_version = None
Expand Down Expand Up @@ -671,8 +669,9 @@ def sync(self) -> None:

def _check_worker_pods_pending_timeout(self):
"""Check if any pending worker pods have timed out"""
if not self.scheduler_job_id:
raise AirflowException(NOT_STARTED_MESSAGE)
if TYPE_CHECKING:
assert self.scheduler_job_id

timeout = self.kube_config.worker_pods_pending_timeout
self.log.debug("Looking for pending worker pods older than %d seconds", timeout)

Expand Down Expand Up @@ -706,10 +705,11 @@ def _check_worker_pods_pending_timeout(self):
self.kube_scheduler.delete_pod(pod.metadata.name, pod.metadata.namespace)

def _change_state(self, key: TaskInstanceKey, state: str | None, pod_id: str, namespace: str) -> None:
if TYPE_CHECKING:
assert self.kube_scheduler

if state != State.RUNNING:
if self.kube_config.delete_worker_pods:
if not self.kube_scheduler:
raise AirflowException(NOT_STARTED_MESSAGE)
if state != State.FAILED or self.kube_config.delete_worker_pods_on_failure:
self.kube_scheduler.delete_pod(pod_id, namespace)
self.log.info("Deleted pod: %s in namespace %s", str(key), str(namespace))
Expand Down Expand Up @@ -744,8 +744,9 @@ def adopt_launched_task(
:param pod: V1Pod spec that we will patch with new label
:param pod_ids: pod_ids we expect to patch.
"""
if not self.scheduler_job_id:
raise AirflowException(NOT_STARTED_MESSAGE)
if TYPE_CHECKING:
assert self.scheduler_job_id

self.log.info("attempting to adopt pod %s", pod.metadata.name)
pod.metadata.labels["airflow-worker"] = pod_generator.make_safe_label_value(self.scheduler_job_id)
pod_id = annotations_to_key(pod.metadata.annotations)
Expand All @@ -771,8 +772,9 @@ def _adopt_completed_pods(self, kube_client: client.CoreV1Api) -> None:

:param kube_client: kubernetes client for speaking to kube API
"""
if not self.scheduler_job_id:
raise AirflowException(NOT_STARTED_MESSAGE)
if TYPE_CHECKING:
assert self.scheduler_job_id

new_worker_id_label = pod_generator.make_safe_label_value(self.scheduler_job_id)
kwargs = {
"field_selector": "status.phase=Succeeded",
Expand All @@ -792,8 +794,9 @@ def _adopt_completed_pods(self, kube_client: client.CoreV1Api) -> None:
self.log.info("Failed to adopt pod %s. Reason: %s", pod.metadata.name, e)

def _flush_task_queue(self) -> None:
if not self.task_queue:
raise AirflowException(NOT_STARTED_MESSAGE)
if TYPE_CHECKING:
assert self.task_queue

self.log.debug("Executor shutting down, task_queue approximate size=%d", self.task_queue.qsize())
while True:
try:
Expand All @@ -805,8 +808,9 @@ def _flush_task_queue(self) -> None:
break

def _flush_result_queue(self) -> None:
if not self.result_queue:
raise AirflowException(NOT_STARTED_MESSAGE)
if TYPE_CHECKING:
assert self.result_queue

self.log.debug("Executor shutting down, result_queue approximate size=%d", self.result_queue.qsize())
while True:
try:
Expand All @@ -833,12 +837,11 @@ def _flush_result_queue(self) -> None:

def end(self) -> None:
"""Called when the executor shuts down"""
if not self.task_queue:
raise AirflowException(NOT_STARTED_MESSAGE)
if not self.result_queue:
raise AirflowException(NOT_STARTED_MESSAGE)
if not self.kube_scheduler:
raise AirflowException(NOT_STARTED_MESSAGE)
if TYPE_CHECKING:
assert self.task_queue
assert self.result_queue
assert self.kube_scheduler

self.log.info("Shutting down Kubernetes executor")
self.log.debug("Flushing task_queue...")
self._flush_task_queue()
Expand Down
39 changes: 21 additions & 18 deletions airflow/executors/local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@
from multiprocessing import Manager, Process
from multiprocessing.managers import SyncManager
from queue import Empty, Queue
from typing import Any, Optional, Tuple
from typing import TYPE_CHECKING, Any, Optional, Tuple

from setproctitle import getproctitle, setproctitle

from airflow import settings
from airflow.exceptions import AirflowException
from airflow.executors.base_executor import NOT_STARTED_MESSAGE, PARALLELISM, BaseExecutor, CommandType
from airflow.executors.base_executor import PARALLELISM, BaseExecutor, CommandType
from airflow.models.taskinstance import TaskInstanceKey, TaskInstanceStateType
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import State
Expand Down Expand Up @@ -245,8 +245,9 @@ def execute_async(
:param queue: Name of the queue
:param executor_config: configuration for the executor
"""
if not self.executor.result_queue:
raise AirflowException(NOT_STARTED_MESSAGE)
if TYPE_CHECKING:
assert self.executor.result_queue

local_worker = LocalWorker(self.executor.result_queue, key=key, command=command)
self.executor.workers_used += 1
self.executor.workers_active += 1
Expand Down Expand Up @@ -284,11 +285,11 @@ def __init__(self, executor: LocalExecutor):

def start(self) -> None:
"""Starts limited parallelism implementation."""
if not self.executor.manager:
raise AirflowException(NOT_STARTED_MESSAGE)
if TYPE_CHECKING:
assert self.executor.manager
assert self.executor.result_queue

self.queue = self.executor.manager.Queue()
if not self.executor.result_queue:
raise AirflowException(NOT_STARTED_MESSAGE)
self.executor.workers = [
QueuedLocalWorker(self.queue, self.executor.result_queue)
for _ in range(self.executor.parallelism)
Expand All @@ -314,8 +315,9 @@ def execute_async(
:param queue: name of the queue
:param executor_config: configuration for the executor
"""
if not self.queue:
raise AirflowException(NOT_STARTED_MESSAGE)
if TYPE_CHECKING:
assert self.queue

self.queue.put((key, command))

def sync(self):
Expand Down Expand Up @@ -365,28 +367,29 @@ def execute_async(
executor_config: Any | None = None,
) -> None:
"""Execute asynchronously."""
if not self.impl:
raise AirflowException(NOT_STARTED_MESSAGE)
if TYPE_CHECKING:
assert self.impl

self.validate_airflow_tasks_run_command(command)

self.impl.execute_async(key=key, command=command, queue=queue, executor_config=executor_config)

def sync(self) -> None:
"""Sync will get called periodically by the heartbeat method."""
if not self.impl:
raise AirflowException(NOT_STARTED_MESSAGE)
if TYPE_CHECKING:
assert self.impl

self.impl.sync()

def end(self) -> None:
"""
Ends the executor.
:return:
"""
if not self.impl:
raise AirflowException(NOT_STARTED_MESSAGE)
if not self.manager:
raise AirflowException(NOT_STARTED_MESSAGE)
if TYPE_CHECKING:
assert self.impl
assert self.manager

self.log.info(
"Shutting down LocalExecutor"
"; waiting for running tasks to finish. Signal again if you don't want to wait."
Expand Down