Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions airflow-core/docs/administration-and-deployment/dag-bundles.rst
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,23 @@ Starting Airflow 3.0.2 git is pre installed in the base image. However, if you a
ENV GIT_PYTHON_REFRESH=quiet


Using DAG Bundles with User Impersonation
-----------------------------------------

When using ``run_as_user`` (user impersonation) with DAG bundles, ensure proper file permissions
are configured so that impersonated users can access bundle files created by the main Airflow process.

1. All impersonated users and the Airflow user should be in the same group
2. Configure appropriate umask settings (e.g., ``umask 0002``)


.. note::

This permission-based approach is a temporary solution. Future versions of Airflow
will handle multi-user access through supervisor-based bundle operations, eliminating
the need for shared group permissions.


Writing custom Dag bundles
--------------------------

Expand Down
37 changes: 37 additions & 0 deletions task-sdk/src/airflow/sdk/execution_time/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,7 @@ def parse(what: StartupDetails, log: Logger) -> RuntimeTaskInstance:
version=bundle_info.version,
)
bundle_instance.initialize()
_verify_bundle_access(bundle_instance, log)

dag_absolute_path = os.fspath(Path(bundle_instance.path, what.dag_rel_path))
bag = BundleDagBag(
Expand Down Expand Up @@ -750,6 +751,42 @@ def parse(what: StartupDetails, log: Logger) -> RuntimeTaskInstance:
# 3. Shutdown and report status


def _verify_bundle_access(bundle_instance: BaseDagBundle, log: Logger) -> None:
"""
Verify bundle is accessible by the current user.

This is called after user impersonation (if any) to ensure the bundle
is actually accessible. Uses os.access() which works with any permission
scheme (standard Unix permissions, ACLs, SELinux, etc.).

:param bundle_instance: The bundle instance to check
:param log: Logger instance
:raises AirflowException: if bundle is not accessible
"""
from getpass import getuser

from airflow.exceptions import AirflowException

bundle_path = bundle_instance.path

if not bundle_path.exists():
# Already handled by initialize() with a warning
return

# Check read permission (and execute for directories to list contents)
access_mode = os.R_OK
if bundle_path.is_dir():
access_mode |= os.X_OK

if not os.access(bundle_path, access_mode):
raise AirflowException(
f"Bundle '{bundle_instance.name}' path '{bundle_path}' is not accessible "
f"by user '{getuser()}'. When using run_as_user, ensure bundle directories "
f"are readable by the impersonated user. "
f"See: https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/dag-bundles.html"
)


def startup() -> tuple[RuntimeTaskInstance, Context, Logger]:
# The parent sends us a StartupDetails message un-prompted. After this, every single message is only sent
# in response to us sending a request.
Expand Down
50 changes: 50 additions & 0 deletions task-sdk/tests/task_sdk/execution_time/test_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,56 @@ def test_parse_module_in_bundle_root(tmp_path: Path, make_ti_context):
assert ti.task.dag.dag_id == "dag_name"


def test_verify_bundle_access_raises_when_not_accessible(tmp_path: Path, make_ti_context):
"""Test that _verify_bundle_access raises AirflowException when bundle path is not accessible."""
from airflow.sdk.execution_time.task_runner import _verify_bundle_access

# Create a directory that exists
bundle_path = tmp_path / "test_bundle"
bundle_path.mkdir()

# Create a mock bundle instance
mock_bundle = mock.Mock()
mock_bundle.path = bundle_path
mock_bundle.name = "test-bundle"

# Mock os.access to simulate permission denied (avoids root user issues in CI)
with patch("airflow.sdk.execution_time.task_runner.os.access", return_value=False):
with pytest.raises(AirflowException) as exc_info:
_verify_bundle_access(mock_bundle, mock.Mock())

assert "not accessible" in str(exc_info.value)
assert "test-bundle" in str(exc_info.value)


def test_verify_bundle_access_succeeds_when_readable(tmp_path: Path, make_ti_context):
"""Test that _verify_bundle_access succeeds when bundle path is accessible."""
from airflow.sdk.execution_time.task_runner import _verify_bundle_access

# Create a directory with read permissions
bundle_path = tmp_path / "accessible_bundle"
bundle_path.mkdir()

mock_bundle = mock.Mock()
mock_bundle.path = bundle_path
mock_bundle.name = "test-bundle"

# Should not raise
_verify_bundle_access(mock_bundle, mock.Mock())


def test_verify_bundle_access_skips_nonexistent_path(tmp_path: Path):
"""Test that _verify_bundle_access does nothing when bundle path doesn't exist."""
from airflow.sdk.execution_time.task_runner import _verify_bundle_access

mock_bundle = mock.Mock()
mock_bundle.path = tmp_path / "nonexistent"
mock_bundle.name = "test-bundle"

# Should not raise - nonexistent paths are handled by initialize()
_verify_bundle_access(mock_bundle, mock.Mock())


@pytest.mark.parametrize("use_queues", [False, True])
def test_run_deferred_basic(time_machine, create_runtime_ti, mock_supervisor_comms, use_queues: bool):
"""Test that a task can transition to a deferred state."""
Expand Down