Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 68 additions & 21 deletions rust/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -904,12 +904,19 @@ impl std::fmt::Debug for DeltaTable {
/// Error returned by the DeltaTransaction struct
#[derive(thiserror::Error, Debug)]
pub enum DeltaTransactionError {
/// Error that indicates the number of optimistic concurrency retries has been exceeded and no further
/// attempts will be made.
#[error("Transaction commit exceeded max retries. Last error: {inner}")]
CommitRetriesExceeded {
/// Error that indicates the transaction commit attempt failed. The wrapped inner error
/// contains details.
#[error("Transaction commit attempt failed. Last error: {inner}")]
TransactionCommitAttempt {
/// The wrapped TransactionCommitAttemptError.
inner: TransactionCommitAttemptError,
},

/// Error that indicates a Delta version conflict. i.e. a writer tried to write version _N_ but
/// version _N_ already exists in the delta log.
#[error("Version already existed when writing transaction. Last error: {inner}")]
VersionAlreadyExists {
/// The wrapped TransactionCommitAttemptError.
#[from]
inner: TransactionCommitAttemptError,
},

Expand Down Expand Up @@ -973,6 +980,16 @@ pub enum TransactionCommitAttemptError {
},
}

impl From<TransactionCommitAttemptError> for DeltaTransactionError {
fn from(error: TransactionCommitAttemptError) -> Self {
match error {
TransactionCommitAttemptError::VersionExists { .. } => {
DeltaTransactionError::VersionAlreadyExists { inner: error }
}
_ => DeltaTransactionError::TransactionCommitAttempt { inner: error },
}
}
}
impl From<StorageError> for TransactionCommitAttemptError {
fn from(error: StorageError) -> Self {
match error {
Expand Down Expand Up @@ -1010,7 +1027,14 @@ impl Default for DeltaTransactionOptions {
}
}

/// Object representing a delta transaction
/// Object representing a delta transaction.
/// Clients that do not need to mutate action content in case a transaction conflict is encountered
/// may use the `commit_with` method and rely on optimistic concurrency to determine the
/// appropriate Delta version number for a commit. A good example of this type of client is an
/// append only client that does not need to maintain transaction state with external systems.
/// Clients that may need to do conflict resolution if the Delta version changes should use the `commit_version`
/// method and manage the Delta version themselves so that they can resolve data conflicts that may
/// occur between Delta versions.
#[derive(Debug)]
pub struct DeltaTransaction<'a> {
delta_table: &'a mut DeltaTable,
Expand Down Expand Up @@ -1056,18 +1080,10 @@ impl<'a> DeltaTransaction<'a> {
// TODO: create a CommitInfo action and prepend it to actions.

// Serialize all actions that are part of this log entry.
let mut jsons = Vec::<String>::new();

for action in additional_actions {
let json = serde_json::to_string(action)?;
jsons.push(json);
}

let log_entry = jsons.join("\n");
let log_entry = log_entry.as_bytes();
let log_entry = log_entry_from_actions(additional_actions)?;

// try to commit in a loop in case other writers write the next version first
let version = self.try_commit_loop(log_entry).await?;
let version = self.try_commit_loop(log_entry.as_bytes()).await?;

// NOTE: since we have the log entry in memory already,
// we could optimize this further by merging the log entry instead of updating from storage.
Expand All @@ -1076,6 +1092,25 @@ impl<'a> DeltaTransaction<'a> {
Ok(version)
}

/// Commits the delta transaction at the specified version.
/// Propagates version conflict errors back to the caller immediately.
pub async fn commit_version(
&mut self,
version: DeltaDataTypeVersion,
additional_actions: &[Action],
_operation: Option<DeltaOperation>,
) -> Result<DeltaDataTypeVersion, DeltaTransactionError> {
// TODO: create a CommitInfo action and prepend it to actions.

let log_entry = log_entry_from_actions(additional_actions)?;
let tmp_log_path = self.prepare_commit(log_entry.as_bytes()).await?;
let version = self.try_commit(&tmp_log_path, version).await?;

self.delta_table.update().await?;

Ok(version)
}

async fn try_commit_loop(
&mut self,
log_entry: &[u8],
Expand All @@ -1084,7 +1119,9 @@ impl<'a> DeltaTransaction<'a> {

let tmp_log_path = self.prepare_commit(log_entry).await?;
loop {
let commit_result = self.try_commit(&tmp_log_path).await;
let version = self.next_attempt_version().await?;

let commit_result = self.try_commit(&tmp_log_path, version).await;

match commit_result {
Ok(v) => {
Expand Down Expand Up @@ -1130,10 +1167,9 @@ impl<'a> DeltaTransaction<'a> {
async fn try_commit(
&mut self,
tmp_log_path: &str,
version: DeltaDataTypeVersion,
) -> Result<DeltaDataTypeVersion, TransactionCommitAttemptError> {
// get the next delta table version and the log path where it should be written
let attempt_version = self.next_attempt_version().await?;
let log_path = self.delta_table.version_to_log_path(attempt_version);
let log_path = self.delta_table.version_to_log_path(version);

// move temporary commit file to delta log directory
// rely on storage to fail if the file already exists -
Expand All @@ -1142,7 +1178,7 @@ impl<'a> DeltaTransaction<'a> {
.rename_obj(tmp_log_path, &log_path)
.await?;

Ok(attempt_version)
Ok(version)
}

async fn next_attempt_version(
Expand All @@ -1153,6 +1189,17 @@ impl<'a> DeltaTransaction<'a> {
}
}

fn log_entry_from_actions(actions: &[Action]) -> Result<String, serde_json::Error> {
let mut jsons = Vec::<String>::new();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can actually write it in a more efficient way by writing serialized action json directly into a string buffer, that way we can avoid the vector join at the end.


for action in actions {
let json = serde_json::to_string(action)?;
jsons.push(json);
}

Ok(jsons.join("\n"))
}

fn process_action(
state: &mut DeltaTableState,
action: &Action,
Expand Down
182 changes: 133 additions & 49 deletions rust/tests/simple_commit_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,45 +13,121 @@ use std::collections::HashMap;

use deltalake::{action, DeltaTransactionError};

#[tokio::test]
async fn test_two_commits_fs() {
prepare_fs();
test_two_commits("./tests/data/simple_commit")
.await
.unwrap();
}
mod simple_commit_s3 {
use super::*;

#[cfg(all(feature = "s3", feature = "dynamodb"))]
#[tokio::test]
async fn test_two_commits_s3() {
let path = "s3://deltars/simple_commit_rw1";
s3_common::setup_dynamodb("concurrent_writes");
prepare_s3(path).await;

test_two_commits(path).await.unwrap();
}

#[cfg(all(feature = "s3", feature = "dynamodb"))]
#[tokio::test]
async fn test_two_commits_s3() {
let path = "s3://deltars/simple_commit_rw1";
s3_common::setup_dynamodb("concurrent_writes");
prepare_s3(path).await;
#[cfg(all(feature = "s3", not(feature = "dynamodb")))]
#[tokio::test]
async fn test_two_commits_s3_fails_with_no_lock() {
use deltalake::{StorageError, TransactionCommitAttemptError};

let path = "s3://deltars/simple_commit_rw2";
prepare_s3(path).await;

let result = test_two_commits(path).await;
if let Err(DeltaTransactionError::TransactionCommitAttempt { ref inner }) = result {
if let TransactionCommitAttemptError::Storage { source } = inner {
if let StorageError::S3Generic(err) = source {
assert_eq!(err, "dynamodb locking is not enabled");
return;
}
}
}

test_two_commits(path).await.unwrap();
result.unwrap();

panic!("S3 commit without dynamodb locking is expected to fail")
}
}

#[cfg(all(feature = "s3", not(feature = "dynamodb")))]
#[tokio::test]
async fn test_two_commits_s3_fails_with_no_lock() {
use deltalake::{StorageError, TransactionCommitAttemptError};
mod simple_commit_fs {
// Tests are run serially to allow usage of the same local fs directory.
use serial_test::serial;

let path = "s3://deltars/simple_commit_rw2";
prepare_s3(path).await;
use super::*;

let result = test_two_commits(path).await;
if let Err(DeltaTransactionError::CommitRetriesExceeded { ref inner }) = result {
if let TransactionCommitAttemptError::Storage { source } = inner {
if let StorageError::S3Generic(err) = source {
assert_eq!(err, "dynamodb locking is not enabled");
return;
}
}
#[tokio::test]
#[serial]
async fn test_two_commits_fs() {
prepare_fs();
test_two_commits("./tests/data/simple_commit")
.await
.unwrap();
}

#[tokio::test]
#[serial]
async fn test_commit_version_succeeds_if_version_does_not_exist() {
prepare_fs();

let table_path = "./tests/data/simple_commit";
let mut table = deltalake::open_table(table_path).await.unwrap();

assert_eq!(0, table.version);
assert_eq!(0, table.get_files().len());

let tx1_actions = tx1_actions();

let mut tx1 = table.create_transaction(None);
let result = tx1
.commit_version(1, tx1_actions.as_slice(), None)
.await
.unwrap();

assert_eq!(1, result);
assert_eq!(1, table.version);
assert_eq!(2, table.get_files().len());
}

result.unwrap();
#[tokio::test]
#[serial]
async fn test_commit_version_fails_if_version_exists() {
prepare_fs();

let table_path = "./tests/data/simple_commit";
let mut table = deltalake::open_table(table_path).await.unwrap();

assert_eq!(0, table.version);
assert_eq!(0, table.get_files().len());

let tx1_actions = tx1_actions();

let mut tx1 = table.create_transaction(None);
let _ = tx1
.commit_version(1, tx1_actions.as_slice(), None)
.await
.unwrap();

let tx2_actions = tx2_actions();

let mut tx2 = table.create_transaction(None);

// we already committed version 1 - this should fail and return error for caller to handle.
let result = tx2.commit_version(1, tx2_actions.as_slice(), None).await;

panic!("S3 commit without dynamodb locking is expected to fail")
match result {
Err(deltalake::DeltaTransactionError::VersionAlreadyExists { .. }) => {
assert!(true, "Delta version already exists.");
}
_ => {
assert!(false, "Delta version should already exist.");
}
}

assert!(result.is_err());
assert_eq!(1, table.version);
assert_eq!(2, table.get_files().len());
}
}

async fn test_two_commits(table_path: &str) -> Result<(), DeltaTransactionError> {
Expand All @@ -60,7 +136,28 @@ async fn test_two_commits(table_path: &str) -> Result<(), DeltaTransactionError>
assert_eq!(0, table.version);
assert_eq!(0, table.get_files().len());

let tx1_actions = vec![
let tx1_actions = tx1_actions();

let mut tx1 = table.create_transaction(None);
let version = tx1.commit_with(tx1_actions.as_slice(), None).await?;

assert_eq!(1, version);
assert_eq!(version, table.version);
assert_eq!(2, table.get_files().len());

let tx2_actions = tx2_actions();

let mut tx2 = table.create_transaction(None);
let version = tx2.commit_with(tx2_actions.as_slice(), None).await.unwrap();

assert_eq!(2, version);
assert_eq!(version, table.version);
assert_eq!(4, table.get_files().len());
Ok(())
}

fn tx1_actions() -> Vec<action::Action> {
vec![
action::Action::add(action::Add {
path: String::from(
"part-00000-b44fcdb0-8b06-4f3a-8606-f8311a96f6dc-c000.snappy.parquet",
Expand All @@ -87,16 +184,11 @@ async fn test_two_commits(table_path: &str) -> Result<(), DeltaTransactionError>
stats_parsed: None,
tags: None,
}),
];

let mut tx1 = table.create_transaction(None);
let version = tx1.commit_with(tx1_actions.as_slice(), None).await?;

assert_eq!(1, version);
assert_eq!(version, table.version);
assert_eq!(2, table.get_files().len());
]
}

let tx2_actions = vec![
fn tx2_actions() -> Vec<action::Action> {
vec![
action::Action::add(action::Add {
path: String::from(
"part-00000-512e1537-8aaa-4193-b8b4-bef3de0de409-c000.snappy.parquet",
Expand All @@ -123,15 +215,7 @@ async fn test_two_commits(table_path: &str) -> Result<(), DeltaTransactionError>
stats_parsed: None,
tags: None,
}),
];

let mut tx2 = table.create_transaction(None);
let version = tx2.commit_with(tx2_actions.as_slice(), None).await.unwrap();

assert_eq!(2, version);
assert_eq!(version, table.version);
assert_eq!(4, table.get_files().len());
Ok(())
]
}

fn prepare_fs() {
Expand Down