Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.datamodels.ui.common import BaseGraphResponse
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.security import requires_access_dag
from airflow.api_fastapi.core_api.security import ReadableDagsFilterDep, requires_access_dag
from airflow.api_fastapi.core_api.services.ui.dependencies import extract_single_connected_component
from airflow.models.serialized_dag import SerializedDagModel

Expand All @@ -41,12 +41,20 @@
),
dependencies=[Depends(requires_access_dag("GET", DagAccessEntity.DEPENDENCIES))],
)
def get_dependencies(session: SessionDep, node_id: str | None = None) -> BaseGraphResponse:
def get_dependencies(
session: SessionDep,
readable_dags_filter: ReadableDagsFilterDep,
node_id: str | None = None,
) -> BaseGraphResponse:
"""Dependencies graph."""
nodes_dict: dict[str, dict] = {}
edge_tuples: set[tuple[str, str]] = set()

for dag, dependencies in sorted(SerializedDagModel.get_dag_dependencies().items()):
dag_dependencies = SerializedDagModel.get_dag_dependencies()
readable_dag_ids = readable_dags_filter.value
for dag, dependencies in sorted(dag_dependencies.items()):
if readable_dag_ids is not None and dag not in readable_dag_ids:
continue
dag_node_id = f"dag:{dag}"
if dag_node_id not in nodes_dict:
for dep in dependencies:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
# under the License.
from __future__ import annotations

from unittest import mock

import pendulum
import pytest
from sqlalchemy import select
Expand Down Expand Up @@ -204,7 +206,7 @@ def expected_secondary_component_response(asset2_id):
class TestGetDependencies:
@pytest.mark.usefixtures("make_primary_connected_component")
def test_should_response_200(self, test_client, expected_primary_component_response):
with assert_queries_count(5):
with assert_queries_count(6):
response = test_client.get("/dependencies")
assert response.status_code == 200

Expand Down Expand Up @@ -240,7 +242,7 @@ def test_delete_dag_should_response_403(self, unauthorized_test_client):
@pytest.mark.usefixtures("make_primary_connected_component", "make_secondary_connected_component")
def test_with_node_id_filter(self, test_client, node_id, expected_response_fixture, request):
expected_response = request.getfixturevalue(expected_response_fixture)
with assert_queries_count(5):
with assert_queries_count(6):
response = test_client.get("/dependencies", params={"node_id": node_id})
assert response.status_code == 200

Expand All @@ -258,7 +260,7 @@ def test_with_node_id_filter_with_asset(
(asset1_id, expected_primary_component_response),
(asset2_id, expected_secondary_component_response),
):
with assert_queries_count(5):
with assert_queries_count(6):
response = test_client.get("/dependencies", params={"node_id": f"asset:{asset_id}"})
assert response.status_code == 200

Expand All @@ -272,3 +274,26 @@ def test_with_node_id_filter_not_found(self, test_client):
assert response.json() == {
"detail": "Unique connected component not found, got [] for connected components of node missing_node_id, expected only 1 connected component.",
}

@mock.patch(
"airflow.api_fastapi.auth.managers.base_auth_manager.BaseAuthManager.get_authorized_dag_ids",
return_value={"upstream", "downstream"},
)
@pytest.mark.usefixtures("make_primary_connected_component", "make_secondary_connected_component")
def test_scheduling_dependencies_respects_readable_dags_filter(self, _, test_client):
response = test_client.get("/dependencies")
assert response.status_code == 200

result = response.json()
dag_node_ids = {node["id"] for node in result["nodes"] if node["type"] == "dag"}
expected_present = ["dag:upstream", "dag:downstream"]
expected_absent = [
"dag:other_dag",
"dag:external_trigger_dag_id",
"dag:upstream_secondary",
"dag:downstream_secondary",
]
for node_id in expected_present:
assert node_id in dag_node_ids
for node_id in expected_absent:
assert node_id not in dag_node_ids