Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion task-sdk/src/airflow/sdk/api/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ def get(
if name or uri:
resp = self.client.get("asset-events/by-asset", params={"name": name, "uri": uri})
elif alias_name:
resp = self.client.get("asset-events/by-asset-alias", params={"name": name})
resp = self.client.get("asset-events/by-asset-alias", params={"name": alias_name})
else:
raise ValueError("Either `name`, `uri` or `alias_name` must be provided")

Expand Down
47 changes: 47 additions & 0 deletions task-sdk/tests/task_sdk/api/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

from airflow.sdk.api.client import RemoteValidationError, ServerResponseError
from airflow.sdk.api.datamodels._generated import (
AssetEventsResponse,
AssetResponse,
ConnectionResponse,
DagRunState,
Expand Down Expand Up @@ -875,6 +876,52 @@ def handle_request(request: httpx.Request) -> httpx.Response:
assert result.error == ErrorType.CONNECTION_NOT_FOUND


class TestAssetEventOperations:
@pytest.mark.parametrize(
"request_params",
[
({"name": "this_asset", "uri": "s3://bucket/key"}),
({"alias_name": "this_asset_alias"}),
],
)
def test_by_name_get_success(self, request_params):
def handle_request(request: httpx.Request) -> httpx.Response:
params = request.url.params
if request.url.path == "/asset-events/by-asset":
assert params.get("name") == request_params.get("name")
assert params.get("uri") == request_params.get("uri")
elif request.url.path == "/asset-events/by-asset-alias":
assert params.get("name") == request_params.get("alias_name")
else:
return httpx.Response(status_code=400, json={"detail": "Bad Request"})

return httpx.Response(
status_code=200,
json={
"asset_events": [
{
"id": 1,
"asset": {
"name": "this_asset",
"uri": "s3://bucket/key",
"group": "asset",
},
"created_dagruns": [],
"timestamp": "2023-01-01T00:00:00Z",
}
]
},
)

client = make_client(transport=httpx.MockTransport(handle_request))
result = client.asset_events.get(**request_params)

assert isinstance(result, AssetEventsResponse)
assert len(result.asset_events) == 1
assert result.asset_events[0].asset.name == "this_asset"
assert result.asset_events[0].asset.uri == "s3://bucket/key"


class TestAssetOperations:
@pytest.mark.parametrize(
"request_params",
Expand Down