Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions airflow-core/docs/administration-and-deployment/dag-bundles.rst
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,32 @@ Starting Airflow 3.0.2 git is pre installed in the base image. However, if you a
ENV GIT_PYTHON_REFRESH=quiet


Using DAG Bundles with User Impersonation
-----------------------------------------

When using ``run_as_user`` (user impersonation) with DAG bundles, ensure proper file permissions
are configured so that impersonated users can access bundle files created by the main Airflow process.

1. All impersonated users and the Airflow user should be in the same group
2. Configure appropriate umask settings (e.g., ``umask 0002``)
3. Set :ref:`config:dag_processor__dag_bundle_new_folder_permissions` to ``0o775`` (default)
4. Set :ref:`config:dag_processor__dag_bundle_new_file_permissions` to ``0o664`` (default)

Example configuration:

.. code-block:: ini

[dag_processor]
dag_bundle_new_folder_permissions = 0o775
dag_bundle_new_file_permissions = 0o664

.. note::

This permission-based approach is a temporary solution. Future versions of Airflow
will handle multi-user access through supervisor-based bundle operations, eliminating
the need for shared group permissions.


Writing custom Dag bundles
--------------------------

Expand Down
30 changes: 30 additions & 0 deletions airflow-core/src/airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2465,6 +2465,36 @@ dag_processor:
example: "/tmp/some-place"
default: ~

dag_bundle_new_folder_permissions:
description: |
Permissions to set on new DAG bundle directories. When using user impersonation
(``run_as_user``), these should be group-writable (e.g., ``0o775``) so that
impersonated users can access the bundle files.

The value should be an octal string (e.g., ``0o775``). The default allows
owner read/write/execute and group read/write/execute.

This is similar to ``[logging] file_task_handler_new_folder_permissions``.
version_added: 3.2.0
type: string
example: "0o775"
default: "0o775"

dag_bundle_new_file_permissions:
description: |
Permissions to set on new DAG bundle files (lock files, tracking files, etc.).
When using user impersonation (``run_as_user``), these should be group-writable
(e.g., ``0o664``) so that impersonated users can access the files.

The value should be an octal string (e.g., ``0o664``). The default allows
owner read/write and group read/write.

This is similar to ``[logging] file_task_handler_new_file_permissions``.
version_added: 3.2.0
type: string
example: "0o664"
default: "0o664"

dag_bundle_config_list:
description: |
List of backend configs. Must supply name, classpath, and kwargs for each backend.
Expand Down
36 changes: 36 additions & 0 deletions airflow-core/src/airflow/dag_processing/bundles/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,35 @@ def get_bundle_version_path(bundle_name: str, version: str) -> Path:
return base_folder / version


def get_bundle_permissions() -> tuple[int, int]:
"""
Return configured permissions for bundle directories and files.

When using user impersonation (run_as_user), bundle directories and files
should be group-writable so that impersonated users can access them.

:return: Tuple of (folder_permissions, file_permissions) as integers
"""
# Fallback needed for backward compatibility with old config files
folder_perms = int(conf.get("dag_processor", "dag_bundle_new_folder_permissions", fallback="0o775"), 8)
file_perms = int(conf.get("dag_processor", "dag_bundle_new_file_permissions", fallback="0o664"), 8)
return folder_perms, file_perms


def apply_bundle_permissions(path: Path, is_directory: bool = True) -> None:
"""
Apply configured bundle permissions to a path.

:param path: The path to apply permissions to
:param is_directory: Whether the path is a directory (True) or file (False)
"""
folder_perms, file_perms = get_bundle_permissions()
try:
path.chmod(folder_perms if is_directory else file_perms)
except OSError:
log.debug("Could not set permissions on %s", path)


@dataclass(frozen=True)
class TrackedBundleVersionInfo:
"""
Expand Down Expand Up @@ -377,9 +406,11 @@ def lock(self):

lock_dir_path = get_bundle_storage_root_path() / "_locks"
lock_dir_path.mkdir(parents=True, exist_ok=True)
apply_bundle_permissions(lock_dir_path, is_directory=True)
lock_file_path = lock_dir_path / f"{self.name}.lock"

with open(lock_file_path, "w") as lock_file:
apply_bundle_permissions(lock_file_path, is_directory=False)
# Exclusive lock - blocks until it is available
fcntl.flock(lock_file, fcntl.LOCK_EX)
try:
Expand Down Expand Up @@ -426,12 +457,17 @@ def _update_version_file(self):
if TYPE_CHECKING:
assert self.lock_file_path
self.lock_file_path.parent.mkdir(parents=True, exist_ok=True)
tracking_root = STALE_BUNDLE_TRACKING_FOLDER
if tracking_root.exists():
apply_bundle_permissions(tracking_root, is_directory=True)
apply_bundle_permissions(self.lock_file_path.parent, is_directory=True)

with tempfile.TemporaryDirectory() as td:
temp_file = Path(td, self.lock_file_path)
now = pendulum.now(tz=pendulum.UTC)
temp_file.write_text(now.isoformat())
os.replace(temp_file, self.lock_file_path)
apply_bundle_permissions(self.lock_file_path, is_directory=False)

def acquire(self):
if not self.version:
Expand Down
110 changes: 110 additions & 0 deletions airflow-core/tests/unit/dag_processing/bundles/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import time
from datetime import timedelta
from pathlib import Path
from stat import S_IMODE
from unittest.mock import patch

import pytest
Expand All @@ -34,6 +35,8 @@
BaseDagBundle,
BundleUsageTrackingManager,
BundleVersionLock,
apply_bundle_permissions,
get_bundle_permissions,
get_bundle_storage_root_path,
)

Expand Down Expand Up @@ -268,3 +271,110 @@ def test_that_stale_bundles_are_removed(
assert len(lock_files) == expected_remaining
bundle_folders = list(b.versions_dir.iterdir())
assert len(bundle_folders) == expected_remaining


class TestBundlePermissions:
"""Tests for bundle permission helper functions."""

def test_get_bundle_permissions_default(self):
"""Test that default permissions are returned when not configured."""
with conf_vars(
{
("dag_processor", "dag_bundle_new_folder_permissions"): None,
("dag_processor", "dag_bundle_new_file_permissions"): None,
}
):
folder_perms, file_perms = get_bundle_permissions()
assert folder_perms == 0o775
assert file_perms == 0o664

def test_get_bundle_permissions_custom(self):
"""Test that custom permissions are read from config."""
with conf_vars(
{
("dag_processor", "dag_bundle_new_folder_permissions"): "0o755",
("dag_processor", "dag_bundle_new_file_permissions"): "0o644",
}
):
folder_perms, file_perms = get_bundle_permissions()
assert folder_perms == 0o755
assert file_perms == 0o644

def test_apply_bundle_permissions_directory(self, tmp_path: Path):
"""Test that permissions are applied to directories."""
test_dir = tmp_path / "test_dir"
test_dir.mkdir()
# Set restrictive permissions first
test_dir.chmod(0o700)

with conf_vars(
{
("dag_processor", "dag_bundle_new_folder_permissions"): "0o775",
}
):
apply_bundle_permissions(test_dir, is_directory=True)
mode = S_IMODE(test_dir.stat().st_mode)
assert mode == 0o775

def test_apply_bundle_permissions_file(self, tmp_path: Path):
"""Test that permissions are applied to files."""
test_file = tmp_path / "test_file"
test_file.touch()
# Set restrictive permissions first
test_file.chmod(0o600)

with conf_vars(
{
("dag_processor", "dag_bundle_new_file_permissions"): "0o664",
}
):
apply_bundle_permissions(test_file, is_directory=False)
mode = S_IMODE(test_file.stat().st_mode)
assert mode == 0o664

def test_lock_applies_permissions(self, bundle_temp_dir):
"""Test that lock() applies configured permissions to lock directory and file."""
with conf_vars(
{
("dag_processor", "dag_bundle_new_folder_permissions"): "0o775",
("dag_processor", "dag_bundle_new_file_permissions"): "0o664",
}
):
bundle = BasicBundle(name="permtest")
lock_dir = get_bundle_storage_root_path() / "_locks"
lock_file = lock_dir / f"{bundle.name}.lock"

with bundle.lock():
# Verify lock directory has correct permissions
dir_mode = S_IMODE(lock_dir.stat().st_mode)
assert dir_mode == 0o775

# Verify lock file has correct permissions
file_mode = S_IMODE(lock_file.stat().st_mode)
assert file_mode == 0o664

def test_bundle_version_lock_applies_permissions(self, bundle_temp_dir):
"""Test that BundleVersionLock applies permissions to tracking directories."""
from airflow.dag_processing.bundles.base import STALE_BUNDLE_TRACKING_FOLDER

with conf_vars(
{
("dag_processor", "dag_bundle_new_folder_permissions"): "0o775",
("dag_processor", "dag_bundle_new_file_permissions"): "0o664",
}
):
bundle_name = "permtest"
version = "v1"

with BundleVersionLock(bundle_name=bundle_name, bundle_version=version):
# Verify tracking directory has correct permissions
tracking_dir = STALE_BUNDLE_TRACKING_FOLDER / bundle_name
if tracking_dir.exists():
dir_mode = S_IMODE(tracking_dir.stat().st_mode)
assert dir_mode == 0o775

# Verify tracking file has correct permissions
tracking_file = tracking_dir / version
if tracking_file.exists():
file_mode = S_IMODE(tracking_file.stat().st_mode)
assert file_mode == 0o664