Skip to content

Commit

Permalink
[KED-1411] Added layer attribute to datasets. (#444)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrii-ivaniuk authored Feb 24, 2020
1 parent 77dc811 commit 01335c7
Show file tree
Hide file tree
Showing 22 changed files with 166 additions and 42 deletions.
1 change: 1 addition & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
* Kedro CLI arguments `--node` and `--tag` support comma-separated values, alternative methods will be deprecated in future releases.
* Config loaders to be deprecated:
- `kedro.contrib.config.TemplatedConfigLoader`
* Added `layer` attribute for datasets in `kedro.extras.datasets` to specify the name of a layer according to a data engineering convention.
* Modify `DataCatalog`'s load order for datasets, now trying to load them in the following order:
- `kedro.io`
- `kedro.extras.datasets`
Expand Down
1 change: 0 additions & 1 deletion kedro/config/templated_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ class TemplatedConfigLoader(ConfigLoader):
https://github.com/jmespath/jmespath.py and http://jmespath.org/.
"""

# pylint: disable=missing-type-doc
def __init__(
self,
conf_paths: Union[str, Iterable[str]],
Expand Down
5 changes: 5 additions & 0 deletions kedro/extras/datasets/biosequence/biosequence_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def __init__(
save_args: Dict[str, Any] = None,
credentials: Dict[str, Any] = None,
fs_args: Dict[str, Any] = None,
layer: str = None,
) -> None:
"""
Creates a new instance of ``BioSequenceDataSet`` pointing
Expand All @@ -92,6 +93,8 @@ def __init__(
E.g. for ``GCSFileSystem`` it should look like `{"token": None}`.
fs_args: Extra arguments to pass into underlying filesystem class.
E.g. for ``GCSFileSystem`` class: `{"project": "my-project", ...}`.
layer: The data layer according to the data engineering convention:
https://kedro.readthedocs.io/en/stable/06_resources/01_faq.html#what-is-data-engineering-convention
Note: Here you can find all supported file formats: https://biopython.org/wiki/SeqIO
"""
Expand All @@ -101,6 +104,7 @@ def __init__(

protocol, path = get_protocol_and_path(filepath)

self._layer = layer
self._filepath = PurePosixPath(path)
self._protocol = protocol
self._fs = fsspec.filesystem(self._protocol, **_credentials, **_fs_args)
Expand All @@ -119,6 +123,7 @@ def _describe(self) -> Dict[str, Any]:
protocol=self._protocol,
load_args=self._load_args,
save_args=self._save_args,
layer=self._layer,
)

def _load(self) -> List:
Expand Down
5 changes: 5 additions & 0 deletions kedro/extras/datasets/dask/parquet_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def __init__(
save_args: Dict[str, Any] = None,
credentials: Dict[str, Any] = None,
fs_args: Dict[str, Any] = None,
layer: str = None,
) -> None:
"""Creates a new instance of ``ParquetDataSet`` pointing to concrete
parquet files.
Expand All @@ -96,10 +97,13 @@ def __init__(
E.g. for ``GCSFileSystem`` it should look like `{"token": None}`.
fs_args: Optional parameters to the backend file system driver:
https://docs.dask.org/en/latest/remote-data-services.html#optional-parameters
layer: The data layer according to the data engineering convention:
https://kedro.readthedocs.io/en/stable/06_resources/01_faq.html#what-is-data-engineering-convention
"""
self._filepath = filepath
self._fs_args = deepcopy(fs_args) or {}
self._credentials = deepcopy(credentials) or {}
self._layer = layer

# Handle default load and save arguments
self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS)
Expand All @@ -125,6 +129,7 @@ def _describe(self) -> Dict[str, Any]:
filepath=self._filepath,
load_args=self._load_args,
save_args=self._save_args,
layer=self._layer,
)

def _load(self) -> dd.DataFrame:
Expand Down
6 changes: 6 additions & 0 deletions kedro/extras/datasets/matplotlib/matplotlib_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,14 @@ class MatplotlibWriter(AbstractDataSet):
"""

# pylint: disable=too-many-arguments
def __init__(
self,
filepath: str,
fs_args: Dict[str, Any] = None,
credentials: Dict[str, Any] = None,
save_args: Dict[str, Any] = None,
layer: str = None,
) -> None:
"""Creates a new instance of ``MatplotlibWriter``.
Expand All @@ -108,10 +110,13 @@ def __init__(
`{'client_kwargs': {'aws_access_key_id': '<id>', 'aws_secret_access_key': '<key>'}}`
save_args: Save args passed to `plt.savefig`. See
https://matplotlib.org/api/_as_gen/matplotlib.pyplot.savefig.html
layer: The data layer according to the data engineering convention:
https://kedro.readthedocs.io/en/stable/06_resources/01_faq.html#what-is-data-engineering-convention
"""
_credentials = copy.deepcopy(credentials) or {}
self._fs_args = copy.deepcopy(fs_args) or {}
self._save_args = save_args or {}
self._layer = layer

protocol, path = get_protocol_and_path(filepath)

Expand All @@ -125,6 +130,7 @@ def _describe(self) -> Dict[str, Any]:
protocol=self._protocol,
fs_args=self._fs_args,
save_args=self._save_args,
layer=self._layer,
)

def _load(self) -> None:
Expand Down
5 changes: 5 additions & 0 deletions kedro/extras/datasets/networkx/networkx_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def __init__(
version: Version = None,
credentials: Dict[str, Any] = None,
fs_args: Dict[str, Any] = None,
layer: str = None,
) -> None:
"""Creates a new instance of ``NetworkXDataSet``.
Expand All @@ -97,12 +98,15 @@ def __init__(
E.g. for ``GCSFileSystem`` it should look like `{"token": None}`.
fs_args: Extra arguments to pass into underlying filesystem class.
E.g. for ``GCSFileSystem`` class: `{"project": "my-project", ...}`
layer: The data layer according to the data engineering convention:
https://kedro.readthedocs.io/en/stable/06_resources/01_faq.html#what-is-data-engineering-convention
"""
_fs_args = deepcopy(fs_args) or {}
_credentials = deepcopy(credentials) or {}

protocol, path = get_protocol_and_path(filepath, version)

self._layer = layer
self._protocol = protocol
self._fs = fsspec.filesystem(self._protocol, **_credentials, **_fs_args)

Expand Down Expand Up @@ -149,6 +153,7 @@ def _describe(self) -> Dict[str, Any]:
load_args=self._load_args,
save_args=self._save_args,
version=self._version,
layer=self._layer,
)

def _release(self) -> None:
Expand Down
6 changes: 5 additions & 1 deletion kedro/extras/datasets/pandas/csv_blob_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def __init__(
load_args: Dict[str, Any] = None,
save_args: Dict[str, Any] = None,
version: Version = None,
layer: str = None,
) -> None:
"""Creates a new instance of ``CSVBlobDataSet`` pointing to a
concrete CSV file on Azure Blob Storage.
Expand Down Expand Up @@ -105,7 +106,8 @@ def __init__(
``kedro.io.core.Version``. If its ``load`` attribute is
None, the latest version will be loaded. If its ``save``
attribute is None, save version will be autogenerated.
layer: The data layer according to the data engineering convention:
https://kedro.readthedocs.io/en/stable/06_resources/01_faq.html#what-is-data-engineering-convention
"""
_credentials = deepcopy(credentials) or {}
_blob_service = BlockBlobService(**_credentials)
Expand All @@ -128,6 +130,7 @@ def __init__(
glob_function=glob_function,
)

self._layer = layer
self._blob_service = _blob_service
self._container_name = _container_name

Expand All @@ -151,6 +154,7 @@ def _describe(self) -> Dict[str, Any]:
load_args=self._load_args,
save_args=self._save_args,
version=self._version,
layer=self._layer,
)

def _load(self) -> pd.DataFrame:
Expand Down
6 changes: 6 additions & 0 deletions kedro/extras/datasets/pandas/csv_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def __init__(
version: Version = None,
credentials: Dict[str, Any] = None,
fs_args: Dict[str, Any] = None,
layer: str = None,
) -> None:
"""Creates a new instance of ``CSVDataSet`` pointing to a concrete CSV file
on a specific filesystem.
Expand All @@ -103,6 +104,8 @@ def __init__(
E.g. for ``GCSFileSystem`` it should look like `{"token": None}`.
fs_args: Extra arguments to pass into underlying filesystem class.
E.g. for ``GCSFileSystem`` class: `{"project": "my-project", ...}`.
layer: The data layer according to the data engineering convention:
https://kedro.readthedocs.io/en/stable/06_resources/01_faq.html#what-is-data-engineering-convention
"""
_fs_args = deepcopy(fs_args) or {}
_credentials = deepcopy(credentials) or {}
Expand All @@ -119,6 +122,8 @@ def __init__(
glob_function=self._fs.glob,
)

self._layer = layer

# Handle default load and save arguments
self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS)
if load_args is not None:
Expand All @@ -134,6 +139,7 @@ def _describe(self) -> Dict[str, Any]:
load_args=self._load_args,
save_args=self._save_args,
version=self._version,
layer=self._layer,
)

def _load(self) -> pd.DataFrame:
Expand Down
6 changes: 6 additions & 0 deletions kedro/extras/datasets/pandas/excel_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ def __init__(
version: Version = None,
credentials: Dict[str, Any] = None,
fs_args: Dict[str, Any] = None,
layer: str = None,
) -> None:
"""Creates a new instance of ``ExcelDataSet`` pointing to a concrete Excel file
on a specific filesystem.
Expand Down Expand Up @@ -111,6 +112,8 @@ def __init__(
E.g. for ``GCSFileSystem`` it should look like `{"token": None}`.
fs_args: Extra arguments to pass into underlying filesystem class.
E.g. for ``GCSFileSystem`` class: `{"project": "my-project", ...}`.
layer: The data layer according to the data engineering convention:
https://kedro.readthedocs.io/en/stable/06_resources/01_faq.html#what-is-data-engineering-convention
"""
_fs_args = deepcopy(fs_args) or {}
_credentials = deepcopy(credentials) or {}
Expand All @@ -127,6 +130,8 @@ def __init__(
glob_function=self._fs.glob,
)

self._layer = layer

# Handle default load and save arguments
self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS)
if load_args is not None:
Expand All @@ -147,6 +152,7 @@ def _describe(self) -> Dict[str, Any]:
save_args=self._save_args,
writer_args=self._writer_args,
version=self._version,
layer=self._layer,
)

def _load(self) -> pd.DataFrame:
Expand Down
6 changes: 6 additions & 0 deletions kedro/extras/datasets/pandas/feather_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def __init__(
version: Version = None,
credentials: Dict[str, Any] = None,
fs_args: Dict[str, Any] = None,
layer: str = None,
) -> None:
"""Creates a new instance of ``FeatherDataSet`` pointing to a concrete
filepath.
Expand All @@ -103,6 +104,8 @@ def __init__(
E.g. for ``GCSFileSystem`` it should look like `{"token": None}`.
fs_args: Extra arguments to pass into underlying filesystem class.
E.g. for ``GCSFileSystem`` class: `{"project": "my-project", ...}`.
layer: The data layer according to the data engineering convention:
https://kedro.readthedocs.io/en/stable/06_resources/01_faq.html#what-is-data-engineering-convention
"""
_fs_args = deepcopy(fs_args) or {}
_credentials = deepcopy(credentials) or {}
Expand All @@ -119,6 +122,8 @@ def __init__(
glob_function=self._fs.glob,
)

self._layer = layer

# Handle default load argument
self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS)
if load_args is not None:
Expand All @@ -130,6 +135,7 @@ def _describe(self) -> Dict[str, Any]:
protocol=self._protocol,
load_args=self._load_args,
version=self._version,
layer=self._layer,
)

def _load(self) -> pd.DataFrame:
Expand Down
6 changes: 6 additions & 0 deletions kedro/extras/datasets/pandas/gbq_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from kedro.io.core import AbstractDataSet, DataSetError, validate_on_forbidden_chars


# pylint: disable=too-many-instance-attributes
class GBQTableDataSet(AbstractDataSet):
"""``GBQTableDataSet`` loads and saves data from/to Google BigQuery.
It uses pandas-gbq to read and write from/to BigQuery table.
Expand Down Expand Up @@ -76,6 +77,7 @@ def __init__(
credentials: Union[Dict[str, Any], Credentials] = None,
load_args: Dict[str, Any] = None,
save_args: Dict[str, Any] = None,
layer: str = None,
) -> None:
"""Creates a new instance of ``GBQTableDataSet``.
Expand All @@ -98,6 +100,8 @@ def __init__(
Here you can find all available arguments:
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_gbq.html
All defaults are preserved, but "progress_bar", which is set to False.
layer: The data layer according to the data engineering convention:
https://kedro.readthedocs.io/en/stable/06_resources/01_faq.html#what-is-data-engineering-convention
Raises:
DataSetError: When ``load_args['location']`` and ``save_args['location']``
Expand All @@ -117,6 +121,7 @@ def __init__(
if isinstance(credentials, dict):
credentials = Credentials(**credentials)

self._layer = layer
self._dataset = dataset
self._table_name = table_name
self._project_id = project
Expand All @@ -133,6 +138,7 @@ def _describe(self) -> Dict[str, Any]:
table_name=self._table_name,
load_args=self._load_args,
save_args=self._save_args,
layer=self._layer,
)

def _load(self) -> pd.DataFrame:
Expand Down
8 changes: 7 additions & 1 deletion kedro/extras/datasets/pandas/hdf_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def __init__(
version: Version = None,
credentials: Dict[str, Any] = None,
fs_args: Dict[str, Any] = None,
layer: str = None,
) -> None:
"""Creates a new instance of ``HDFDataSet`` pointing to a concrete hdf file
on a specific filesystem.
Expand All @@ -111,10 +112,11 @@ def __init__(
E.g. for ``GCSFileSystem`` it should look like `{"token": None}`.
fs_args: Extra arguments to pass into underlying filesystem class.
E.g. for ``GCSFileSystem`` class: `{"project": "my-project", ...}`
layer: The data layer according to the data engineering convention:
https://kedro.readthedocs.io/en/stable/06_resources/01_faq.html#what-is-data-engineering-convention
"""
_fs_args = deepcopy(fs_args) or {}
_credentials = deepcopy(credentials) or {}
self._key = key

protocol, path = get_protocol_and_path(filepath, version)

Expand All @@ -128,6 +130,9 @@ def __init__(
glob_function=self._fs.glob,
)

self._layer = layer
self._key = key

# Handle default load and save arguments
self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS)
if load_args is not None:
Expand All @@ -144,6 +149,7 @@ def _describe(self) -> Dict[str, Any]:
load_args=self._load_args,
save_args=self._save_args,
version=self._version,
layer=self._layer,
)

def _load(self) -> pd.DataFrame:
Expand Down
6 changes: 5 additions & 1 deletion kedro/extras/datasets/pandas/json_blob_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def __init__(
blob_to_bytes_args: Dict[str, Any] = None,
load_args: Dict[str, Any] = None,
save_args: Dict[str, Any] = None,
layer: str = None,
) -> None:
"""Creates a new instance of ``JSONBlobDataSet`` pointing to a
concrete JSON file on Azure Blob Storage.
Expand All @@ -103,7 +104,8 @@ def __init__(
Here you can find all available arguments:
https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.to_json.html
All defaults are preserved, but "index", which is set to False.
layer: The data layer according to the data engineering convention:
https://kedro.readthedocs.io/en/stable/06_resources/01_faq.html#what-is-data-engineering-convention
"""
_credentials = deepcopy(credentials) or {}
self._filepath = filepath
Expand All @@ -112,6 +114,7 @@ def __init__(
self._blob_to_bytes_args = deepcopy(blob_to_bytes_args) or {}
self._blob_from_bytes_args = deepcopy(blob_from_bytes_args) or {}
self._blob_service = BlockBlobService(**_credentials)
self._layer = layer

# Handle default load and save arguments
self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS)
Expand All @@ -129,6 +132,7 @@ def _describe(self) -> Dict[str, Any]:
blob_from_bytes_args=self._blob_from_bytes_args,
load_args=self._load_args,
save_args=self._save_args,
layer=self._layer,
)

def _load(self) -> pd.DataFrame:
Expand Down
Loading

0 comments on commit 01335c7

Please sign in to comment.