Skip to content

Commit

Permalink
Apply suggestions from code review
Browse files Browse the repository at this point in the history
Co-authored-by: Joel Schwarzmann <35801847+datajoely@users.noreply.github.com>
  • Loading branch information
Lorena Bălan and datajoely committed Nov 25, 2021
1 parent 78789e6 commit 56ea747
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 7 deletions.
27 changes: 20 additions & 7 deletions docs/source/11_tools_integration/01_pyspark.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ temperature:
weather@spark:
type: spark.SparkDataSet
filepath: data/01_raw/data
filepath: s3a://my_bucket/03_primary/temperature
file_format: delta
save_args:
mode: "overwrite"
Expand All @@ -146,28 +146,35 @@ weather@spark:
weather@delta:
type: spark.DeltaTableDataSet
filepath: data/01_raw/data
filepath: s3a://my_bucket/03_primary/weather
```

The `DeltaTableDataSet` does not support `save()` operation, as the updates happen in place inside the node function, i.e. through `DeltaTable.update()`, `DeltaTable.delete()`, `DeltaTable.merge()`.

> Note: `before_dataset_saved` hook will not run at the expected time, as the actual saving happens in the node through the DeltaTable operations.

> Since the save operation happens within the `node` via the DeltaTable API, the Kedro `before_dataset_saved` hook will not be triggered.

```python
Pipeline(
[
node(func=..., inputs="temperature", outputs="weather@spark"),
node(func=..., inputs="weather@delta", outputs="first_operation_complete"),
node(
func=...,
func=process_barometer_data, inputs="temperature", outputs="weather@spark"
),
node(
func=update_meterological_state,
inputs="weather@delta",
outputs="first_operation_complete",
),
node(
func=estimate_weather_trend,
inputs=["first_operation_complete", "weather@delta"],
outputs="second_operation_complete",
),
]
)
```

`first_operation_complete` is a `MemoryDataSet` and it signals that a set of Delta operations is complete. This can be used as input to a downstream node, to preserve the shape of the DAG. Otherwise, if no downstream nodes need to run after this, the node can simply not return anything:
`first_operation_complete` is a `MemoryDataSet` and it signals that any Delta operations which occur "outside" the Kedro DAG are complete. This can be used as input to a downstream node, to preserve the shape of the DAG. Otherwise, if no downstream nodes need to run after this, the node can simply not return anything:

```python
Pipeline(
Expand All @@ -178,6 +185,12 @@ Pipeline(
)
```

The following diagram is the visual representation of the workflow explained above:

![Spark and Delta Lake workflow](../meta/images/spark_delta_workflow.png)

> Note: This pattern of creating "dummy" datasets to preserve the data flow also applies to other "out of DAG" execution operations such as SQL operations within a node.

## Use `MemoryDataSet` for intermediary `DataFrame`

For nodes operating on `DataFrame` that doesn't need to perform Spark actions such as writing the `DataFrame` to storage, we recommend using the default `MemoryDataSet` to hold the `DataFrame`. In other words, there is no need to specify it in the `DataCatalog` or `catalog.yml`. This allows you to take advantage of Spark's optimiser and lazy evaluation.
Expand Down
Binary file added docs/source/meta/images/spark_delta_workflow.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit 56ea747

Please sign in to comment.