Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions providers/http/docs/triggers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ enabling event-driven DAGs based on API responses.
How It Works
------------

1. Sends requests to an API.
1. Sends requests to an API every ``poll_interval`` seconds (default 60).
2. Uses the callable at ``response_check_path`` to evaluate the API response.
3. If the callable returns ``True``, a ``TriggerEvent`` is emitted. This will trigger DAGs using this ``AssetWatcher`` for scheduling.

Expand Down Expand Up @@ -85,6 +85,7 @@ Here's an example of using the HttpEventTrigger in an AssetWatcher to monitor th
http_conn_id="http_default", # HTTP connection with https://api.github.com/ as the Host
headers=headers,
response_check_path="dags.check_airflow_releases.check_github_api_response", # Path to the check_github_api_response callable
poll_interval=600, # Poll API every 600 seconds
)

asset = Asset(
Expand Down Expand Up @@ -133,11 +134,15 @@ Parameters
``response_check_path``
Path to callable that evaluates whether the API response passes the conditions set by the user to trigger DAGs

``poll_interval``
How often, in seconds, the trigger should send a request to the API.


Important Notes
---------------

1. A ``response_check_path`` value is required.
2. The ``response_check_path`` must contain the path to an asynchronous callable. Synchronous callables will raise an exception.
3. This trigger does not automatically record the previous API response.
4. The previous response may have to be persisted manually though ``Variable.set()`` in the ``response_check_path`` callable to prevent the trigger from emitting events repeatedly for the same API response.
3. The ``poll_interval`` defaults to 60 seconds. This may be changed to avoid hitting API rate limits.
4. This trigger does not automatically record the previous API response.
5. The previous response may have to be persisted manually though ``Variable.set()`` in the ``response_check_path`` callable to prevent the trigger from emitting events repeatedly for the same API response.
7 changes: 6 additions & 1 deletion providers/http/src/airflow/providers/http/triggers/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ class HttpEventTrigger(HttpTrigger, BaseEventTrigger):
:param headers: Additional headers to be passed through as a dict.
:param data: Payload to be uploaded or request parameters.
:param extra_options: Additional kwargs to pass when creating a request.
:parama poll_interval: How often, in seconds, the trigger should send a request to the API.
"""

def __init__(
Expand All @@ -265,9 +266,11 @@ def __init__(
headers: dict[str, str] | None = None,
data: dict[str, Any] | str | None = None,
extra_options: dict[str, Any] | None = None,
poll_interval: float = 60.0,
):
super().__init__(http_conn_id, auth_type, method, endpoint, headers, data, extra_options)
self.response_check_path = response_check_path
self.poll_interval = poll_interval

def serialize(self) -> tuple[str, dict[str, Any]]:
"""Serialize HttpEventTrigger arguments and classpath."""
Expand All @@ -276,12 +279,13 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
{
"http_conn_id": self.http_conn_id,
"method": self.method,
"auth_type": self.auth_type,
"auth_type": serialize_auth_type(self.auth_type),
"endpoint": self.endpoint,
"headers": self.headers,
"data": self.data,
"extra_options": self.extra_options,
"response_check_path": self.response_check_path,
"poll_interval": self.poll_interval,
},
)

Expand All @@ -293,6 +297,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
response = await super()._get_response(hook)
if await self._run_response_check(response):
break
await asyncio.sleep(self.poll_interval)
yield TriggerEvent(
{
"status": "success",
Expand Down
3 changes: 3 additions & 0 deletions providers/http/tests/unit/http/triggers/test_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
TEST_DATA = {"key": "value"}
TEST_EXTRA_OPTIONS: dict[str, Any] = {}
TEST_RESPONSE_CHECK_PATH = "mock.path"
TEST_POLL_INTERVAL = 5


@pytest.fixture
Expand Down Expand Up @@ -81,6 +82,7 @@ def event_trigger():
data=TEST_DATA,
extra_options=TEST_EXTRA_OPTIONS,
response_check_path=TEST_RESPONSE_CHECK_PATH,
poll_interval=TEST_POLL_INTERVAL,
)


Expand Down Expand Up @@ -232,6 +234,7 @@ def test_serialization(self, event_trigger):
"data": TEST_DATA,
"extra_options": TEST_EXTRA_OPTIONS,
"response_check_path": TEST_RESPONSE_CHECK_PATH,
"poll_interval": TEST_POLL_INTERVAL,
}

@pytest.mark.asyncio
Expand Down