diff --git a/docs/source/data/data_catalog_yaml_examples.md b/docs/source/data/data_catalog_yaml_examples.md index 3b1f8e67dd..bfc309d2ab 100644 --- a/docs/source/data/data_catalog_yaml_examples.md +++ b/docs/source/data/data_catalog_yaml_examples.md @@ -365,11 +365,15 @@ airplanes: In this example, the default `csv` configuration is inserted into `airplanes` and then the `load_args` block is overridden. Normally, that would replace the whole dictionary. In order to extend `load_args`, the defaults for that block are then re-inserted. -## Read the same file using two different datasets +## Read the same file using different datasets with transcoding -You might come across a situation where you would like to read the same file using two different dataset implementations (known as transcoding). For example, Parquet files can not only be loaded via the `ParquetDataset` using `pandas`, but also directly by `SparkDataset`. This conversion is typical when coordinating a `Spark` to `pandas` workflow. +You might come across a situation where you would like to read the same file using two different `Dataset` implementations. You can achieve this by using transcoding to define separate `DataCatalog` entries that point to the same `filepath`. -Define two `DataCatalog` entries for the same dataset in a common format (for example, Parquet, JSON, CSV) in your `conf/base/catalog.yml`: +### How to use transcoding + +Consider an example with Parquet files. Parquet files can be loaded with both the `pandas.ParquetDataset`, and the `spark.SparkDataset` directly. This conversion is typical when coordinating a `Spark` to `pandas` workflow. + +To load the same file as both a `pandas.ParquetDataset` and a `spark.SparkDataset`, define two `DataCatalog` entries for the same dataset in your `conf/base/catalog.yml`: ```yaml my_dataframe@spark: @@ -382,13 +386,13 @@ my_dataframe@pandas: filepath: data/02_intermediate/data.parquet ``` -These entries are used in the pipeline like this: + When using transcoding you must ensure the filepaths defined for each catalog entry share the same format (for example: CSV, JSON, Parquet). These entries can then be used in the pipeline as follows: ```python pipeline( [ - node(func=my_func1, inputs="spark_input", outputs="my_dataframe@spark"), - node(func=my_func2, inputs="my_dataframe@pandas", outputs="pipeline_output"), + node(name="my_func1_node", func=my_func1, inputs="spark_input", outputs="my_dataframe@spark"), + node(name="my_func2_node", func=my_func2, inputs="my_dataframe@pandas", outputs="pipeline_output"), ] ) ``` @@ -398,6 +402,80 @@ In this example, Kedro understands that `my_dataframe` is the same dataset in it In the pipeline, Kedro uses the `spark.SparkDataset` implementation for saving and `pandas.ParquetDataset` for loading, so the first node outputs a `pyspark.sql.DataFrame`, while the second node receives a `pandas.Dataframe`. +### How *not* to use transcoding + +Kedro pipelines automatically resolve the node execution order and check to ensure there are no circular dependencies in the pipeline. It is during this process that the transcoded datasets are resolved and the transcoding notation `@...` is stripped. This means within the pipeline the datasets `my_dataframe@spark` and `my_dataframe@pandas` are considered to be one `my_dataframe` dataset. The `DataCatalog`, however, treats transcoded entries as separate datasets, as they are only resolved as part of the pipeline resolution process. This results in differences between your defined pipeline in `pipeline.py` and the resolved pipeline that is run by Kedro, and these differences may lead to unintended behaviours. Thus, it is important to be aware of this when using transcoding. + +```{caution} +Below are some examples where transcoding may produce unwanted side effects and raise errors. +``` + +#### Defining a node with the same inputs and outputs + +Consider the following pipeline: + +```python +pipeline( + [ + node(name="my_func1_node", func=my_func1, inputs="my_dataframe@pandas", outputs="my_dataframe@spark"), + ] +) +``` + +During the pipeline resolution, the node above is defined as having the dataset `my_dataset` as both its input and output. As a node cannot have the same inputs and outputs, trying to run this pipeline will fail with the following error: + +```console +ValueError: Failed to create node my_func1([my_dataframe@pandas]) -> [my_dataframe@spark]. +A node cannot have the same inputs and outputs even if they are transcoded: {'my_dataframe'} +``` + +#### Defining several nodes that share the same output + +Consider the following pipeline: + +```python +pipeline( + [ + node(name="my_func1_node", func=my_func1, inputs="spark_input", outputs="my_dataframe@spark"), + node(name="my_func2_node", func=my_func2, inputs="pandas_input", outputs="my_dataframe@pandas"), + ] +) +``` + +When this pipeline is resolved, both nodes are defined as returning the same output `my_dataset`, which is not allowed. Running the pipeline will fail with the following error: + +```console +kedro.pipeline.pipeline.OutputNotUniqueError: Output(s) ['my_dataframe'] are returned by more than one nodes. Node outputs must be unique. +``` + +#### Creating pipelines with hidden dependencies + +Consider the following pipeline: + +```python +pipeline( + [ + node(name="my_func1_node", func=my_func1, inputs="my_dataframe@spark", outputs="spark_output"), + node(name="my_func2_node", func=my_func2, inputs="pandas_input", outputs="my_dataframe@pandas"), + node(name="my_func3_node", func=my_func3, inputs="my_dataframe@pandas", outputs="pandas_output"), + ] +) +``` + +In this example, there is a single dependency between the nodes `my_func3_node` and `my_func2_node`. However, when this pipeline is resolved there are some hidden dependencies that will restrict the node execution order. We can expose them by removing the transcoding notation: + +```python +resolved_pipeline( + [ + node(name="my_func1_node", func=my_func1, inputs="my_dataframe", outputs="spark_output"), + node(name="my_func2_node", func=my_func2, inputs="pandas_input", outputs="my_dataframe"), + node(name="my_func3_node", func=my_func3, inputs="my_dataframe", outputs="pandas_output"), + ] +) +``` + +When the node order is resolved, we can see that the node `my_func1_node` is treated as dependent on the node `my_func2_node`. This pipeline will still run without any errors, but one should be careful about creating hidden dependencies as they can decrease performance, for example, when using the `ParallelRunner`. + ## Create a Data Catalog YAML configuration file via the CLI You can use the [`kedro catalog create` command to create a Data Catalog YAML configuration](../development/commands_reference.md#create-a-data-catalog-yaml-configuration-file). diff --git a/docs/source/integrations/pyspark_integration.md b/docs/source/integrations/pyspark_integration.md index a9450ced4f..0298c14ef7 100644 --- a/docs/source/integrations/pyspark_integration.md +++ b/docs/source/integrations/pyspark_integration.md @@ -111,7 +111,7 @@ assert isinstance(df, pyspark.sql.DataFrame) [Delta Lake](https://delta.io/) is an open-source project that enables building a Lakehouse architecture on top of data lakes. It provides ACID transactions and unifies streaming and batch data processing on top of existing data lakes, such as S3, ADLS, GCS, and HDFS. To setup PySpark with Delta Lake, have a look at [the recommendations in Delta Lake's documentation](https://docs.delta.io/latest/quick-start.html#python). -We recommend the following workflow, which makes use of the [transcoding feature in Kedro](../data/data_catalog_yaml_examples.md#read-the-same-file-using-two-different-datasets): +We recommend the following workflow, which makes use of the [transcoding feature in Kedro](../data/data_catalog_yaml_examples.md#read-the-same-file-using-different-datasets-with-transcoding): * To create a Delta table, use a `SparkDataset` with `file_format="delta"`. You can also use this type of dataset to read from a Delta table or overwrite it. * To perform [Delta table deletes, updates, and merges](https://docs.delta.io/latest/delta-update.html#language-python), load the data using a `DeltaTableDataset` and perform the write operations within the node function.