Skip to content
Merged
21 changes: 12 additions & 9 deletions dev/breeze/tests/test_selective_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str):
"providers/http/tests/file.py",
),
{
"selected-providers-list-as-string": "amazon apache.livy atlassian.jira dbt.cloud dingding discord google http pagerduty",
"selected-providers-list-as-string": "amazon apache.livy atlassian.jira common.compat dbt.cloud dingding discord google http pagerduty",
"all-python-versions": f"['{DEFAULT_PYTHON_MAJOR_MINOR_VERSION}']",
"all-python-versions-list-as-string": DEFAULT_PYTHON_MAJOR_MINOR_VERSION,
"python-versions": f"['{DEFAULT_PYTHON_MAJOR_MINOR_VERSION}']",
Expand All @@ -691,7 +691,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str):
[
{
"description": "amazon...google",
"test_types": "Providers[amazon] Providers[apache.livy,atlassian.jira,dbt.cloud,dingding,discord,http,pagerduty] Providers[google]",
"test_types": "Providers[amazon] Providers[apache.livy,atlassian.jira,common.compat,dbt.cloud,dingding,discord,http,pagerduty] Providers[google]",
}
]
),
Expand All @@ -702,18 +702,21 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str):
"test_types": "Providers[amazon] Providers[apache.livy]",
},
{
"description": "atlassian.jir...dbt.cloud",
"test_types": "Providers[atlassian.jira] Providers[dbt.cloud]",
"description": "atlassian.jir...common.compat",
"test_types": "Providers[atlassian.jira] Providers[common.compat]",
},
{
"description": "dingding...discord",
"test_types": "Providers[dingding] Providers[discord]",
"description": "dbt.cloud...dingding",
"test_types": "Providers[dbt.cloud] Providers[dingding]",
},
{
"description": "google...http",
"test_types": "Providers[google] Providers[http]",
"description": "discord...google",
"test_types": "Providers[discord] Providers[google]",
},
{
"description": "http...pagerduty",
"test_types": "Providers[http] Providers[pagerduty]",
},
{"description": "pagerduty", "test_types": "Providers[pagerduty]"},
]
),
"run-mypy": "true",
Expand Down
3 changes: 3 additions & 0 deletions providers/http/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ hooks:
python-modules:
- airflow.providers.http.hooks.http

notifications:
- airflow.providers.http.notifications.HttpNotifier

triggers:
- integration-name: Hypertext Transfer Protocol (HTTP)
python-modules:
Expand Down
8 changes: 8 additions & 0 deletions providers/http/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,19 @@ dependencies = [
"asgiref>=2.3.0",
]

# The optional dependencies should be modified in place in the generated file
# Any change in the dependencies is preserved when the file is regenerated
[project.optional-dependencies]
"common.compat" = [
"apache-airflow-providers-common-compat"
]

[dependency-groups]
dev = [
"apache-airflow",
"apache-airflow-task-sdk",
"apache-airflow-devel-common",
"apache-airflow-providers-common-compat",
# Additional devel dependencies (do not remove this line and add extra development dependencies)
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def get_provider_info():
"python-modules": ["airflow.providers.http.hooks.http"],
}
],
"notifications": ["airflow.providers.http.notifications.HttpNotifier"],
"triggers": [
{
"integration-name": "Hypertext Transfer Protocol (HTTP)",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from airflow.providers.http.notifications.http import HttpNotifier

__all__ = ["HttpNotifier"]
105 changes: 105 additions & 0 deletions providers/http/src/airflow/providers/http/notifications/http.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from functools import cached_property
from typing import TYPE_CHECKING, Any

import aiohttp

from airflow.providers.common.compat.notifier import BaseNotifier
from airflow.providers.http.hooks.http import HttpAsyncHook, HttpHook

if TYPE_CHECKING:
from airflow.sdk.definitions.context import Context


class HttpNotifier(BaseNotifier):
"""
HTTP Notifier.

Sends HTTP requests to notify external systems.

:param http_conn_id: HTTP connection id that has the base URL and optional authentication credentials.
:param endpoint: The endpoint to be called i.e. resource/v1/query?
:param method: The HTTP method to use. Defaults to POST.
:param data: Payload to be uploaded or request parameters
:param json: JSON payload to be uploaded
:param headers: Additional headers to be passed through as a dictionary
:param extra_options: Additional options to be used when executing the request
"""

template_fields = ("http_conn_id", "endpoint", "data", "json", "headers", "extra_options")

def __init__(
self,
*,
http_conn_id: str = HttpHook.default_conn_name,
endpoint: str | None = None,
method: str = "POST",
data: dict[str, Any] | str | None = None,
json: dict[str, Any] | str | None = None,
headers: dict[str, Any] | None = None,
extra_options: dict[str, Any] | None = None,
**kwargs,
):
super().__init__(**kwargs)
self.http_conn_id = http_conn_id
self.endpoint = endpoint
self.method = method
self.data = data
self.json = json
self.headers = headers
self.extra_options = extra_options or {}

@cached_property
def hook(self) -> HttpHook:
"""HTTP Hook."""
return HttpHook(method=self.method, http_conn_id=self.http_conn_id)

@cached_property
def async_hook(self) -> HttpAsyncHook:
"""HTTP Async Hook."""
return HttpAsyncHook(method=self.method, http_conn_id=self.http_conn_id)

def notify(self, context: Context) -> None:
"""Send HTTP notification (sync)."""
resp = self.hook.run(
endpoint=self.endpoint,
data=self.data,
headers=self.headers,
extra_options=self.extra_options,
json=self.json,
)
self.log.debug("HTTP notification sent: %s %s", resp.status_code, resp.url)

async def async_notify(self, context: Context) -> None:
"""Send HTTP notification (async)."""
async with aiohttp.ClientSession() as session:
resp = await self.async_hook.run(
session=session,
endpoint=self.endpoint,
data=self.data,
json=self.json,
headers=self.headers,
extra_options=self.extra_options,
)
self.log.debug("HTTP notification sent (async): %s %s", resp.status, resp.url)


send_http_notification = HttpNotifier
16 changes: 16 additions & 0 deletions providers/http/tests/unit/http/notifications/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
95 changes: 95 additions & 0 deletions providers/http/tests/unit/http/notifications/test_http.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from unittest import mock

import pytest

from airflow.providers.http.notifications.http import HttpNotifier, send_http_notification


class TestHttpNotifier:
def test_class_and_notifier_are_same(self):
assert send_http_notification is HttpNotifier

@mock.patch("airflow.providers.http.notifications.http.HttpHook")
def test_http_notifier(self, mock_http_hook):
notifier = HttpNotifier(
http_conn_id="test_conn_id",
endpoint="/testing",
method="POST",
json={"message": "testing"},
headers={"Content-Type": "application/json"},
)
notifier.notify({})

mock_http_hook.return_value.run.assert_called_once_with(
endpoint="/testing",
data=None,
headers={"Content-Type": "application/json"},
extra_options={},
json={"message": "testing"},
)
mock_http_hook.assert_called_once_with(method="POST", http_conn_id="test_conn_id")

@pytest.mark.asyncio
@mock.patch("airflow.providers.http.notifications.http.HttpAsyncHook")
@mock.patch("aiohttp.ClientSession")
async def test_async_http_notifier(self, mock_session, mock_http_async_hook):
mock_hook = mock_http_async_hook.return_value
mock_hook.run = mock.AsyncMock()

notifier = HttpNotifier(
http_conn_id="test_conn_id",
endpoint="/test",
method="POST",
json={"message": "test"},
)

await notifier.async_notify({})

mock_hook.run.assert_called_once_with(
session=mock_session.return_value.__aenter__.return_value,
endpoint="/test",
data=None,
json={"message": "test"},
headers=None,
extra_options={},
)

@mock.patch("airflow.providers.http.notifications.http.HttpHook")
def test_http_notifier_templated(self, mock_http_hook, create_dag_without_db):
notifier = HttpNotifier(
endpoint="/{{ dag.dag_id }}",
json={"dag_id": "{{ dag.dag_id }}", "user": "{{ username }}"},
)
notifier(
{
"dag": create_dag_without_db("test_http_notification_templated"),
"username": "test-user",
}
)

mock_http_hook.return_value.run.assert_called_once_with(
endpoint="/test_http_notification_templated",
data=None,
headers=None,
extra_options={},
json={"dag_id": "test_http_notification_templated", "user": "test-user"},
)
Loading