Skip to content

Conversation

@amoghrajesh
Copy link
Contributor

@amoghrajesh amoghrajesh commented May 28, 2025

closes: #50423

Why?

Airflow 2 had support for user impersonation: https://airflow.apache.org/docs/apache-airflow/stable/security/workload.html. Quoting from docs:

Airflow has the ability to impersonate a unix user while running task instances based on the task’s run_as_user parameter, which takes a user’s name.

The intention here is to de-elevate the user running the task to reduce priviliges from the process / worker launching the process which runs as root. We can configure the task to impersonate as an user with lesser priviliges and control the behaviour of the tasks running for a more secure task run.

Quoting one of the use case from one of the airflow users too:

I suspect there might be something wrong with user impersonation (run_as_user) in 3.0.0+ with docker (tested on 3.0.1rc1). My setup uses an overlay filesystem so I can fake-interact with a shared network drive. When I connect to the docker cli, I can do sudo -u data_user -i and create/delete files without problem. However, the celery worker encounters a permission error when running a task that writes to the network drive as that very same data_user. Does anyone have any idea what might be the reason? Anything I can try to pinpoint the issue? In 2.10.5 my setup worked fine. The difference in configurations comes down to changes to the "official" docker-compose from 2.x to 3.x (on top of which my own additions, which didn't change, are included).

https://apache-airflow.slack.com/archives/CCQB40SQJ/p1746728794387939

Implementation

Airflow 2 essentially did: sudo -u user "your_bash_command_here".

For airflow 3, we should do something simular, basically run the task runner running the workload as the provided run_as_user.

  • Introduced a helper method to attempt impersonation,
    • Uses pwd.getpwnam() to resolve the UID and GID of the user.
    • Applies os.setgid() followed by os.setuid() to apply privileges.
    • Handles KeyError, PermissionError.
    The order of setgid() → setuid() is intentional. Once a user is dropped to non-root via setuid, regaining privileges is impossible.

  • In the task runner startup, we check if the run_as_user is set, if not check if the config for default_impersonation is set: https://airflow.apache.org/docs/apache-airflow/stable/security/workload.html#default-impersonation.

  • If neither is set, continues with the current user. (root usually, for CE atleast)

Testing

Intention is to run airflow as "root" and switch to a lesser privileged user: "airflowuser". We will try and use a user that cannot list some files like /root/airflow/airflow.cfg intentionally.

Setup for testing

  1. Run airflow with celery executor
  2. Create a "airflowuser": sudo useradd -m -s /bin/bash airflowuser
  3. Switch to "airflowuser" and ensure that the privilieges are lesser, for eg:
root@1b92a329d570:/opt/airflow# sudo -u airflowuser -i
airflowuser@1b92a329d570:~$ namei -l /root/airflow/airflow.cfg
f: /root/airflow/airflow.cfg
drwxr-xr-x root root /
drwx------ root root root
                     airflow - Permission denied
  1. Run celery worker with root now:
root@1b92a329d570:/opt/airflow# airflow celery worker
2025-05-29 07:41:56.804540 [info     ] starting stale bundle cleanup process [airflow.providers.celery.cli.celery_command]
[2025-05-29 07:41:56 +0000] [418] [INFO] Starting gunicorn 23.0.0
[2025-05-29 07:41:56 +0000] [418] [INFO] Listening at: http://[::]:8793 (418)
[2025-05-29 07:41:56 +0000] [418] [INFO] Using worker: sync
[2025-05-29 07:41:56 +0000] [420] [INFO] Booting worker with pid: 420
[2025-05-29 07:41:56 +0000] [421] [INFO] Booting worker with pid: 421
2025-05-29 07:41:58.159935 [warning  ] You're running the worker with superuser privileges: this is
absolutely not recommended!

Please specify a different user using the --uid option.

User information: uid=0 euid=0 gid=0 egid=0
 [py.warnings] category=SecurityWarning filename=/usr/local/lib/python3.9/site-packages/celery/platforms.py lineno=84

DAG:

from airflow import DAG
from datetime import datetime

from airflow.providers.standard.operators.bash import BashOperator

with DAG(
    dag_id="check_cfg_file_access",
    schedule=None,
    catchup=False,
) as dag:

    check_access = BashOperator(
        task_id="ls_root_cfg",
        bash_command="ls -l /root/airflow/airflow.cfg",
    )

Running this without run_as_user, can access that file:

image

Test 1: Check if a task can run with run_as_user provided at task level.

DAG Used:

from airflow import DAG
from datetime import datetime

from airflow.providers.standard.operators.bash import BashOperator

with DAG(
    dag_id="check_cfg_file_access",
    schedule=None,
    catchup=False,
) as dag:

    check_access = BashOperator(
        task_id="ls_root_cfg",
        bash_command="ls -l /root/airflow/airflow.cfg",
    )

extract is running with "airflowuser" and airflowuser is present.

Errors out, logs:
image

[2025-05-29, 13:18:01] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-05-29, 1[3](http://localhost:28080/dags/check_cfg_file_access/runs/manual__2025-05-29T07:48:00.771934+00:00/tasks/ls_root_cfg?try_number=1#3):18:01] INFO - Filling up the DagBag from /files/dags/bashoperator-trying-ls.py: source="airflow.models.dagbag.DagBag"
[2025-05-29, 13:18:01] INFO - Running task as impersonated user: impersonated_user="airflowuser": source="task"
[2025-05-29, 13:18:01] INFO - Tmp dir root location: /tmp: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-05-29, 13:18:01] INFO - Running command: ['/usr/bin/bash', '-c', 'ls -l /root/airflow/airflow.cfg']: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-05-29, 13:18:01] INFO - Output:: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-05-29, 13:18:01] INFO - ls: cannot access '/root/airflow/airflow.cfg': Permission denied: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-05-29, 13:18:01] INFO - Command exited with return code 2: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-05-29, 13:18:01] ERROR - Task failed with exception: source="task"
AirflowException: Bash command failed. The command returned a non-zero exit code 2.
File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py", line 90[4](http://localhost:28080/dags/check_cfg_file_access/runs/manual__2025-05-29T07:48:00.771934+00:00/tasks/ls_root_cfg?try_number=1#4) in run

File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py", line 1196 in _execute_task

File "/opt/airflow/task-sdk/src/airflow/sdk/bases/operator.py", line 397 in wrapper

File "/opt/airflow/providers/standard/src/airflow/providers/standard/operators/bash.py", line 233 in execute

Test 2: Do not provide run_as_user but override with the conf instead: "airflowuser" itself

Set env in worker:

export AIRFLOW__CORE__DEFAULT_IMPERSONATION="airflowuser"

DAG Used:

from airflow import DAG
from datetime import datetime

from airflow.providers.standard.operators.bash import BashOperator

with DAG(
    dag_id="check_cfg_file_access",
    schedule=None,
    catchup=False,
) as dag:

    check_access = BashOperator(
        task_id="ls_root_cfg",
        bash_command="ls -l /root/airflow/airflow.cfg",
    )

Same error as before:

image

Logs:

[2025-05-29, 13:21:13] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-05-29, 1[3](http://localhost:28080/dags/check_cfg_file_access/runs/manual__2025-05-29T07:51:12.790374+00:00/tasks/ls_root_cfg?try_number=1#3):21:13] INFO - Filling up the DagBag from /files/dags/bashoperator-trying-ls.py: source="airflow.models.dagbag.DagBag"
[2025-05-29, 13:21:13] INFO - Running task as impersonated user: impersonated_user="airflowuser": source="task"
[2025-05-29, 13:21:13] INFO - Tmp dir root location: /tmp: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-05-29, 13:21:13] INFO - Running command: ['/usr/bin/bash', '-c', 'ls -l /root/airflow/airflow.cfg']: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-05-29, 13:21:13] INFO - Output:: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-05-29, 13:21:13] INFO - ls: cannot access '/root/airflow/airflow.cfg': Permission denied: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-05-29, 13:21:13] INFO - Command exited with return code 2: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-05-29, 13:21:13] ERROR - Task failed with exception: source="task"
AirflowException: Bash command failed. The command returned a non-zero exit code 2.
File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py", line 90[4](http://localhost:28080/dags/check_cfg_file_access/runs/manual__2025-05-29T07:51:12.790374+00:00/tasks/ls_root_cfg?try_number=1#4) in run

File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py", line 1196 in _execute_task

File "/opt/airflow/task-sdk/src/airflow/sdk/bases/operator.py", line 397 in wrapper

File "/opt/airflow/providers/standard/src/airflow/providers/standard/operators/bash.py", line 233 in execute

Test 3: Provide run_as_user and in conf, to check which one is picked up

In worker, create new user: randomuser and set env to "airflowuser"

sudo useradd -m -s /bin/bash randomuser
export AIRFLOW__CORE__DEFAULT_IMPERSONATION="airflowuser"

DAG used:

from airflow import DAG
from datetime import datetime

from airflow.providers.standard.operators.bash import BashOperator

with DAG(
    dag_id="check_cfg_file_access",
    schedule=None,
    catchup=False,
) as dag:

    check_access = BashOperator(
        task_id="ls_root_cfg",
        bash_command="ls -l /root/airflow/airflow.cfg",
        run_as_user="randomuser"
    )

Random user picked up:
image

TODO:

  • Add tests in task sdk
  • Update docs if any
  • Figure out what happens in v2 for PermissionError or UserNotFound case and replicate here

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@amoghrajesh amoghrajesh requested review from ashb and kaxil as code owners May 28, 2025 11:40
@boring-cyborg boring-cyborg bot added area:API Airflow's REST/HTTP API area:task-sdk labels May 28, 2025
@amoghrajesh amoghrajesh added the area:task-execution-interface-aip72 AIP-72: Task Execution Interface (TEI) aka Task SDK label May 28, 2025
@amoghrajesh amoghrajesh added this to the Airflow 3.0.2 milestone May 28, 2025
@amoghrajesh amoghrajesh force-pushed the user-impersonation branch from 0260880 to 7122b9d Compare May 28, 2025 12:00
@amoghrajesh amoghrajesh added the backport-to-v3-1-test Mark PR with this label to backport to v3-1-test branch label May 28, 2025
@kaxil kaxil modified the milestones: Airflow 3.0.2, Airflow 3.0.3 May 30, 2025
@amoghrajesh amoghrajesh removed the backport-to-v3-1-test Mark PR with this label to backport to v3-1-test branch label May 30, 2025
# otherwise, group privileges may not be able to be fully dropped.

os.setgid(gid)
os.setuid(uid)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this work if Airflow worker is run with a sudo user instead of root?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah there's a limitation that the user has to be "root" in order to run this and I am trying to work on an alternative proposal due to that limitation. Ideas are welcome @codenamelxl

@amoghrajesh amoghrajesh self-assigned this Jun 2, 2025
@amoghrajesh
Copy link
Contributor Author

Closing in favour of: #51780

Reworking it there

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:API Airflow's REST/HTTP API area:task-execution-interface-aip72 AIP-72: Task Execution Interface (TEI) aka Task SDK area:task-sdk

Development

Successfully merging this pull request may close these issues.

Handle run_as_user / impersonation in the Supervisor

3 participants