Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
45d4acb
Merge branch 'main' into feature/http-extra-options-check-response
davidblain-infrabel Feb 4, 2025
7db5cd2
refactor: Ignore check_response from extra options so it doesn't get …
davidblain-infrabel Feb 4, 2025
a19ba56
refactor: Re-added changes to support the check_response parameter in…
davidblain-infrabel Feb 4, 2025
7479cb1
Merge branch 'main' into feature/http-extra-options-check-response
dabla Feb 4, 2025
46cd8bf
refactor: Updated http provider to version 5.1.0 as requested by Jare…
davidblain-infrabel Feb 4, 2025
3e0076d
refactor: Livy provider should depend on version 5.1.0 of http provider
davidblain-infrabel Feb 4, 2025
084ae66
Merge branch 'main' into feature/http-extra-options-check-response
dabla Feb 5, 2025
8f65f2e
refactor: Updated provider dependencies
dabla Feb 5, 2025
554787e
Merge branch 'main' into feature/http-extra-options-check-response
dabla Feb 5, 2025
1e5950b
refactor: Fixed provider info in livy and http due to bump of http pr…
davidblain-infrabel Feb 5, 2025
104a15a
refactor: Try fixing patching for TestHttpSensor
davidblain-infrabel Feb 5, 2025
a625772
refactor: Updated README for livy
davidblain-infrabel Feb 5, 2025
eae2b3f
Merge branch 'main' into feature/http-extra-options-check-response
dabla Feb 5, 2025
51829e9
Merge branch 'main' into feature/http-extra-options-check-response
dabla Feb 5, 2025
f713e10
Merge branch 'main' into feature/http-extra-options-check-response
dabla Feb 5, 2025
5ef698c
Merge branch 'main' into feature/http-extra-options-check-response
dabla Feb 6, 2025
afb1231
Merge branch 'main' into feature/http-extra-options-check-response
dabla Feb 10, 2025
6d0c763
refactor: Fixed patching of FakeSession in TestHttpOpSensor
Feb 10, 2025
b219500
Merge branch 'main' into feature/http-extra-options-check-response
dabla Feb 10, 2025
c15d33c
Merge branch 'main' into feature/http-extra-options-check-response
dabla Feb 11, 2025
fe75b80
Merge branch 'main' into feature/http-extra-options-check-response
dabla Feb 11, 2025
8aace16
Merge branch 'main' into feature/http-extra-options-check-response
dabla Feb 11, 2025
56125fb
Merge branch 'main' into feature/http-extra-options-check-response
dabla Feb 12, 2025
eb3c900
Merge branch 'main' into feature/http-extra-options-check-response
dabla Feb 12, 2025
09463e8
Merge branch 'main' into feature/http-extra-options-check-response
dabla Feb 13, 2025
ec34e6c
Merge branch 'main' into feature/http-extra-options-check-response
dabla Feb 13, 2025
79848ef
Merge branch 'main' into feature/http-extra-options-check-response
dabla Feb 13, 2025
4dd1461
Merge branch 'main' into feature/http-extra-options-check-response
dabla Feb 14, 2025
13ccfd1
Merge branch 'main' into feature/http-extra-options-check-response
dabla Feb 14, 2025
69ead4e
Merge branch 'main' into feature/http-extra-options-check-response
davidblain-infrabel Feb 18, 2025
e736520
Merge remote-tracking branch 'origin/feature/http-extra-options-check…
davidblain-infrabel Feb 18, 2025
8dacc0e
Merge branch 'main' into feature/http-extra-options-check-response
dabla Feb 18, 2025
77c53b6
Merge branch 'main' into feature/http-extra-options-check-response
dabla Feb 18, 2025
70410ee
Merge branch 'main' into feature/http-extra-options-check-response
dabla Feb 18, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion generated/provider_dependencies.json
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@
"apache.livy": {
"deps": [
"aiohttp>=3.9.2",
"apache-airflow-providers-http",
"apache-airflow-providers-http>=5.1.0",
"apache-airflow>=2.9.0",
"asgiref>=2.3.0"
],
Expand Down
2 changes: 1 addition & 1 deletion providers/apache/livy/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ Requirements
PIP package Version required
================================= ==================
``apache-airflow`` ``>=2.9.0``
``apache-airflow-providers-http``
``apache-airflow-providers-http`` ``>=5.1.0``
``aiohttp`` ``>=3.9.2``
``asgiref`` ``>=2.3.0``
================================= ==================
Expand Down
2 changes: 1 addition & 1 deletion providers/apache/livy/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ requires-python = "~=3.9"
# Any change in the dependencies is preserved when the file is regenerated
dependencies = [
"apache-airflow>=2.9.0",
"apache-airflow-providers-http",
"apache-airflow-providers-http>=5.1.0",
"aiohttp>=3.9.2",
"asgiref>=2.3.0",
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def get_provider_info():
],
"dependencies": [
"apache-airflow>=2.9.0",
"apache-airflow-providers-http",
"apache-airflow-providers-http>=5.1.0",
"aiohttp>=3.9.2",
"asgiref>=2.3.0",
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,11 @@ class LivyHook(HttpHook):
BatchState.ERROR,
}

_def_headers = {"Content-Type": "application/json", "Accept": "application/json"}

conn_name_attr = "livy_conn_id"
default_conn_name = "livy_default"
conn_type = "livy"
hook_name = "Apache Livy"
default_headers = {"Content-Type": "application/json", "Accept": "application/json"}

def __init__(
self,
Expand All @@ -102,18 +101,6 @@ def __init__(
if auth_type:
self.auth_type = auth_type

def get_conn(self, headers: dict[str, Any] | None = None) -> Any:
"""
Return http session for use with requests.

:param headers: additional headers to be passed through as a dictionary
:return: requests session
"""
tmp_headers = self._def_headers.copy() # setting default headers
if headers:
tmp_headers.update(headers)
return super().get_conn(tmp_headers)

def run_method(
self,
endpoint: str,
Expand Down
6 changes: 3 additions & 3 deletions providers/http/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

Package ``apache-airflow-providers-http``

Release: ``5.0.0``
Release: ``5.1.0``


`Hypertext Transfer Protocol (HTTP) <https://www.w3.org/Protocols/>`__
Expand All @@ -37,7 +37,7 @@ This is a provider package for ``http`` provider. All classes for this provider
are in ``airflow.providers.http`` python package.

You can find package information and changelog for the provider
in the `documentation <https://airflow.apache.org/docs/apache-airflow-providers-http/5.0.0/>`_.
in the `documentation <https://airflow.apache.org/docs/apache-airflow-providers-http/5.1.0/>`_.

Installation
------------
Expand All @@ -62,4 +62,4 @@ PIP package Version required
===================== ====================

The changelog for the provider package can be found in the
`changelog <https://airflow.apache.org/docs/apache-airflow-providers-http/5.0.0/changelog.html>`_.
`changelog <https://airflow.apache.org/docs/apache-airflow-providers-http/5.1.0/changelog.html>`_.
2 changes: 1 addition & 1 deletion providers/http/docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ apache-airflow-providers-http package
`Hypertext Transfer Protocol (HTTP) <https://www.w3.org/Protocols/>`__


Release: 5.0.0
Release: 5.1.0

Provider package
----------------
Expand Down
1 change: 1 addition & 0 deletions providers/http/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ state: ready
source-date-epoch: 1734534857
# note that those versions are maintained by release manager - do not update them manually
versions:
- 5.1.0
- 5.0.0
- 4.13.3
- 4.13.2
Expand Down
6 changes: 3 additions & 3 deletions providers/http/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ build-backend = "flit_core.buildapi"

[project]
name = "apache-airflow-providers-http"
version = "5.0.0"
version = "5.1.0"
description = "Provider package apache-airflow-providers-http for Apache Airflow"
readme = "README.rst"
authors = [
Expand Down Expand Up @@ -66,8 +66,8 @@ dependencies = [
]

[project.urls]
"Documentation" = "https://airflow.apache.org/docs/apache-airflow-providers-http/5.0.0"
"Changelog" = "https://airflow.apache.org/docs/apache-airflow-providers-http/5.0.0/changelog.html"
"Documentation" = "https://airflow.apache.org/docs/apache-airflow-providers-http/5.1.0"
"Changelog" = "https://airflow.apache.org/docs/apache-airflow-providers-http/5.1.0/changelog.html"
"Bug Tracker" = "https://github.com/apache/airflow/issues"
"Source Code" = "https://github.com/apache/airflow"
"Slack Chat" = "https://s.apache.org/airflow-slack"
Expand Down
2 changes: 1 addition & 1 deletion providers/http/src/airflow/providers/http/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

__all__ = ["__version__"]

__version__ = "5.0.0"
__version__ = "5.1.0"

if packaging.version.parse(packaging.version.parse(airflow_version).base_version) < packaging.version.parse(
"2.9.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def get_provider_info():
"state": "ready",
"source-date-epoch": 1734534857,
"versions": [
"5.1.0",
"5.0.0",
"4.13.3",
"4.13.2",
Expand Down
127 changes: 70 additions & 57 deletions providers/http/src/airflow/providers/http/hooks/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@
from urllib.parse import urlparse

import aiohttp
import requests
import tenacity
from aiohttp import ClientResponseError
from asgiref.sync import sync_to_async
from requests import PreparedRequest, Request, Response, Session
from requests.auth import HTTPBasicAuth
from requests.exceptions import ConnectionError, HTTPError
from requests.models import DEFAULT_REDIRECT_LIMIT
from requests_toolbelt.adapters.socket_options import TCPKeepAliveAdapter

Expand All @@ -47,6 +48,39 @@ def _url_from_endpoint(base_url: str | None, endpoint: str | None) -> str:
return (base_url or "") + (endpoint or "")


def _process_extra_options_from_connection(conn: Connection, extra_options: dict[str, Any]) -> dict:
extra = conn.extra_dejson
stream = extra.pop("stream", None)
cert = extra.pop("cert", None)
proxies = extra.pop("proxies", extra.pop("proxy", None))
timeout = extra.pop("timeout", None)
verify_ssl = extra.pop("verify", extra.pop("verify_ssl", None))
allow_redirects = extra.pop("allow_redirects", None)
max_redirects = extra.pop("max_redirects", None)
trust_env = extra.pop("trust_env", None)
check_response = extra.pop("check_response", None)

if stream is not None and "stream" not in extra_options:
extra_options["stream"] = stream
if cert is not None and "cert" not in extra_options:
extra_options["cert"] = cert
if proxies is not None and "proxy" not in extra_options:
extra_options["proxy"] = proxies
if timeout is not None and "timeout" not in extra_options:
extra_options["timeout"] = timeout
if verify_ssl is not None and "verify_ssl" not in extra_options:
extra_options["verify_ssl"] = verify_ssl
if allow_redirects is not None and "allow_redirects" not in extra_options:
extra_options["allow_redirects"] = allow_redirects
if max_redirects is not None and "max_redirects" not in extra_options:
extra_options["max_redirects"] = max_redirects
if trust_env is not None and "trust_env" not in extra_options:
extra_options["trust_env"] = trust_env
if check_response is not None and "check_response" not in extra_options:
extra_options["check_response"] = check_response
return extra


class HttpHook(BaseHook):
"""
Interact with HTTP servers.
Expand All @@ -69,6 +103,8 @@ class HttpHook(BaseHook):
default_conn_name = "http_default"
conn_type = "http"
hook_name = "HTTP"
default_host = ""
default_headers: dict[str, str] = {}

def __init__(
self,
Expand Down Expand Up @@ -109,26 +145,31 @@ def auth_type(self, v):

# headers may be passed through directly or in the "extra" field in the connection
# definition
def get_conn(self, headers: dict[Any, Any] | None = None) -> requests.Session:
def get_conn(
self, headers: dict[Any, Any] | None = None, extra_options: dict[str, Any] | None = None
) -> Session:
"""
Create a Requests HTTP session.

:param headers: Additional headers to be passed through as a dictionary.
:param extra_options: additional options to be used when executing the request
:return: A configured requests.Session object.
"""
session = requests.Session()
session = Session()
connection = self.get_connection(self.http_conn_id)
self._set_base_url(connection)
session = self._configure_session_from_auth(session, connection)
if connection.extra:
session = self._configure_session_from_extra(session, connection)
session = self._configure_session_from_extra(session, connection, extra_options)
session = self._configure_session_from_mount_adapters(session)
if self.default_headers:
session.headers.update(self.default_headers)
if headers:
session.headers.update(headers)
return session

def _set_base_url(self, connection: Connection) -> None:
host = connection.host or ""
host = connection.host or self.default_host
schema = connection.schema or "http"
# RFC 3986 (https://www.rfc-editor.org/rfc/rfc3986.html#page-16)
if "://" in host:
Expand All @@ -141,9 +182,7 @@ def _set_base_url(self, connection: Connection) -> None:
if not parsed.scheme:
raise ValueError(f"Invalid base URL: Missing scheme in {self.base_url}")

def _configure_session_from_auth(
self, session: requests.Session, connection: Connection
) -> requests.Session:
def _configure_session_from_auth(self, session: Session, connection: Connection) -> Session:
session.auth = self._extract_auth(connection)
return session

Expand All @@ -155,24 +194,24 @@ def _extract_auth(self, connection: Connection) -> Any | None:
return None

def _configure_session_from_extra(
self, session: requests.Session, connection: Connection
) -> requests.Session:
extra = connection.extra_dejson
extra.pop("timeout", None)
extra.pop("allow_redirects", None)
session.proxies = extra.pop("proxies", extra.pop("proxy", {}))
session.stream = extra.pop("stream", False)
session.verify = extra.pop("verify", extra.pop("verify_ssl", True))
session.cert = extra.pop("cert", None)
session.max_redirects = extra.pop("max_redirects", DEFAULT_REDIRECT_LIMIT)
session.trust_env = extra.pop("trust_env", True)
self, session: Session, connection: Connection, extra_options: dict[str, Any] | None = None
) -> Session:
if extra_options is None:
extra_options = {}
headers = _process_extra_options_from_connection(connection, extra_options)
session.proxies = extra_options.pop("proxies", extra_options.pop("proxy", {}))
session.stream = extra_options.pop("stream", False)
session.verify = extra_options.pop("verify", extra_options.pop("verify_ssl", True))
session.cert = extra_options.pop("cert", None)
session.max_redirects = extra_options.pop("max_redirects", DEFAULT_REDIRECT_LIMIT)
session.trust_env = extra_options.pop("trust_env", True)
try:
session.headers.update(extra)
session.headers.update(headers)
except TypeError:
self.log.warning("Connection to %s has invalid extra field.", connection.host)
return session

def _configure_session_from_mount_adapters(self, session: requests.Session) -> requests.Session:
def _configure_session_from_mount_adapters(self, session: Session) -> Session:
scheme = urlparse(self.base_url).scheme
if not scheme:
raise ValueError(
Expand Down Expand Up @@ -207,25 +246,25 @@ def run(
"""
extra_options = extra_options or {}

session = self.get_conn(headers)
session = self.get_conn(headers, extra_options)

url = self.url_from_endpoint(endpoint)

if self.method == "GET":
# GET uses params
req = requests.Request(self.method, url, params=data, headers=headers, **request_kwargs)
req = Request(self.method, url, params=data, headers=headers, **request_kwargs)
elif self.method == "HEAD":
# HEAD doesn't use params
req = requests.Request(self.method, url, headers=headers, **request_kwargs)
req = Request(self.method, url, headers=headers, **request_kwargs)
else:
# Others use data
req = requests.Request(self.method, url, data=data, headers=headers, **request_kwargs)
req = Request(self.method, url, data=data, headers=headers, **request_kwargs)

prepped_request = session.prepare_request(req)
self.log.debug("Sending '%s' to url: %s", self.method, url)
return self.run_and_check(session, prepped_request, extra_options)

def check_response(self, response: requests.Response) -> None:
def check_response(self, response: Response) -> None:
"""
Check the status code and raise on failure.

Expand All @@ -235,15 +274,15 @@ def check_response(self, response: requests.Response) -> None:
"""
try:
response.raise_for_status()
except requests.exceptions.HTTPError:
except HTTPError:
self.log.error("HTTP error: %s", response.reason)
self.log.error(response.text)
raise AirflowException(str(response.status_code) + ":" + response.reason)

def run_and_check(
self,
session: requests.Session,
prepped_request: requests.PreparedRequest,
session: Session,
prepped_request: PreparedRequest,
extra_options: dict[Any, Any],
) -> Any:
"""
Expand Down Expand Up @@ -279,7 +318,7 @@ def run_and_check(
self.check_response(response)
return response

except requests.exceptions.ConnectionError as ex:
except ConnectionError as ex:
self.log.warning("%s Tenacity will retry to execute the operation", ex)
raise ex

Expand Down Expand Up @@ -400,7 +439,7 @@ async def run(
if conn.login:
auth = self.auth_type(conn.login, conn.password)
if conn.extra:
extra = self._process_extra_options_from_connection(conn=conn, extra_options=extra_options)
extra = _process_extra_options_from_connection(conn=conn, extra_options=extra_options)

try:
_headers.update(extra)
Expand Down Expand Up @@ -457,32 +496,6 @@ async def run(

raise NotImplementedError # should not reach this, but makes mypy happy

@classmethod
def _process_extra_options_from_connection(cls, conn: Connection, extra_options: dict) -> dict:
extra = conn.extra_dejson
extra.pop("stream", None)
extra.pop("cert", None)
proxies = extra.pop("proxies", extra.pop("proxy", None))
timeout = extra.pop("timeout", None)
verify_ssl = extra.pop("verify", extra.pop("verify_ssl", None))
allow_redirects = extra.pop("allow_redirects", None)
max_redirects = extra.pop("max_redirects", None)
trust_env = extra.pop("trust_env", None)

if proxies is not None and "proxy" not in extra_options:
extra_options["proxy"] = proxies
if timeout is not None and "timeout" not in extra_options:
extra_options["timeout"] = timeout
if verify_ssl is not None and "verify_ssl" not in extra_options:
extra_options["verify_ssl"] = verify_ssl
if allow_redirects is not None and "allow_redirects" not in extra_options:
extra_options["allow_redirects"] = allow_redirects
if max_redirects is not None and "max_redirects" not in extra_options:
extra_options["max_redirects"] = max_redirects
if trust_env is not None and "trust_env" not in extra_options:
extra_options["trust_env"] = trust_env
return extra

def _retryable_error_async(self, exception: ClientResponseError) -> bool:
"""
Determine whether an exception may successful on a subsequent attempt.
Expand Down
Loading
Loading