-
Couldn't load subscription status.
- Fork 537
[WIP] Towards high level writer #523
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@houqp, this is probably the most involved thing o ever did in rust, so any feedback is greatly appreciated :). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @roeap , this is an epic set of changes! Really cool to see it being hooked up with datafusion execution plan :)
I only quickly skimmed through the PR. I probably won't be able to go through it in details until this weekend due to the size of the change. But on a high level, it looks pretty good to me.
first and foremost, is this going in the right direction
I think so, the abstractions look very clear to me.
what to look out for to have as little data copies as possible
Could you elaborate what you mean by data copies? Did you mean arrow record batches?
should we do automatic schema updates on the table?
Eventually yes we should support schema evaluation as part of the transaction :) But it's a lot of work so I think it's better to handle it as follow ups.
should there be defaults / base entries for commit infos in operations
I think so, it will be useful metadata to auditing purpose.
how to best handle DeltaTable instances between different structs and specifically when we distribute via datafusion.
I don't have a good answer to this without going through the implementation in details. I think we might need to introduce changes in datafusion as well to better support this.
| } | ||
|
|
||
| /// Write data to internal transaction buffers | ||
| pub async fn write_files(&mut self, batches: Vec<RecordBatch>) -> Result<(), DeltaTableError> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, files seems like an implementation details here. perhaps write_batches or append_batches would be a better name?
| ) -> Result<String, DeltaWriterError> { | ||
| // TODO: what does 00000 mean? | ||
| // TODO (roeap): my understanding is, that the values are used as a counter - i.e. if a single batch of | ||
| // data written to one partition needs to be split due to desired file size constraints. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just FYI, some prior discussions around this topic: #248 (comment)
| partition_values.insert(key.clone(), value); | ||
| } | ||
|
|
||
| let batch_data = arrow_schema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not entirely sure how to interpret this line form the Delta protocol.
Values for all partition columns present in the schema MUST be present for all files in the table.
Does this mean, the partition values should not be omitted in the data files? We are writing them in the log, so they are somehow available, but right now not written in the data files. The previous implementation did write the data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting, I was under the impression that we won't need need to duplicate partition column values in the data files. Let me double check this with the reference implementation tomorrow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I double checked and can confirm that partition values should not be written out into the data file. This sentence in the spec is a little bit mislead to me at least. Worth filing an upstream ticket to further clarify it.
| ); | ||
| assert_eq!(modified_action, expected) | ||
| } | ||
| // Action::commitInfo(action) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the reason for commenting out these lines?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently that test fails since in this PR we now create basic commit info from the DeltaOperation enum. If we stick with that, the test needs to be refactored and maybe moved.
| return Ok(()); | ||
| } | ||
| let schema = data[0].schema(); | ||
| let current_part = match self.table.update().await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we are doing optimistic commit, i think we can skip the update here and just check against self.table.get_metdata() directly. if the table is out of date, the subsequent commit will fail and we will simply retry the write with the latest version.
| let mut table = | ||
| get_table_from_uri_without_update(self.table_uri.clone()).map_err(to_datafusion_err)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something to refactor in the future. In my mind, a full table load is not needed and we should be able to perform a commit given just a list of actions in the commit and the commit version. To do this, we need to decouple delta table struct from DeltaTransaction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When implementing this, I also often thought, that decoupling DeltaTransaction from the actual table instance would be beneficial. Haven't really looked at it more closely, but is this something we should try in this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think it's something we can handle as follow ups since the scope of this PR is already quite large. We should leave a comment and link to a tracking issue though :)
| let mut txn = table.create_transaction(None); | ||
| txn.add_actions(actions); | ||
| let committed_version = txn | ||
| .commit_with_info(Some(self.operation.clone()), self.app_metadata.clone()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably need to rename commit_with_info to something more explicit since it has an internal retry loop which should only be applied to append only transactions. I think whether a write is append only can be decided by matching on self.operation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The retry loop in commit_with_info doesn't check for schema change on retry, so I think it's probably not even safe for append only writes 🤔 Maybe we should just stick with try_commit_transaction in here for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The commit_with_info method was actually not meant to live very long. Main reason it exists ritght now, was to propose a way how we do basic commit info using the operation but provide a way for callers to pass in custom application data to be included in the commit. When I saw that kafka-delta-ingest uses the commit function I wanted to not break the curretn API for now, but the hope was to change the function signature of the current commit method. The current create method on delta table also does lower level commit logic. If we decide to use the operations enum as a basis for creating commit infos, create should - I think - use that route as well. And if the call sites in kafka ingest can be adjusted, my haope was to move to a single commit function on the transaction again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, we can perform the operation check within commit_with_info. My understanding for the current create method not needing the operation enum is because there is no commit loop within it, i.e. it only tries to perform a commit for version 0.
| operation.clone(), | ||
| )?); | ||
|
|
||
| self.execute(operation, plan).await |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perhaps wrapping create command in a transaction is an overkill here and we can just call table.create() here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On its own definitely. Main reason it exists, is that it made it much easier to use of a table needs to be created from within other commands, and to have a consistent way individual commands are implemented. Also I was unsure in cases where we create the table as part of e.g. a write command. calling table.create() would generate a version and then the write would create another version. in the current implementation a single version / commit is created if the table is created as part of a write command. Not sure how this is handled in the reference implementation, but since table metadata is derived from the written data, i felt it more natural to be a single commit. In that I thought moving the actions around consistently was worth the overhead of having a execution plan. I guess it all depends on how we want to handle wrapping things inside a transaction in general.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with you and think we should keep the CreateCommand plan. For writes to an non-existence table, I would prefer us to combine the create table actions and add actions into a single commit too.
For this particular call site, I was thinking we can avoid all the datafusion overhead and create the table directly since we are not writing extra data.
| #[derive(Debug)] | ||
| pub struct DeltaTransactionPlan { | ||
| table_uri: String, | ||
| input: Arc<dyn ExecutionPlan>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One problem I noticed with DeltaTransactionPlan is transaction actions need to be prepared for a specific version of a table so we can detect and resolve transaction conflicts. However, within its execute method, we always load the latest version of the table, which might be newer than the version we used to generate the actions.
For example, consider an overwrite table write actions generated for version 1, then another writer wrote a bunch of new files into version 2 before we commit the overwrite write. When we try to commit the overwrite transaction as version 3, we will miss the new files that got added in version 2 resulting in incorrect table state.
One way to solve this problem is to add a table version attribute to DeltaTransactionPlan. Alternative, it might be better to get rid of the DeltaTransactionPlan entirely and let individual command execution plans manage the transaction struct internally. A table transaction is just a simple json file write, so there is not much value in managing this write as an execution plan. Execution plan is especially useful for operations needs to be executed in a distributed fashion, for example, writing out partitioned parquet files for each add action.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Writing this I was also a bit conflicted about how to best handle wrapping things in a transaction. Making sure we commit against a fixed version seems reasonable. For my understanding though, in the example above we would have to update the log to the latest version before retrying, right?
The reason this found its way into an execution plan is, twofold. For one i tried to mimic what spark does as best I could. There they seem to have something that conceptually looks a bit like a contextmanager in python.
override def run(sparkSession: SparkSession): Seq[Row] = {
deltaLog.withNewTransaction { txn =>
val actions = write(txn, sparkSession)
val operation = DeltaOperations.Write(mode, Option(partitionColumns),
options.replaceWhere, options.userMetadata)
txn.commit(actions, operation)
}
Seq.empty
}While I may have misunderstood whats going on there, I tried to come up with an implementation that could be used in a similar fashion, i.e. collect all actions from child plans and commit them based on the operation. Of course we can achieve this without all the overhead of an execution plan. However if we wanted to support the Delta Specific SQL syntax from spark in the future based on datafusion / ballista, I thought wrapping things in an execution plan would be the mst straight forward way to go. But again I may have missed something :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For my understanding though, in the example above we would have to update the log to the latest version before retrying, right?
The core of optimistic concurrency control is we always assume we already have the latest version or latest view of the world when performing a commit. Optimistic concurrency control is optimized for low concurrent write transaction rate use-cases, so the chance of us not on the latest version is low. A lot of the time, there is only a single writer to delta table, in which case, the writer will always have the latest view of the table without needing to do explicit state refreshes. On top of this, refreshing the state here doesn't really help us get the latest view of the table, it will only get us a fairly recent view since other writes could still happen right after the refresh and before we commit the write. Therefore it's not buying us much here.
But the more important issue this leads to is correctness compromise. For some of the writes like upsert and delete, the prepared actions are only valid for a specific version of the table. For example, let's say you have a transaction to delete file A which only exists in version 1. Then subsequently another writer deletes the same file in version 2. When we try to commit the same delete action on version 3, we should abort and return a conflict error to the users instead of letting the commit go through. If we record the targeted version with the commit, the writer will know to commit on version 2 and error out with object already exists error. If we always refresh before the commit, it will fetch the latest version (2), then increment to 3 and commit without a way to perform conflict resolution.
I think leveraging execution plan is the right move here, we just need to make sure there is way to provide the targeted commit version as well.
Co-authored-by: QP Hou <dave2008713@gmail.com>
| &mut self, | ||
| operation: Option<DeltaOperation>, | ||
| app_metadata: Option<Map<String, Value>>, | ||
| ) -> Result<DeltaDataTypeVersion, DeltaTableError> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a TODO here to remind checking operation and decide whether the commit is safe to retry in a loop?
|
@houqp - sorry for taking a while to respond, work got in the way :). I finally wanted to get back to this PR and get some of it merged. From your perspective, does it make more sense to try and get this merged as a whole, or split it up in some smaller pieces? The datafusion part is currently likely no longer compatible with some of the changes in the latest datafusion versions. One way could be to remove the query plans from this PR and submit it in a later one. THis will likely the part where we need to iterate on most to get to a good solution. Alternatively I should of course merge current master, and update the query plans as well. DO you have a preference how to proceed? |
|
Sorry @roeap for the late reply, still catching on all my github notifications :P I don't have a preference, so I recommend you pick the route that requires the least amount of work for you. My guess is splitting it up into smaller pieces will be less work for you because it reduces chances of merge conflicts. |
Description
This PR tries to consolidate existing efforts and lay the groundwork for advancing the high-level writer APIs. When preparing this PR, I learned a lot about rust and the internals of this crate. Thus I assume there is lots to improve. The main part of low level internals for generating stats etc is taken from this crate or
kafka-delta-ingest.There is still a lot to do before everything works and is tested as required, but I aimed for getting things to work end to end to collect some feedback on the general direction.
Related Issue(s)
Documentation
Roughly there are three levels. the writer api handles buffering data, writing the physical files and generating actions with the respective statistics. The transaction api wraps writing the low level api and handles the actions internally. The command api handles complete operations around transactions. Below are some simple code snipplets how one might perform writing data at the different levels.
Writer API
Transaction API
Command API
DataFusioncrateExecutionPlansQuestions
DeltaTableinstances between different structs and specifically when we distribute via datafusion.