Skip to content

Conversation

@ihnokim
Copy link
Contributor

@ihnokim ihnokim commented Apr 18, 2025

closes: #48667
related: #48667

Changes

  • The default pod_template's AIRFLOW__CORE__EXECUTOR value, previously hardcoded to LocalExecutor, now respects the setting from values.yaml.

Why we need?

There is an issue when using multiple executors: if a specific executor is explicitly defined in a task parameter, the pod created by the pod_template may fail to run properly. This happens because the default pod_template generated by the Helm chart hardcodes the executor to LocalExecutor via the AIRFLOW__CORE__EXECUTOR environment variable.

For example, in an environment where KubernetesExecutor is used, if a task is defined in the DAG with the executor="KubernetesExecutor" parameter, the pod will still have AIRFLOW__CORE__EXECUTOR=LocalExecutor in its environment variables. This leads to the following error:

airflow.exceptions.UnknownExecutorException: Unknown executor being loaded: KubernetesExecutor

To resolve this and improve the user experience for Helm chart users, I’ve updated the default pod_template so that the AIRFLOW__CORE__EXECUTOR value matches the executor defined in values.yaml.

After this change, I confirmed that tasks using KubernetesExecutor run successfully with the following DAG:

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

default_args = {
    'start_date': datetime(2023, 1, 1),
}

with DAG(
    dag_id='kubernetes_hello_world',
    default_args=default_args,
    schedule_interval=None,
    catchup=False,
    tags=['test'],
) as dag:
    hello_task = BashOperator(
        task_id='print_hello',
        executor='KubernetesExecutor',  # Warning!
        bash_command='echo "Hello, World from KubernetesExecutor!"'
    )
image

Let me know if any additional clarification is needed!


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@ihnokim ihnokim force-pushed the bugfix/default-pod-template-executor branch 4 times, most recently from ef2edfa to f00caa2 Compare April 23, 2025 12:19
@potiuk
Copy link
Member

potiuk commented Apr 23, 2025

@kaxil @jedcunningham -> this one should be included in the future chart with 3.0.0 support as well.

Copy link
Member

@kaxil kaxil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is intentionally LocalExecutor, it is local to that pod -- it has been like that since forever (probably since KE became a thing)!

@kaxil
Copy link
Member

kaxil commented Apr 23, 2025

@potiuk 's comment on the issue is still accurate:

#48667 (comment)

But I think this is quite fundamental change in the way how K8S executor works. It's always worked in the way (from what I remember) - that when POD was run by K8S executor, the LocalExecutor has been set unconditonally - because fundamentally we did not want to again trigger the executor when run - so the task in K8S executor is executed effectively by the LocalExecutor.

Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. That one needs to be looked at - the verifiication of the executor should be fixed in order to make things work - not the executor configured in POD

@o-nikolas
Copy link
Contributor

I'd push back on this, perhaps I need to hear more about how Helm is operating differently than a normal airflow deployment. But in normal cases Airlfow expects to operate with the same configuration on all hosts (until multi-team of course).

If a pod is being started as a result of an executor triggering a pod to run a task, the core.executor field will not do anything. In AF2 the airflow tasks run ... command will use the LocalTaskRunner, or in AF3 a workload will be executed via the correct execution_time module.

If the context is something else than the above, I'd need to hear more.

@ihnokim ihnokim force-pushed the bugfix/default-pod-template-executor branch from f00caa2 to 904c245 Compare April 24, 2025 01:21
@ihnokim
Copy link
Contributor Author

ihnokim commented Apr 24, 2025

@kaxil @potiuk I respect your opinions, but I believe this might add to the confusion for Helm chart users. According to what you’re saying, setting the AIRFLOW__CORE__EXECUTOR value inside a pod becomes essentially meaningless. If that's the case, are you suggesting that users (and this PR) should completely remove this value instead of overriding it?

Scenario: The user has configured the global Airflow configuration to use "KubernetesExecutor,CeleryExecutor".
Error in Pod: airflow.exceptions.UnknownExecutorException: Unknown executor being loaded: KubernetesExecutor

[CASE1] "AIRFLOW__CORE__EXECUTOR=KubernetesExecutor,CeleryExecutor" in Pod
User: "Oh, it’s correctly set just as I configured it! The pod is starting properly! Great!"

[CASE2] "AIRFLOW__CORE__EXECUTOR=LocalExecutor" in Pod
User: "Hmm, the value is different from what I configured. Is this why the pod isn’t starting properly? I guess I need to modify the pod template to use KubernetesExecutor or CeleryExecutor... Wait, is it even safe to change it like this?"

Personally, as @o-nikolas mentioned, I believe that configuration values managed by Airflow should be applied consistently across all components.

As noted in issue #48667 and confirmed through testing, even when a pod is launched using the KubernetesExecutor, the task still behaves as if it's executed with the LocalExecutor, regardless of what value is passed to AIRFLOW__CORE__EXECUTOR. The problem arises during the validation phase when the user explicitly specifies a particular executor for the task via the executor parameter because Pod is hardcoded to expect LocalExecutor, rather than referencing the global configuration. In other words, the validation logic is assuming that a consistent global configuration is in place, just as @o-nikolas pointed out.

To summarize: updating this PR to follow the global configuration instead of using a hardcoded value doesn't break actual task execution. It simply ensures that the validation logic behaves as expected. Even after this PR is merged, Helm chart users can still freely override AIRFLOW__CORE__EXECUTOR (to one of the globally configured executors) if they want. The default value is just being changed to improve out-of-the-box usability, reducing the need for manual tweaks.

If your intention is to prevent users from modifying this value at all, I can update the pod template to remove the environment variable entirely. (+ fixing validation part)

@kaxil
Copy link
Member

kaxil commented Apr 24, 2025

If the value provided to it is meaningless in the pod, why set it at all?

@kaxil
Copy link
Member

kaxil commented Apr 24, 2025

And if we are certainly sure to delete it or change it, we should do the same for the pod_template_files example in K8s provider:

spec:
containers:
- env:
- name: AIRFLOW__CORE__EXECUTOR
value: LocalExecutor

containers:
- env:
- name: AIRFLOW__CORE__EXECUTOR
value: LocalExecutor

containers:
- env:
- name: AIRFLOW__CORE__EXECUTOR
value: LocalExecutor

--

We should also add some test

@kaxil
Copy link
Member

kaxil commented Apr 24, 2025

I think the initial intention with this was just to run airflow task run with --local flag but instead added LocalExecuto, check

env["AIRFLOW__CORE__EXECUTOR"] = "LocalExecutor"

which may have changed somewhere; I am still digging into the code and history. This change might break the Helm chart with KE on <Airflow 3.

@kaxil
Copy link
Member

kaxil commented Apr 24, 2025

OK, I see what's going on. That LocalExecutor should never have been required. We already pass --local in the CLI. But the bug was that we showed a different command for K8s Rendered Template on the UI (which does not include local).

From a test with the Helm Chart with KE and 2.10.5.

Containers:
  base:
    Container ID:  docker://5861ce7cefaf3359356060379033c496712187ef5dd84cb8644ca4766fe1ed24
    Image:         apache/airflow:2.10.5
    Image ID:      docker-pullable://apache/airflow@sha256:6499a680a93463846d3a6be980e85d601dc97b0d81e82eed9ef5e5cb9da31b79
    Port:          <none>
    Host Port:     <none>
    Args:
      airflow
      tasks
      run
      example_bash_operator
      runme_0
      manual__2025-04-24T07:57:01+00:00
      --local
      --subdir
      /home/airflow/.local/lib/python3.12/site-packages/airflow/example_dags/example_bash_operator.py

But the UI shows

image

Tested it with helm install $RELEASE_NAME apache-airflow/airflow --namespace $NAMESPACE -f values.yaml

values.yaml:

executor: KubernetesExecutor
env:
  - name: AIRFLOW__CORE__LOAD_EXAMPLES
    value: "True"
  - name: AIRFLOW__CORE__EXECUTOR
    value: "KubernetesExecutor"
podTemplate: |-
  apiVersion: v1
  kind: Pod
  metadata:
    name: placeholder-name
    labels:
      tier: airflow
      component: worker
      release: example-release
    annotations:
      cluster-autoscaler.kubernetes.io/safe-to-evict: "false"
  spec:
    initContainers:
    containers:
      - envFrom:
          []
        env:
          - name: AIRFLOW__CORE__FERNET_KEY
            valueFrom:
              secretKeyRef:
                name: example-release-fernet-key
                key: fernet-key
          - name: AIRFLOW_HOME
            value: /opt/airflow
          # For Airflow <2.3, backward compatibility; moved to [database] in 2.3
          - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
            valueFrom:
              secretKeyRef:
                name: example-release-metadata
                key: connection
          - name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
            valueFrom:
              secretKeyRef:
                name: example-release-metadata
                key: connection
          - name: AIRFLOW_CONN_AIRFLOW_DB
            valueFrom:
              secretKeyRef:
                name: example-release-metadata
                key: connection
          - name: AIRFLOW__WEBSERVER__SECRET_KEY
            valueFrom:
              secretKeyRef:
                name: example-release-webserver-secret-key
                key: webserver-secret-key
        image: apache/airflow:2.10.5
        imagePullPolicy: IfNotPresent
        securityContext:
          allowPrivilegeEscalation: false
          capabilities:
            drop:
              - ALL
        name: base
        volumeMounts:
          - mountPath: "/opt/airflow/logs"
            name: logs
          - name: config
            mountPath: "/opt/airflow/airflow.cfg"
            subPath: airflow.cfg
            readOnly: true
          - name: config
            mountPath: "/opt/airflow/config/airflow_local_settings.py"
            subPath: airflow_local_settings.py
            readOnly: true
    restartPolicy: Never
    securityContext:
      runAsUser: 50000
      fsGroup: 0
    terminationGracePeriodSeconds: 600
    serviceAccountName: "example-release-airflow-worker"
    volumes:
    - emptyDir:
        {}
      name: logs
    - configMap:
        name: example-release-config
      name: config

That --local is passed from the following place in the Scheduler, which then sends it to all the executors:

https://github.com/apache/airflow/blob/2.10.5/airflow/jobs/scheduler_job_runner.py#L685C13-L688

            command = ti.command_as_list(
                local=True,
                pickle_id=ti.dag_model.pickle_id,
            )

def queue_command(
self,
task_instance: TaskInstance,
command: CommandType,
priority: int = 1,
queue: str | None = None,
):
"""Queues command to task."""
if task_instance.key not in self.queued_tasks:
self.log.info("Adding to queue: %s", command)
self.queued_tasks[task_instance.key] = (command, priority, queue, task_instance)

self.execute_async(key=key, command=command, queue=queue, executor_config=executor_config)

and used by KE here

https://github.com/apache/airflow/blob/main/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py#L282

https://github.com/apache/airflow/blob/main/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py#L361-L365

which then generates pod spec here:

But the UI code is generated directly using args=task_instance.command_as_list() instead of args=task_instance.command_as_list(local=True) (like the scheduler), hence the mis-match

@kaxil
Copy link
Member

kaxil commented Apr 24, 2025

That said, the error reported in the GH issue around validation, does not need to happen when the task is already in the pod i.e. there is no needed to load or create a Executor:

│ Traceback (most recent call last):                                                                                                                                                                                                                                           │
│   File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/dag.py", line 852, in validate_executor_field                                                                                                                                                       │
│     ExecutorLoader.lookup_executor_name_by_str(task.executor)                                                                                                                                                                                                                │
│   File "/home/airflow/.local/lib/python3.12/site-packages/airflow/executors/executor_loader.py", line 215, in lookup_executor_name_by_str                                                                                                                                    │
│     raise UnknownExecutorException(f"Unknown executor being loaded: {executor_name_str}")                                                                                                                                                                                    │
│ airflow.exceptions.UnknownExecutorException: Unknown executor being loaded: KubernetesExecutor

@ihnokim
Copy link
Contributor Author

ihnokim commented Apr 24, 2025

@kaxil Oh, thank you for thoroughly reviewing the history and carefully analyzing the original intention behind the code. Based on your insights, I believe the following changes would be appropriate—could you please confirm if this approach makes sense?

  • Remove executor-related environment variables from the default K8s pod template and example files.
  • Resolve the mismatch issue where the UI displays a command that differs from the actual one executed by the pod.
  • Address the validation failure caused by the executor in tasks running on worker pods.
  • Add appropriate tests.

P.S. This is my first contribution to this project, so I hope you'll excuse my rough edges. Regarding the scope of testing—do you have any guidance on what would be expected? I think I can follow the guidelines for unit and K8s tests, but I'm a bit unfamiliar with how to write tests for the UI part. Would providing a screenshot to demonstrate that the mismatch issue has been resolved be acceptable?

@kaxil
Copy link
Member

kaxil commented Apr 24, 2025

Yeah that sounds about right. The UI thing is no longer an issue in Airflow 3 as we don't use the airflow task run command anymore, so you can ignore it.

@kaxil Oh, thank you for thoroughly reviewing the history and carefully analyzing the original intention behind the code. Based on your insights, I believe the following changes would be appropriate—could you please confirm if this approach makes sense?

  • Remove executor-related environment variables from the default K8s pod template and example files.
  • Resolve the mismatch issue where the UI displays a command that differs from the actual one executed by the pod.
  • Address the validation failure caused by the executor in tasks running on worker pods.
  • Add appropriate tests.

Yeah, feel free to create separate PRs for them. One for Helm Chart and one for K8s provider.

  • Remove executor-related environment variables from the default K8s pod template and example files.

I am glad that you are contributing, definitely here to help wherever needed. Some test in Helm Chart gives a good idea: Example https://github.com/apache/airflow/blob/main/helm-tests/tests/helm_tests/airflow_aux/test_pod_template_file.py

P.S. This is my first contribution to this project, so I hope you'll excuse my rough edges. Regarding the scope of testing—do you have any guidance on what would be expected? I think I can follow the guidelines for unit and K8s tests, but I'm a bit unfamiliar with how to write tests for the UI part. Would providing a screenshot to demonstrate that the mismatch issue has been resolved be acceptable?

@o-nikolas
Copy link
Contributor

Sorry folks I've been out sick since Wednesday!

Glad you did a deep dive @kaxil and you are now convinced 😀 I was absolutely certain it was not needed for Airflow Executor workflow, which I've worked quite a bit on!

@ihnokim

could you please confirm if this approach makes sense?

Remove executor-related environment variables from the default K8s pod template and example files.

I still think Airflow configuration should be as consistent as possible across all nodes/workers/pods executing it, personally. But happy to commit if I'm out voted on that.

Resolve the mismatch issue where the UI displays a command that differs from the actual one executed by the pod.
Address the validation failure caused by the executor in tasks running on worker pods.

To me the code is nice and simple as is, with no branching logic, and we also get one last check that we're not launching a task for which there is no executor configured to take it, which is a critical piece of multi exec config (so tasks don't get stranded in scheduled state). As long as the config is the same (as we expect) everything functions correctly. Which to me is the real fix, But again, happy to commit if I'm out voted on that one.

Add appropriate tests.

@kaxil
Copy link
Member

kaxil commented Apr 28, 2025

I am fine with that, no strong opinion :)

I still think Airflow configuration should be as consistent as possible across all nodes/workers/pods executing it, personally. But happy to commit if I'm out voted on that.

@ihnokim ihnokim force-pushed the bugfix/default-pod-template-executor branch from e2cc84f to 8cf45bc Compare April 28, 2025 22:24
@kaxil kaxil enabled auto-merge (squash) April 29, 2025 09:39
@o-nikolas
Copy link
Contributor

Hey @kaxil FYI I saw that you enabled auto merge on this one (assuming you were happy with it once it went green) but you also have changes requested which is blocking it's merge (same as well @potiuk).

@potiuk potiuk disabled auto-merge May 3, 2025 05:09
Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @kaxil FYI I saw that you enabled auto merge on this one (assuming you were happy with it once it went green) but you also have changes requested which is blocking it's merge (same as well @potiuk).

Yeah. I read the explanation and now it's a;ll clear - I guess we need to re-learn some of the things that we took for granted in Airflow 2 :) .... And yeah I am fine either way - but consistency is likely more important than not having the executor information at all.

That said - I just I have another question, however..

Our helm chart supports still both Airlfow 2 and Airflow 3 .. Is not that change handling only Airflow 3 case? Should we make it:

  • if Airflow 2 -> LocalExecutor
  • else > executor

?

Or am I missing something ?

@ihnokim ihnokim force-pushed the bugfix/default-pod-template-executor branch from 8cf45bc to 9f2efc5 Compare May 4, 2025 01:57
@o-nikolas
Copy link
Contributor

Our helm chart supports still both Airlfow 2 and Airflow 3 .. Is not that change handling only Airflow 3 case? Should we make it:

* if Airflow 2 -> LocalExecutor

* else > executor

?

Or am I missing something ?

Under Airflow 2 you should not need to specifically set core.executor=LocalExecutor either, the tasks will run on the local task runner either way because the --local is passed to the workers already (see @kaxil's investigation). That is all the same for Airflow 2 as well. But it also wont cause any harm if you'd like to set it to LocalExecutor in the Airflow 2 branch (other than, again, there is no need and config should be consistent everywhere).

@kaxil kaxil self-requested a review May 6, 2025 13:30
@github-actions
Copy link

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Jun 21, 2025
@o-nikolas
Copy link
Contributor

Hey folks, still interested in seeing this one merged. @potiuk does the answer above satisfy your question? Looks like everyone is approved and ready to merge if we can get the request for changes removed.

@o-nikolas o-nikolas removed the stale Stale PRs per the .github/workflows/stale.yml policy file label Jun 23, 2025
@potiuk
Copy link
Member

potiuk commented Jun 23, 2025

Hey folks, still interested in seeing this one merged. @potiuk does the answer above satisfy your question? Looks like everyone is approved and ready to merge if we can get the request for changes removed.

Ah sure - dismissed my review. I missed it. Sorry

@potiuk potiuk force-pushed the bugfix/default-pod-template-executor branch from 9f2efc5 to 6b8667e Compare June 23, 2025 17:33
@potiuk
Copy link
Member

potiuk commented Jun 23, 2025

I rebased it now - just in case.

@kaxil kaxil merged commit 5066561 into apache:main Jun 23, 2025
68 checks passed
@boring-cyborg
Copy link

boring-cyborg bot commented Jun 23, 2025

Awesome work, congrats on your first merged pull request! You are invited to check our Issue Tracker for additional contributions.

@ihnokim
Copy link
Contributor Author

ihnokim commented Jun 23, 2025

@o-nikolas @kaxil @potiuk

Thank you so much for your thoughtful and patient reviews — especially while I was away on vacation and not as responsive as I would have liked.

I'm incredibly grateful for the opportunity to make my very first contribution to Airflow, a project I truly love. Your guidance made that possible.

This has been such a meaningful experience for me, and I’m more motivated than ever to keep learning and continue contributing to the Airflow community. Looking forward to many more!

@o-nikolas
Copy link
Contributor

Glad to hear that @ihnokim ! Sorry it took so long to get this one merged and welcome to the community ☺️

@kaxil
Copy link
Member

kaxil commented Jun 24, 2025

Thanks for your contribution @ihnokim . Hope you had a good vacation 🏖️

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:helm-chart Airflow Helm Chart

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Task execution failure with multiple executors

4 participants