Skip to content

Commit

Permalink
[KED-2891] Implement spark.DeltaTable dataset (#964)
Browse files Browse the repository at this point in the history
Signed-off-by: Yetunde Dada <yetudada@users.noreply.github.com>
  • Loading branch information
jiriklein authored and yetudada committed Jan 28, 2022
1 parent c29c171 commit e8f5e22
Show file tree
Hide file tree
Showing 16 changed files with 423 additions and 89 deletions.
11 changes: 11 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@ executors:
py36:
docker:
- image: 350138855857.dkr.ecr.eu-west-2.amazonaws.com/kedro-builder:3.6
resource_class: medium+
py37:
docker:
- image: 350138855857.dkr.ecr.eu-west-2.amazonaws.com/kedro-builder:3.7
resource_class: medium+
py38:
docker:
- image: 350138855857.dkr.ecr.eu-west-2.amazonaws.com/kedro-builder:3.8
resource_class: medium+

commands:
setup_conda:
Expand All @@ -38,6 +41,14 @@ commands:
- run:
name: Install requirements and test requirements
command: pip install --upgrade -r test_requirements.txt
- run:
# this is needed to fix java cacerts so
# spark can automatically download packages from mvn
# https://stackoverflow.com/a/50103533/1684058
name: Fix cacerts
command: |
sudo rm /etc/ssl/certs/java/cacerts
sudo update-ca-certificates -f
- run:
# Since recently Spark installation for some reason does not have enough permissions to execute
# /home/circleci/miniconda/envs/kedro_builder/lib/python3.X/site-packages/pyspark/bin/spark-class.
Expand Down
3 changes: 2 additions & 1 deletion RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
* Enabled overriding nested parameters with `params` in CLI, i.e. `kedro run --params="model.model_tuning.booster:gbtree"` updates parameters to `{"model": {"model_tuning": {"booster": "gbtree"}}}`.
* Added option to `pandas.SQLQueryDataSet` to specify a `filepath` with a SQL query, in addition to the current method of supplying the query itself in the `sql` argument.
* Extended `ExcelDataSet` to support saving Excel files with multiple sheets.
* Added the following new dataset (see ([Issue #839](https://github.com/quantumblacklabs/kedro/issues/839)):
* Added the following new datasets (see ([Issue #839](https://github.com/quantumblacklabs/kedro/issues/839)):

| Type | Description | Location |
| --------------------------- | ---------------------------------------------------- | --------------------------------- |
| `plotly.JSONDataSet` | Works with plotly graph object Figures (saves as json file) | `kedro.extras.datasets.plotly` |
| `pandas.GenericDataSet` | Provides a 'best effort' facility to read / write any format provided by the `pandas` library | `kedro.extras.datasets.pandas` |
| `pandas.GBQQueryDataSet` | Loads data from a Google Bigquery table using provided SQL query | `kedro.extras.datasets.pandas` |
| `spark.DeltaTableDataSet` | Dataset designed to handle Delta Lake Tables and their CRUD-style operations, including `update`, `merge` and `delete` | `kedro.extras.datasets.spark` |

## Bug fixes and other changes
* Fixed an issue where `kedro new --config config.yml` was ignoring the config file when `prompts.yml` didn't exist.
Expand Down
4 changes: 2 additions & 2 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,19 +192,19 @@
# too many requests, or forbidden URL
linkcheck_ignore = [
"https://datacamp.com/community/tutorials/docstrings-python", # "forbidden" url
"https://setuptools.readthedocs.io/en/latest/setuptools.html#dynamic-discovery-of-services-and-plugins",
"https://github.com/argoproj/argo/blob/master/README.md#quickstart",
"https://console.aws.amazon.com/batch/home#/jobs",
"https://github.com/EbookFoundation/free-programming-books/blob/master/books/free-programming-books-langs.md#python",
"https://github.com/jazzband/pip-tools#example-usage-for-pip-compile",
"https://www.astronomer.io/docs/cloud/stable/get-started/quickstart#",
"https://github.com/quantumblacklabs/private-kedro/blob/main/kedro/templates/project/*",
"https://eternallybored.org/misc/wget/",
"https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.from_pandas",
"https://www.oracle.com/java/technologies/javase-downloads.html", # "forbidden" url
"https://towardsdatascience.com/the-importance-of-layered-thinking-in-data-engineering-a09f685edc71",
"https://medium.com/quantumblack/beyond-the-notebook-and-into-the-data-science-framework-revolution-a7fd364ab9c4",
"https://www.java.com/en/download/help/download_options.html", # "403 Client Error: Forbidden for url"
# "anchor not found" but it's a valid selector for code examples
"https://docs.delta.io/latest/delta-update.html#language-python",
]

# retry before render a link broken (fix for "too many requests")
Expand Down
4 changes: 2 additions & 2 deletions docs/source/07_extend_kedro/04_plugins.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Kedro plugins allow you to create new features for Kedro and inject additional c

## Overview

Kedro uses [`setuptools`](https://setuptools.readthedocs.io/en/latest/setuptools.html), which is a collection of enhancements to the Python `distutils` to allow developers to build and distribute Python packages. Kedro uses various entry points in [`pkg_resources`](https://setuptools.readthedocs.io/en/latest/setuptools.html#dynamic-discovery-of-services-and-plugins) to provide plugin functionality.
Kedro uses [`setuptools`](https://setuptools.readthedocs.io/en/latest/setuptools.html), which is a collection of enhancements to the Python `distutils` to allow developers to build and distribute Python packages. Kedro uses various entry points in [`pkg_resources`](https://setuptools.readthedocs.io/en/latest/setuptools.html) to provide plugin functionality.

## Example of a simple plugin

Expand Down Expand Up @@ -148,7 +148,7 @@ When you are ready to submit your code:
2. Choose a command approach: `global` and / or `project` commands:
- All `global` commands should be provided as a single `click` group
- All `project` commands should be provided as another `click` group
- The `click` groups are declared through the [`pkg_resources` entry_point system](https://setuptools.readthedocs.io/en/latest/setuptools.html#dynamic-discovery-of-services-and-plugins)
- The `click` groups are declared through the [`pkg_resources` entry_point system](https://setuptools.readthedocs.io/en/latest/setuptools.html)
3. Include a `README.md` describing your plugin's functionality and all dependencies that should be included
4. Use GitHub tagging to tag your plugin as a `kedro-plugin` so that we can find it

Expand Down
81 changes: 81 additions & 0 deletions docs/source/11_tools_integration/01_pyspark.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ CONTEXT_CLASS = CustomContext

We recommend using Kedro's built-in Spark datasets to load raw data into Spark's [DataFrame](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.html), as well as to write them back to storage. Some of our built-in Spark datasets include:

* [spark.DeltaTableDataSet](/kedro.extras.datasets.spark.DeltaTableDataSet)
* [spark.SparkDataSet](/kedro.extras.datasets.spark.SparkDataSet)
* [spark.SparkJDBCDataSet](/kedro.extras.datasets.spark.SparkJDBCDataSet)
* [spark.SparkHiveDataSet](/kedro.extras.datasets.spark.SparkHiveDataSet)
Expand Down Expand Up @@ -115,6 +116,86 @@ df = catalog.load("weather")
assert isinstance(df, pyspark.sql.DataFrame)
```

## Spark and Delta Lake interaction

[Delta Lake](https://delta.io/) is an open-source project that enables building a Lakehouse architecture on top of data lakes. It provides ACID transactions and unifies streaming and batch data processing on top of existing data lakes, such as S3, ADLS, GCS, and HDFS.
To setup PySpark with Delta Lake, have a look at [the recommendations in Delta Lake's documentation](https://docs.delta.io/latest/quick-start.html#python).

We recommend the following workflow, which makes use of the [Transcoding](../05_data/01_data_catalog.md) feature in Kedro:

* To create a Delta table, use a `SparkDataSet` with `file_format="delta"`. You can also use this type of dataset to read from a Delta table and/or overwrite it.
* To perform [Delta table deletes, updates, and merges](https://docs.delta.io/latest/delta-update.html#language-python), load the data using a `DeltaTableDataSet` and perform the write operations within the node function.

As a result, we end up with a catalog that looks like this:

```yaml
temperature:
type: spark.SparkDataSet
filepath: data/01_raw/data.csv
file_format: "csv"
load_args:
header: True
inferSchema: True
save_args:
sep: '|'
header: True
weather@spark:
type: spark.SparkDataSet
filepath: s3a://my_bucket/03_primary/weather
file_format: "delta"
save_args:
mode: "overwrite"
df_writer:
versionAsOf: 0
weather@delta:
type: spark.DeltaTableDataSet
filepath: s3a://my_bucket/03_primary/weather
```

The `DeltaTableDataSet` does not support `save()` operation, as the updates happen in place inside the node function, i.e. through `DeltaTable.update()`, `DeltaTable.delete()`, `DeltaTable.merge()`.


> Note: If you have defined an implementation for the Kedro `before_dataset_saved`/`after_dataset_saved` hook, the hook will not be triggered. This is because the save operation happens within the `node` itself, via the DeltaTable API.

```python
Pipeline(
[
node(
func=process_barometer_data, inputs="temperature", outputs="weather@spark"
),
node(
func=update_meterological_state,
inputs="weather@delta",
outputs="first_operation_complete",
),
node(
func=estimate_weather_trend,
inputs=["first_operation_complete", "weather@delta"],
outputs="second_operation_complete",
),
]
)
```

`first_operation_complete` is a `MemoryDataSet` and it signals that any Delta operations which occur "outside" the Kedro DAG are complete. This can be used as input to a downstream node, to preserve the shape of the DAG. Otherwise, if no downstream nodes need to run after this, the node can simply not return anything:

```python
Pipeline(
[
node(func=..., inputs="temperature", outputs="weather@spark"),
node(func=..., inputs="weather@delta", outputs=None),
]
)
```

The following diagram is the visual representation of the workflow explained above:

![Spark and Delta Lake workflow](../meta/images/spark_delta_workflow.png)

> Note: This pattern of creating "dummy" datasets to preserve the data flow also applies to other "out of DAG" execution operations such as SQL operations within a node.

## Use `MemoryDataSet` for intermediary `DataFrame`

For nodes operating on `DataFrame` that doesn't need to perform Spark actions such as writing the `DataFrame` to storage, we recommend using the default `MemoryDataSet` to hold the `DataFrame`. In other words, there is no need to specify it in the `DataCatalog` or `catalog.yml`. This allows you to take advantage of Spark's optimiser and lazy evaluation.
Expand Down
1 change: 1 addition & 0 deletions docs/source/15_api_docs/kedro.extras.datasets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ kedro.extras.datasets
kedro.extras.datasets.pillow.ImageDataSet
kedro.extras.datasets.plotly.JSONDataSet
kedro.extras.datasets.plotly.PlotlyDataSet
kedro.extras.datasets.spark.DeltaTableDataSet
kedro.extras.datasets.spark.SparkDataSet
kedro.extras.datasets.spark.SparkHiveDataSet
kedro.extras.datasets.spark.SparkJDBCDataSet
Expand Down
Binary file added docs/source/meta/images/spark_delta_workflow.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 3 additions & 1 deletion kedro/extras/datasets/spark/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Provides I/O modules for Apache Spark."""

__all__ = ["SparkDataSet", "SparkHiveDataSet", "SparkJDBCDataSet"]
__all__ = ["SparkDataSet", "SparkHiveDataSet", "SparkJDBCDataSet", "DeltaTableDataSet"]

from contextlib import suppress

Expand All @@ -10,3 +10,5 @@
from .spark_hive_dataset import SparkHiveDataSet
with suppress(ImportError):
from .spark_jdbc_dataset import SparkJDBCDataSet
with suppress(ImportError):
from .deltatable_dataset import DeltaTableDataSet
110 changes: 110 additions & 0 deletions kedro/extras/datasets/spark/deltatable_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
"""``AbstractVersionedDataSet`` implementation to access DeltaTables using
``delta-spark``
"""
from copy import deepcopy
from pathlib import PurePosixPath
from typing import Any, Dict

from delta.tables import DeltaTable
from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException

from kedro.extras.datasets.spark.spark_dataset import (
_split_filepath,
_strip_dbfs_prefix,
)
from kedro.io.core import AbstractDataSet, DataSetError


class DeltaTableDataSet(AbstractDataSet):
"""``DeltaTableDataSet`` loads data into DeltaTable objects.
Example adding a catalog entry with
`YAML API <https://kedro.readthedocs.io/en/stable/05_data/\
01_data_catalog.html#using-the-data-catalog-with-the-yaml-api>`_:
.. code-block:: yaml
>>> weather@spark:
>>> type: spark.SparkDataSet
>>> filepath: data/02_intermediate/data.parquet
>>> file_format: "delta"
>>>
>>> weather@delta:
>>> type: spark.DeltaTableDataSet
>>> filepath: data/02_intermediate/data.parquet
Example using Python API:
::
>>> from pyspark.sql import SparkSession
>>> from pyspark.sql.types import (StructField, StringType,
>>> IntegerType, StructType)
>>>
>>> from kedro.extras.datasets.spark import DeltaTableDataSet, SparkDataSet
>>>
>>> schema = StructType([StructField("name", StringType(), True),
>>> StructField("age", IntegerType(), True)])
>>>
>>> data = [('Alex', 31), ('Bob', 12), ('Clarke', 65), ('Dave', 29)]
>>>
>>> spark_df = SparkSession.builder.getOrCreate().createDataFrame(data, schema)
>>>
>>> data_set = SparkDataSet(filepath="test_data", file_format="delta")
>>> data_set.save(spark_df)
>>> deltatable_dataset = DeltaTableDataSet(filepath="test_data")
>>> delta_table = deltatable_dataset.load()
>>>
>>> delta_table.update()
"""

# this dataset cannot be used with ``ParallelRunner``,
# therefore it has the attribute ``_SINGLE_PROCESS = True``
# for parallelism within a Spark pipeline please consider
# using ``ThreadRunner`` instead
_SINGLE_PROCESS = True

def __init__(self, filepath: str, credentials: Dict[str, Any] = None) -> None:
"""Creates a new instance of ``DeltaTableDataSet``.
Args:
filepath: Filepath in POSIX format to a Spark dataframe. When using Databricks
and working with data written to mount path points,
specify ``filepath``s for (versioned) ``SparkDataSet``s
starting with ``/dbfs/mnt``.
credentials: Credentials to access the S3 bucket, such as
``key``, ``secret``, if ``filepath`` prefix is ``s3a://`` or ``s3n://``.
Optional keyword arguments passed to ``hdfs.client.InsecureClient``
if ``filepath`` prefix is ``hdfs://``. Ignored otherwise.
"""
credentials = deepcopy(credentials) or {} # do we need these anywhere??
fs_prefix, filepath = _split_filepath(filepath)

self._fs_prefix = fs_prefix
self._filepath = PurePosixPath(filepath)

@staticmethod
def _get_spark():
return SparkSession.builder.getOrCreate()

def _load(self) -> DeltaTable:
load_path = self._fs_prefix + str(self._filepath)
return DeltaTable.forPath(self._get_spark(), load_path)

def _save(self, data: Any) -> None:
raise DataSetError(f"{self.__class__.__name__} is a read only dataset type")

def _exists(self) -> bool:
load_path = _strip_dbfs_prefix(self._fs_prefix + str(self._filepath))

try:
self._get_spark().read.load(path=load_path, format="delta")
except AnalysisException as exception:
if "is not a Delta table" in exception.desc:
return False
raise

return True

def _describe(self):
return dict(filepath=str(self._filepath), fs_prefix=self._fs_prefix)
29 changes: 24 additions & 5 deletions kedro/extras/datasets/spark/spark_dataset.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""``AbstractDataSet`` implementation to access Spark dataframes using
"""``AbstractVersionedDataSet`` implementation to access Spark dataframes using
``pyspark``
"""
from copy import deepcopy
Expand All @@ -13,7 +13,7 @@
from pyspark.sql.utils import AnalysisException
from s3fs import S3FileSystem

from kedro.io.core import AbstractVersionedDataSet, Version
from kedro.io.core import AbstractVersionedDataSet, DataSetError, Version


def _parse_glob_pattern(pattern: str) -> str:
Expand Down Expand Up @@ -223,7 +223,7 @@ def __init__( # pylint: disable=too-many-arguments
starting with ``/dbfs/mnt``.
file_format: File format used during load and save
operations. These are formats supported by the running
SparkContext include parquet, csv. For a list of supported
SparkContext include parquet, csv, delta. For a list of supported
formats please refer to Apache Spark documentation at
https://spark.apache.org/docs/latest/sql-programming-guide.html
load_args: Load args passed to Spark DataFrameReader load method.
Expand Down Expand Up @@ -304,9 +304,13 @@ def __init__( # pylint: disable=too-many-arguments
if save_args is not None:
self._save_args.update(save_args)

### would they be relevant on load_args / on read as well?
self._dfwriter_options = self._save_args.pop("dfwriter_options", {}) or {}
self._file_format = file_format
self._fs_prefix = fs_prefix

self._handle_delta_format()

def _describe(self) -> Dict[str, Any]:
return dict(
filepath=self._fs_prefix + str(self._filepath),
Expand All @@ -329,15 +333,30 @@ def _load(self) -> DataFrame:

def _save(self, data: DataFrame) -> None:
save_path = _strip_dbfs_prefix(self._fs_prefix + str(self._get_save_path()))
data.write.save(save_path, self._file_format, **self._save_args)
data.write.options(**self._dfwriter_options).save(
save_path, self._file_format, **self._save_args
)

def _exists(self) -> bool:
load_path = _strip_dbfs_prefix(self._fs_prefix + str(self._get_load_path()))

try:
self._get_spark().read.load(load_path, self._file_format)
except AnalysisException as exception:
if exception.desc.startswith("Path does not exist:"):
if (
exception.desc.startswith("Path does not exist:")
or "is not a Delta table" in exception.desc
):
return False
raise
return True

def _handle_delta_format(self) -> None:
unsupported_modes = {"merge", "delete", "update"}
write_mode = self._save_args.get("mode") or ""
if self._file_format == "delta" and write_mode.lower() in unsupported_modes:
raise DataSetError(
f"It is not possible to perform `save()` for file format `delta` "
f"with mode `{write_mode}` on `SparkDataSet`. "
f"Please use `spark.DeltaTableDataSet` instead."
)
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ def _collect_requirements(requires):
"spark.SparkDataSet": [SPARK, HDFS, S3FS],
"spark.SparkHiveDataSet": [SPARK, HDFS, S3FS],
"spark.SparkJDBCDataSet": [SPARK, HDFS, S3FS],
"spark.DeltaTableDataSet": [SPARK, HDFS, S3FS, "delta-spark~=1.0"],
}
tensorflow_required = {
"tensorflow.TensorflowModelDataset": [
Expand Down
Loading

0 comments on commit e8f5e22

Please sign in to comment.