Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/dag_processing/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ def _serialize_dag_capturing_errors(
if not dag_was_updated:
# Check and update DagCode
DagCode.update_source_code(dag.dag_id, dag.fileloc)
elif "FabAuthManager" in conf.get("core", "auth_manager"):
if "FabAuthManager" in conf.get("core", "auth_manager"):
_sync_dag_perms(dag, session=session)

return []
Expand Down
124 changes: 119 additions & 5 deletions airflow-core/tests/unit/dag_processing/test_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@
if TYPE_CHECKING:
from kgb import SpyAgency

mark_fab_auth_manager_test = pytest.mark.skipif(
condition="FabAuthManager" not in conf.get("core", "auth_manager"),
reason="This is only for FabAuthManager. Please set the environment variable `AIRFLOW__CORE__AUTH_MANAGER` to `airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager` in `files/airflow-breeze-config/environment_variables.env` before running breeze shell. To run the test, add the flag `--keep-env-variables` to the pytest command.",
)


def test_statement_latest_runs_one_dag():
with warnings.catch_warnings():
Expand Down Expand Up @@ -340,10 +345,7 @@ def dag_to_lazy_serdag(self, dag: DAG) -> LazyDeserializedDAG:
ser_dict = SerializedDAG.to_dict(dag)
return LazyDeserializedDAG(data=ser_dict)

@pytest.mark.skipif(
condition="FabAuthManager" not in conf.get("core", "auth_manager"),
reason="This is only for FabAuthManager",
)
@mark_fab_auth_manager_test
@pytest.mark.usefixtures("clean_db") # sync_perms in fab has bad session commit hygiene
def test_sync_perms_syncs_dag_specific_perms_on_update(
self, monkeypatch, spy_agency: SpyAgency, session, time_machine, testing_dag_bundle
Expand Down Expand Up @@ -378,7 +380,8 @@ def _sync_to_db():

# DAG isn't updated
_sync_to_db()
spy_agency.assert_spy_not_called(sync_perms_spy)
# `_sync_dag_perms` should be called even the DAG isn't updated. Otherwise, any import error will not show up until DAG is updated.
spy_agency.assert_spy_called_with(sync_perms_spy, dag, session=session)

# DAG is updated
dag.tags = {"new_tag"}
Expand Down Expand Up @@ -492,6 +495,117 @@ def test_serialized_dag_errors_are_import_errors(
assert len(dag_import_error_listener.existing) == 0
assert dag_import_error_listener.new["abc.py"] == import_error.stacktrace

@patch.object(ParseImportError, "full_file_path")
@mark_fab_auth_manager_test
@pytest.mark.usefixtures("clean_db")
def test_import_error_persist_for_invalid_access_control_role(
self,
mock_full_path,
monkeypatch,
session,
time_machine,
dag_import_error_listener,
testing_dag_bundle,
):
"""
Test that import errors related to invalid access control role are tracked in the DB until being fixed.
"""
from airflow import settings

serialized_dags_count = session.query(func.count(SerializedDagModel.dag_id)).scalar()
assert serialized_dags_count == 0

monkeypatch.setattr(settings, "MIN_SERIALIZED_DAG_UPDATE_INTERVAL", 5)
time_machine.move_to(tz.datetime(2020, 1, 5, 0, 0, 0), tick=False)

# create a DAG and assign it a non-exist role.
dag = DAG(
dag_id="test_nonexist_access_control",
access_control={
"non_existing_role": {"can_edit", "can_read", "can_delete"},
},
)
dag.fileloc = "test_nonexist_access_control.py"
dag.relative_fileloc = "test_nonexist_access_control.py"
mock_full_path.return_value = "test_nonexist_access_control.py"

# the DAG processor should raise an import error when processing the DAG above.
import_errors = {}
# run the DAG parsing.
update_dag_parsing_results_in_db("testing", None, [dag], import_errors, set(), session)
# expect to get an error with "role does not exist" message.
err = import_errors.get(("testing", dag.relative_fileloc))
assert "AirflowException" in err
assert "role does not exist" in err
dag_model: DagModel = session.get(DagModel, (dag.dag_id,))
# the DAG should contain an import error.
assert dag_model.has_import_errors is True

prev_import_errors = session.query(ParseImportError).all()
# the import error message should match.
assert len(prev_import_errors) == 1
prev_import_error = prev_import_errors[0]
assert prev_import_error.filename == dag.relative_fileloc
assert "AirflowException" in prev_import_error.stacktrace
assert "role does not exist" in prev_import_error.stacktrace

# this is a new import error.
assert len(dag_import_error_listener.new) == 1
assert len(dag_import_error_listener.existing) == 0
assert (
dag_import_error_listener.new["test_nonexist_access_control.py"] == prev_import_error.stacktrace
)

# the DAG is serialized into the DB.
serialized_dags_count = session.query(func.count(SerializedDagModel.dag_id)).scalar()
assert serialized_dags_count == 1

# run the update again. Even though the DAG is not updated, the processor should raise import error since the access control is not fixed.
time_machine.move_to(tz.datetime(2020, 1, 5, 0, 0, 5), tick=False)
update_dag_parsing_results_in_db("testing", None, [dag], dict(), set(), session)

dag_model: DagModel = session.get(DagModel, (dag.dag_id,))
# the DAG should contain an import error.
assert dag_model.has_import_errors is True

import_errors = session.query(ParseImportError).all()
# the import error should still in the DB.
assert len(import_errors) == 1
import_error = import_errors[0]
assert import_error.filename == dag.relative_fileloc
assert "AirflowException" in import_error.stacktrace
assert "role does not exist" in import_error.stacktrace

# the new import error should be the same as the previous one
assert len(import_errors) == len(prev_import_errors)
assert import_error.filename == prev_import_error.filename
assert import_error.filename == dag.relative_fileloc
assert import_error.stacktrace == prev_import_error.stacktrace

# there is a new error and an existing error.
assert len(dag_import_error_listener.new) == 1
assert len(dag_import_error_listener.existing) == 1
assert (
dag_import_error_listener.new["test_nonexist_access_control.py"] == prev_import_error.stacktrace
)

# run the update again, but the incorrect access control configuration is removed.
time_machine.move_to(tz.datetime(2020, 1, 5, 0, 0, 10), tick=False)
dag.access_control = None
update_dag_parsing_results_in_db("testing", None, [dag], dict(), set(), session)

dag_model: DagModel = session.get(DagModel, (dag.dag_id,))
# the import error should be cleared.
assert dag_model.has_import_errors is False

import_errors = session.query(ParseImportError).all()
# the import error should be cleared.
assert len(import_errors) == 0

# no import error should be introduced.
assert len(dag_import_error_listener.new) == 1
assert len(dag_import_error_listener.existing) == 1

@patch.object(ParseImportError, "full_file_path")
@pytest.mark.usefixtures("clean_db")
def test_new_import_error_replaces_old(
Expand Down
Loading