Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,57 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/assets/{asset_id}/materialize:
post:
tags:
- Asset
summary: Materialize Asset
description: Materialize an asset by triggering a DAG run that produces it.
operationId: materialize_asset
parameters:
- name: asset_id
in: path
required: true
schema:
type: integer
title: Asset Id
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/DAGRunResponse'
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'409':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Conflict
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/assets/{asset_id}/queuedEvents:
get:
tags:
Expand Down
64 changes: 62 additions & 2 deletions airflow/api_fastapi/core_api/routes/public/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from datetime import datetime
from typing import Annotated

from fastapi import Depends, HTTPException, status
from fastapi import Depends, HTTPException, Request, status
from sqlalchemy import delete, select
from sqlalchemy.orm import joinedload, subqueryload

Expand Down Expand Up @@ -51,10 +51,22 @@
QueuedEventCollectionResponse,
QueuedEventResponse,
)
from airflow.api_fastapi.core_api.datamodels.dag_run import DAGRunResponse
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.logging.decorators import action_logging
from airflow.assets.manager import asset_manager
from airflow.models.asset import AssetAliasModel, AssetDagRunQueue, AssetEvent, AssetModel
from airflow.models.asset import (
AssetAliasModel,
AssetDagRunQueue,
AssetEvent,
AssetModel,
TaskOutletAssetReference,
)
from airflow.models.dag import DAG
from airflow.models.dag_version import DagVersion
from airflow.utils import timezone
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunTriggeredByType, DagRunType

assets_router = AirflowRouter(tags=["Asset"])

Expand Down Expand Up @@ -243,6 +255,54 @@ def create_asset_event(
return AssetEventResponse.model_validate(assets_event)


@assets_router.post(
"/assets/{asset_id}/materialize",
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND, status.HTTP_409_CONFLICT]),
dependencies=[Depends(action_logging())],
)
def materialize_asset(
asset_id: int,
request: Request,
session: SessionDep,
) -> DAGRunResponse:
"""Materialize an asset by triggering a DAG run that produces it."""
dag_id_it = iter(
session.scalars(
select(TaskOutletAssetReference.dag_id)
.where(TaskOutletAssetReference.asset_id == asset_id)
.group_by(TaskOutletAssetReference.dag_id)
.limit(2)
)
)

if (dag_id := next(dag_id_it, None)) is None:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"No DAG materializes asset with ID: {asset_id}")
if next(dag_id_it, None) is not None:
raise HTTPException(
status.HTTP_409_CONFLICT,
f"More than one DAG materializes asset with ID: {asset_id}",
)

dag: DAG | None
if not (dag := request.app.state.dag_bag.get_dag(dag_id)):
raise HTTPException(status.HTTP_404_NOT_FOUND, f"DAG with ID `{dag_id}` was not found")

return dag.create_dagrun(
run_id=dag.timetable.generate_run_id(
run_type=DagRunType.MANUAL,
run_after=(run_after := timezone.coerce_datetime(timezone.utcnow())),
data_interval=None,
),
run_after=run_after,
run_type=DagRunType.MANUAL,
triggered_by=DagRunTriggeredByType.REST_API,
external_trigger=True,
dag_version=DagVersion.get_latest_version(dag_id, session=session),
state=DagRunState.QUEUED,
session=session,
)


@assets_router.get(
"/assets/{asset_id}/queuedEvents",
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
Expand Down
3 changes: 3 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1692,6 +1692,9 @@ export const UseLoginServiceLoginKeyFn = (
export type AssetServiceCreateAssetEventMutationResult = Awaited<
ReturnType<typeof AssetService.createAssetEvent>
>;
export type AssetServiceMaterializeAssetMutationResult = Awaited<
ReturnType<typeof AssetService.materializeAsset>
>;
export type BackfillServiceCreateBackfillMutationResult = Awaited<
ReturnType<typeof BackfillService.createBackfill>
>;
Expand Down
36 changes: 36 additions & 0 deletions airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2850,6 +2850,42 @@ export const useAssetServiceCreateAssetEvent = <
AssetService.createAssetEvent({ requestBody }) as unknown as Promise<TData>,
...options,
});
/**
* Materialize Asset
* Materialize an asset by triggering a DAG run that produces it.
* @param data The data for the request.
* @param data.assetId
* @returns DAGRunResponse Successful Response
* @throws ApiError
*/
export const useAssetServiceMaterializeAsset = <
TData = Common.AssetServiceMaterializeAssetMutationResult,
TError = unknown,
TContext = unknown,
>(
options?: Omit<
UseMutationOptions<
TData,
TError,
{
assetId: number;
},
TContext
>,
"mutationFn"
>,
) =>
useMutation<
TData,
TError,
{
assetId: number;
},
TContext
>({
mutationFn: ({ assetId }) => AssetService.materializeAsset({ assetId }) as unknown as Promise<TData>,
...options,
});
/**
* Create Backfill
* @param data The data for the request.
Expand Down
27 changes: 27 additions & 0 deletions airflow/ui/openapi-gen/requests/services.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import type {
GetAssetEventsResponse,
CreateAssetEventData,
CreateAssetEventResponse,
MaterializeAssetData,
MaterializeAssetResponse,
GetAssetQueuedEventsData,
GetAssetQueuedEventsResponse,
DeleteAssetQueuedEventsData,
Expand Down Expand Up @@ -381,6 +383,31 @@ export class AssetService {
});
}

/**
* Materialize Asset
* Materialize an asset by triggering a DAG run that produces it.
* @param data The data for the request.
* @param data.assetId
* @returns DAGRunResponse Successful Response
* @throws ApiError
*/
public static materializeAsset(data: MaterializeAssetData): CancelablePromise<MaterializeAssetResponse> {
return __request(OpenAPI, {
method: "POST",
url: "/public/assets/{asset_id}/materialize",
path: {
asset_id: data.assetId,
},
errors: {
401: "Unauthorized",
403: "Forbidden",
404: "Not Found",
409: "Conflict",
422: "Validation Error",
},
});
}

/**
* Get Asset Queued Events
* Get queued asset events for an asset.
Expand Down
37 changes: 37 additions & 0 deletions airflow/ui/openapi-gen/requests/types.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1616,6 +1616,12 @@ export type CreateAssetEventData = {

export type CreateAssetEventResponse = AssetEventResponse;

export type MaterializeAssetData = {
assetId: number;
};

export type MaterializeAssetResponse = DAGRunResponse;

export type GetAssetQueuedEventsData = {
assetId: number;
before?: string | null;
Expand Down Expand Up @@ -2604,6 +2610,37 @@ export type $OpenApiTs = {
};
};
};
"/public/assets/{asset_id}/materialize": {
post: {
req: MaterializeAssetData;
res: {
/**
* Successful Response
*/
200: DAGRunResponse;
/**
* Unauthorized
*/
401: HTTPExceptionResponse;
/**
* Forbidden
*/
403: HTTPExceptionResponse;
/**
* Not Found
*/
404: HTTPExceptionResponse;
/**
* Conflict
*/
409: HTTPExceptionResponse;
/**
* Validation Error
*/
422: HTTPValidationError;
};
};
};
"/public/assets/{asset_id}/queuedEvents": {
get: {
req: GetAssetQueuedEventsData;
Expand Down
63 changes: 59 additions & 4 deletions tests/api_fastapi/core_api/routes/public/test_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
TaskOutletAssetReference,
)
from airflow.models.dagrun import DagRun
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.utils import timezone
from airflow.utils.session import provide_session
from airflow.utils.state import DagRunState
Expand All @@ -47,7 +48,7 @@
pytestmark = pytest.mark.db_test


def _create_assets(session, num: int = 2) -> None:
def _create_assets(session, num: int = 2) -> list[AssetModel]:
assets = [
AssetModel(
id=i,
Expand All @@ -62,6 +63,7 @@ def _create_assets(session, num: int = 2) -> None:
]
session.add_all(assets)
session.commit()
return assets


def _create_assets_with_sensitive_extra(session, num: int = 2) -> None:
Expand Down Expand Up @@ -192,8 +194,8 @@ def teardown_method(self) -> None:
clear_db_runs()

@provide_session
def create_assets(self, session, num: int = 2):
_create_assets(session=session, num=num)
def create_assets(self, session, num: int = 2) -> list[AssetModel]:
return _create_assets(session=session, num=num)

@provide_session
def create_assets_with_sensitive_extra(self, session, num: int = 2):
Expand Down Expand Up @@ -921,7 +923,7 @@ def test_should_respond_404_valid_dag_no_adrq(self, test_client, session, create
class TestPostAssetEvents(TestAssets):
@pytest.mark.usefixtures("time_freezer")
def test_should_respond_200(self, test_client, session):
self.create_assets()
self.create_assets(session)
event_payload = {"asset_id": 1, "extra": {"foo": "bar"}}
response = test_client.post("/public/assets/events", json=event_payload)
assert response.status_code == 200
Expand Down Expand Up @@ -970,6 +972,59 @@ def test_should_mask_sensitive_extra(self, test_client, session):
}


@pytest.mark.need_serialized_dag
class TestPostAssetMaterialize(TestAssets):
DAG_ASSET1_ID = "test_dag_1"
DAG_ASSET2_ID_A = "test_dag_2a"
DAG_ASSET2_ID_B = "test_dag_2b"
DAG_ASSET_NO = "test_dag_no"

@pytest.fixture(autouse=True)
def create_dags(self, setup, dag_maker, session):
# Depend on 'setup' so it runs first. Otherwise it deletes what we create here.
assets = {am.id: am.to_public() for am in self.create_assets(session=session, num=3)}
with dag_maker(self.DAG_ASSET1_ID, schedule=None, session=session):
EmptyOperator(task_id="task", outlets=assets[1])
with dag_maker(self.DAG_ASSET2_ID_A, schedule=None, session=session):
EmptyOperator(task_id="task", outlets=assets[2])
with dag_maker(self.DAG_ASSET2_ID_B, schedule=None, session=session):
EmptyOperator(task_id="task", outlets=assets[2])
with dag_maker(self.DAG_ASSET_NO, schedule=None, session=session):
EmptyOperator(task_id="task")

def test_should_respond_200(self, test_client):
response = test_client.post("/public/assets/1/materialize")
assert response.status_code == 200
assert response.json() == {
"dag_run_id": mock.ANY,
"dag_id": self.DAG_ASSET1_ID,
"logical_date": None,
"queued_at": mock.ANY,
"run_after": mock.ANY,
"start_date": None,
"end_date": None,
"data_interval_start": None,
"data_interval_end": None,
"last_scheduling_decision": None,
"run_type": "manual",
"state": "queued",
"external_trigger": True,
"triggered_by": "rest_api",
"conf": {},
"note": None,
}

def test_should_respond_409_on_multiple_dags(self, test_client):
response = test_client.post("/public/assets/2/materialize")
assert response.status_code == 409
assert response.json()["detail"] == "More than one DAG materializes asset with ID: 2"

def test_should_respond_404_on_multiple_dags(self, test_client):
response = test_client.post("/public/assets/3/materialize")
assert response.status_code == 404
assert response.json()["detail"] == "No DAG materializes asset with ID: 3"


class TestGetAssetQueuedEvents(TestQueuedEventEndpoint):
@pytest.mark.usefixtures("time_freezer")
def test_should_respond_200(self, test_client, session, create_dummy_dag):
Expand Down
Loading