Skip to content

Commit

Permalink
sync APIDataSet from kedro's develop (#184)
Browse files Browse the repository at this point in the history
* Update APIDataSet

Signed-off-by: Nok Chan <nok.lam.chan@quantumblack.com>

* Sync ParquetDataSet

Signed-off-by: Nok Chan <nok.lam.chan@quantumblack.com>

* Sync Test

Signed-off-by: Nok Chan <nok.lam.chan@quantumblack.com>

* Linting

Signed-off-by: Nok Chan <nok.lam.chan@quantumblack.com>

* Revert Unnecessary ParquetDataSet Changes

Signed-off-by: Nok Chan <nok.lam.chan@quantumblack.com>

* Sync release notes

Signed-off-by: Nok Chan <nok.lam.chan@quantumblack.com>

---------

Signed-off-by: Nok Chan <nok.lam.chan@quantumblack.com>
  • Loading branch information
noklam authored Apr 24, 2023
1 parent a203f53 commit 6911f03
Show file tree
Hide file tree
Showing 5 changed files with 242 additions and 147 deletions.
2 changes: 1 addition & 1 deletion kedro-datasets/RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
## Major features and improvements:
* Added pandas 2.0 support.
* Added SQLAlchemy 2.0 support (and dropped support for versions below 1.4).

* Reduced constructor arguments for `APIDataSet` by replacing most arguments with a single constructor argument `load_args`. This makes it more consistent with other Kedro DataSets and the underlying `requests` API, and automatically enables the full configuration domain: stream, certificates, proxies, and more.
## Bug fixes and other changes
* Relaxed `delta-spark` upper bound to allow compatibility with Spark 3.1.x and 3.2.x.

Expand Down
111 changes: 59 additions & 52 deletions kedro-datasets/kedro_datasets/api/api_dataset.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
"""``APIDataSet`` loads the data from HTTP(S) APIs.
It uses the python requests library: https://requests.readthedocs.io/en/latest/
"""
from typing import Any, Dict, Iterable, List, NoReturn, Union
from typing import Any, Dict, List, NoReturn, Tuple, Union

import requests
from kedro.io.core import AbstractDataSet, DataSetError
from requests import Session, sessions
from requests.auth import AuthBase

# NOTE: kedro.extras.datasets will be removed in Kedro 0.19.0.
# Any contribution to datasets should be made in kedro-datasets
# in kedro-plugins (https://github.com/kedro-org/kedro-plugins)


class APIDataSet(AbstractDataSet[None, requests.Response]):
"""``APIDataSet`` loads the data from HTTP(S) APIs.
Expand Down Expand Up @@ -34,88 +39,89 @@ class APIDataSet(AbstractDataSet[None, requests.Response]):
data_catalog.html#use-the-data-catalog-with-the-code-api>`_:
::
>>> from kedro_datasets.api import APIDataSet
>>> from kedro.extras.datasets.api import APIDataSet
>>>
>>>
>>> data_set = APIDataSet(
>>> url="https://quickstats.nass.usda.gov",
>>> params={
>>> "key": "SOME_TOKEN",
>>> "format": "JSON",
>>> "commodity_desc": "CORN",
>>> "statisticcat_des": "YIELD",
>>> "agg_level_desc": "STATE",
>>> "year": 2000
>>> }
>>> load_args={
>>> "params": {
>>> "key": "SOME_TOKEN",
>>> "format": "JSON",
>>> "commodity_desc": "CORN",
>>> "statisticcat_des": "YIELD",
>>> "agg_level_desc": "STATE",
>>> "year": 2000
>>> }
>>> },
>>> credentials=("username", "password")
>>> )
>>> data = data_set.load()
"""

# pylint: disable=too-many-arguments
def __init__(
self,
url: str,
method: str = "GET",
data: Any = None,
params: Dict[str, Any] = None,
headers: Dict[str, Any] = None,
auth: Union[Iterable[str], AuthBase] = None,
json: Union[List, Dict[str, Any]] = None,
timeout: int = 60,
credentials: Union[Iterable[str], AuthBase] = None,
load_args: Dict[str, Any] = None,
credentials: Union[Tuple[str, str], List[str], AuthBase] = None,
) -> None:
"""Creates a new instance of ``APIDataSet`` to fetch data from an API endpoint.
Args:
url: The API URL endpoint.
method: The Method of the request, GET, POST, PUT, DELETE, HEAD, etc...
data: The request payload, used for POST, PUT, etc requests
https://requests.readthedocs.io/en/latest/user/quickstart/#more-complicated-post-requests
params: The url parameters of the API.
https://requests.readthedocs.io/en/latest/user/quickstart/#passing-parameters-in-urls
headers: The HTTP headers.
https://requests.readthedocs.io/en/latest/user/quickstart/#custom-headers
auth: Anything ``requests`` accepts. Normally it's either ``('login', 'password')``,
or ``AuthBase``, ``HTTPBasicAuth`` instance for more complex cases. Any
iterable will be cast to a tuple.
json: The request payload, used for POST, PUT, etc requests, passed in
to the json kwarg in the requests object.
https://requests.readthedocs.io/en/latest/user/quickstart/#more-complicated-post-requests
timeout: The wait time in seconds for a response, defaults to 1 minute.
https://requests.readthedocs.io/en/latest/user/quickstart/#timeouts
credentials: same as ``auth``. Allows specifying ``auth`` secrets in
credentials.yml.
load_args: Additional parameters to be fed to requests.request.
https://requests.readthedocs.io/en/latest/api/#requests.request
credentials: Allows specifying secrets in credentials.yml.
Expected format is ``('login', 'password')`` if given as a tuple or list.
An ``AuthBase`` instance can be provided for more complex cases.
Raises:
ValueError: if both ``credentials`` and ``auth`` are specified.
ValueError: if both ``auth`` in ``load_args`` and ``credentials`` are specified.
"""
super().__init__()

if credentials is not None and auth is not None:
self._load_args = load_args or {}
self._load_args_auth = self._load_args.pop("auth", None)

if credentials is not None and self._load_args_auth is not None:
raise ValueError("Cannot specify both auth and credentials.")

auth = credentials or auth
self._auth = credentials or self._load_args_auth

if "cert" in self._load_args:
self._load_args["cert"] = self._convert_type(self._load_args["cert"])

if isinstance(auth, Iterable):
auth = tuple(auth)
if "timeout" in self._load_args:
self._load_args["timeout"] = self._convert_type(self._load_args["timeout"])

self._request_args: Dict[str, Any] = {
"url": url,
"method": method,
"data": data,
"params": params,
"headers": headers,
"auth": auth,
"json": json,
"timeout": timeout,
"auth": self._convert_type(self._auth),
**self._load_args,
}

@staticmethod
def _convert_type(value: Any):
"""
From the Data Catalog, iterables are provided as Lists.
However, for some parameters in the Python requests library,
only Tuples are allowed.
"""
if isinstance(value, List):
return tuple(value)
return value

def _describe(self) -> Dict[str, Any]:
return {**self._request_args}
# prevent auth from logging
request_args_cp = self._request_args.copy()
request_args_cp.pop("auth", None)
return request_args_cp

def _execute_request(self) -> requests.Response:
def _execute_request(self, session: Session) -> requests.Response:
try:
response = requests.request(**self._request_args)
response = session.request(**self._request_args)
response.raise_for_status()
except requests.exceptions.HTTPError as exc:
raise DataSetError("Failed to fetch data", exc) from exc
Expand All @@ -125,12 +131,13 @@ def _execute_request(self) -> requests.Response:
return response

def _load(self) -> requests.Response:
return self._execute_request()
with sessions.Session() as session:
return self._execute_request(session)

def _save(self, data: None) -> NoReturn:
raise DataSetError(f"{self.__class__.__name__} is a read only data set type")

def _exists(self) -> bool:
response = self._execute_request()

with sessions.Session() as session:
response = self._execute_request(session)
return response.ok
Loading

0 comments on commit 6911f03

Please sign in to comment.