Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ def get_import_errors(
visible_files_cte,
and_(
ParseImportError.filename == visible_files_cte.c.relative_fileloc,
# ParseImportError.bundle_name == visible_files_cte.c.bundle_name, # apparently not needed
ParseImportError.bundle_name == visible_files_cte.c.bundle_name,
),
)
.order_by(ParseImportError.id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@
import pytest

from airflow.models import DagModel
from airflow.models.dagbundle import DagBundleModel
from airflow.models.errors import ParseImportError
from airflow.utils.session import NEW_SESSION, provide_session

from tests_common.test_utils.db import clear_db_dags, clear_db_import_errors
from tests_common.test_utils.db import clear_db_dag_bundles, clear_db_dags, clear_db_import_errors
from tests_common.test_utils.format_datetime import from_datetime_to_zulu_without_ms

if TYPE_CHECKING:
Expand All @@ -51,7 +52,18 @@
@pytest.fixture(scope="class")
@provide_session
def permitted_dag_model(session: Session = NEW_SESSION) -> DagModel:
dag_model = DagModel(fileloc=FILENAME1, relative_fileloc=FILENAME1, dag_id="dag_id1", is_paused=False)
# Create the bundle first
bundle = DagBundleModel(name=BUNDLE_NAME)
session.add(bundle)
session.commit()

dag_model = DagModel(
fileloc=FILENAME1,
relative_fileloc=FILENAME1,
dag_id="dag_id1",
is_paused=False,
bundle_name=BUNDLE_NAME,
)
session.add(dag_model)
session.commit()
return dag_model
Expand All @@ -70,11 +82,13 @@ def not_permitted_dag_model(session: Session = NEW_SESSION) -> DagModel:
def clear_db():
clear_db_import_errors()
clear_db_dags()
clear_db_dag_bundles()

yield

clear_db_import_errors()
clear_db_dags()
clear_db_dag_bundles()


@pytest.fixture(autouse=True, scope="class")
Expand Down Expand Up @@ -374,3 +388,40 @@ def test_user_can_not_read_all_dags_in_file(
}
],
}

@pytest.mark.usefixtures("permitted_dag_model")
@mock.patch("airflow.api_fastapi.core_api.routes.public.import_error.get_auth_manager")
def test_bundle_name_join_condition_for_import_errors(
self, mock_get_auth_manager, test_client, permitted_dag_model, import_errors, session
):
"""Test that the bundle_name join condition works correctly."""
set_mock_auth_manager__is_authorized_dag(mock_get_auth_manager)
mock_get_authorized_dag_ids = set_mock_auth_manager__get_authorized_dag_ids(
mock_get_auth_manager, {permitted_dag_model.dag_id}
)
set_mock_auth_manager__batch_is_authorized_dag(mock_get_auth_manager, True)

response = test_client.get("/importErrors")

# Assert
mock_get_authorized_dag_ids.assert_called_once_with(method="GET", user=mock.ANY)
assert response.status_code == 200
response_json = response.json()

# Should return the import error with matching bundle_name and filename
assert response_json["total_entries"] == 1
assert response_json["import_errors"][0]["bundle_name"] == BUNDLE_NAME
assert response_json["import_errors"][0]["filename"] == FILENAME1

# Now test that removing the bundle_name from the DagModel causes the import error to not be returned
permitted_dag_model.bundle_name = None
session.merge(permitted_dag_model)
session.commit()

response2 = test_client.get("/importErrors")

# Assert - should return 0 entries because bundle_name no longer matches
assert response2.status_code == 200
response_json2 = response2.json()
assert response_json2["total_entries"] == 0
assert response_json2["import_errors"] == []