-
Notifications
You must be signed in to change notification settings - Fork 90
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[spike] Clarify status of various Delta Table datasets #542
Comments
Hi! I took the opportunity to also add the from deltalake.writer import write_deltalake
from deltalake import DeltaTable
from kedro_datasets.pandas import DeltaTableDataset
from kedro.io.core import DatasetError
from typing import List, Union, Any
import pandas as pd
import pyarrow as pa
# from https://github.com/inigohidalgo/prefect-polygon-etl/blob/main/delta-rs-etl/src/delta_rs_etl/upsert.py
def upsert(new_data: pa.Table, target_table: DeltaTable, primary_key: Union[str, List[str]]) -> dict:
predicate = (
f"target.{primary_key} = source.{primary_key}"
if type(primary_key) == str
else " AND ".join([f"target.{col} = source.{col}" for col in primary_key])
)
return (
target_table
.merge(
source=new_data,
predicate=predicate,
source_alias="source",
target_alias="target"
)
.when_matched_update_all()
.when_not_matched_insert_all()
.execute()
)
class CustomDeltaTableDataset(DeltaTableDataset):
"""
This is a variation of pandas.DeltaTableDataset with support to upsert write mode
"""
def __init__(self, primary_key: Union[str, List[str], None] = None, **kargs) -> None:
self.primary_key = primary_key
if kargs.get('save_args', {}).get('mode', '') == 'upsert':
self.upsert_mode = True
kargs['save_args']['mode'] = 'overwrite'
if not self.primary_key:
raise DatasetError(
"To use upsert write mode, you need to set the primare_key argument!"
)
else:
self.upsert_mode = False
super().__init__(**kargs)
def _save(self, data: pd.DataFrame) -> None:
data = pa.Table.from_pandas(data, preserve_index=False)
if self.is_empty_dir:
# first time creation of delta table
write_deltalake(
self._filepath,
data,
storage_options=self.fs_args,
**self._save_args,
)
self.is_empty_dir = False
self._delta_table = DeltaTable(
table_uri=self._filepath,
storage_options=self.fs_args,
version=self._version,
)
elif self.upsert_mode:
upsert(
new_data=data,
target_table=self._delta_table,
primary_key=self.primary_key
)
else:
write_deltalake(
self._delta_table,
data,
storage_options=self.fs_args,
**self._save_args,
)
def _describe(self) -> dict[str, Any]:
desc = super()._describe()
desc['primary_key'] = self.primary_key
if self.upsert_mode:
desc['save_args']['mode'] = 'upsert'
return desc |
To add on top of that, just realized that months ago I had created my own Polars Mostly uninteresting, except for # HACK: If the table is empty, return an empty DataFrame
try:
return pl.read_delta(
load_path, storage_options=self._storage_options, **self._load_args
)
except TableNotFoundError:
return pl.DataFrame() (related: kedro-org/kedro#3578) |
Description
We have several Delta Table datasets and it's a bit hard to understand the differences and also choose from them. A spike should be done to get clarity on what each of these delta table datasets does and how we could potentially merge the datasets or make them more consistent.
Context
spark.DeltaTableDataset
was introduced in [KED-2891] Implementspark.DeltaTable
dataset kedro#964 (2021). Understanding that PR is not for the faint of heart:(related: kedro-org/kedro#3578)
Halfway through the implementation it was determined to not add a
_save
method and instead users are supposed to perform write operations inside the node themselves, introducing an undocumented exception to the Kedro philosophy.databricks.ManagedTableDataset
was introduced in feat: Add ManagedTableDataset for managed Delta Lake tables in Databricks #127 (2023). That one does have a_save
method: "When saving data, you can specify one of three modes: overwrite(default), append, or upsert." It supports Databricks Unity Catalog ("the name of the catalog in Unity"). In the words of @dannyrfar, "ManagedTable was made for Databricks while deltatable was made for general Delta implementations (not specific to databricks), not sure why save wasn’t implemented on delta table but would probably have to do with save location and additional permissions for different storage locations (s3, gcs, adls, etc)" https://linen-slack.kedro.org/t/16366189/tldr-is-there-a-suggested-pattern-for-converting-vanilla-par#d1d2bc1e-42f2-41f2-af15-d1b4ef50268bpandas.DeltaTableDataset
was introduced in feat(datasets): AddedDeltaTableDataSet
#243 (2023). Interestingly, it does have_save
capabilities and supports Databricks Unity Catalog as well as AWS Glue catalog, but writing capabilities are more restricted: "When saving data, you can specify one of two modes: overwrite(default), append." So it begs the question of whypandas.DeltaTableDataset
can_save
, butspark.DeltaTableDataset
can't.This has been an ongoing point of confusion, and the final straw was this Slack thread https://linen-slack.kedro.org/t/16366189/tldr-is-there-a-suggested-pattern-for-converting-vanilla-par#4c007d9d-68c7-46e3-bf86-b4b86755f7ca
The expected outcomes here are unclear, but for starters I hope at least folks agree that the current situation is a bit of a mess. Possible actions are:
_save
capabilities tospark.DeltaTableDataset
. Should that happen before or after [spike] Investigate suitability of Kedro for EL pipelines and incremental data loading kedro#3578? Good question.pandas.DeltaTableDataset
supports AWS Glue and Databricks Unity Catalog butspark.DeltaTableDataset
doesn't, if both are generic? Whydatabricks.ManagedTableDataset
has 3 write modes, butpandas.DeltaTableDataset
has 2?The text was updated successfully, but these errors were encountered: