Skip to content

Commit

Permalink
feat: Dataset API add save method (#180)
Browse files Browse the repository at this point in the history
* [FEAT] add save method to APIDataset

Signed-off-by: jmcdonnell <jmcdonnell@fieldbox.ai>

* [ENH] create save_args parameter for api_dataset

Signed-off-by: jmcdonnell <jmcdonnell@fieldbox.ai>

* [ENH] add tests for socket + http errors

Signed-off-by: <jmcdonnell@fieldbox.ai>
Signed-off-by: jmcdonnell <jmcdonnell@fieldbox.ai>

* [ENH] check save data is json

Signed-off-by: <jmcdonnell@fieldbox.ai>
Signed-off-by: jmcdonnell <jmcdonnell@fieldbox.ai>

* [FIX] clean code

Signed-off-by: jmcdonnell <jmcdonnell@fieldbox.ai>

* [ENH] handle different data types

Signed-off-by: jmcdonnell <jmcdonnell@fieldbox.ai>

* [FIX] test coverage for exceptions

Signed-off-by: jmcdonnell <jmcdonnell@fieldbox.ai>

* [ENH] add examples in APIDataSet docstring

Signed-off-by: jmcdonnell <jmcdonnell@fieldbox.ai>

* sync APIDataSet  from kedro's `develop` (#184)

* Update APIDataSet

Signed-off-by: Nok Chan <nok.lam.chan@quantumblack.com>

* Sync ParquetDataSet

Signed-off-by: Nok Chan <nok.lam.chan@quantumblack.com>

* Sync Test

Signed-off-by: Nok Chan <nok.lam.chan@quantumblack.com>

* Linting

Signed-off-by: Nok Chan <nok.lam.chan@quantumblack.com>

* Revert Unnecessary ParquetDataSet Changes

Signed-off-by: Nok Chan <nok.lam.chan@quantumblack.com>

* Sync release notes

Signed-off-by: Nok Chan <nok.lam.chan@quantumblack.com>

---------

Signed-off-by: Nok Chan <nok.lam.chan@quantumblack.com>
Signed-off-by: jmcdonnell <jmcdonnell@fieldbox.ai>

* [FIX] remove support for delete method

Signed-off-by: jmcdonnell <jmcdonnell@fieldbox.ai>

* [FIX] lint files

Signed-off-by: jmcdonnell <jmcdonnell@fieldbox.ai>

* [FIX] fix conflicts

Signed-off-by: jmcdonnell <jmcdonnell@fieldbox.ai>

* [FIX] remove fail save test

Signed-off-by: jmcdonnell <jmcdonnell@fieldbox.ai>

* [ENH] review suggestions

Signed-off-by: jmcdonnell <jmcdonnell@fieldbox.ai>

* [ENH] fix tests

Signed-off-by: jmcdonnell <jmcdonnell@fieldbox.ai>

* [FIX] reorder arguments

Signed-off-by: jmcdonnell <jmcdonnell@fieldbox.ai>

---------

Signed-off-by: jmcdonnell <jmcdonnell@fieldbox.ai>
Signed-off-by: <jmcdonnell@fieldbox.ai>
Signed-off-by: Nok Chan <nok.lam.chan@quantumblack.com>
Co-authored-by: jmcdonnell <jmcdonnell@fieldbox.ai>
Co-authored-by: Nok Lam Chan <mediumnok@gmail.com>
  • Loading branch information
3 people authored May 22, 2023
1 parent 9366f86 commit 4570cb0
Show file tree
Hide file tree
Showing 5 changed files with 246 additions and 43 deletions.
10 changes: 6 additions & 4 deletions kedro-datasets/RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
## Major features and improvements:
* Added pandas 2.0 support.
* Added SQLAlchemy 2.0 support (and dropped support for versions below 1.4).
* Added a save method to the APIDataSet

* Reduced constructor arguments for `APIDataSet` by replacing most arguments with a single constructor argument `load_args`. This makes it more consistent with other Kedro DataSets and the underlying `requests` API, and automatically enables the full configuration domain: stream, certificates, proxies, and more.
* Relaxed Kedro version pin to `>=0.16`

Expand Down Expand Up @@ -42,10 +44,10 @@ Many thanks to the following Kedroids for contributing PRs to this release:

* Added the following new datasets:

| Type | Description | Location |
| ------------------------------------ | -------------------------------------------------------------------------- | ----------------------------- |
| `polars.CSVDataSet` | A `CSVDataSet` backed by [polars](https://www.pola.rs/), a lighting fast dataframe package built entirely using Rust. | `kedro_datasets.polars` |
| `snowflake.SnowparkTableDataSet` | Work with [Snowpark](https://www.snowflake.com/en/data-cloud/snowpark/) DataFrames from tables in Snowflake. | `kedro_datasets.snowflake` |
| Type | Description | Location |
| -------------------------------- | --------------------------------------------------------------------------------------------------------------------- | -------------------------- |
| `polars.CSVDataSet` | A `CSVDataSet` backed by [polars](https://www.pola.rs/), a lighting fast dataframe package built entirely using Rust. | `kedro_datasets.polars` |
| `snowflake.SnowparkTableDataSet` | Work with [Snowpark](https://www.snowflake.com/en/data-cloud/snowpark/) DataFrames from tables in Snowflake. | `kedro_datasets.snowflake` |

## Bug fixes and other changes
* Add `mssql` backend to the `SQLQueryDataSet` DataSet using `pyodbc` library.
Expand Down
138 changes: 113 additions & 25 deletions kedro-datasets/kedro_datasets/api/api_dataset.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
"""``APIDataSet`` loads the data from HTTP(S) APIs.
It uses the python requests library: https://requests.readthedocs.io/en/latest/
"""
from typing import Any, Dict, List, NoReturn, Tuple, Union
import json as json_ # make pylint happy
from copy import deepcopy
from typing import Any, Dict, List, Tuple, Union

import requests
from kedro.io.core import AbstractDataSet, DataSetError
Expand All @@ -14,11 +16,10 @@


class APIDataSet(AbstractDataSet[None, requests.Response]):
"""``APIDataSet`` loads the data from HTTP(S) APIs.
"""``APIDataSet`` loads/saves data from/to HTTP(S) APIs.
It uses the python requests library: https://requests.readthedocs.io/en/latest/
Example usage for the
`YAML API <https://kedro.readthedocs.io/en/stable/data/\
Example usage for the `YAML API <https://kedro.readthedocs.io/en/stable/data/\
data_catalog.html#use-the-data-catalog-with-the-yaml-api>`_:
.. code-block:: yaml
Expand All @@ -34,10 +35,8 @@ class APIDataSet(AbstractDataSet[None, requests.Response]):
agg_level_desc: STATE,
year: 2000
Example usage for the
`Python API <https://kedro.readthedocs.io/en/stable/data/\
data_catalog.html#use-the-data-catalog-with-the-code-api>`_:
::
Example usage for the `Python API <https://kedro.readthedocs.io/en/stable/data/\
data_catalog.html#use-the-data-catalog-with-the-code-api>`_: ::
>>> from kedro.extras.datasets.api import APIDataSet
>>>
Expand All @@ -57,49 +56,101 @@ class APIDataSet(AbstractDataSet[None, requests.Response]):
>>> credentials=("username", "password")
>>> )
>>> data = data_set.load()
``APIDataSet`` can also be used to save output on a remote server using HTTP(S)
methods.
>>> example_table = '{"col1":["val1", "val2"], "col2":["val3", "val4"]}'
>>> data_set = APIDataSet(
method = "POST"
url = "url_of_remote_server",
save_args = {"chunk_size":1}
)
>>> data_set.save(example_table)
On initialisation, we can specify all the necessary parameters in the save args
dictionary. The default HTTP(S) method is POST but PUT is also supported. Two
important parameters to keep in mind are timeout and chunk_size. `timeout` defines
how long our program waits for a response after a request. `chunk_size`, is only
used if the input of save method is a list. It will divide the request into chunks
of size `chunk_size`. For example, here we will send two requests each containing
one row of our example DataFrame.
If the data passed to the save method is not a list, ``APIDataSet`` will check if it
can be loaded as JSON. If true, it will send the data unchanged in a single request.
Otherwise, the ``_save`` method will try to dump the data in JSON format and execute
the request.
"""

DEFAULT_SAVE_ARGS = {
"params": None,
"headers": None,
"auth": None,
"json": None,
"timeout": 60,
"chunk_size": 100,
}
# pylint: disable=too-many-arguments

def __init__(
self,
url: str,
method: str = "GET",
load_args: Dict[str, Any] = None,
save_args: Dict[str, Any] = None,
credentials: Union[Tuple[str, str], List[str], AuthBase] = None,
) -> None:
"""Creates a new instance of ``APIDataSet`` to fetch data from an API endpoint.
Args:
url: The API URL endpoint.
method: The Method of the request, GET, POST, PUT, DELETE, HEAD, etc...
method: The method of the request. GET, POST, PUT are the only supported
methods
load_args: Additional parameters to be fed to requests.request.
https://requests.readthedocs.io/en/latest/api/#requests.request
credentials: Allows specifying secrets in credentials.yml.
Expected format is ``('login', 'password')`` if given as a tuple or list.
An ``AuthBase`` instance can be provided for more complex cases.
Expected format is ``('login', 'password')`` if given as a tuple or
list. An ``AuthBase`` instance can be provided for more complex cases.
save_args: Options for saving data on server. Includes all parameters used
during load method. Adds an optional parameter, ``chunk_size`` which
determines the size of the package sent at each request.
Raises:
ValueError: if both ``auth`` in ``load_args`` and ``credentials`` are specified.
ValueError: if both ``auth`` in ``load_args`` and ``credentials`` are
specified.
"""
super().__init__()

self._load_args = load_args or {}
self._load_args_auth = self._load_args.pop("auth", None)
# GET method means load
if method == "GET":
self._params = load_args or {}

# PUT, POST, DELETE means save
elif method in ["PUT", "POST"]:
self._params = deepcopy(self.DEFAULT_SAVE_ARGS)
if save_args is not None:
self._params.update(save_args)
self._chunk_size = self._params.pop("chunk_size", 1)
else:
raise ValueError("Only GET, POST and PUT methods are supported")

self._param_auth = self._params.pop("auth", None)

if credentials is not None and self._load_args_auth is not None:
if credentials is not None and self._param_auth is not None:
raise ValueError("Cannot specify both auth and credentials.")

self._auth = credentials or self._load_args_auth
self._auth = credentials or self._param_auth

if "cert" in self._load_args:
self._load_args["cert"] = self._convert_type(self._load_args["cert"])
if "cert" in self._params:
self._params["cert"] = self._convert_type(self._params["cert"])

if "timeout" in self._load_args:
self._load_args["timeout"] = self._convert_type(self._load_args["timeout"])
if "timeout" in self._params:
self._params["timeout"] = self._convert_type(self._params["timeout"])

self._request_args: Dict[str, Any] = {
"url": url,
"method": method,
"auth": self._convert_type(self._auth),
**self._load_args,
**self._params,
}

@staticmethod
Expand Down Expand Up @@ -131,11 +182,48 @@ def _execute_request(self, session: Session) -> requests.Response:
return response

def _load(self) -> requests.Response:
with sessions.Session() as session:
return self._execute_request(session)
if self._request_args["method"] == "GET":
with sessions.Session() as session:
return self._execute_request(session)

raise DataSetError("Only GET method is supported for load")

def _execute_save_with_chunks(
self,
json_data: List[Dict[str, Any]],
) -> requests.Response:
chunk_size = self._chunk_size
n_chunks = len(json_data) // chunk_size + 1

for i in range(n_chunks):
send_data = json_data[i * chunk_size : (i + 1) * chunk_size]
response = self._execute_save_request(json_data=send_data)

return response

def _execute_save_request(self, json_data: Any) -> requests.Response:
try:
json_.loads(json_data)
except TypeError:
self._request_args["json"] = json_.dumps(json_data)
try:
response = requests.request(**self._request_args)
response.raise_for_status()
except requests.exceptions.HTTPError as exc:
raise DataSetError("Failed to send data", exc) from exc

except OSError as exc:
raise DataSetError("Failed to connect to the remote server") from exc
return response

def _save(self, data: Any) -> requests.Response:
if self._request_args["method"] in ["PUT", "POST"]:
if isinstance(data, list):
return self._execute_save_with_chunks(json_data=data)

return self._execute_save_request(json_data=data)

def _save(self, data: None) -> NoReturn:
raise DataSetError(f"{self.__class__.__name__} is a read only data set type")
raise DataSetError("Use PUT or POST methods for save")

def _exists(self) -> bool:
with sessions.Session() as session:
Expand Down
2 changes: 0 additions & 2 deletions kedro-datasets/kedro_datasets/pandas/generic_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ def _ensure_file_system_target(self) -> None:
)

def _load(self) -> pd.DataFrame:

self._ensure_file_system_target()

load_path = get_filepath_str(self._get_load_path(), self._protocol)
Expand All @@ -196,7 +195,6 @@ def _load(self) -> pd.DataFrame:
)

def _save(self, data: pd.DataFrame) -> None:

self._ensure_file_system_target()

save_path = get_filepath_str(self._get_save_path(), self._protocol)
Expand Down
1 change: 0 additions & 1 deletion kedro-datasets/kedro_datasets/spark/spark_jdbc_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ def __init__(

# Update properties in load_args and save_args with credentials.
if credentials is not None:

# Check credentials for bad inputs.
for cred_key, cred_value in credentials.items():
if cred_value is None:
Expand Down
Loading

0 comments on commit 4570cb0

Please sign in to comment.