Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,7 @@ class TaskInfo(InfoJsonEncodable):
"postoperator", # SQLInsertRowsOperator
"table_name_with_schema", # SQLInsertRowsOperator
"column_names", # SQLInsertRowsOperator
"hitl_summary", # All HITLOperator based operators
]
casts = {
"operator_class": lambda task: task.task_type,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2474,6 +2474,7 @@ def __init__(self, *args, **kwargs):
self.tol = "tol" # SQLValueCheckOperator
self.trigger_dag_id = "trigger_dag_id" # TriggerDagRunOperator
self.trigger_run_id = "trigger_run_id" # TriggerDagRunOperator
self.hitl_summary = "hitl_summary" # HITLOperator
super().__init__(*args, **kwargs)

with DAG(
Expand Down Expand Up @@ -2512,6 +2513,7 @@ def __init__(self, *args, **kwargs):
"downstream_task_ids": "['task_1']",
"execution_timeout": None,
"executor_config": {},
"hitl_summary": "hitl_summary",
"ignore_first_depends_on_past": False,
"inlets": "[{'uri': 'uri1', 'extra': {'a': 1}}]",
"mapped": False,
Expand Down Expand Up @@ -2599,6 +2601,7 @@ def __init__(self, *args, **kwargs):
self.tol = "tol" # SQLValueCheckOperator
self.trigger_dag_id = "trigger_dag_id" # TriggerDagRunOperator
self.trigger_run_id = "trigger_run_id" # TriggerDagRunOperator
self.hitl_summary = "hitl_summary" # HITLOperator
super().__init__(*args, **kwargs)

with DAG(
Expand Down Expand Up @@ -2637,6 +2640,7 @@ def __init__(self, *args, **kwargs):
"downstream_task_ids": "['task_1']",
"execution_timeout": None,
"executor_config": {},
"hitl_summary": "hitl_summary",
"ignore_first_depends_on_past": True,
"is_setup": False,
"is_teardown": False,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,17 @@ def __init__(
self.validate_params()
self.validate_defaults()

# HITL summary for the use of listeners; subclasses can extend it.
self.hitl_summary: dict[str, Any] = {
"subject": self.subject,
"body": self.body,
"options": self.options,
"defaults": self.defaults,
"multiple": self.multiple,
"assigned_users": self.assigned_users,
"serialized_params": self.serialized_params or None,
}

def validate_options(self) -> None:
"""
Validate the `options` attribute of the instance.
Expand Down Expand Up @@ -156,6 +167,9 @@ def execute(self, context: Context):
else:
timeout_datetime = None

# Enrich summary with runtime info
self.hitl_summary["timeout_datetime"] = timeout_datetime.isoformat() if timeout_datetime else None

self.log.info("Waiting for response")
for notifier in self.notifiers:
notifier(context)
Expand All @@ -181,12 +195,23 @@ def serialized_params(self) -> dict[str, dict[str, Any]]:

def execute_complete(self, context: Context, event: dict[str, Any]) -> Any:
if "error" in event:
self.hitl_summary["error_type"] = event["error_type"]
self.process_trigger_event_error(event)

chosen_options = event["chosen_options"]
params_input = event["params_input"] or {}
self.validate_chosen_options(chosen_options)
self.validate_params_input(params_input)

self.hitl_summary.update(
{
"chosen_options": chosen_options,
"params_input": params_input,
"responded_at": event["responded_at"].isoformat(),
"responded_by_user": event["responded_by_user"],
}
)

return HITLTriggerEventSuccessPayload(
chosen_options=chosen_options,
params_input=params_input,
Expand Down Expand Up @@ -356,10 +381,14 @@ def __init__(
**kwargs,
)

self.hitl_summary["ignore_downstream_trigger_rules"] = self.ignore_downstream_trigger_rules
self.hitl_summary["fail_on_reject"] = self.fail_on_reject

def execute_complete(self, context: Context, event: dict[str, Any]) -> Any:
ret = super().execute_complete(context=context, event=event)

chosen_option = ret["chosen_options"][0]
self.hitl_summary["approved"] = chosen_option == self.APPROVE
if chosen_option == self.APPROVE:
self.log.info("Approved. Proceeding with downstream tasks...")
return ret
Expand Down Expand Up @@ -413,6 +442,7 @@ def __init__(self, *, options_mapping: dict[str, str] | None = None, **kwargs) -
super().__init__(**kwargs)
self.options_mapping = options_mapping or {}
self.validate_options_mapping()
self.hitl_summary["options_mapping"] = self.options_mapping

def validate_options_mapping(self) -> None:
"""
Expand Down Expand Up @@ -447,6 +477,7 @@ def execute_complete(self, context: Context, event: dict[str, Any]) -> Any:

# Map options to task IDs using the mapping, fallback to original option
chosen_options = [self.options_mapping.get(option, option) for option in chosen_options]
self.hitl_summary["branches_to_execute"] = chosen_options
return self.do_branch(context=context, branches_to_execute=chosen_options)


Expand Down
Loading
Loading