Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow-core/docs/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
e491b0c58188f06ab4696cc09c765413065069f90d78e27eb53fdac5e2e92c82
35e9e07930e138664fb6ff23bc299567a88946734630d84f3d7d95deacf2f4b8
12 changes: 6 additions & 6 deletions airflow-core/docs/img/airflow_erd.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ def to_orm(self, select: Select) -> Select:
sql_select(func.count(HITLDetail.ti_id))
.join(TaskInstance, HITLDetail.ti_id == TaskInstance.id)
.where(
HITLDetail.response_at.is_(None),
HITLDetail.responded_at.is_(None),
TaskInstance.state == TaskInstanceState.DEFERRED,
)
.where(TaskInstance.dag_id == DagModel.dag_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class HITLDetailResponse(BaseModel):
"""Response of updating a Human-in-the-loop detail."""

responded_by: HITLUser
response_at: datetime
responded_at: datetime
chosen_options: list[str] = Field(min_length=1)
params_input: Mapping = Field(default_factory=dict)

Expand Down Expand Up @@ -66,7 +66,7 @@ class HITLDetail(BaseModel):

# Response Content Detail
responded_by_user: HITLUser | None = None
response_at: datetime | None = None
responded_at: datetime | None = None
chosen_options: list[str] | None = None
params_input: dict[str, Any] = Field(default_factory=dict)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2003,12 +2003,12 @@ components:
anyOf:
- $ref: '#/components/schemas/HITLUser'
- type: 'null'
response_at:
responded_at:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Response At
title: Responded At
chosen_options:
anyOf:
- items:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10865,12 +10865,12 @@ components:
anyOf:
- $ref: '#/components/schemas/HITLUser'
- type: 'null'
response_at:
responded_at:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Response At
title: Responded At
chosen_options:
anyOf:
- items:
Expand Down Expand Up @@ -10913,10 +10913,10 @@ components:
properties:
responded_by:
$ref: '#/components/schemas/HITLUser'
response_at:
responded_at:
type: string
format: date-time
title: Response At
title: Responded At
chosen_options:
items:
type: string
Expand All @@ -10930,7 +10930,7 @@ components:
type: object
required:
- responded_by
- response_at
- responded_at
- chosen_options
title: HITLDetailResponse
description: Response of updating a Human-in-the-loop detail.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def update_hitl_detail(
)

hitl_detail_model.responded_by = hitl_user
hitl_detail_model.response_at = timezone.utcnow()
hitl_detail_model.responded_at = timezone.utcnow()
hitl_detail_model.chosen_options = update_hitl_detail_payload.chosen_options
hitl_detail_model.params_input = update_hitl_detail_payload.params_input
session.add(hitl_detail_model)
Expand Down Expand Up @@ -204,7 +204,7 @@ def get_hitl_details(
allowed_attrs=[
"ti_id",
"subject",
"response_at",
"responded_at",
],
model=HITLDetailModel,
to_replace={
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def get_dags(
)
.join(TaskInstance, HITLDetail.ti_id == TaskInstance.id)
.where(
HITLDetail.response_at.is_(None),
HITLDetail.responded_at.is_(None),
TaskInstance.state == TaskInstanceState.DEFERRED,
)
.where(TaskInstance.dag_id.in_([dag.dag_id for dag in dags]))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class HITLDetailResponse(BaseModel):

response_received: bool
responded_by_user: HITLUser | None = None
response_at: datetime | None
responded_at: datetime | None
# It's empty if the user has not yet responded.
chosen_options: list[str] | None
params_input: dict[str, Any] = Field(default_factory=dict)
Expand All @@ -77,7 +77,7 @@ def from_hitl_detail_orm(cls, hitl_detail: HITLDetail) -> HITLDetailResponse:

return HITLDetailResponse(
response_received=hitl_detail.response_received,
response_at=hitl_detail.response_at,
responded_at=hitl_detail.responded_at,
responded_by_user=hitl_user,
chosen_options=hitl_detail.chosen_options,
params_input=hitl_detail.params_input or {},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def upsert_hitl_detail(
# Cleanup the response part of HITLDetail as we only store one response for one task instance.
# It normally happens after retry, we keep only the latest response.
hitl_detail_model.responded_by = None
hitl_detail_model.response_at = None
hitl_detail_model.responded_at = None
hitl_detail_model.chosen_options = None
hitl_detail_model.params_input = {}
session.add(hitl_detail_model)
Expand Down Expand Up @@ -117,7 +117,7 @@ def update_hitl_detail(
)

hitl_detail_model.responded_by = None
hitl_detail_model.response_at = datetime.now(timezone.utc)
hitl_detail_model.responded_at = datetime.now(timezone.utc)
hitl_detail_model.chosen_options = payload.chosen_options
hitl_detail_model.params_input = payload.params_input
session.add(hitl_detail_model)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def upgrade():
Column("multiple", Boolean, unique=False, default=False),
Column("params", sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={}),
Column("assignees", sqlalchemy_jsonfield.JSONField(json=json), nullable=True),
Column("response_at", UtcDateTime, nullable=True),
Column("responded_at", UtcDateTime, nullable=True),
Column("responded_by", sqlalchemy_jsonfield.JSONField(json=json), nullable=True),
Column("chosen_options", sqlalchemy_jsonfield.JSONField(json=json), nullable=True),
Column("params_input", sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={}),
Expand Down
6 changes: 3 additions & 3 deletions airflow-core/src/airflow/models/hitl.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class HITLDetail(Base):
assignees = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=True)

# Response Content Detail
response_at = Column(UtcDateTime, nullable=True)
responded_at = Column(UtcDateTime, nullable=True)
responded_by = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=True)
chosen_options = Column(
sqlalchemy_jsonfield.JSONField(json=json),
Expand All @@ -125,11 +125,11 @@ class HITLDetail(Base):

@hybrid_property
def response_received(self) -> bool:
return self.response_at is not None
return self.responded_at is not None

@response_received.expression # type: ignore[no-redef]
def response_received(cls):
return cls.response_at.is_not(None)
return cls.responded_at.is_not(None)

@hybrid_property
def responded_by_user_id(self) -> str | None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3588,7 +3588,7 @@ export const $HITLDetail = {
}
]
},
response_at: {
responded_at: {
anyOf: [
{
type: 'string',
Expand All @@ -3598,7 +3598,7 @@ export const $HITLDetail = {
type: 'null'
}
],
title: 'Response At'
title: 'Responded At'
},
chosen_options: {
anyOf: [
Expand Down Expand Up @@ -3656,10 +3656,10 @@ export const $HITLDetailResponse = {
responded_by: {
'$ref': '#/components/schemas/HITLUser'
},
response_at: {
responded_at: {
type: 'string',
format: 'date-time',
title: 'Response At'
title: 'Responded At'
},
chosen_options: {
items: {
Expand All @@ -3676,7 +3676,7 @@ export const $HITLDetailResponse = {
}
},
type: 'object',
required: ['responded_by', 'response_at', 'chosen_options'],
required: ['responded_by', 'responded_at', 'chosen_options'],
title: 'HITLDetailResponse',
description: 'Response of updating a Human-in-the-loop detail.'
} as const;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -942,7 +942,7 @@ export type HITLDetail = {
};
assigned_users?: Array<HITLUser>;
responded_by_user?: HITLUser | null;
response_at?: string | null;
responded_at?: string | null;
chosen_options?: Array<(string)> | null;
params_input?: {
[key: string]: unknown;
Expand All @@ -963,7 +963,7 @@ export type HITLDetailCollection = {
*/
export type HITLDetailResponse = {
responded_by: HITLUser;
response_at: string;
responded_at: string;
chosen_options: Array<(string)>;
params_input?: {
[key: string]: unknown;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ export const HITLResponseForm = ({ hitlDetail }: HITLResponseFormProps) => {
{hitlDetail.response_received ? (
<Text color="fg.muted" fontSize="sm">
{translate("response.received")}
<Time datetime={hitlDetail.response_at} format={DEFAULT_DATETIME_FORMAT} />
<Time datetime={hitlDetail.responded_at} format={DEFAULT_DATETIME_FORMAT} />
</Text>
) : undefined}
<Accordion.Root
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ const taskInstanceColumns = ({
header: translate("common:mapIndex"),
},
{
accessorKey: "response_at",
cell: ({ row: { original } }) => <Time datetime={original.response_at} />,
accessorKey: "responded_at",
cell: ({ row: { original } }) => <Time datetime={original.responded_at} />,
header: translate("response.received"),
},
];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ def sample_hitl_details(sample_tis: list[TaskInstance], session: Session) -> lis
defaults=["1"],
multiple=False,
params={"input": 1},
response_at=utcnow(),
responded_at=utcnow(),
chosen_options=[str(i)],
params_input={"input": i},
responded_by={"id": "test", "name": "test"},
Expand Down Expand Up @@ -210,7 +210,7 @@ def expected_sample_hitl_detail_dict(sample_ti: TaskInstance) -> dict[str, Any]:
"params": {"input_1": 1},
"assigned_users": [],
"params_input": {},
"response_at": None,
"responded_at": None,
"chosen_options": None,
"response_received": False,
"subject": "This is subject",
Expand Down Expand Up @@ -321,7 +321,7 @@ def test_should_respond_200_with_existing_response(
"params_input": {"input_1": 2},
"chosen_options": ["Approve"],
"responded_by": {"id": "test", "name": "test"},
"response_at": "2025-07-03T00:00:00Z",
"responded_at": "2025-07-03T00:00:00Z",
}

audit_log = session.scalar(select(Log))
Expand Down Expand Up @@ -350,7 +350,7 @@ def test_should_respond_200_to_assigned_users(
"params_input": {"input_1": 2},
"chosen_options": ["Approve"],
"responded_by": {"id": "test", "name": "test"},
"response_at": "2025-07-03T00:00:00Z",
"responded_at": "2025-07-03T00:00:00Z",
}

audit_log = session.scalar(select(Log))
Expand Down Expand Up @@ -451,7 +451,7 @@ def test_should_respond_409(
"params_input": {"input_1": 2},
"chosen_options": ["Approve"],
"responded_by": {"id": "test", "name": "test"},
"response_at": "2025-07-03T00:00:00Z",
"responded_at": "2025-07-03T00:00:00Z",
}
assert response.status_code == 200
assert response.json() == expected_response
Expand Down Expand Up @@ -620,7 +620,7 @@ def test_should_respond_200_with_existing_response_and_query(
("task_instance_operator", lambda x: x["task_instance"]["operator_name"]),
# htil key
("subject", itemgetter("subject")),
("response_at", itemgetter("response_at")),
("responded_at", itemgetter("responded_at")),
],
ids=[
"ti_id",
Expand All @@ -630,7 +630,7 @@ def test_should_respond_200_with_existing_response_and_query(
"rendered_map_index",
"task_instance_operator",
"subject",
"response_at",
"responded_at",
],
)
def test_should_respond_200_with_existing_response_and_order_by(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def setup_hitl_data(self, create_task_instance: TaskInstance, session: Session):
options=["Approve", "Reject"],
subject=f"This is subject {i}",
defaults=["Approve"],
response_at=utcnow(),
responded_at=utcnow(),
chosen_options=["Approve"],
responded_by={"id": "test", "name": "test"},
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
"assignees": None,
}
expected_empty_hitl_detail_response_part: dict[str, Any] = {
"response_at": None,
"responded_at": None,
"chosen_options": None,
"responded_by_user": None,
"params_input": {},
Expand Down Expand Up @@ -91,7 +91,7 @@ def expected_sample_hitl_detail_dict(sample_ti: TaskInstance) -> dict[str, Any]:
**default_hitl_detail_request_kwargs,
**{
"params_input": {"input_1": 2},
"response_at": convert_to_utc(datetime(2025, 7, 3, 0, 0, 0)),
"responded_at": convert_to_utc(datetime(2025, 7, 3, 0, 0, 0)),
"chosen_options": ["Reject"],
"responded_by": None,
},
Expand Down Expand Up @@ -171,7 +171,7 @@ def test_update_hitl_detail(client: Client, sample_ti: TaskInstance) -> None:
assert response.status_code == 200
assert response.json() == {
"params_input": {"input_1": 2},
"response_at": "2025-07-03T00:00:00Z",
"responded_at": "2025-07-03T00:00:00Z",
"chosen_options": ["Reject"],
"response_received": True,
"responded_by_user": None,
Expand Down
4 changes: 2 additions & 2 deletions airflow-ctl/src/airflowctl/api/datamodels/generated.py
Original file line number Diff line number Diff line change
Expand Up @@ -1437,7 +1437,7 @@ class HITLDetailResponse(BaseModel):
"""

responded_by: HITLUser
response_at: Annotated[datetime, Field(title="Response At")]
responded_at: Annotated[datetime, Field(title="Responded At")]
chosen_options: Annotated[list[str], Field(min_length=1, title="Chosen Options")]
params_input: Annotated[dict[str, Any] | None, Field(title="Params Input")] = None

Expand Down Expand Up @@ -1838,7 +1838,7 @@ class HITLDetail(BaseModel):
params: Annotated[dict[str, Any] | None, Field(title="Params")] = None
assigned_users: Annotated[list[HITLUser] | None, Field(title="Assigned Users")] = None
responded_by_user: HITLUser | None = None
response_at: Annotated[datetime | None, Field(title="Response At")] = None
responded_at: Annotated[datetime | None, Field(title="Responded At")] = None
chosen_options: Annotated[list[str] | None, Field(title="Chosen Options")] = None
params_input: Annotated[dict[str, Any] | None, Field(title="Params Input")] = None
response_received: Annotated[bool | None, Field(title="Response Received")] = False
Expand Down
Loading