Skip to content

Conversation

@xianwill
Copy link
Collaborator

@xianwill xianwill commented May 18, 2021

Description

This PR adds a commit_version method to DeltaTransaction which takes a version parameter from the caller.

Some clients (i.e. https://github.com/delta-io/kafka-delta-ingest) include txn actions when creating delta log entries that may require additional conflict resolution with external systems before incrementing the delta log version. The commit_version method propagates version conflict errors back to the caller immediately so they may apply conflict resolution steps before attempting another commit.

This PR is complete - but leaving it in a draft status per @rtyler's notice that we may preemptively avoid some conflicts by incorporating this into his PR.

@xianwill xianwill requested review from houqp and mosyp May 18, 2021 12:57
}
}

fn log_bytes_from_actions(actions: &[Action]) -> Result<String, serde_json::Error> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it awkward to have a private function just floating in the top level namespace when it's really only something needed/used by the functions within the DeltaTransaction.

I suggest moving this up in to there.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a super interesting style point. I always have an internal battle in cases like this. The rule I've been following (privately made rule) up to now has been - "if my fn doesn't have a self ref - it should go into an outer scope since it does not depend on self state" - implication - all struct "methods" should be true methods that have an actual dependency on self state.

I don't disagree with your opinion necessarily, but I'd love to discover a hard rule to base placement decisions like this on rather than a smell which is a bit more ambiguous.

Copy link
Collaborator Author

@xianwill xianwill May 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we were to create a mod for delta_transaction and this fn lived in that mod but still outside of the impl DeltaTransaction if this might be more palatable to both of us.

async fn try_commit(
&mut self,
tmp_log_path: &str,
version: Option<DeltaDataTypeVersion>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's worth making version non-optional here, that way the commit_with could just do:

  • Fetch the next attempt version
  • Call commit_version

That way the bulk of the work goes into commit_version. That may also obviate the need for the log_bytes_with_actions function as well

Copy link
Collaborator Author

@xianwill xianwill May 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI @rtyler - I implemented your suggestion (nice catch) to make version non-optional in this method and the implication that commit_version doesn't need the optimistic concurrency loop at all and also fixed naming of log_bytes_from_actions (it doesnt return bytes lol). However, rather than funnel commit_with down to a call of commit_version, I left the separation in place so that call stacks now look like:

commit_version
log_entry_from_actions
try_commit
commit_with
log_entry_from_actions
try_commit_loop
try_commit

My reasoning here is that try_commit_loop shouldn't have to serialize the same set of log actions on every iteration of the loop until succeeding, so rather than re-use commit_version, I think its better if both commit_version and commit_with serialize the log entry independently and funnel down separate paths to the try_commit call.

Copy link
Contributor

@mosyp mosyp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, is it still in draft mode or we're ready to merge it?

@xianwill
Copy link
Collaborator Author

LGTM, is it still in draft mode or we're ready to merge it?

@rtyler will be incorporating the changes into #248. LMK if this becomes a blocker for your work on kafka-delta-ingest @mosyp - I think we're likely okay to wait a bit for now.

@rtyler
Copy link
Member

rtyler commented May 19, 2021

I think this is fine to bring into main as is @xianwill

@xianwill xianwill marked this pull request as ready for review May 19, 2021 15:58
@rtyler rtyler merged commit 4d033d6 into delta-io:main May 19, 2021
@xianwill xianwill deleted the delta-tx-conflict-flag branch May 19, 2021 15:59
}

fn log_entry_from_actions(actions: &[Action]) -> Result<String, serde_json::Error> {
let mut jsons = Vec::<String>::new();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can actually write it in a more efficient way by writing serialized action json directly into a string buffer, that way we can avoid the vector join at the end.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants