-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Add user impersonation (run_as_user) support for task execution
#51780
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
amoghrajesh
merged 24 commits into
apache:main
from
astronomer:user-impersonation-reworked
Jun 18, 2025
Merged
Add user impersonation (run_as_user) support for task execution
#51780
amoghrajesh
merged 24 commits into
apache:main
from
astronomer:user-impersonation-reworked
Jun 18, 2025
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
The existing JSON Lines based approach had two major drawbacks
1. In the case of really large lines (in the region of 10 or 20MB) the python
line buffering could _sometimes_ result in a partial read
2. The JSON based approach didn't have the ability to add any metadata (such
as errors).
3. Not every message type/call-site waited for a response, which meant those
client functions could never get told about an error
One of the ways this line-based approach fell down was if you suddenly tried
to run 100s of triggers at the same time you would get an error like this:
```
Traceback (most recent call last):
File "/Users/ash/.local/share/uv/python/cpython-3.12.7-macos-aarch64-none/lib/python3.12/asyncio/streams.py", line 568, in readline
line = await self.readuntil(sep)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/ash/.local/share/uv/python/cpython-3.12.7-macos-aarch64-none/lib/python3.12/asyncio/streams.py", line 663, in readuntil
raise exceptions.LimitOverrunError(
asyncio.exceptions.LimitOverrunError: Separator is found, but chunk is longer than limit
```
The other way this caused problems was if you parse a large dag (as in one
with 20k tasks or more) the DagFileProcessor could end up getting a partial
read which would be invalid JSON.
This changes the communications protocol in in a couple of ways.
First off at the python level the separate send and receive methods in the
client/task side have been removed and replaced with a single `send()` that
sends the request, reads the response and raises an error if one is returned.
(But note, right now almost nothing in the supervisor side sets the error,
that will be a future PR.)
Secondly the JSON Lines approach has been changed from a line-based protocol
to a binary "frame" one. The protocol (which is the same for whichever side is
sending) is length-prefixed, i.e. we first send the length of the data as a
4byte big-endian integer, followed by the data itself. This should remove the
possibility of JSON parse errors due to reading incomplete lines
Finally the last change made in this PR is to remove the "extra" requests
socket/channel. Upon closer examination with this comms path I realised that
this socket is unnecessary: Since we are in 100% control of the client side we
can make use of the bi-directional nature of `socketpair` and save file
handles. This also happens to help the `run_as_user` feature which is
currently broken, as without extra config to `sudoers` file, `sudo` will close
all filehandles other than stdin, stdout, and stderr -- so by introducing this
change we make it easier to re-add run_as_user support.
In order to support this in the DagFileProcessor (as the fact that the proc
manager uses a single selector for multiple processes) means I have moved the
`on_close` callback to be part of the object we store in the `selector` object
in the supervisors, previoulsy it was the "on_read" callback, now we store a
tuple of `(on_read, on_close)` and on_close is called once universally.
This also changes the way comms are handled from the (async) TriggerRunner
process. Previously we had a sync+async lock, but that made it possible to end
up deadlocking things. The change now is to have `send` on
`TriggerCommsDecoder` "go back" to the async even loop via `async_to_sync`, so
that only async code deals with the socket, and we can use an async lock
(rather than the hybrid sync and async lock we tried before). This seems to
help the deadlock issue, but I'm not 100% sure it will remove it entirely, but
it makes it much much harder to hit - I've not been able to reprouce it with
this change
3 tasks
b09744b to
595b550
Compare
The existing JSON Lines based approach had two major drawbacks
1. In the case of really large lines (in the region of 10 or 20MB) the python
line buffering could _sometimes_ result in a partial read
2. The JSON based approach didn't have the ability to add any metadata (such
as errors).
3. Not every message type/call-site waited for a response, which meant those
client functions could never get told about an error
One of the ways this line-based approach fell down was if you suddenly tried
to run 100s of triggers at the same time you would get an error like this:
```
Traceback (most recent call last):
File "/Users/ash/.local/share/uv/python/cpython-3.12.7-macos-aarch64-none/lib/python3.12/asyncio/streams.py", line 568, in readline
line = await self.readuntil(sep)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/ash/.local/share/uv/python/cpython-3.12.7-macos-aarch64-none/lib/python3.12/asyncio/streams.py", line 663, in readuntil
raise exceptions.LimitOverrunError(
asyncio.exceptions.LimitOverrunError: Separator is found, but chunk is longer than limit
```
The other way this caused problems was if you parse a large dag (as in one
with 20k tasks or more) the DagFileProcessor could end up getting a partial
read which would be invalid JSON.
This changes the communications protocol in in a couple of ways.
First off at the python level the separate send and receive methods in the
client/task side have been removed and replaced with a single `send()` that
sends the request, reads the response and raises an error if one is returned.
(But note, right now almost nothing in the supervisor side sets the error,
that will be a future PR.)
Secondly the JSON Lines approach has been changed from a line-based protocol
to a binary "frame" one. The protocol (which is the same for whichever side is
sending) is length-prefixed, i.e. we first send the length of the data as a
4byte big-endian integer, followed by the data itself. This should remove the
possibility of JSON parse errors due to reading incomplete lines
Finally the last change made in this PR is to remove the "extra" requests
socket/channel. Upon closer examination with this comms path I realised that
this socket is unnecessary: Since we are in 100% control of the client side we
can make use of the bi-directional nature of `socketpair` and save file
handles. This also happens to help the `run_as_user` feature which is
currently broken, as without extra config to `sudoers` file, `sudo` will close
all filehandles other than stdin, stdout, and stderr -- so by introducing this
change we make it easier to re-add run_as_user support.
In order to support this in the DagFileProcessor (as the fact that the proc
manager uses a single selector for multiple processes) means I have moved the
`on_close` callback to be part of the object we store in the `selector` object
in the supervisors, previoulsy it was the "on_read" callback, now we store a
tuple of `(on_read, on_close)` and on_close is called once universally.
This also changes the way comms are handled from the (async) TriggerRunner
process. Previously we had a sync+async lock, but that made it possible to end
up deadlocking things. The change now is to have `send` on
`TriggerCommsDecoder` "go back" to the async even loop via `async_to_sync`, so
that only async code deals with the socket, and we can use an async lock
(rather than the hybrid sync and async lock we tried before). This seems to
help the deadlock issue, but I'm not 100% sure it will remove it entirely, but
it makes it much much harder to hit - I've not been able to reprouce it with
this change
…ersonation-reworked
The existing JSON Lines based approach had two major drawbacks
1. In the case of really large lines (in the region of 10 or 20MB) the python
line buffering could _sometimes_ result in a partial read
2. The JSON based approach didn't have the ability to add any metadata (such
as errors).
3. Not every message type/call-site waited for a response, which meant those
client functions could never get told about an error
One of the ways this line-based approach fell down was if you suddenly tried
to run 100s of triggers at the same time you would get an error like this:
```
Traceback (most recent call last):
File "/Users/ash/.local/share/uv/python/cpython-3.12.7-macos-aarch64-none/lib/python3.12/asyncio/streams.py", line 568, in readline
line = await self.readuntil(sep)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/ash/.local/share/uv/python/cpython-3.12.7-macos-aarch64-none/lib/python3.12/asyncio/streams.py", line 663, in readuntil
raise exceptions.LimitOverrunError(
asyncio.exceptions.LimitOverrunError: Separator is found, but chunk is longer than limit
```
The other way this caused problems was if you parse a large dag (as in one
with 20k tasks or more) the DagFileProcessor could end up getting a partial
read which would be invalid JSON.
This changes the communications protocol in in a couple of ways.
First off at the python level the separate send and receive methods in the
client/task side have been removed and replaced with a single `send()` that
sends the request, reads the response and raises an error if one is returned.
(But note, right now almost nothing in the supervisor side sets the error,
that will be a future PR.)
Secondly the JSON Lines approach has been changed from a line-based protocol
to a binary "frame" one. The protocol (which is the same for whichever side is
sending) is length-prefixed, i.e. we first send the length of the data as a
4byte big-endian integer, followed by the data itself. This should remove the
possibility of JSON parse errors due to reading incomplete lines
Finally the last change made in this PR is to remove the "extra" requests
socket/channel. Upon closer examination with this comms path I realised that
this socket is unnecessary: Since we are in 100% control of the client side we
can make use of the bi-directional nature of `socketpair` and save file
handles. This also happens to help the `run_as_user` feature which is
currently broken, as without extra config to `sudoers` file, `sudo` will close
all filehandles other than stdin, stdout, and stderr -- so by introducing this
change we make it easier to re-add run_as_user support.
In order to support this in the DagFileProcessor (as the fact that the proc
manager uses a single selector for multiple processes) means I have moved the
`on_close` callback to be part of the object we store in the `selector` object
in the supervisors, previoulsy it was the "on_read" callback, now we store a
tuple of `(on_read, on_close)` and on_close is called once universally.
This also changes the way comms are handled from the (async) TriggerRunner
process. Previously we had a sync+async lock, but that made it possible to end
up deadlocking things. The change now is to have `send` on
`TriggerCommsDecoder` "go back" to the async even loop via `async_to_sync`, so
that only async code deals with the socket, and we can use an async lock
(rather than the hybrid sync and async lock we tried before). This seems to
help the deadlock issue, but I'm not 100% sure it will remove it entirely, but
it makes it much much harder to hit - I've not been able to reprouce it with
this change
This compat issue is only in tests, as nothing in the runtime of airflow-core imports/calls methods directly on SUPERVISOR_COMMS, we are only importing it in tests to mkae assertions about the behavour/to stub the return values.
…ersonation-reworked
952661c to
a1c0dd2
Compare
amoghrajesh
commented
Jun 17, 2025
ashb
reviewed
Jun 18, 2025
ashb
reviewed
Jun 18, 2025
ashb
reviewed
Jun 18, 2025
Co-authored-by: Ash Berlin-Taylor <ash_github@firemirror.com>
ashb
reviewed
Jun 18, 2025
ashb
reviewed
Jun 18, 2025
ashb
reviewed
Jun 18, 2025
ashb
approved these changes
Jun 18, 2025
Member
ashb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ashb
approved these changes
Jun 18, 2025
RoyLee1224
pushed a commit
to RoyLee1224/airflow
that referenced
this pull request
Jun 21, 2025
This was referenced Jul 2, 2025
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
closes: #50423
Why?
Airflow 2 had support for user impersonation: https://airflow.apache.org/docs/apache-airflow/stable/security/workload.html. Quoting from docs:
The intention here is to de-elevate the user running the task to reduce priviliges from the process / worker launching the process which runs as root. We can configure the task to impersonate as an user with lesser priviliges and control the behaviour of the tasks running for a more secure task run.
Quoting one of the use case from one of the airflow users too:
https://apache-airflow.slack.com/archives/CCQB40SQJ/p1746728794387939
Approach
We implement user impersonation by re-executing the task process with
sudoas the specified user (run_as_user). The key is preserving the communication channel between the task and supervisor across this re-execution.To do this we use the
os.execvpwhich basically swaps out the current process with a new one. We run the task runner process indeed again but with the specified user.Process Flow
run_as_useris set, we:Things to note
Startup Details Storage
/runcall to the API server which makes the operation idempotent.This ensures the re-executed process has all necessary context without needing to re-parse the DAG or re-establish connections.
Socket Inheritance
This is imp because:
Why os.execvp?
We use
os.execvpinstead ofsubprocessoros.forkbecause:-Eflag and-Htoo.Communication Channel
Introduced a helper to access the
SUPERVISOR_COMMSvia an utility function. This is because when we re-exec, the other modules were failing to be able to access theSUPERVISOR_COMMSdefined at module level intask_runner(i am not so sure of the exact reason, but seemed like init sequence or its a python thing), but this approach does no harm. The new way to access things viaSUPERVISOR_COMMSis to use theget_supervisor_commshelper which returns existing instance if available or returns it by initing.Made a change to The communication channel is initialized lazily via
get_supervisor_comms():This ensures:
Testing
Intention is to run airflow as "root" and switch to a lesser privileged user: "airflowuser". We will try and use a user that cannot list some files like
/root/airflow/airflow.cfgintentionally.Setup for testing
sudo useradd -m -s /bin/bash airflowuserGeneral Testing:
Test 1: Running a simple task that runs with
airflowuserand pushes an xcom:DAG:
Test 2: Running a simple task that tries to get a connection and set a variable with
runasuserDAG:
Testing
run_as_userfunctionalityTest 1:
run_as_usertrying to access/root/airflow/airflow.cfgDAG Used:
Errors out, logs:

Test 2: Do not provide
run_as_userbut override with the conf instead: "airflowuser" itselfSet env in worker:
DAG Used:
Same error as before:
Test 3: Provide
run_as_userand in conf, to check which one is picked upIn worker, create new user:
randomuserand set env to "airflowuser"DAG used:
Random user picked up:

Test 4: User not present
DAG:
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.