Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,18 @@ class CeleryKubernetesExecutor(BaseExecutor):
def kubernetes_queue(self) -> str:
return conf.get("celery_kubernetes_executor", "kubernetes_queue")

def __init__(self, celery_executor: CeleryExecutor, kubernetes_executor: KubernetesExecutor):
def __init__(
self,
celery_executor: CeleryExecutor | None = None,
kubernetes_executor: KubernetesExecutor | None = None,
):
if AIRFLOW_V_3_0_PLUS or not kubernetes_executor or not celery_executor:
raise RuntimeError(
f"{self.__class__.__name__} does not support Airflow 3.0+. See "
"https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/executor/index.html#using-multiple-executors-concurrently"
" how to use multiple executors concurrently."
)

super().__init__()
self._job_id: int | str | None = None
self.celery_executor = celery_executor
Expand Down Expand Up @@ -130,13 +141,6 @@ def job_id(self, value: int | str | None) -> None:

def start(self) -> None:
"""Start celery and kubernetes executor."""
if AIRFLOW_V_3_0_PLUS:
raise RuntimeError(
f"{self.__class__.__name__} does not support Airflow 3.0+. See "
"https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/executor/index.html#using-multiple-executors-concurrently"
" how to use multiple executors concurrently."
)

self.celery_executor.start()
self.kubernetes_executor.start()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,18 @@ class LocalKubernetesExecutor(BaseExecutor):

KUBERNETES_QUEUE = conf.get("local_kubernetes_executor", "kubernetes_queue")

def __init__(self, local_executor: LocalExecutor, kubernetes_executor: KubernetesExecutor):
def __init__(
self,
local_executor: LocalExecutor | None = None,
kubernetes_executor: KubernetesExecutor | None = None,
):
if AIRFLOW_V_3_0_PLUS or not local_executor or not kubernetes_executor:
raise RuntimeError(
f"{self.__class__.__name__} does not support Airflow 3.0+. See "
"https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/executor/index.html#using-multiple-executors-concurrently"
" how to use multiple executors concurrently."
)

super().__init__()
self._job_id: int | str | None = None
self.local_executor = local_executor
Expand Down Expand Up @@ -120,13 +131,6 @@ def job_id(self, value: int | str | None) -> None:

def start(self) -> None:
"""Start local and kubernetes executor."""
if AIRFLOW_V_3_0_PLUS:
raise RuntimeError(
f"{self.__class__.__name__} does not support Airflow 3.0+. See "
"https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/executor/index.html#using-multiple-executors-concurrently"
" how to use multiple executors concurrently."
)

self.log.info("Starting local and Kubernetes Executor")
self.local_executor.start()
self.kubernetes_executor.start()
Expand Down