Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/boring-cyborg.yml
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ labelPRBasedOnFilePath:
- airflow/kubernetes/**/*
- airflow/kubernetes_executor_templates/**/*
- airflow/executors/kubernetes_executor.py
- airflow/executors/celery_kubernetes_executor.py
- airflow/providers/celery/executors/celery_kubernetes_executor.py
- docs/apache-airflow/core-concepts/executor/kubernetes.rst
- docs/apache-airflow/core-concepts/executor/celery_kubernetes.rst
- docs/apache-airflow-providers-cncf-kubernetes/**/*
Expand Down
6 changes: 4 additions & 2 deletions airflow/cli/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def _check_value(self, action, value):
executor_cls, _ = ExecutorLoader.import_executor_cls(executor)
classes = ()
try:
from airflow.executors.celery_executor import CeleryExecutor
from airflow.providers.celery.executors.celery_executor import CeleryExecutor

classes += (CeleryExecutor,)
except ImportError:
Expand All @@ -77,7 +77,9 @@ def _check_value(self, action, value):
)
raise ArgumentError(action, message)
try:
from airflow.executors.celery_kubernetes_executor import CeleryKubernetesExecutor
from airflow.providers.celery.executors.celery_kubernetes_executor import (
CeleryKubernetesExecutor,
)

classes += (CeleryKubernetesExecutor,)
except ImportError:
Expand Down
2 changes: 1 addition & 1 deletion airflow/cli/commands/celery_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

from airflow import settings
from airflow.configuration import conf
from airflow.executors.celery_executor import app as celery_app
from airflow.providers.celery.executors.celery_executor import app as celery_app
from airflow.utils import cli as cli_utils
from airflow.utils.cli import setup_locations, setup_logging
from airflow.utils.serve_logs import serve_logs
Expand Down
2 changes: 1 addition & 1 deletion airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2023,7 +2023,7 @@ celery:
version_added: ~
type: string
example: ~
default: "airflow.executors.celery_executor"
default: "airflow.providers.celery.executors.celery_executor"
worker_concurrency:
description: |
The concurrency that will be used when starting workers with the
Expand Down
2 changes: 1 addition & 1 deletion airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -1037,7 +1037,7 @@ kubernetes_queue = kubernetes
# This section only applies if you are using the CeleryExecutor in
# ``[core]`` section above
# The app name that will be used by celery
celery_app_name = airflow.executors.celery_executor
celery_app_name = airflow.providers.celery.executors.celery_executor

# The concurrency that will be used when starting workers with the
# ``airflow celery worker`` command. This defines the number of task instances that
Expand Down
16 changes: 16 additions & 0 deletions airflow/executors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,19 @@
# specific language governing permissions and limitations
# under the License.
"""Executors."""
from __future__ import annotations

from airflow.utils.deprecation_tools import add_deprecated_classes

__deprecated_classes = {
"celery_executor": {
"app": "airflow.providers.celery.executors.celery_executor_utils.app",
"CeleryExecutor": "airflow.providers.celery.executors.celery_executor.CeleryExecutor",
},
"celery_kubernetes_executor": {
"CeleryKubernetesExecutor": "airflow.providers.celery.executors."
"celery_kubernetes_executor.CeleryKubernetesExecutor",
},
}

add_deprecated_classes(__deprecated_classes, __name__)
5 changes: 3 additions & 2 deletions airflow/executors/executor_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ class ExecutorLoader:
LOCAL_EXECUTOR: "airflow.executors.local_executor.LocalExecutor",
LOCAL_KUBERNETES_EXECUTOR: "airflow.executors.local_kubernetes_executor.LocalKubernetesExecutor",
SEQUENTIAL_EXECUTOR: "airflow.executors.sequential_executor.SequentialExecutor",
CELERY_EXECUTOR: "airflow.executors.celery_executor.CeleryExecutor",
CELERY_KUBERNETES_EXECUTOR: "airflow.executors.celery_kubernetes_executor.CeleryKubernetesExecutor",
CELERY_EXECUTOR: "airflow.providers.celery.executors.celery_executor.CeleryExecutor",
CELERY_KUBERNETES_EXECUTOR: "airflow.providers.celery."
"executors.celery_kubernetes_executor.CeleryKubernetesExecutor",
Comment on lines +63 to +65
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this is good for back-compat, but I wonder if Airflow should be so opinionated about executors that live in providers. For example, for the new AWS executors we are working on, would we add them here as well? I was assuming not. But I'm interested to hear input from others

Copy link
Member Author

@potiuk potiuk Jul 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Celery and Kubernetest yes - mostly because of tradition and being "vendor agnostic" should stay here.

While we are mviing them to providers, they will become still availabile in the core and pre-installed (we are adding both celery and kubernetes packages as pre-installed). In the way those executors are special. Always available, described in "apache-airflow" part of our documentaiton, not in "apache-airflow-providers-". I think it will stay like that "forever".

So in the sense, no - we are not going to add Amazon Executors in here. Even more - this has not been discussed yet, but I am going to propose to move "DASK_EXECUTOR" out of the core and out of this list as well - strictly to a new dask provider we are going to have. Precisely for the same reason.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yupp, I agree it's best for back compat to keep Celery and K8s. And I also agree we should move dask to it's own provider and drop it from this list as well.

DASK_EXECUTOR: "airflow.executors.dask_executor.DaskExecutor",
KUBERNETES_EXECUTOR: "airflow.executors.kubernetes_executor.KubernetesExecutor",
DEBUG_EXECUTOR: "airflow.executors.debug_executor.DebugExecutor",
Expand Down
8 changes: 8 additions & 0 deletions airflow/providers/celery/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@
Changelog
---------

3.3.0
.....

.. note::
This provider release is the first release that has Celery Executor and
Celery Kubernetes Executor moved from the core ``apache-airflow`` package to a Celery
provider package.

3.2.1
.....

Expand Down
36 changes: 36 additions & 0 deletions airflow/providers/celery/executors/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import packaging.version

from airflow.exceptions import AirflowOptionalProviderFeatureException

try:
from airflow import __version__ as airflow_version
except ImportError:
from airflow.version import version as airflow_version

base_version = packaging.version.parse(airflow_version).base_version

if packaging.version.parse(base_version) < packaging.version.parse("2.7.0"):
raise AirflowOptionalProviderFeatureException(
"Celery Executor from Celery Provider should only be used with Airflow 2.7.0+.\n"
f"This is Airflow {airflow_version} and Celery and CeleryKubernetesExecutor are "
f"available in the 'airflow.executors' package. You should not use "
f"the provider's executors in this version of Airflow."
)
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def __getattr__(name):
# celery_executor module without the time cost of its import and
# construction
if name == "app":
from airflow.executors.celery_executor_utils import app
from airflow.providers.celery.executors.celery_executor_utils import app

return app
raise AttributeError(f"module '{__name__}' has no attribute '{name}'")
Expand Down Expand Up @@ -99,7 +99,7 @@ def __init__(self):
self._sync_parallelism = conf.getint("celery", "SYNC_PARALLELISM")
if self._sync_parallelism == 0:
self._sync_parallelism = max(1, cpu_count() - 1)
from airflow.executors.celery_executor_utils import BulkStateFetcher
from airflow.providers.celery.executors.celery_executor_utils import BulkStateFetcher

self.bulk_state_fetcher = BulkStateFetcher(self._sync_parallelism)
self.tasks = {}
Expand All @@ -118,7 +118,7 @@ def _num_tasks_per_send_process(self, to_send_count: int) -> int:
return max(1, int(math.ceil(1.0 * to_send_count / self._sync_parallelism)))

def _process_tasks(self, task_tuples: list[TaskTuple]) -> None:
from airflow.executors.celery_executor_utils import execute_command
from airflow.providers.celery.executors.celery_executor_utils import execute_command

task_tuples_to_send = [task_tuple[:3] + (execute_command,) for task_tuple in task_tuples]
first_task = next(t[3] for t in task_tuples_to_send)
Expand All @@ -129,7 +129,7 @@ def _process_tasks(self, task_tuples: list[TaskTuple]) -> None:

key_and_async_results = self._send_tasks_to_celery(task_tuples_to_send)
self.log.debug("Sent all tasks.")
from airflow.executors.celery_executor_utils import ExceptionWithTraceback
from airflow.providers.celery.executors.celery_executor_utils import ExceptionWithTraceback

for key, _, result in key_and_async_results:
if isinstance(result, ExceptionWithTraceback) and isinstance(
Expand Down Expand Up @@ -165,7 +165,7 @@ def _process_tasks(self, task_tuples: list[TaskTuple]) -> None:
self.update_task_state(key, result.state, getattr(result, "info", None))

def _send_tasks_to_celery(self, task_tuples_to_send: list[TaskInstanceInCelery]):
from airflow.executors.celery_executor_utils import send_task_to_executor
from airflow.providers.celery.executors.celery_executor_utils import send_task_to_executor

if len(task_tuples_to_send) == 1 or self._sync_parallelism == 1:
# One tuple, or max one process -> send it in the main thread.
Expand Down Expand Up @@ -304,7 +304,7 @@ def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]:
:return: List of readable task instances for a warning message
"""
readable_tis = []
from airflow.executors.celery_executor_utils import app
from airflow.providers.celery.executors.celery_executor_utils import app

for ti in tis:
readable_tis.append(repr(ti))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import os
import subprocess
import traceback
import warnings
from concurrent.futures import ProcessPoolExecutor
from typing import TYPE_CHECKING, Any, Mapping, MutableMapping, Optional, Tuple

Expand All @@ -40,7 +41,7 @@
import airflow.settings as settings
from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG
from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.exceptions import AirflowException, RemovedInAirflow3Warning
from airflow.executors.base_executor import BaseExecutor
from airflow.models.taskinstance import TaskInstanceKey
from airflow.stats import Stats
Expand All @@ -66,7 +67,17 @@
else:
celery_configuration = DEFAULT_CELERY_CONFIG

app = Celery(conf.get("celery", "CELERY_APP_NAME"), config_source=celery_configuration)
celery_app_name = conf.get("celery", "CELERY_APP_NAME")
if celery_app_name == "airflow.executors.celery_executor":
warnings.warn(
"The celery.CELERY_APP_NAME configuration uses deprecated package name: "
"'airflow.executors.celery_executor'. "
"Change it to `airflow.providers.celery.executors.celery_executor`, and "
"update the `-app` flag in your Celery Health Checks "
"to use `airflow.providers.celery.executors.celery_executor.app`.",
RemovedInAirflow3Warning,
)
app = Celery(celery_app_name, config_source=celery_configuration)


@celery_import_modules.connect
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
from airflow.callbacks.base_callback_sink import BaseCallbackSink
from airflow.callbacks.callback_requests import CallbackRequest
from airflow.configuration import conf
from airflow.executors.celery_executor import CeleryExecutor
from airflow.executors.kubernetes_executor import KubernetesExecutor
from airflow.providers.celery.executors.celery_executor import CeleryExecutor
from airflow.utils.log.logging_mixin import LoggingMixin

if TYPE_CHECKING:
Expand Down
1 change: 1 addition & 0 deletions airflow/providers/celery/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ description: |

suspended: false
versions:
- 3.3.0
- 3.2.1
- 3.2.0
- 3.1.0
Expand Down
2 changes: 1 addition & 1 deletion chart/templates/workers/worker-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ spec:
{{- else }}
- sh
- -c
- CONNECTION_CHECK_MAX_COUNT=0 exec /entrypoint python -m celery --app airflow.executors.celery_executor.app inspect ping -d celery@$(hostname)
- CONNECTION_CHECK_MAX_COUNT=0 exec /entrypoint python -m celery --app airflow.providers.celery.executors.celery_executor.app inspect ping -d celery@$(hostname)
{{- end }}
{{- end }}
ports:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,12 @@ To check if the worker running on the local host is working correctly, run:

.. code-block:: bash

celery --app airflow.executors.celery_executor.app inspect ping -d celery@${HOSTNAME}
celery --app airflow.providers.celery.executors.celery_executor.app inspect ping -d celery@${HOSTNAME}

To check if the all workers in the cluster running is working correctly, run:

.. code-block:: bash

celery --app airflow.executors.celery_executor.app inspect ping
celery --app airflow.providers.celery.executors.celery_executor.app inspect ping

For more information, see: `Management Command-line Utilities (inspect/control) <https://docs.celeryproject.org/en/stable/userguide/monitoring.html#monitoring-control>`__ and `Workers Guide <https://docs.celeryproject.org/en/stable/userguide/workers.html>`__ in the Celery documentation.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
CeleryKubernetes Executor
=========================

The :class:`~airflow.executors.celery_kubernetes_executor.CeleryKubernetesExecutor` allows users
The :class:`~airflow.providers.celery.executors.celery_kubernetes_executor.CeleryKubernetesExecutor` allows users
to run simultaneously a ``CeleryExecutor`` and a ``KubernetesExecutor``.
An executor is chosen to run a task based on the task's queue.

Expand Down
Loading