Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ class DagRun(StrictBaseModel):
run_type: DagRunType
state: DagRunState
conf: Annotated[dict[str, Any], Field(default_factory=dict)]
triggering_user_name: str | None = None
consumed_asset_events: list[AssetEventDagRunReference]


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
AddIncludePriorDatesToGetXComSlice,
)
from airflow.api_fastapi.execution_api.versions.v2025_09_23 import AddDagVersionIdField
from airflow.api_fastapi.execution_api.versions.v2025_10_10 import AddTriggeringUserNameField

bundle = VersionBundle(
HeadVersion(),
Version("2025-10-10", AddTriggeringUserNameField),
Version("2025-09-23", AddDagVersionIdField),
Version(
"2025-08-10",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from cadwyn import ResponseInfo, VersionChange, convert_response_to_previous_version_for, schema

from airflow.api_fastapi.execution_api.datamodels.taskinstance import DagRun, TIRunContext


class AddTriggeringUserNameField(VersionChange):
"""Add the `triggering_user_name` field to DagRun model."""

description = __doc__

instructions_to_migrate_to_previous_version = (schema(DagRun).field("triggering_user_name").didnt_exist,)

@convert_response_to_previous_version_for(TIRunContext) # type: ignore[arg-type]
def remove_triggering_user_name_from_dag_run(response: ResponseInfo) -> None: # type: ignore[misc]
"""Remove the `triggering_user_name` field from the dag_run object when converting to the previous version."""
if "dag_run" in response.body and isinstance(response.body["dag_run"], dict):
response.body["dag_run"].pop("triggering_user_name", None)
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ def test_ti_run_state_to_running(
"end_date": None,
"run_type": "manual",
"conf": {},
"triggering_user_name": None,
"consumed_asset_events": [],
},
"task_reschedule_count": 0,
Expand Down Expand Up @@ -637,6 +638,62 @@ def test_xcom_not_cleared_for_deferral(self, client, session, create_task_instan
assert response.status_code == 200
assert ti.xcom_pull(task_ids="test_xcom_not_cleared_for_deferral", key="key") == "value"

def test_ti_run_with_triggering_user_name(
self,
client,
session,
dag_maker,
time_machine,
):
"""
Test that the triggering_user_name field is correctly returned when it has a non-None value.
"""
instant_str = "2024-09-30T12:00:00Z"
instant = timezone.parse(instant_str)
time_machine.move_to(instant, tick=False)

with dag_maker(dag_id=str(uuid4()), session=session):
EmptyOperator(task_id="test_ti_run_with_triggering_user_name")

# Create DagRun with triggering_user_name set to a specific value
dr = dag_maker.create_dagrun(
run_id="test",
logical_date=instant,
state=DagRunState.RUNNING,
start_date=instant,
triggering_user_name="test_user",
)

ti = dr.get_task_instance(task_id="test_ti_run_with_triggering_user_name")
ti.set_state(State.QUEUED)
session.commit()

response = client.patch(
f"/execution/task-instances/{ti.id}/run",
json={
"state": "running",
"hostname": "test-hostname",
"unixname": "test-unixname",
"pid": 12345,
"start_date": instant_str,
},
)

assert response.status_code == 200
json_response = response.json()

# Verify the dag_run is present
assert "dag_run" in json_response
dag_run = json_response["dag_run"]

# The triggering_user_name field should be present with the correct value
assert dag_run["triggering_user_name"] == "test_user"

# Verify other expected fields are still present
assert dag_run["dag_id"] == ti.dag_id
assert dag_run["run_id"] == "test"
assert dag_run["state"] == "running"


class TestTIUpdateState:
def setup_method(self):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

import pytest

from airflow._shared.timezones import timezone
from airflow.utils.state import DagRunState, State

from tests_common.test_utils.db import clear_db_runs

pytestmark = pytest.mark.db_test


@pytest.fixture
def ver_client(client):
"""Client configured to use API version 2025-09-23."""
client.headers["Airflow-API-Version"] = "2025-09-23"
return client


class TestTIRunStateV20250923:
"""Test that API version 2025-09-23 does NOT include triggering_user_name field."""

def setup_method(self):
clear_db_runs()

def teardown_method(self):
clear_db_runs()

def test_ti_run_excludes_triggering_user_name(
self,
ver_client,
session,
create_task_instance,
time_machine,
):
"""
Test that the triggering_user_name field is NOT present in API version 2025-09-23.

This field was added in version 2025-10-10, so older API clients should not
receive it in the response.
"""
instant_str = "2024-09-30T12:00:00Z"
instant = timezone.parse(instant_str)
time_machine.move_to(instant, tick=False)

ti = create_task_instance(
task_id="test_triggering_user_exclusion",
state=State.QUEUED,
dagrun_state=DagRunState.RUNNING,
session=session,
start_date=instant,
)
session.commit()

response = ver_client.patch(
f"/execution/task-instances/{ti.id}/run",
json={
"state": "running",
"hostname": "test-hostname",
"unixname": "test-user",
"pid": 12345,
"start_date": instant_str,
},
)

assert response.status_code == 200
json_response = response.json()

# Verify the dag_run is present
assert "dag_run" in json_response
dag_run = json_response["dag_run"]

# The triggering_user_name field should NOT be present in this API version
assert "triggering_user_name" not in dag_run, (
"triggering_user_name should not be present in API version 2025-09-23"
)

# Verify other expected fields are still present
assert dag_run["dag_id"] == ti.dag_id
assert dag_run["run_id"] == "test"
assert dag_run["state"] == "running"
assert dag_run["conf"] == {}
3 changes: 2 additions & 1 deletion task-sdk/src/airflow/sdk/api/datamodels/_generated.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

from pydantic import AwareDatetime, BaseModel, ConfigDict, Field, JsonValue, RootModel

API_VERSION: Final[str] = "2025-09-23"
API_VERSION: Final[str] = "2025-10-10"


class AssetAliasReferenceAssetEventDagRun(BaseModel):
Expand Down Expand Up @@ -539,6 +539,7 @@ class DagRun(BaseModel):
run_type: DagRunType
state: DagRunState
conf: Annotated[dict[str, Any] | None, Field(title="Conf")] = None
triggering_user_name: Annotated[str | None, Field(title="Triggering User Name")] = None
consumed_asset_events: Annotated[list[AssetEventDagRunReference], Field(title="Consumed Asset Events")]


Expand Down
1 change: 1 addition & 0 deletions task-sdk/src/airflow/sdk/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class DagRunProtocol(Protocol):
run_type: Any
run_after: AwareDatetime
conf: dict[str, Any] | None
triggering_user_name: str | None


class RuntimeTaskInstanceProtocol(Protocol):
Expand Down
2 changes: 2 additions & 0 deletions task-sdk/tests/task_sdk/execution_time/test_supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1846,6 +1846,7 @@ class RequestTestCase:
"end_date": None,
"clear_number": 0,
"conf": None,
"triggering_user_name": None,
},
"type": "PreviousDagRunResult",
},
Expand All @@ -1866,6 +1867,7 @@ class RequestTestCase:
run_after=timezone.parse("2024-01-15T12:00:00Z"),
consumed_asset_events=[],
state=DagRunState.SUCCESS,
triggering_user_name=None,
)
),
),
Expand Down
Loading