Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow-core/docs/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
6e452af5ca150404e6bd7553fb6a9adfcd7081ca60dfd09efea0f9aae7460cc6
88337ee3cd515161de57099c2582d4b26d22666a30387fa32aec89e86d7beeb3
828 changes: 416 additions & 412 deletions airflow-core/docs/img/airflow_erd.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
18 changes: 15 additions & 3 deletions airflow-core/src/airflow/api_fastapi/common/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -957,15 +957,27 @@ def _optional_boolean(value: bool | None) -> bool | None:
)
),
]
QueryHITLDetailUserIdFilter = Annotated[
QueryHITLDetailRespondedUserIdFilter = Annotated[
FilterParam[list[str]],
Depends(
filter_param_factory(
HITLDetail.user_id,
HITLDetail.responded_user_id,
list[str],
FilterOptionEnum.ANY_EQUAL,
default_factory=list,
filter_name="user_id",
filter_name="responded_user_id",
)
),
]
QueryHITLDetailRespondedUserNameFilter = Annotated[
FilterParam[list[str]],
Depends(
filter_param_factory(
HITLDetail.responded_user_name,
list[str],
FilterOptionEnum.ANY_EQUAL,
default_factory=list,
filter_name="responded_user_name",
)
),
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ class UpdateHITLDetailPayload(BaseModel):
class HITLDetailResponse(BaseModel):
"""Response of updating a Human-in-the-loop detail."""

user_id: str
responded_user_id: str
responded_user_name: str
response_at: datetime
chosen_options: list[str] = Field(min_length=1)
params_input: Mapping = Field(default_factory=dict)
Expand All @@ -58,7 +59,8 @@ class HITLDetail(BaseModel):
respondents: list[str] | None = None

# Response Content Detail
user_id: str | None = None
responded_user_id: str | None = None
responded_user_name: str | None = None
response_at: datetime | None = None
chosen_options: list[str] | None = None
params_input: dict[str, Any] = Field(default_factory=dict)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8179,14 +8179,22 @@ paths:
- type: boolean
- type: 'null'
title: Response Received
- name: user_id
- name: responded_user_id
in: query
required: false
schema:
type: array
items:
type: string
title: User Id
title: Responded User Id
- name: responded_user_name
in: query
required: false
schema:
type: array
items:
type: string
title: Responded User Name
- name: subject_search
in: query
required: false
Expand Down Expand Up @@ -10822,11 +10830,16 @@ components:
type: array
- type: 'null'
title: Respondents
user_id:
responded_user_id:
anyOf:
- type: string
- type: 'null'
title: User Id
title: Responded User Id
responded_user_name:
anyOf:
- type: string
- type: 'null'
title: Responded User Name
response_at:
anyOf:
- type: string
Expand Down Expand Up @@ -10873,9 +10886,12 @@ components:
description: Schema for a collection of Human-in-the-loop details.
HITLDetailResponse:
properties:
user_id:
responded_user_id:
type: string
title: Responded User Id
responded_user_name:
type: string
title: User Id
title: Responded User Name
response_at:
type: string
format: date-time
Expand All @@ -10892,7 +10908,8 @@ components:
title: Params Input
type: object
required:
- user_id
- responded_user_id
- responded_user_name
- response_at
- chosen_options
title: HITLDetailResponse
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@
QueryHITLDetailDagIdFilter,
QueryHITLDetailDagIdPatternSearch,
QueryHITLDetailDagRunIdFilter,
QueryHITLDetailRespondedUserIdFilter,
QueryHITLDetailRespondedUserNameFilter,
QueryHITLDetailResponseReceivedFilter,
QueryHITLDetailSubjectSearch,
QueryHITLDetailTaskIdFilter,
QueryHITLDetailTaskIdPatternSearch,
QueryHITLDetailUserIdFilter,
QueryLimit,
QueryOffset,
QueryTIStateFilter,
Expand Down Expand Up @@ -121,19 +122,21 @@ def _update_hitl_detail(
),
)

user_id = user.get_id()
user_name = user.get_name()
if hitl_detail_model.respondents:
user_id = user.get_id()
if isinstance(user_id, int):
# FabAuthManager (ab_user) store user id as integer, but common interface is string type
user_id = str(user_id)
if user_id not in hitl_detail_model.respondents:
log.error("User=%s is not a respondent for the task", user_id)
log.error("User=%s (id=%s) is not a respondent for the task", user_name, user_id)
raise HTTPException(
status.HTTP_403_FORBIDDEN,
f"User={user_id} is not a respondent for the task.",
f"User={user_name} (id={user_id}) is not a respondent for the task.",
)

hitl_detail_model.user_id = user.get_id()
hitl_detail_model.responded_user_id = user_id
hitl_detail_model.responded_user_name = user_name
hitl_detail_model.response_at = timezone.utcnow()
hitl_detail_model.chosen_options = update_hitl_detail_payload.chosen_options
hitl_detail_model.params_input = update_hitl_detail_payload.params_input
Expand Down Expand Up @@ -269,7 +272,8 @@ def get_hitl_details(
ti_state: QueryTIStateFilter,
# hitl detail related filter
response_received: QueryHITLDetailResponseReceivedFilter,
user_id: QueryHITLDetailUserIdFilter,
responded_user_id: QueryHITLDetailRespondedUserIdFilter,
responded_user_name: QueryHITLDetailRespondedUserNameFilter,
subject_patten: QueryHITLDetailSubjectSearch,
body_patten: QueryHITLDetailBodySearch,
) -> HITLDetailCollection:
Expand All @@ -292,7 +296,8 @@ def get_hitl_details(
ti_state,
# hitl detail related filter
response_received,
user_id,
responded_user_id,
responded_user_name,
subject_patten,
body_patten,
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ class HITLDetailResponse(BaseModel):
"""Schema for the response part of a Human-in-the-loop detail for a specific task instance."""

response_received: bool
user_id: str | None
responded_user_name: str | None
responded_user_id: str | None
response_at: datetime | None
# It's empty if the user has not yet responded.
chosen_options: list[str] | None
Expand All @@ -62,7 +63,8 @@ def from_hitl_detail_orm(cls, hitl_detail: HITLDetail) -> HITLDetailResponse:
return HITLDetailResponse(
response_received=hitl_detail.response_received,
response_at=hitl_detail.response_at,
user_id=hitl_detail.user_id,
responded_user_id=hitl_detail.responded_user_id,
responded_user_name=hitl_detail.responded_user_name,
chosen_options=hitl_detail.chosen_options,
params_input=hitl_detail.params_input or {},
)
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def upsert_hitl_detail(
elif hitl_detail_model.response_received:
# Cleanup the response part of HITLDetail as we only store one response for one task instance.
# It normally happens after retry, we keep only the latest response.
hitl_detail_model.user_id = None
hitl_detail_model.responded_by = None
hitl_detail_model.response_at = None
hitl_detail_model.chosen_options = None
hitl_detail_model.params_input = {}
Expand Down Expand Up @@ -116,7 +116,8 @@ def update_hitl_detail(
f"Human-in-the-loop detail for Task Instance with id {ti_id_str} already exists.",
)

hitl_detail_model.user_id = "Fallback to defaults"
hitl_detail_model.responded_user_id = HITLDetail.DEFAULT_USER_NAME
hitl_detail_model.responded_user_name = HITLDetail.DEFAULT_USER_NAME
hitl_detail_model.response_at = datetime.now(timezone.utc)
hitl_detail_model.chosen_options = payload.chosen_options
hitl_detail_model.params_input = payload.params_input
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ def upgrade():
Column("params", sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={}),
Column("respondents", sqlalchemy_jsonfield.JSONField(json=json), nullable=True),
Column("response_at", UtcDateTime, nullable=True),
Column("user_id", String(128), nullable=True),
Column("responded_user_id", String(128), nullable=True),
Column("responded_user_name", String(128), nullable=True),
Column("chosen_options", sqlalchemy_jsonfield.JSONField(json=json), nullable=True),
Column("params_input", sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={}),
ForeignKeyConstraint(
Expand Down
5 changes: 4 additions & 1 deletion airflow-core/src/airflow/models/hitl.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ class HITLDetail(Base):

# Response Content Detail
response_at = Column(UtcDateTime, nullable=True)
user_id = Column(String(128), nullable=True)
responded_user_id = Column(String(128), nullable=True)
responded_user_name = Column(String(128), nullable=True)
chosen_options = Column(
sqlalchemy_jsonfield.JSONField(json=json),
nullable=True,
Expand Down Expand Up @@ -78,3 +79,5 @@ def response_received(self) -> bool:
@response_received.expression # type: ignore[no-redef]
def response_received(cls):
return cls.response_at.is_not(None)

DEFAULT_USER_NAME = "Fallback to defaults"
7 changes: 4 additions & 3 deletions airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -713,21 +713,22 @@ export const UseHumanInTheLoopServiceGetHitlDetailKeyFn = ({ dagId, dagRunId, ma
export type HumanInTheLoopServiceGetHitlDetailsDefaultResponse = Awaited<ReturnType<typeof HumanInTheLoopService.getHitlDetails>>;
export type HumanInTheLoopServiceGetHitlDetailsQueryResult<TData = HumanInTheLoopServiceGetHitlDetailsDefaultResponse, TError = unknown> = UseQueryResult<TData, TError>;
export const useHumanInTheLoopServiceGetHitlDetailsKey = "HumanInTheLoopServiceGetHitlDetails";
export const UseHumanInTheLoopServiceGetHitlDetailsKeyFn = ({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, taskId, taskIdPattern, userId }: {
export const UseHumanInTheLoopServiceGetHitlDetailsKeyFn = ({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, respondedUserId, respondedUserName, responseReceived, state, subjectSearch, taskId, taskIdPattern }: {
bodySearch?: string;
dagId?: string;
dagIdPattern?: string;
dagRunId?: string;
limit?: number;
offset?: number;
orderBy?: string[];
respondedUserId?: string[];
respondedUserName?: string[];
responseReceived?: boolean;
state?: string[];
subjectSearch?: string;
taskId?: string;
taskIdPattern?: string;
userId?: string[];
} = {}, queryKey?: Array<unknown>) => [useHumanInTheLoopServiceGetHitlDetailsKey, ...(queryKey ?? [{ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, taskId, taskIdPattern, userId }])];
} = {}, queryKey?: Array<unknown>) => [useHumanInTheLoopServiceGetHitlDetailsKey, ...(queryKey ?? [{ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, respondedUserId, respondedUserName, responseReceived, state, subjectSearch, taskId, taskIdPattern }])];
export type MonitorServiceGetHealthDefaultResponse = Awaited<ReturnType<typeof MonitorService.getHealth>>;
export type MonitorServiceGetHealthQueryResult<TData = MonitorServiceGetHealthDefaultResponse, TError = unknown> = UseQueryResult<TData, TError>;
export const useMonitorServiceGetHealthKey = "MonitorServiceGetHealth";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1363,27 +1363,29 @@ export const ensureUseHumanInTheLoopServiceGetHitlDetailData = (queryClient: Que
* @param data.taskIdPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported.
* @param data.state
* @param data.responseReceived
* @param data.userId
* @param data.respondedUserId
* @param data.respondedUserName
* @param data.subjectSearch SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported.
* @param data.bodySearch SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported.
* @returns HITLDetailCollection Successful Response
* @throws ApiError
*/
export const ensureUseHumanInTheLoopServiceGetHitlDetailsData = (queryClient: QueryClient, { bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, taskId, taskIdPattern, userId }: {
export const ensureUseHumanInTheLoopServiceGetHitlDetailsData = (queryClient: QueryClient, { bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, respondedUserId, respondedUserName, responseReceived, state, subjectSearch, taskId, taskIdPattern }: {
bodySearch?: string;
dagId?: string;
dagIdPattern?: string;
dagRunId?: string;
limit?: number;
offset?: number;
orderBy?: string[];
respondedUserId?: string[];
respondedUserName?: string[];
responseReceived?: boolean;
state?: string[];
subjectSearch?: string;
taskId?: string;
taskIdPattern?: string;
userId?: string[];
} = {}) => queryClient.ensureQueryData({ queryKey: Common.UseHumanInTheLoopServiceGetHitlDetailsKeyFn({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, taskId, taskIdPattern, userId }), queryFn: () => HumanInTheLoopService.getHitlDetails({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, taskId, taskIdPattern, userId }) });
} = {}) => queryClient.ensureQueryData({ queryKey: Common.UseHumanInTheLoopServiceGetHitlDetailsKeyFn({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, respondedUserId, respondedUserName, responseReceived, state, subjectSearch, taskId, taskIdPattern }), queryFn: () => HumanInTheLoopService.getHitlDetails({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, respondedUserId, respondedUserName, responseReceived, state, subjectSearch, taskId, taskIdPattern }) });
/**
* Get Health
* @returns HealthInfoResponse Successful Response
Expand Down
10 changes: 6 additions & 4 deletions airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1363,27 +1363,29 @@ export const prefetchUseHumanInTheLoopServiceGetHitlDetail = (queryClient: Query
* @param data.taskIdPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported.
* @param data.state
* @param data.responseReceived
* @param data.userId
* @param data.respondedUserId
* @param data.respondedUserName
* @param data.subjectSearch SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported.
* @param data.bodySearch SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported.
* @returns HITLDetailCollection Successful Response
* @throws ApiError
*/
export const prefetchUseHumanInTheLoopServiceGetHitlDetails = (queryClient: QueryClient, { bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, taskId, taskIdPattern, userId }: {
export const prefetchUseHumanInTheLoopServiceGetHitlDetails = (queryClient: QueryClient, { bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, respondedUserId, respondedUserName, responseReceived, state, subjectSearch, taskId, taskIdPattern }: {
bodySearch?: string;
dagId?: string;
dagIdPattern?: string;
dagRunId?: string;
limit?: number;
offset?: number;
orderBy?: string[];
respondedUserId?: string[];
respondedUserName?: string[];
responseReceived?: boolean;
state?: string[];
subjectSearch?: string;
taskId?: string;
taskIdPattern?: string;
userId?: string[];
} = {}) => queryClient.prefetchQuery({ queryKey: Common.UseHumanInTheLoopServiceGetHitlDetailsKeyFn({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, taskId, taskIdPattern, userId }), queryFn: () => HumanInTheLoopService.getHitlDetails({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, taskId, taskIdPattern, userId }) });
} = {}) => queryClient.prefetchQuery({ queryKey: Common.UseHumanInTheLoopServiceGetHitlDetailsKeyFn({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, respondedUserId, respondedUserName, responseReceived, state, subjectSearch, taskId, taskIdPattern }), queryFn: () => HumanInTheLoopService.getHitlDetails({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, respondedUserId, respondedUserName, responseReceived, state, subjectSearch, taskId, taskIdPattern }) });
/**
* Get Health
* @returns HealthInfoResponse Successful Response
Expand Down
10 changes: 6 additions & 4 deletions airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1363,27 +1363,29 @@ export const useHumanInTheLoopServiceGetHitlDetail = <TData = Common.HumanInTheL
* @param data.taskIdPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported.
* @param data.state
* @param data.responseReceived
* @param data.userId
* @param data.respondedUserId
* @param data.respondedUserName
* @param data.subjectSearch SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported.
* @param data.bodySearch SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported.
* @returns HITLDetailCollection Successful Response
* @throws ApiError
*/
export const useHumanInTheLoopServiceGetHitlDetails = <TData = Common.HumanInTheLoopServiceGetHitlDetailsDefaultResponse, TError = unknown, TQueryKey extends Array<unknown> = unknown[]>({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, taskId, taskIdPattern, userId }: {
export const useHumanInTheLoopServiceGetHitlDetails = <TData = Common.HumanInTheLoopServiceGetHitlDetailsDefaultResponse, TError = unknown, TQueryKey extends Array<unknown> = unknown[]>({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, respondedUserId, respondedUserName, responseReceived, state, subjectSearch, taskId, taskIdPattern }: {
bodySearch?: string;
dagId?: string;
dagIdPattern?: string;
dagRunId?: string;
limit?: number;
offset?: number;
orderBy?: string[];
respondedUserId?: string[];
respondedUserName?: string[];
responseReceived?: boolean;
state?: string[];
subjectSearch?: string;
taskId?: string;
taskIdPattern?: string;
userId?: string[];
} = {}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey: Common.UseHumanInTheLoopServiceGetHitlDetailsKeyFn({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, taskId, taskIdPattern, userId }, queryKey), queryFn: () => HumanInTheLoopService.getHitlDetails({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, taskId, taskIdPattern, userId }) as TData, ...options });
} = {}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey: Common.UseHumanInTheLoopServiceGetHitlDetailsKeyFn({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, respondedUserId, respondedUserName, responseReceived, state, subjectSearch, taskId, taskIdPattern }, queryKey), queryFn: () => HumanInTheLoopService.getHitlDetails({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, respondedUserId, respondedUserName, responseReceived, state, subjectSearch, taskId, taskIdPattern }) as TData, ...options });
/**
* Get Health
* @returns HealthInfoResponse Successful Response
Expand Down
Loading
Loading