Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/actions/migration_tests/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ runs:
airflow db migrate"
env:
COMPOSE_PROJECT_NAME: "docker-compose"
AIRFLOW__DATABASE__EXTERNAL_DB_MANAGERS: "airflow.providers.fab.auth_manager.models.db.FABDBManager"
- name: "Bring composer down"
shell: bash
run: breeze down
Expand All @@ -45,6 +46,7 @@ runs:
COMPOSE_PROJECT_NAME: "docker-compose"
env:
COMPOSE_PROJECT_NAME: "docker-compose"
AIRFLOW__DATABASE__EXTERNAL_DB_MANAGERS: "airflow.providers.fab.auth_manager.models.db.FABDBManager"
- name: "Bring compose down again"
shell: bash
run: breeze down
Expand All @@ -58,6 +60,7 @@ runs:
airflow db migrate -s"
env:
COMPOSE_PROJECT_NAME: "docker-compose"
AIRFLOW__DATABASE__EXTERNAL_DB_MANAGERS: "airflow.providers.fab.auth_manager.models.db.FABDBManager"
if: env.BACKEND != 'sqlite'
- name: "Bring any containers left down"
shell: bash
Expand Down
547 changes: 121 additions & 426 deletions airflow-core/docs/img/airflow_erd.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 2 additions & 2 deletions airflow-core/src/airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -701,8 +701,8 @@ database:
from BaseDBManager
version_added: 3.0.0
type: string
example: ~
default: "airflow.providers.fab.auth_manager.models.db.FABDBManager"
example: "airflow.providers.fab.auth_manager.models.db.FABDBManager"
default: ~
logging:
description: ~
options:
Expand Down
6 changes: 3 additions & 3 deletions airflow-core/src/airflow/dag_processing/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,11 @@ def _serialize_dag_capturing_errors(
min_update_interval=settings.MIN_SERIALIZED_DAG_UPDATE_INTERVAL,
session=session,
)
if dag_was_updated:
_sync_dag_perms(dag, session=session)
else:
if not dag_was_updated:
# Check and update DagCode
DagCode.update_source_code(dag.dag_id, dag.fileloc)
elif "FabAuthManager" in conf.get("core", "auth_manager"):
_sync_dag_perms(dag, session=session)

return []
except OperationalError:
Expand Down
18 changes: 17 additions & 1 deletion airflow-core/src/airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -1212,7 +1212,23 @@ def downgrade(*, to_revision, from_revision=None, show_sql_only=False, session:

log.info("Attempting downgrade to revision %s", to_revision)
config = _get_alembic_config()

# Check if downgrade is less than 3.0.0 and requires that `ab_user` fab table is present
if _revision_greater(config, _REVISION_HEADS_MAP["3.0.0"], to_revision):
if conf.getboolean("core", "unit_test_mode"):
try:
from airflow.providers.fab.auth_manager.models.db import FABDBManager

dbm = FABDBManager(session)
dbm.initdb()
except ImportError:
log.warning("Import error occurred while importing FABDBManager. Skipping the check.")
pass
if not inspect(settings.engine).has_table("ab_user"):
log.error(
"Downgrade to revision less than 3.0.0 requires that `ab_user` table is present. "
"Please add FabDBManager to [core] external_db_managers and run fab migrations before proceeding"
)
return
with create_global_lock(session=session, lock=DBLocks.MIGRATIONS):
if show_sql_only:
log.warning("Generating sql scripts for manual migration.")
Expand Down
8 changes: 5 additions & 3 deletions airflow-core/src/airflow/utils/db_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,15 @@ class RunDBManager(LoggingMixin):
def __init__(self):
super().__init__()
self._managers: list[BaseDBManager] = []
managers = conf.get("database", "external_db_managers").split(",")

managers_config = conf.get("database", "external_db_managers", fallback=None)
if not managers_config:
managers = []
else:
managers = managers_config.split(",")
# Add DB manager specified by auth manager (if any)
auth_manager_db_manager = create_auth_manager().get_db_manager()
if auth_manager_db_manager and auth_manager_db_manager not in managers:
managers.append(auth_manager_db_manager)

for module in managers:
manager = import_string(module)
self._managers.append(manager)
Expand Down
5 changes: 5 additions & 0 deletions airflow-core/tests/unit/dag_processing/test_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from sqlalchemy.exc import OperationalError, SAWarning

import airflow.dag_processing.collection
from airflow.configuration import conf
from airflow.dag_processing.collection import (
AssetModelOperation,
_get_latest_runs_stmt,
Expand Down Expand Up @@ -264,6 +265,10 @@ def dag_to_lazy_serdag(self, dag: DAG) -> LazyDeserializedDAG:
ser_dict = SerializedDAG.to_dict(dag)
return LazyDeserializedDAG(data=ser_dict)

@pytest.mark.skipif(
condition="FabAuthManager" not in conf.get("core", "auth_manager"),
reason="This is only for FabAuthManager",
)
@pytest.mark.usefixtures("clean_db") # sync_perms in fab has bad session commit hygiene
def test_sync_perms_syncs_dag_specific_perms_on_update(
self, monkeypatch, spy_agency: SpyAgency, session, time_machine, testing_dag_bundle
Expand Down
48 changes: 43 additions & 5 deletions airflow-core/tests/unit/utils/test_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@
from sqlalchemy import Column, Integer, MetaData, Table, select

from airflow.models import Base as airflow_base
from airflow.providers.fab.auth_manager.models.db import FABDBManager
from airflow.settings import engine
from airflow.utils.db import (
_REVISION_HEADS_MAP,
LazySelectSequence,
_get_alembic_config,
check_migrations,
Expand Down Expand Up @@ -68,7 +70,9 @@ def test_database_schema_and_sqlalchemy_model_are_in_sync(self):
for dbmanager in external_db_managers._managers:
for table_name, table in dbmanager.metadata.tables.items():
all_meta_data._add_table(table_name, table.schema, table)

# test FAB models
for table_name, table in FABDBManager.metadata.tables.items():
all_meta_data._add_table(table_name, table.schema, table)
# create diff between database schema and SQLAlchemy model
mctx = MigrationContext.configure(
engine.connect(),
Expand Down Expand Up @@ -131,11 +135,35 @@ def test_check_migrations(self):
check_migrations(0)
check_migrations(1)

@pytest.mark.parametrize(
"auth, expected",
[
(
{
(
"core",
"auth_manager",
): "airflow.api_fastapi.auth.managers.simple.simple_auth_manager.SimpleAuthManager"
},
1,
),
(
{
(
"core",
"auth_manager",
): "airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager"
},
2,
),
],
)
@mock.patch("alembic.command")
def test_upgradedb(self, mock_alembic_command):
upgradedb()
mock_alembic_command.upgrade.assert_called_with(mock.ANY, revision="heads")
assert mock_alembic_command.upgrade.call_count == 2
def test_upgradedb(self, mock_alembic_command, auth, expected):
with conf_vars(auth):
upgradedb()
mock_alembic_command.upgrade.assert_called_with(mock.ANY, revision="heads")
assert mock_alembic_command.upgrade.call_count == expected

@pytest.mark.parametrize(
"from_revision, to_revision",
Expand Down Expand Up @@ -264,3 +292,13 @@ def scalar(self, stmt):
lss = LazySelectSequence.from_select(select(t.c.id), order_by=[], session=MockSession())

assert bool(lss) is False

@conf_vars({("core", "unit_test_mode"): "False"})
@mock.patch("airflow.utils.db.inspect")
def test_upgradedb_raises_if_lower_than_v3_0_0(self, mock_inspect, caplog):
mock_inspect.return_value.has_table.return_value = False
downgrade(to_revision=_REVISION_HEADS_MAP["2.7.0"])
assert (
"Downgrade to revision less than 3.0.0 requires that `ab_user` table is present. "
"Please add FabDBManager to [core] external_db_managers and run fab migrations before proceeding"
) in caplog.text
2 changes: 1 addition & 1 deletion airflow-core/tests/unit/utils/test_db_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class TestRunDBManager:
@conf_vars(
{("database", "external_db_managers"): "airflow.providers.fab.auth_manager.models.db.FABDBManager"}
)
def test_fab_db_manager_is_default(self):
def test_db_manager_uses_config(self):
from airflow.providers.fab.auth_manager.models.db import FABDBManager

run_db_manager = RunDBManager()
Expand Down
3 changes: 2 additions & 1 deletion dev/breeze/src/airflow_breeze/params/shell_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,8 @@ def env_variables_for_docker_commands(self) -> dict[str, str]:
_set_var(_env, "AIRFLOW__CELERY__BROKER_URL", self.airflow_celery_broker_url)
_set_var(_env, "AIRFLOW__CORE__AUTH_MANAGER", self.auth_manager_path)
_set_var(_env, "AIRFLOW__CORE__EXECUTOR", self.executor)
_set_var(_env, "AIRFLOW__CORE__SIMPLE_AUTH_MANAGER_USERS", "admin:admin,viewer:viewer")
if self.auth_manager == SIMPLE_AUTH_MANAGER:
_set_var(_env, "AIRFLOW__CORE__SIMPLE_AUTH_MANAGER_USERS", "admin:admin,viewer:viewer")
_set_var(
_env,
"AIRFLOW__CORE__SIMPLE_AUTH_MANAGER_PASSWORDS_FILE",
Expand Down
4 changes: 2 additions & 2 deletions devel-common/src/tests_common/pytest_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -841,6 +841,7 @@ def _activate_assets(self):
SchedulerJobRunner._activate_referenced_assets(assets, session=self.session)

def __exit__(self, type, value, traceback):
from airflow.configuration import conf
from airflow.models import DagModel
from airflow.models.serialized_dag import SerializedDagModel

Expand All @@ -855,12 +856,11 @@ def __exit__(self, type, value, traceback):
else:
dag.sync_to_db(session=self.session)

if dag.access_control:
if dag.access_control and "FabAuthManager" in conf.get("core", "auth_manager"):
if AIRFLOW_V_3_0_PLUS:
from airflow.providers.fab.www.security_appless import ApplessAirflowSecurityManager
else:
from airflow.www.security_appless import ApplessAirflowSecurityManager

security_manager = ApplessAirflowSecurityManager(session=self.session)
security_manager.sync_perm_for_dag(dag.dag_id, dag.access_control)
self.dag_model = self.session.get(DagModel, dag.dag_id)
Expand Down
3 changes: 3 additions & 0 deletions devel-common/src/tests_common/test_utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from typing import TYPE_CHECKING

from airflow.configuration import conf
from airflow.jobs.job import Job
from airflow.models import (
Connection,
Expand Down Expand Up @@ -298,6 +299,8 @@ def clear_db_dag_bundles():


def clear_dag_specific_permissions():
if "FabAuthManager" not in conf.get("core", "auth_manager"):
return
try:
from airflow.providers.fab.auth_manager.models import Permission, Resource, assoc_permission_role
except ImportError:
Expand Down