-
Notifications
You must be signed in to change notification settings - Fork 903
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[KED-2891] Implement spark.DeltaTable
dataset
#964
Conversation
…re/databricks-deltatable-dataset
…re/databricks-deltatable-dataset
I am going to update the PR but I can foresee that the YAML would take a similar shape. Please let me know what do you think about that. _anchor: &anchor
type: spark.DeltaTableDataset
delta_overwrite_dataset:
<<: *anchor
filepath: /dbfs/path/to/overwrite/dataset
delta_options:
replaceWhere: "start_date >= '2017-01-01' AND end_date <= '2021-01-01'"
overwriteSchema: true
save_args:
mode: overwrite
delta_append_dataset:
<<: *anchor
filepath: /dbfs/path/to/append/dataset
save_args:
mode: append
delta_update_dataset:
<<: *anchor
filepath: /dbfs/path/to/update/dataset
delta_options:
timestampAsOf: '2019-01-01'
versionAsOf: 2
save_args:
mode: update
update_predicates:
- 'col = "foo"'
update_values:
'event': 'baz'
'another': 'foobar'
delta_upsert_dataset:
<<: *anchor
filepath: /dbfs/path/to/upsert/dataset
save_args:
mode: upsert
key: 'a.id = b.id'
whenMatchedUpdate:
'a.data': 'b.new_data'
whenNotMatchedInsert:
'a.date': 'b.new_date'
'a.id': 'b.id'
delta_delete_dataset:
<<: *anchor
filepath: /dbfs/path/to/delete/dataset
save_args:
mode: delete
delete_predicates:
- 'date < "2017-01-01"' |
…re/databricks-deltatable-dataset
So I have a few thoughts - this is very related to the SQL conversation I had with @idanov and @MerelTheisenQB a while back which ended with the conclusion that we don't want to encourage many transformation side effects outside of the Kedro DAG. The central question here is what sort of declarative CRUD functionality we allow in the catalog. Of the two options (1) Catalog definition (2) Within node, option (2) is still possible in both. I don't know if it means we shouldn't do (1), but it highlights the fact that it adds complexity in terms of implementation which we could just delegate to the existing PySpark API. Upsert example - The syntax doesn't make it especially clear to the reader where exactly A and B come from. The more I write on this comment, the less convinced I am that we should expose this sort of functionality in the catalog definition. |
In my opinion, this fully aligns with not allowing IO ops outside of the node. Therefore the only place where this could be is the
THAT BEING SAID - we could restrict the Similar with |
I think this is reasonable for an upsert, but I'm not sure what the implementation would look like
I also like this restriction, but the dataset isn't aware of where it sits in the pipeline - I guess the runner is, but that's not a great help. Would this be enforced or just encouraged? |
self._update_options = save_args.pop( | ||
"insert_options" | ||
) # TBD - maybe better solution? | ||
self._insert_options = save_args.pop( | ||
"update_options" | ||
) # TBD - maybe better solution? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these the wrong way round?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes 😬 it was testing whether reviewers are paying attention 😂
Initial comments from someone with no understanding of what a delta table is, what upsert is or really what spark is... Would the strategy pattern work similarly to |
The dates in replaceWhere are static. That would be against the purpose of replaceWhere because we'd need it to be updated whenever we're doing incremental load. That dates being set based on whatever incremental load strategy the user is applying. |
I see what you mean @IamSandeepGunda - that being said, in the The individual |
Well, I gave up finding an elegant solution and hacked it a little bit. delta_overwrite_dataset:
type: "${datasets.spark}" ## That's just my custom spark dataset
filepath: /dbfs/path/to/overwrite/dataset
file_format: delta
save_args:
mode: overwrite
option:
overwriteSchema: true Now inside my custom _save() method, partition_col = self._save_args.get("partitionBy")
start_date = data.filter(min(col(partition_col)))
data.write \
.option("replaceWhere", f"{partition_col} >= '{start_date}'") \
.save(
path=save_path,
format=self._file_format,
mode=self._save_args.get("mode"),
partitionBy=partition_col) This is how I'm getting around setting replace where dynamically. I'd love to have a decent implementation of spark delta dataset where maybe node can pass the dates to the save method |
…re/databricks-deltatable-dataset
Summary@datajoely @AntonyMilneQB @limdauto @idanov @lorenabalan So after our discussions, I believe the best use-case for this dataset is to smooth over the non-self-consistent DeltaTableAPI. For each of the write modes, this would look such as: First of all, the def _save(self, io: Union[DataFrame, NodeStatus]):
# align the Spark and DeltaTable APIs
if isinstance(io, DataFrame):
super()._save(data=io) # there is still `_strip_dbfs_prefix` -- will this work?
# allow the user to handle DeltaTable IO in node and return success status
elif isinstance(io, NodeStatus):
if io != NodeStatus.SUCCESS:
raise DataSetError("`NodeStatus` returned something other than SUCCESS")
else:
raise DataSetError("Incorrect return from node func.") Append & OverwriteConsistent with the Kedro Principles & DAG, requires user to return Leverages: https://github.com/delta-io/delta/blob/master/python/delta/tables.py#L43 The node function would look like this from delta.tables import DeltaTable
from pyspark.sql import DataFrame
def node_func(input: DeltaTable) -> DataFrame:
# processing happens here
return input.toDF() The Update, Upsert/Merge, DeleteThese are not directly consistent with the Kedro Principles & DAG, as
Therefore, the node function would look like this # in kedro.io.core
class NodeStatus(Enum):
SUCCESS = 0
FAILURE = 1
# in node func module
from delta.tables import DeltaTable
from kedro.io.core import NodeStatus
import pyspark.sql.functions as f
def node_func(input: DeltaTable) -> NodeStatus:
# processing happens here, such as
input.update(f.col("col_1") == "foo", { "col_1": f.lit("bar") } )
return NodeStatus.SUCCESS This implementation has the following benefits:
Constraints, disadvantages and questions:
YAML_anchor: &anchor
type: spark.DeltaTableDataset
delta_overwrite_dataset:
<<: *anchor
filepath: /dbfs/path/to/overwrite/dataset
delta_options: # this may need to be dynamic
replaceWhere: "start_date >= '2017-01-01' AND end_date <= '2021-01-01'"
overwriteSchema: true
save_args:
mode: overwrite
delta_append_dataset:
<<: *anchor
filepath: /dbfs/path/to/append/dataset
save_args:
mode: append
delta_update_dataset:
<<: *anchor
filepath: /dbfs/path/to/update/dataset
delta_options: # this may need to be dynamic
timestampAsOf: '2019-01-01'
versionAsOf: 2
save_args:
mode: update # the only point of this is to check or log
delta_upsert_dataset:
<<: *anchor
filepath: /dbfs/path/to/upsert/dataset
save_args:
mode: upsert # the only point of this is to check or log
delta_delete_dataset:
<<: *anchor
filepath: /dbfs/path/to/delete/dataset
save_args:
mode: delete # the only point of this is to check or log Alternatives
|
Thank you for this @jiriklein - the graphic is very helpful, but I need more time to think about the prose. Could you also think about transcoding from |
…re/databricks-deltatable-dataset
@datajoely @idanov @limdauto @lorenabalan @SajidAlamQB Further discussionAfter yesterday's discussion, we came at the following:
Transcribing and dataset proposal |
…re/databricks-deltatable-dataset
…re/databricks-deltatable-dataset
…re/databricks-deltatable-dataset
…re/databricks-deltatable-dataset
One last point - perhaps it's worth adding a note under the image saying something like:
|
Co-authored-by: Joel Schwarzmann <35801847+datajoely@users.noreply.github.com>
284401e
to
56ea747
Compare
08cb366
to
c837288
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work everyone! Thanks @lorenabalan for persevering 👏 🏆 The docs look really good as well!
75ec489
to
75ee172
Compare
Signed-off-by: Yetunde Dada <yetudada@users.noreply.github.com>
Description
This PR was created to add the functionality of
databricks DeltaTable
in akedro
pipeline. Unlike a regular dataset, which has the concept ofsave
that fully overwrites (or creates) the underlying data, aDeltaTable
is mutable underneath - this means it can be appended to, updated, merged or deleted, as a regular SQL table is (along with commit logs etc.).This means that unlike with other datasets,
kedro
needs to be extremely careful on defining what thesave
method does. In my opinion, this is, in the case ofDeltaTable
, modifying an underlying dataset. The actual implementation is described below.Development notes
In my opinion, the
DeltaTable
method calls are best abstracted away from the users. The actual API then comes in through well-designed conf/catalog entry and a strategy pattern (or similar), that resolves into the right call in the_save
method.Happy to take onboard any design contributions on this.
Checklist
RELEASE.md
fileNotice
I acknowledge and agree that, by checking this box and clicking "Submit Pull Request":
I submit this contribution under the Apache 2.0 license and represent that I am entitled to do so on behalf of myself, my employer, or relevant third parties, as applicable.
I certify that (a) this contribution is my original creation and / or (b) to the extent it is not my original creation, I am authorised to submit this contribution on behalf of the original creator(s) or their licensees.
I certify that the use of this contribution as authorised by the Apache 2.0 license does not violate the intellectual property rights of anyone else.