Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KED-2891] Implement spark.DeltaTable dataset #964

Merged
merged 56 commits into from
Dec 3, 2021
Merged
Show file tree
Hide file tree
Changes from 52 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
3c3e774
branch init
Oct 14, 2021
bcb7ed1
non-working draft that leads to strat pattern resolution on _save method
Oct 14, 2021
603f7bd
Merge branch 'master' of github.com:quantumblacklabs/kedro into featu…
Oct 14, 2021
3d85fe3
Merge branch 'master' of github.com:quantumblacklabs/kedro into featu…
Oct 15, 2021
462d26b
Merge branch 'master' of github.com:quantumblacklabs/kedro into featu…
Oct 15, 2021
fd37c4d
Merge branch 'master' of github.com:quantumblacklabs/kedro into featu…
Oct 20, 2021
cdbeabd
commenting and removal of redundancy
Oct 20, 2021
17ea2ac
Merge branch 'master' of github.com:quantumblacklabs/kedro into featu…
Oct 22, 2021
e703a66
linting, fixes, removal
Oct 22, 2021
be6106c
Merge branch 'master' of github.com:quantumblacklabs/kedro into featu…
Oct 22, 2021
beed054
Merge branch 'master' of github.com:quantumblacklabs/kedro into featu…
Oct 25, 2021
763ab92
Merge branch 'master' of github.com:quantumblacklabs/kedro into featu…
Oct 26, 2021
82ad4c3
Merge branch 'master' of github.com:quantumblacklabs/kedro into featu…
Oct 26, 2021
8984385
added RELEASE.md
Oct 26, 2021
7b1cd03
Merge branch 'master' of github.com:quantumblacklabs/kedro into featu…
Oct 29, 2021
566dd57
Merge branch 'master' of github.com:quantumblacklabs/kedro into featu…
Nov 2, 2021
b6cbb25
Merge branch 'master' into feature/databricks-deltatable-dataset
jiriklein Nov 3, 2021
00d9e0f
Merge branch 'master' of github.com:quantumblacklabs/kedro into featu…
Nov 3, 2021
65e0da0
Merge branch 'master' of github.com:quantumblacklabs/kedro into featu…
Nov 4, 2021
3f6f449
Merge branch 'master' of github.com:quantumblacklabs/kedro into featu…
Nov 5, 2021
88ab9af
reworking and cleanup - delegated decent amount of logic to SparkDataSet
Nov 5, 2021
dbea396
Merge branch 'master' of github.com:quantumblacklabs/kedro into featu…
Nov 5, 2021
c92c883
logging message multiline
Nov 5, 2021
a1925af
Merge branch 'master' of github.com:quantumblacklabs/kedro into featu…
Nov 8, 2021
b8a0282
removal of licencing
Nov 8, 2021
155bc64
Merge branch 'master' into feature/databricks-deltatable-dataset
Nov 11, 2021
aa92baf
Merge branch 'master' into feature/databricks-deltatable-dataset
Nov 12, 2021
058be40
Add requirements, remove confirms(), linting fixes
Nov 12, 2021
721e046
Add options
Nov 12, 2021
0747ad9
Add test file
Nov 12, 2021
01397a4
Fix tests
Nov 12, 2021
1ae97f3
Merge branch 'master' into feature/databricks-deltatable-dataset
Nov 16, 2021
abd1740
Fix tests properly
Nov 16, 2021
c6efb74
Write minimal unit test
Nov 17, 2021
66f17ac
Remove delta_options
Nov 17, 2021
2b697f5
Merge branch 'master' into feature/databricks-deltatable-dataset
Nov 17, 2021
7bf65cf
Add some docstrings
Nov 17, 2021
1ed7ed5
Merge branch 'master' into feature/databricks-deltatable-dataset
Nov 23, 2021
a2559aa
Make deltatable dataset test pass
Nov 23, 2021
2792538
Merge branch 'master' into feature/databricks-deltatable-dataset
Nov 23, 2021
f8005b6
Define delta spark session locally
Nov 23, 2021
cfb0f9e
More robust test
Nov 23, 2021
cfba1db
Add unit tests
Nov 23, 2021
799658d
Inherit from AbstractDataSet directly
Nov 23, 2021
7ca691a
Add more tests
Nov 24, 2021
d3f3e39
Use DataFrameWriter.options instead of .option
Nov 24, 2021
da00700
Fix linkchecker
Nov 24, 2021
b135a92
Move test
Nov 24, 2021
29d116f
Try fix tests that fail only on CCI
Nov 24, 2021
78789e6
Add some rough documentation
Nov 24, 2021
56ea747
Apply suggestions from code review
Nov 24, 2021
c837288
Try this on CI
Nov 29, 2021
55a9b2d
Run sparksession before
Dec 2, 2021
5bbd157
Merge branch 'master' into feature/databricks-deltatable-dataset
limdauto Dec 3, 2021
933feef
Fix tests (#1088)
limdauto Dec 3, 2021
75ee172
Apply suggestions from code review and general cleanup
Dec 3, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 40 additions & 40 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -449,29 +449,29 @@ workflows:
regular:
jobs:
- unit_tests_36
- linters_36
- e2e_tests_36
- docs_36
- docs_linkcheck_37
# - linters_36
# - e2e_tests_36
# - docs_36
# - docs_linkcheck_37
- unit_tests_37
- linters_37
- e2e_tests_37
- docs_37
# - linters_37
# - e2e_tests_37
# - docs_37
- unit_tests_38
- linters_38
- e2e_tests_38
- pip_compile_36
- pip_compile_37
- pip_compile_38
- win_unit_tests_36
- win_unit_tests_37
- win_unit_tests_38
- win_pip_compile_36
- win_pip_compile_37
- win_pip_compile_38
- win_e2e_tests_36
- win_e2e_tests_37
- win_e2e_tests_38
# - linters_38
# - e2e_tests_38
# - pip_compile_36
# - pip_compile_37
# - pip_compile_38
# - win_unit_tests_36
# - win_unit_tests_37
# - win_unit_tests_38
# - win_pip_compile_36
# - win_pip_compile_37
# - win_pip_compile_38
# - win_e2e_tests_36
# - win_e2e_tests_37
# - win_e2e_tests_38
lorenabalan marked this conversation as resolved.
Show resolved Hide resolved
- run_kedro_viz:
filters:
branches:
Expand All @@ -480,28 +480,28 @@ workflows:
- all_circleci_checks_succeeded:
requires:
- unit_tests_36
- linters_36
- e2e_tests_36
- docs_36
# - linters_36
# - e2e_tests_36
# - docs_36
- unit_tests_37
- linters_37
- e2e_tests_37
- docs_37
- docs_linkcheck_37
# - linters_37
# - e2e_tests_37
# - docs_37
# - docs_linkcheck_37
- unit_tests_38
- linters_38
- e2e_tests_38
- pip_compile_36
- pip_compile_37
- pip_compile_38
- win_pip_compile_36
- win_pip_compile_37
- win_pip_compile_38
- win_unit_tests_36
- win_unit_tests_37
# - linters_38
# - e2e_tests_38
# - pip_compile_36
# - pip_compile_37
# - pip_compile_38
# - win_pip_compile_36
# - win_pip_compile_37
# - win_pip_compile_38
# - win_unit_tests_36
# - win_unit_tests_37
# Skipped due to `pywin32 is in an unsupported or invalid wheel`
# - win_e2e_tests_36
# Skipped due to Windows fatal exception: stack overflow
# - win_unit_tests_38
- win_e2e_tests_37
- win_e2e_tests_38
# - win_e2e_tests_37
# - win_e2e_tests_38
3 changes: 2 additions & 1 deletion RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
* Enabled overriding nested parameters with `params` in CLI, i.e. `kedro run --params="model.model_tuning.booster:gbtree"` updates parameters to `{"model": {"model_tuning": {"booster": "gbtree"}}}`.
* Added option to `pandas.SQLQueryDataSet` to specify a `filepath` with a SQL query, in addition to the current method of supplying the query itself in the `sql` argument.
* Extended `ExcelDataSet` to support saving Excel files with multiple sheets.
* Added the following new dataset (see ([Issue #839](https://github.com/quantumblacklabs/kedro/issues/839)):
* Added the following new datasets (see ([Issue #839](https://github.com/quantumblacklabs/kedro/issues/839)):

| Type | Description | Location |
| --------------------------- | ---------------------------------------------------- | --------------------------------- |
| `plotly.JSONDataSet` | Works with plotly graph object Figures (saves as json file) | `kedro.extras.datasets.plotly` |
| `pandas.GenericDataSet` | Provides a 'best effort' facility to read / write any format provided by the `pandas` library | `kedro.extras.datasets.pandas` |
| `pandas.GBQQueryDataSet` | Loads data from a Google Bigquery table using provided SQL query | `kedro.extras.datasets.pandas` |
| `spark.DeltaTableDataSet` | Dataset designed to handle Delta Lake Tables and their CRUD-style operations, including `update`, `merge` and `delete` | `kedro.extras.datasets.spark` |

## Bug fixes and other changes
* Fixed an issue where `kedro new --config config.yml` was ignoring the config file when `prompts.yml` didn't exist.
Expand Down
2 changes: 1 addition & 1 deletion docs/source/07_extend_kedro/04_plugins.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ See the full list of plugins using the GitHub tag [kedro-plugin](https://github.
- [kedro-great](https://github.com/tamsanh/kedro-great), by [Tam-Sanh Nguyen](https://github.com/tamsanh), integrates Kedro with [Great Expectations](https://greatexpectations.io), enabling catalog-based expectation generation and data validation on pipeline run
- [Kedro-Accelerator](https://github.com/deepyaman/kedro-accelerator), by [Deepyaman Datta](https://github.com/deepyaman), speeds up pipelines by parallelizing I/O in the background
- [kedro-dataframe-dropin](https://github.com/mzjp2/kedro-dataframe-dropin), by [Zain Patel](https://github.com/mzjp2), lets you swap out pandas datasets for modin or RAPIDs equivalents for specialised use to speed up your workflows (e.g on GPUs)
- [kedro-kubeflow](https://github.com/getindata/kedro-kubeflow), by [Mateusz Pytel](https://github.com/em-pe) and [Mariusz Strzelecki](https://github.com/szczeles), lets you run and schedule pipelines on Kubernetes clusters using [Kubeflow Pipelines](https://www.kubeflow.org/docs/pipelines/overview/pipelines-overview/)
- [kedro-kubeflow](https://github.com/getindata/kedro-kubeflow), by [Mateusz Pytel](https://github.com/em-pe) and [Mariusz Strzelecki](https://github.com/szczeles), lets you run and schedule pipelines on Kubernetes clusters using [Kubeflow Pipelines](https://www.kubeflow.org/docs/pipelines/overview/)
- [kedro-mlflow](https://github.com/Galileo-Galilei/kedro-mlflow), by [Yolan Honoré-Rougé](https://github.com/galileo-galilei), and [Takieddine Kadiri](https://github.com/takikadiri) facilitates [Mlflow](https://www.mlflow.org/) integration inside Kedro projects while enforcing [Kedro's principles](https://kedro.readthedocs.io/en/stable/12_faq/01_faq.html#what-are-the-primary-advantages-of-kedro). Its main features are modular configuration, automatic parameters tracking, datasets versioning, Kedro pipelines packaging and serving and automatic synchronization between training and inference pipelines for high reproducibility of machine learning experiments and ease of deployment. A tutorial is provided in the [kedro-mlflow-tutorial repo](https://github.com/Galileo-Galilei/kedro-mlflow-tutorial). You can find more information in [the documentation](https://kedro-mlflow.readthedocs.io/en/stable/).
- [kedro-neptune](https://github.com/neptune-ai/kedro-neptune), by [Jakub Czakon](https://github.com/jakubczakon) and [Rafał Jankowski](https://github.com/Raalsky), lets you have all the benefits of a nicely organized Kedro pipeline with Neptune: a powerful user interface built for ML metadata management. It lets you browse and filter pipeline executions, compare nodes and pipelines on metrics and parameters, and visualize pipeline metadata like learning curves, node outputs, and charts. For more information, tutorials and videos, go to [the documentation](https://docs.neptune.ai/integrations-and-supported-tools/automation-pipelines/kedro).
- [kedro-dolt](https://www.dolthub.com/blog/2021-06-16-kedro-dolt-plugin/), [Max Hoffman](https://github.com/max-hoffman) and [Oscar Batori](https://github.com/oscarbatori), a plugin that allows you to expand the data versioning abilities of data scientists and engineers.
Expand Down
76 changes: 76 additions & 0 deletions docs/source/11_tools_integration/01_pyspark.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ CONTEXT_CLASS = CustomContext

We recommend using Kedro's built-in Spark datasets to load raw data into Spark's [DataFrame](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.html), as well as to write them back to storage. Some of our built-in Spark datasets include:

* [spark.DeltaTableDataSet](/kedro.extras.datasets.spark.DeltaTableDataSet)
* [spark.SparkDataSet](/kedro.extras.datasets.spark.SparkDataSet)
* [spark.SparkJDBCDataSet](/kedro.extras.datasets.spark.SparkJDBCDataSet)
* [spark.SparkHiveDataSet](/kedro.extras.datasets.spark.SparkHiveDataSet)
Expand Down Expand Up @@ -115,6 +116,81 @@ df = catalog.load("weather")
assert isinstance(df, pyspark.sql.DataFrame)
```

## Spark and Delta Lake interaction

[Delta Lake](https://delta.io/) is an open-source project that enables building a Lakehouse architecture on top of data lakes. It provides ACID transactions and unifies streaming and batch data processing on top of existing data lakes, such as S3, ADLS, GCS, and HDFS.
To setup PySpark with Delta Lake, have a look at [the recommendations in Delta Lake's documentation](https://docs.delta.io/latest/quick-start.html#python).

We recommend the following workflow, which makes use of the [Transcoding](../05_data/01_data_catalog.md) feature in Kedro.
lorenabalan marked this conversation as resolved.
Show resolved Hide resolved

```yaml
temperature:
type: spark.SparkDataSet
filepath: data/01_raw/data.csv
datajoely marked this conversation as resolved.
Show resolved Hide resolved
file_format: csv
load_args:
header: True
inferSchema: True
save_args:
sep: '|'
header: True

weather@spark:
type: spark.SparkDataSet
filepath: s3a://my_bucket/03_primary/temperature
datajoely marked this conversation as resolved.
Show resolved Hide resolved
file_format: delta
save_args:
mode: "overwrite"
df_writer:
versionAsOf: 0

weather@delta:
type: spark.DeltaTableDataSet
filepath: s3a://my_bucket/03_primary/weather
datajoely marked this conversation as resolved.
Show resolved Hide resolved
```

The `DeltaTableDataSet` does not support `save()` operation, as the updates happen in place inside the node function, i.e. through `DeltaTable.update()`, `DeltaTable.delete()`, `DeltaTable.merge()`.


> Since the save operation happens within the `node` via the DeltaTable API, the Kedro `before_dataset_saved` hook will not be triggered.
lorenabalan marked this conversation as resolved.
Show resolved Hide resolved

```python
Pipeline(
[
node(
func=process_barometer_data, inputs="temperature", outputs="weather@spark"
),
node(
func=update_meterological_state,
inputs="weather@delta",
outputs="first_operation_complete",
),
node(
func=estimate_weather_trend,
inputs=["first_operation_complete", "weather@delta"],
outputs="second_operation_complete",
),
]
)
```

`first_operation_complete` is a `MemoryDataSet` and it signals that any Delta operations which occur "outside" the Kedro DAG are complete. This can be used as input to a downstream node, to preserve the shape of the DAG. Otherwise, if no downstream nodes need to run after this, the node can simply not return anything:

```python
Pipeline(
[
node(func=..., inputs="temperature", outputs="weather@spark"),
node(func=..., inputs="weather@delta", outputs=None),
]
)
```

The following diagram is the visual representation of the workflow explained above:

![Spark and Delta Lake workflow](../meta/images/spark_delta_workflow.png)

> Note: This pattern of creating "dummy" datasets to preserve the data flow also applies to other "out of DAG" execution operations such as SQL operations within a node.

## Use `MemoryDataSet` for intermediary `DataFrame`

For nodes operating on `DataFrame` that doesn't need to perform Spark actions such as writing the `DataFrame` to storage, we recommend using the default `MemoryDataSet` to hold the `DataFrame`. In other words, there is no need to specify it in the `DataCatalog` or `catalog.yml`. This allows you to take advantage of Spark's optimiser and lazy evaluation.
Expand Down
1 change: 1 addition & 0 deletions docs/source/15_api_docs/kedro.extras.datasets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ kedro.extras.datasets
kedro.extras.datasets.pillow.ImageDataSet
kedro.extras.datasets.plotly.JSONDataSet
kedro.extras.datasets.plotly.PlotlyDataSet
kedro.extras.datasets.spark.DeltaTableDataSet
kedro.extras.datasets.spark.SparkDataSet
kedro.extras.datasets.spark.SparkHiveDataSet
kedro.extras.datasets.spark.SparkJDBCDataSet
Expand Down
Binary file added docs/source/meta/images/spark_delta_workflow.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 3 additions & 1 deletion kedro/extras/datasets/spark/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Provides I/O modules for Apache Spark."""

__all__ = ["SparkDataSet", "SparkHiveDataSet", "SparkJDBCDataSet"]
__all__ = ["SparkDataSet", "SparkHiveDataSet", "SparkJDBCDataSet", "DeltaTableDataSet"]

from contextlib import suppress

Expand All @@ -10,3 +10,5 @@
from .spark_hive_dataset import SparkHiveDataSet
with suppress(ImportError):
from .spark_jdbc_dataset import SparkJDBCDataSet
with suppress(ImportError):
from .deltatable_dataset import DeltaTableDataSet
118 changes: 118 additions & 0 deletions kedro/extras/datasets/spark/deltatable_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
"""``AbstractVersionedDataSet`` implementation to access DeltaTables using
``delta-spark``
"""
import logging
from copy import deepcopy
from pathlib import PurePosixPath
from typing import Any, Dict

from delta.tables import DeltaTable
from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException

from kedro.extras.datasets.spark.spark_dataset import (
_split_filepath,
_strip_dbfs_prefix,
)
from kedro.io.core import AbstractDataSet

logger = logging.getLogger(__name__)


class DeltaTableDataSet(AbstractDataSet):
"""``DeltaTableDataSet`` loads data into DeltaTable objects.

Example adding a catalog entry with
`YAML API <https://kedro.readthedocs.io/en/stable/05_data/\
01_data_catalog.html#using-the-data-catalog-with-the-yaml-api>`_:

.. code-block:: yaml

>>> weather@spark:
>>> type: spark.SparkDataSet
>>> filepath: data/02_intermediate/data.parquet
>>> file_format: "delta"
>>>
>>> weather@delta:
>>> type: spark.DeltaTableDataSet
>>> filepath: data/02_intermediate/data.parquet

Example using Python API:
::

>>> from pyspark.sql import SparkSession
>>> from pyspark.sql.types import (StructField, StringType,
>>> IntegerType, StructType)
>>>
>>> from kedro.extras.datasets.spark import DeltaTableDataSet, SparkDataSet
>>>
>>> schema = StructType([StructField("name", StringType(), True),
>>> StructField("age", IntegerType(), True)])
>>>
>>> data = [('Alex', 31), ('Bob', 12), ('Clarke', 65), ('Dave', 29)]
>>>
>>> spark_df = SparkSession.builder.getOrCreate().createDataFrame(data, schema)
>>>
>>> data_set = SparkDataSet(filepath="test_data", file_format="delta")
>>> data_set.save(spark_df)
>>> deltatable_dataset = DeltaTableDataSet(filepath="test_data")
>>> delta_table = deltatable_dataset.load()
>>>
>>> delta_table.update()
"""

# this dataset cannot be used with ``ParallelRunner``,
# therefore it has the attribute ``_SINGLE_PROCESS = True``
# for parallelism within a Spark pipeline please consider
lorenabalan marked this conversation as resolved.
Show resolved Hide resolved
# ``ThreadRunner`` instead
_SINGLE_PROCESS = True

def __init__( # pylint: disable=too-many-arguments
self, filepath: str, credentials: Dict[str, Any] = None
) -> None:
"""Creates a new instance of ``DeltaTableDataSet``.

Args:
filepath: Filepath in POSIX format to a Spark dataframe. When using Databricks
and working with data written to mount path points,
specify ``filepath``s for (versioned) ``SparkDataSet``s
starting with ``/dbfs/mnt``.
credentials: Credentials to access the S3 bucket, such as
``key``, ``secret``, if ``filepath`` prefix is ``s3a://`` or ``s3n://``.
Optional keyword arguments passed to ``hdfs.client.InsecureClient``
if ``filepath`` prefix is ``hdfs://``. Ignored otherwise.
"""
credentials = deepcopy(credentials) or {} # do we need these anywhere??
Comment on lines +75 to +80
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that we don't pass credentials to neither SparkHiveDataSet nor SparkJDBCDataSet. We only use them in SparkDataSet to be able to do versioning later (we instantiate the filesystem and get the glob_function and exist_function). Can I assume that configuration for DeltaTable will be done separately in a spark.yml?

fs_prefix, filepath = _split_filepath(filepath)

self._fs_prefix = fs_prefix
self._filepath = PurePosixPath(filepath)

@staticmethod
def _get_spark():
return SparkSession.builder.getOrCreate()

def _load(self) -> DeltaTable:
load_path = self._fs_prefix + str(self._filepath)
return DeltaTable.forPath(self._get_spark(), load_path)

def _save(self, data: Any) -> None:
logger.info(
"Saving was performed on `DeltaTable` object within the context of the node function"
)
# raise DataSetError(f"{self.__class__.__name__} is a read only dataset type")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm tempted to turn into into an actual error instead of a log message. I think that's more consistent with what we have on other datasets (e.g. APIDataSet) and also signals to the users that saving doesn't actually do anything (the operation is done in the node already).


def _exists(self) -> bool:
load_path = _strip_dbfs_prefix(self._fs_prefix + str(self._filepath))

try:
self._get_spark().read.load(path=load_path, format="delta")
except AnalysisException as exception:
if "is not a Delta table" in exception.desc:
return False
raise

return True

def _describe(self):
return dict(filepath=str(self._filepath), fs_prefix=self._fs_prefix)
Loading