Skip to content

feat: implement HITL shared links functionality #53189

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from

Conversation

sunank200
Copy link
Collaborator

@sunank200 sunank200 commented Jul 11, 2025

Implements HITL shared links functionality as specified in AIP-90, enabling external users to approve/reject tasks via secure links with TTL.

What's Changed

  • HITLSharedLinkManager with HMAC token generation/verification
  • API Endpoints:
    • POST /api/v2/hitl-details/share-link/{dag_id}/{dag_run_id}/{task_id} - Create shared links
    • GET /api/v2/hitl-details/share-link/{dag_id}/{dag_run_id}/{task_id} - Redirect links (UI dialog)
    • POST /api/v2/hitl-details/share-link/{dag_id}/{dag_run_id}/{task_id}/action - Action links (direct execution)
    • Mapped task variants with /{map_index} suffix
  • Configuration: [api] hitl_enable_shared_links (default: False), hitl_shared_link_expiration_hours (default: 24)
  • HMAC signatures, configurable expiration, explicit enablement required
  • Moved to standard providers package

Testing:

  1. DAG code I used for local testing
from __future__ import annotations

import datetime

from airflow.providers.standard.operators.hitl import ApprovalOperator
from airflow.sdk import DAG, task

with DAG(
    dag_id="aip_90_dag",
    start_date=datetime.datetime(2021, 1, 1),
    schedule="@daily",
):
    approve_or_reject = ApprovalOperator(
        task_id="approve_or_reject",
        subject="This is subject",
        body="This is body",
    )

    approve_if_timeout = ApprovalOperator(
        task_id="approve_if_timeout",
        subject="This is subject",
        defaults="Approve",
        execution_timeout=datetime.timedelta(seconds=1),
    )

    @task
    def run_after_approve() -> None:
        pass

    reject_if_timeout = ApprovalOperator(
        task_id="reject_if_timeout",
        subject="This is subject",
        defaults="Reject",
        execution_timeout=datetime.timedelta(seconds=1),
    )

    @task
    def skip_due_to_reject() -> None:
        pass

    @task
    def skip_due_to_upstream_skip() -> None:
        pass

    approve_if_timeout >> run_after_approve()
    reject_if_timeout >> skip_due_to_reject() >> skip_due_to_upstream_skip()
  1. Create test_hitl_api.py
#!/usr/bin/env python3
"""
Test script for HITL API functionality
"""
#!/usr/bin/env python3
"""
Test script for HITL API functionality
"""

import requests
import json
from datetime import datetime
from typing import Any

host = "http://localhost:28080"
username = "admin"
password = "admin"


def get_auth_token() -> str | None:
    """Get authentication token."""
    token_endpoint = "/auth/token"
    resp = requests.post(f"{host}{token_endpoint}", json={"username": username, "password": password})
    if resp.status_code in [200, 201]:
        token = resp.json()["access_token"]
        print(f" Authentication successful")
        return token
    else:
        print(f" Authentication failed: {resp.status_code} - {resp.text}")
        return None


def get_all_hitl_details(headers: dict[str, str]) -> dict[str, Any] | None:
    """Get all HITL details."""
    get_all_endpoint = "/api/v2/hitl-details/"
    resp = requests.get(f"{host}{get_all_endpoint}", headers=headers)
    if resp.status_code == 200:
        data = resp.json()
        print(f" Found {data['total_entries']} HITL details")
        return data
    else:
        print(f" Failed to get HITL details: {resp.status_code} - {resp.text}")
        return None


def list_dag_runs(headers: dict[str, str], dag_id: str) -> list[dict[str, Any]]:
    """List all DAG runs for a given dag_id."""
    endpoint = f"/api/v2/dags/{dag_id}/dagRuns"
    resp = requests.get(f"{host}{endpoint}", headers=headers)
    if resp.status_code == 200:
        data = resp.json()
        runs = data.get("dag_runs", [])
        print(f"\nAvailable DAG runs for {dag_id}:")
        for i, run in enumerate(runs):
            print(f"  [{i}] {run['dag_run_id']} (state: {run['state']})")
        return runs
    else:
        print(f" Failed to get DAG runs: {resp.status_code} - {resp.text}")
        return []


def get_hitl_detail(
    headers: dict[str, str], dag_id: str, dag_run_id: str, task_id: str
) -> dict[str, Any] | None:
    """Get specific HITL detail."""
    hitl_ti_endpoint = f"/api/v2/hitl-details/{dag_id}/{dag_run_id}/{task_id}"
    resp = requests.get(f"{host}{hitl_ti_endpoint}", headers=headers)
    if resp.status_code == 200:
        data = resp.json()
        print(f"  HITL detail for {task_id}:")
        print(f"   Subject: {data['subject']}")
        print(f"   Options: {data['options']}")
        print(f"   Response received: {data['response_received']}")
        return data
    else:
        print(f" Failed to get HITL detail: {resp.status_code} - {resp.text}")
        return None


def update_hitl_detail(
    headers: dict[str, str],
    dag_id: str,
    dag_run_id: str,
    task_id: str,
    chosen_options: list[str],
    params_input: dict[str, Any] | None = None,
) -> dict[str, Any] | None:
    """Update HITL detail with response."""
    hitl_ti_endpoint = f"/api/v2/hitl-details/{dag_id}/{dag_run_id}/{task_id}"
    data = {"chosen_options": chosen_options, "params_input": params_input or {}}
    resp = requests.patch(f"{host}{hitl_ti_endpoint}", headers=headers, json=data)
    if resp.status_code == 200:
        result = resp.json()
        print(f" Successfully updated HITL detail for {task_id}")
        print(f"   Chosen options: {result['chosen_options']}")
        print(f"   User ID: {result['user_id']}")
        return result
    else:
        print(f" Failed to update HITL detail: {resp.status_code} - {resp.text}")
        return None


def create_shared_link(
    headers: dict[str, str],
    dag_id: str,
    dag_run_id: str,
    task_id: str,
    link_type: str = "action",
    action: str = "Approve",
) -> dict[str, Any] | None:
    """Create a shared link for HITL task."""
    share_link_endpoint = (
        f"/api/v2/hitl-details/api/v2/hitl-details-share-link/{dag_id}/{dag_run_id}/{task_id}"
    )
    data = {"link_type": link_type, "action": action, "chosen_options": [action], "params_input": {}}
    resp = requests.post(f"{host}{share_link_endpoint}", headers=headers, json=data)
    if resp.status_code == 201:
        result = resp.json()
        print(f" Successfully created shared link for {task_id}")
        print(f"   Link URL: {result['link_url']}")
        print(f"   Expires at: {result['expires_at']}")
        print(f"   Action: {result['action']}")
        return result
    else:
        print(f" Failed to create shared link: {resp.status_code} - {resp.text}")
        return None


def test_shared_link_action(
    headers: dict[str, str], dag_id: str, dag_run_id: str, task_id: str, payload: str, signature: str
) -> dict[str, Any] | None:
    """Test shared link action."""
    action_endpoint = f"/api/v2/hitl-details-share-link/{dag_id}/{dag_run_id}/{task_id}/action"
    params = {"payload": payload, "signature": signature}
    data = {"chosen_options": ["Approve"], "params_input": {"reason": "Approved via shared link"}}
    resp = requests.post(f"{host}{action_endpoint}", headers=headers, json=data, params=params)
    if resp.status_code == 200:
        result = resp.json()
        print(f" Successfully executed shared link action for {task_id}")
        return result
    else:
        print(f" Failed to execute shared link action: {resp.status_code} - {resp.text}")
        return None


def main() -> None:
    print("Starting HITL API Test")
    print("=" * 50)

    token = get_auth_token()
    if not token:
        return

    headers = {"Authorization": f"Bearer {token}"}

    print("\nGetting all HITL details...")
    all_details = get_all_hitl_details(headers)
    if not all_details:
        return

    dag_id = "aip_90_dag"
    dag_run_id = None
    task_id = "approve_or_reject"

    for detail in all_details.get("hitl_details", []):
        if detail.get("ti_id", "").startswith(dag_id):
            parts = detail["ti_id"].split(".")
            if len(parts) >= 3:
                dag_run_id = parts[1]
                break

    if not dag_run_id:
        print(f" Could not find DAG run for {dag_id} automatically.")
        runs = list_dag_runs(headers, dag_id)
        if not runs:
            print("No DAG runs found. Please trigger the DAG and try again.")
            return
        idx = input(f"Enter the number of the DAG run to use [0-{len(runs) - 1}]: ")
        try:
            idx = int(idx)
            dag_run_id = runs[idx]["dag_run_id"]
        except Exception:
            print("Invalid selection. Exiting.")
            return

    print(f" Using DAG run ID: {dag_run_id}")

    print(f"\n📋 Getting HITL detail for {task_id}...")
    hitl_detail = get_hitl_detail(headers, dag_id, dag_run_id, task_id)
    if not hitl_detail:
        return

    print(f"\nTesting shared link creation for {task_id}...")
    shared_link = create_shared_link(headers, dag_id, dag_run_id, task_id, "action", "Approve")

    print(f"\n Testing direct API update - Approve {task_id}...")
    result = update_hitl_detail(headers, dag_id, dag_run_id, task_id, ["Approve"])

    print(f"\n Testing direct API update - Reject {task_id}...")
    result = update_hitl_detail(headers, dag_id, dag_run_id, task_id, ["Reject"])

    print(f"\n Testing timeout tasks...")
    timeout_tasks = ["approve_if_timeout", "reject_if_timeout"]
    for timeout_task in timeout_tasks:
        print(f"\n📋 Getting HITL detail for {timeout_task}...")
        timeout_detail = get_hitl_detail(headers, dag_id, dag_run_id, timeout_task)
        if timeout_detail:
            print(f"   Default option: {timeout_detail.get('defaults', 'None')}")

    print("\n HITL API Test completed!")
    print("=" * 50)


if __name__ == "__main__":
    main()
  1. Set following config in airflow.cfg:
[api]
hitl_enable_shared_links = true
hitl_shared_link_secret_key = your-secret-key-here
hitl_shared_link_expiration_hours = 24
  1. Run Airflow:
breeze start-airflow
  • Open Airflow UI at http://localhost:28080
  • Navigate to DAGs → aip_90_dag
  • Click "Trigger DAG" to create a new DAG run
  • Wait for tasks to reach "deferred" state (waiting for human input)
  1. Run the test script:
python test_hitl_api.py

The script will:

  • Authenticate with Airflow API
  • List available DAG runs
  • Prompt you to select the correct dag_run_id
  • Test shared link creation
  • Test direct API updates (approve/reject)
  1. Expected Output:
Screenshot 2025-07-11 at 8 25 51 PM
Starting HITL API Test
==================================================
Authentication successful

Getting all HITL details...
Found 12 HITL details
Could not find DAG run for aip_90_dag automatically.

Available DAG runs for aip_90_dag:
  [0] manual__2025-07-11T09:16:30.052730+00:00 (state: success)
  [1] scheduled__2025-07-11T00:00:00+00:00 (state: running)
  [2] manual__2025-07-11T09:33:33.679564+00:00 (state: success)
  [3] manual__2025-07-11T14:40:39.281924+00:00 (state: running)
Enter the number of the DAG run to use [0-3]: 3
Using DAG run ID: manual__2025-07-11T14:40:39.281924+00:00

Getting HITL detail for approve_or_reject...
HITL detail for approve_or_reject:
   Subject: This is subject
   Options: ['Approve', 'Reject']
   Response received: False

Testing shared link creation for approve_or_reject...
Successfully created shared link for approve_or_reject
   Link URL: http://localhost:8080/api/v2/hitl-details-share-link/aip_90_dag/manual__2025-07-11T14:40:39.281924+00:00/approve_or_reject?payload=eyJhY3Rpb24iOiAiQXBwcm92ZSIsICJkYWdfaWQiOiAiYWlwXzkwX2RhZyIsICJkYWdfcnVuX2lkIjogIm1hbnVhbF9fMjAyNS0wNy0xMVQxNDo0MDozOS4yODE5MjQrMDA6MDAiLCAiZXhwaXJlc19hdCI6ICIyMDI1LTA3LTEyVDE0OjQxOjI3LjMwNDExNCswMDowMCIsICJsaW5rX3R5cGUiOiAiYWN0aW9uIiwgIm1hcF9pbmRleCI6IG51bGwsICJ0YXNrX2lkIjogImFwcHJvdmVfb3JfcmVqZWN0In0%3D&signature=cSLSfsplex4Fed8-nQdOkS21WiEZSmCqMcqNp7x5log%3D
   Expires at: 2025-07-12T14:41:27.304114Z
   Action: Approve

Testing direct API update - Approve approve_or_reject...
Successfully updated HITL detail for approve_or_reject
   Chosen options: ['Approve']

closes:#52202


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

Copy link
Member

@jason810496 jason810496 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Thanks for the PR!

raise HTTPException(
status.HTTP_400_BAD_REQUEST,
str(e),
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about having common functions in service module for

  • create_hitl_share_link and create_mapped_hitl_share_link
  • get_hitl_share_link and get_mapped_ti_hitl_share_link
  • execute_hitl_share_link_action and execute_mapped_ti_hitl_share_link_action

the logic for simple one and mapped one seem the same.
Only the parameters of the routes are different.

@dstandish
Copy link
Contributor

@sunank200 i noticed in the API design you don't have try number in there. Shouldn't it require an approval every try?

Moreover, why aren't we just using the uuid so we can easily uniquely identify the TI-try.

@sunank200 sunank200 force-pushed the hitl-shared-links branch from 5c1c3e8 to de74e16 Compare July 15, 2025 13:10
@sunank200 sunank200 force-pushed the hitl-shared-links branch from 5f844ed to 01a1285 Compare July 15, 2025 18:01
@sunank200 sunank200 force-pushed the hitl-shared-links branch from 01a1285 to cd43358 Compare July 15, 2025 18:01
@sunank200
Copy link
Collaborator Author

@sunank200 i noticed in the API design you don't have try number in there. Shouldn't it require an approval every try?

Moreover, why aren't we just using the uuid so we can easily uniquely identify the TI-try.

That is right. I have changed the logic now.

@Lee-W
Copy link
Member

Lee-W commented Jul 20, 2025

I think we'll need to rebase this from main before the next review round. Thanks!

@sunank200 sunank200 force-pushed the hitl-shared-links branch 2 times, most recently from 9a6e3ef to 99c62cd Compare July 22, 2025 16:26
Fix the CI

feat: implement HITL shared links functionality

feat: implement HITL shared links functionality

# Conflicts:
#	airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
#	airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py
#	airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
#	airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
#	airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py
#	airflow-ctl/src/airflowctl/api/datamodels/generated.py
@sunank200 sunank200 force-pushed the hitl-shared-links branch from 99c62cd to 8a66849 Compare July 22, 2025 17:26
@sunank200 sunank200 marked this pull request as draft July 22, 2025 20:07
@sunank200
Copy link
Collaborator Author

I think we'll need to rebase this from main before the next review round. Thanks!

@Lee-W I will create a new PR as the issue was updated and a lot of things have changed in the main. It will be easier that way with fresh PR.

@sunank200 sunank200 closed this Jul 22, 2025
@github-project-automation github-project-automation bot moved this from In progress to Done in AIP-90 - Human in the loop Jul 22, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:airflow-ctl area:API Airflow's REST/HTTP API area:ConfigTemplates area:UI Related to UI/UX. For Frontend Developers.
Projects
Status: Done
Development

Successfully merging this pull request may close these issues.

5 participants