Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions rust/.ignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
tests/data
test_atomic*
3 changes: 3 additions & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ rusoto_sts = { version = "0.46", optional = true }
rusoto_dynamodb = { version = "0.46", optional = true }
maplit = { version = "1", optional = true }

# High-level writer
parquet-format = "~2.6.1"

arrow = { version = "4" }
datafusion = { version = "4", optional = true }
parquet = { version = "4" }
Expand Down
103 changes: 98 additions & 5 deletions rust/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,15 @@ pub enum DeltaTableError {
/// Error returned when no partition was found in the DeltaTable.
#[error("No partitions found, please make sure table is partitioned.")]
LoadPartitions,

/// Error returned when writes are attempted with data that doesn't match the schema of the
/// table
#[error("Data does not match the schema or partitions of the table: {}", msg)]
SchemaMismatch {
/// Information about the mismatch
msg: String,
},

/// Error returned when a partition is not formatted as a Hive Partition.
#[error("This partition is not formatted with key=value: {}", .partition)]
PartitionError {
Expand Down Expand Up @@ -1041,6 +1050,7 @@ impl Default for DeltaTransactionOptions {
#[derive(Debug)]
pub struct DeltaTransaction<'a> {
delta_table: &'a mut DeltaTable,
actions: Vec<Action>,
options: DeltaTransactionOptions,
}

Expand All @@ -1051,15 +1061,99 @@ impl<'a> DeltaTransaction<'a> {
pub fn new(delta_table: &'a mut DeltaTable, options: Option<DeltaTransactionOptions>) -> Self {
DeltaTransaction {
delta_table,
actions: vec![],
options: options.unwrap_or_else(DeltaTransactionOptions::default),
}
}

/// Add an arbitrary "action" to the actions associated with this transaction
pub fn add_action(&mut self, action: action::Action) {
self.actions.push(action);
}

/// Add an arbitrary number of actions to the actions associated with this transaction
pub fn add_actions(&mut self, actions: Vec<action::Action>) {
for action in actions.into_iter() {
self.actions.push(action);
}
}

/// Create a new add action and write the given bytes to the storage backend as a fully formed
/// Parquet file
///
/// add_file accepts two optional parameters:
///
/// partitions: an ordered vec of WritablePartitionValues for the file to be added
/// actions: an ordered list of Actions to be inserted into the log file _ahead_ of the Add
/// action for the file added. This should typically be used for txn type actions
pub async fn add_file(
&mut self,
bytes: &[u8],
partitions: Option<Vec<(String, String)>>,
) -> Result<(), DeltaTransactionError> {
let mut partition_values = HashMap::new();
if let Some(partitions) = &partitions {
for (key, value) in partitions {
partition_values.insert(key.clone(), value.clone());
}
}

let path = self.generate_parquet_filename(partitions);
let storage_path = self
.delta_table
.storage
.join_path(&self.delta_table.table_path, &path);

debug!("Writing a parquet file to {}", &storage_path);
self.delta_table
.storage
.put_obj(&storage_path, &bytes)
.await
.map_err(|source| DeltaTransactionError::Storage { source })?;

// Determine the modification timestamp to include in the add action - milliseconds since epoch
// Err should be impossible in this case since `SystemTime::now()` is always greater than `UNIX_EPOCH`
let modification_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
let modification_time = modification_time.as_millis() as i64;

self.actions.push(Action::add(action::Add {
path,
size: bytes.len() as i64,
partitionValues: partition_values,
partitionValues_parsed: None,
modificationTime: modification_time,
dataChange: true,
stats: None,
stats_parsed: None,
tags: None,
}));

Ok(())
}

fn generate_parquet_filename(&self, partitions: Option<Vec<(String, String)>>) -> String {
/*
* The specific file naming for parquet is not well documented including the preceding five
* zeros and the trailing c000 string
*
*/
let mut path_parts = vec![format!("part-00000-{}-c000.snappy.parquet", Uuid::new_v4())];

if let Some(partitions) = partitions {
for partition in partitions {
path_parts.push(format!("{}={}", partition.0, partition.1));
}
}

self.delta_table
.storage
.join_paths(&path_parts.iter().map(|s| s.as_str()).collect::<Vec<&str>>())
}

/// Commits the given actions to the delta log.
/// This method will retry the transaction commit based on the value of `max_retry_commit_attempts` set in `DeltaTransactionOptions`.
pub async fn commit_with(
pub async fn commit(
&mut self,
additional_actions: &[Action],
_operation: Option<DeltaOperation>,
) -> Result<DeltaDataTypeVersion, DeltaTransactionError> {
// TODO: stubbing `operation` parameter (which will be necessary for writing the CommitInfo action), but leaving it unused for now.
Expand All @@ -1083,7 +1177,7 @@ impl<'a> DeltaTransaction<'a> {
// TODO: create a CommitInfo action and prepend it to actions.

// Serialize all actions that are part of this log entry.
let log_entry = log_entry_from_actions(additional_actions)?;
let log_entry = log_entry_from_actions(&self.actions)?;

// try to commit in a loop in case other writers write the next version first
let version = self.try_commit_loop(log_entry.as_bytes()).await?;
Expand All @@ -1100,12 +1194,11 @@ impl<'a> DeltaTransaction<'a> {
pub async fn commit_version(
&mut self,
version: DeltaDataTypeVersion,
additional_actions: &[Action],
_operation: Option<DeltaOperation>,
) -> Result<DeltaDataTypeVersion, DeltaTransactionError> {
// TODO: create a CommitInfo action and prepend it to actions.

let log_entry = log_entry_from_actions(additional_actions)?;
let log_entry = log_entry_from_actions(&self.actions)?;
let tmp_log_path = self.prepare_commit(log_entry.as_bytes()).await?;
let version = self.try_commit(&tmp_log_path, version).await?;

Expand Down
3 changes: 3 additions & 0 deletions rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ extern crate lazy_static;
extern crate parquet;
extern crate regex;
extern crate serde;
#[cfg(test)]
#[macro_use]
extern crate serde_json;
extern crate thiserror;

Expand All @@ -61,6 +63,7 @@ pub mod delta_arrow;
pub mod partitions;
mod schema;
pub mod storage;
pub mod writer;

#[cfg(feature = "datafusion-ext")]
pub mod delta_datafusion;
Expand Down
Loading