Skip to content

Commit

Permalink
🎉 CDK: Allow setting request non-JSON data (airbytehq#5161)
Browse files Browse the repository at this point in the history
* add the function request_body_data

* gradlew format

* Update airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py

Co-authored-by: Sherif A. Nada <snadalive@gmail.com>

* Update airbyte-cdk/python/airbyte_cdk/sources/streams/http/exceptions.py

Co-authored-by: Sherif A. Nada <snadalive@gmail.com>

* add test for application/x-www-form-urlencoded

Co-authored-by: Maksym Pavlenok <maksym.pavlenok@globallogic.com>
Co-authored-by: Sherif A. Nada <snadalive@gmail.com>
  • Loading branch information
3 people authored Aug 6, 2021
1 parent 137257b commit 106c3cd
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ class BaseBackoffException(requests.exceptions.HTTPError):
pass


class RequestBodyException(Exception):
"""
Raised when there are issues in configuring a request body
"""


class UserDefinedBackoffException(BaseBackoffException):
"""
An exception that exposes how long it attempted to backoff
Expand Down
47 changes: 37 additions & 10 deletions airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,19 @@


from abc import ABC, abstractmethod
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union

import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.core import Stream

from .auth.core import HttpAuthenticator, NoAuth
from .exceptions import DefaultBackoffException, UserDefinedBackoffException
from .exceptions import DefaultBackoffException, RequestBodyException, UserDefinedBackoffException
from .rate_limiting import default_backoff_handler, user_defined_backoff_handler

# list of all possible HTTP methods which can be used for sending of request bodies
BODY_REQUEST_METHODS = ("POST", "PUT", "PATCH")


class HttpStream(Stream, ABC):
"""
Expand All @@ -56,7 +59,7 @@ def url_base(self) -> str:
@property
def http_method(self) -> str:
"""
Override if needed. See get_request_data if using POST.
Override if needed. See get_request_data/get_request_json if using POST/PUT/PATCH.
"""
return "GET"

Expand Down Expand Up @@ -106,15 +109,33 @@ def request_headers(
"""
return {}

def request_body_data(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> Optional[Union[Mapping, str]]:
"""
Override when creating POST/PUT/PATCH requests to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is.
If returns a dict that it will be converted to a urlencoded form.
E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
"""
return None

def request_body_json(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> Optional[Mapping]:
"""
TODO make this possible to do for non-JSON APIs
Override when creating POST requests to populate the body of the request with a JSON payload.
Override when creating POST/PUT/PATCH requests to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
"""
return None

Expand Down Expand Up @@ -171,13 +192,18 @@ def backoff_time(self, response: requests.Response) -> Optional[float]:
return None

def _create_prepared_request(
self, path: str, headers: Mapping = None, params: Mapping = None, json: Any = None
self, path: str, headers: Mapping = None, params: Mapping = None, json: Any = None, data: Any = None
) -> requests.PreparedRequest:
args = {"method": self.http_method, "url": self.url_base + path, "headers": headers, "params": params}

if self.http_method.upper() == "POST":
# TODO support non-json bodies
args["json"] = json
if self.http_method.upper() in BODY_REQUEST_METHODS:
if json and data:
raise RequestBodyException(
"At the same time only one of the 'request_body_data' and 'request_body_json' functions can return data"
)
elif json:
args["json"] = json
elif data:
args["data"] = data

return self._session.prepare_request(requests.Request(**args))

Expand Down Expand Up @@ -235,6 +261,7 @@ def read_records(
headers=dict(request_headers, **self.authenticator.get_auth_header()),
params=self.request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
json=self.request_body_json(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
data=self.request_body_data(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
)
request_kwargs = self.request_kwargs(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
response = self._send_request(request, request_kwargs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@
def default_backoff_handler(max_tries: int, factor: int, **kwargs):
def log_retry_attempt(details):
_, exc, _ = sys.exc_info()
logger.info(str(exc))
logger.info(f"Caught retryable error after {details['tries']} tries. Waiting {details['wait']} seconds then retrying...")
logger.info(
f"Caught retryable error '{str(exc)}' after {details['tries']} tries. Waiting {details['wait']} seconds then retrying..."
)

def should_give_up(exc):
# If a non-rate-limiting related 4XX error makes it this far, it means it was unexpected and probably consistent, so we shouldn't back off
Expand Down
4 changes: 2 additions & 2 deletions airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

setup(
name="airbyte-cdk",
version="0.1.6",
version="0.1.7",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down Expand Up @@ -66,7 +66,7 @@
packages=find_packages(exclude=("unit_tests",)),
install_requires=[
"backoff",
"jsonschema==2.6.0",
"jsonschema~=3.2.0",
"pendulum",
"pydantic~=1.6",
"PyYAML~=5.4",
Expand Down
86 changes: 85 additions & 1 deletion airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@
#


import json
from typing import Any, Iterable, Mapping, Optional
from unittest.mock import ANY

import pytest
import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.exceptions import UserDefinedBackoffException
from airbyte_cdk.sources.streams.http.exceptions import RequestBodyException, UserDefinedBackoffException


class StubBasicReadHttpStream(HttpStream):
Expand Down Expand Up @@ -153,3 +154,86 @@ def test_stub_custom_backoff_http_stream(mocker):
list(stream.read_records(SyncMode.full_refresh))

# TODO(davin): Figure out how to assert calls.


class PostHttpStream(StubBasicReadHttpStream):
http_method = "POST"

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
"""Returns response data as is"""
yield response.json()


class TestRequestBody:
"""Suite of different tests for request bodies"""

json_body = {"key": "value"}
data_body = "key:value"
form_body = {"key1": "value1", "key2": 1234}
urlencoded_form_body = "key1=value1&key2=1234"

def request2response(self, request, context):
return json.dumps({"body": request.text, "content_type": request.headers.get("Content-Type")})

def test_json_body(self, mocker, requests_mock):

stream = PostHttpStream()
mocker.patch.object(stream, "request_body_json", return_value=self.json_body)

requests_mock.register_uri("POST", stream.url_base, text=self.request2response)
response = list(stream.read_records(sync_mode=SyncMode.full_refresh))[0]

assert response["content_type"] == "application/json"
assert json.loads(response["body"]) == self.json_body

def test_text_body(self, mocker, requests_mock):

stream = PostHttpStream()
mocker.patch.object(stream, "request_body_data", return_value=self.data_body)

requests_mock.register_uri("POST", stream.url_base, text=self.request2response)
response = list(stream.read_records(sync_mode=SyncMode.full_refresh))[0]

assert response["content_type"] is None
assert response["body"] == self.data_body

def test_form_body(self, mocker, requests_mock):

stream = PostHttpStream()
mocker.patch.object(stream, "request_body_data", return_value=self.form_body)

requests_mock.register_uri("POST", stream.url_base, text=self.request2response)
response = list(stream.read_records(sync_mode=SyncMode.full_refresh))[0]

assert response["content_type"] == "application/x-www-form-urlencoded"
assert response["body"] == self.urlencoded_form_body

def test_text_json_body(self, mocker, requests_mock):
"""checks a exception if both functions were overridden"""
stream = PostHttpStream()
mocker.patch.object(stream, "request_body_data", return_value=self.data_body)
mocker.patch.object(stream, "request_body_json", return_value=self.json_body)
requests_mock.register_uri("POST", stream.url_base, text=self.request2response)
with pytest.raises(RequestBodyException):
list(stream.read_records(sync_mode=SyncMode.full_refresh))

def test_body_for_all_methods(self, mocker, requests_mock):
"""Stream must send a body for POST/PATCH/PUT methods only"""
stream = PostHttpStream()
methods = {
"POST": True,
"PUT": True,
"PATCH": True,
"GET": False,
"DELETE": False,
"OPTIONS": False,
}
for method, with_body in methods.items():
stream.http_method = method
mocker.patch.object(stream, "request_body_data", return_value=self.data_body)
requests_mock.register_uri(method, stream.url_base, text=self.request2response)
response = list(stream.read_records(sync_mode=SyncMode.full_refresh))[0]
if with_body:
assert response["body"] == self.data_body
else:
assert response["body"] is None

0 comments on commit 106c3cd

Please sign in to comment.