Skip to content

Commit 5e4edde

Browse files
committed
Reorganized add_file to DeltaTransaction to make it a bit more stable, and finish up BufferedJSONWriter
There are some optimizations that come to mind for this work, but at this point I think it's ready for simple high-level JSON writer usage
1 parent 60b70ae commit 5e4edde

File tree

6 files changed

+283
-184
lines changed

6 files changed

+283
-184
lines changed

rust/src/delta.rs

Lines changed: 92 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,15 @@ pub enum DeltaTableError {
137137
/// Error returned when no partition was found in the DeltaTable.
138138
#[error("No partitions found, please make sure table is partitioned.")]
139139
LoadPartitions,
140+
141+
/// Error returned when writes are attempted with data that doesn't match the schema of the
142+
/// table
143+
#[error("Data does not match the schema or partitions of the table: {}", msg)]
144+
SchemaMismatch {
145+
/// Information about the mismatch
146+
msg: String,
147+
},
148+
140149
/// Error returned when a partition is not formatted as a Hive Partition.
141150
#[error("This partition is not formatted with key=value: {}", .partition)]
142151
PartitionError {
@@ -813,80 +822,6 @@ impl DeltaTable {
813822
DeltaTransaction::new(self, options)
814823
}
815824

816-
/// Create a new add action and write the given bytes to the storage backend as a fully formed
817-
/// Parquet file
818-
///
819-
/// add_file accepts two optional parameters:
820-
///
821-
/// partitions: an ordered vec of WritablePartitionValues for the file to be added
822-
/// actions: an ordered list of Actions to be inserted into the log file _ahead_ of the Add
823-
/// action for the file added. This should typically be used for txn type actions
824-
pub async fn add_file(
825-
&mut self,
826-
bytes: &Vec<u8>,
827-
partitions: Option<Vec<WritablePartitionValue>>,
828-
actions: Option<Vec<action::Action>>,
829-
) -> Result<i64, DeltaTransactionError> {
830-
let path = self.generate_parquet_filename(partitions);
831-
let storage_path = self.storage.join_path(&self.table_path, &path);
832-
833-
debug!("Writing a parquet file to {}", &storage_path);
834-
self.storage
835-
.put_obj(&storage_path, &bytes)
836-
.await
837-
.map_err(|source| DeltaTransactionError::Storage { source })?;
838-
839-
// Determine the modification timestamp to include in the add action - milliseconds since epoch
840-
// Err should be impossible in this case since `SystemTime::now()` is always greater than `UNIX_EPOCH`
841-
let modification_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
842-
let modification_time = modification_time.as_millis() as i64;
843-
844-
let add = action::Add {
845-
path,
846-
size: bytes.len() as i64,
847-
partitionValues: HashMap::default(),
848-
partitionValues_parsed: None,
849-
modificationTime: modification_time,
850-
dataChange: true,
851-
stats: None,
852-
stats_parsed: None,
853-
tags: None,
854-
};
855-
856-
let mut tx = self.create_transaction(None);
857-
let mut commit_actions= vec![];
858-
if let Some(actions) = actions {
859-
for action in actions {
860-
commit_actions.insert(0, action);
861-
}
862-
}
863-
commit_actions.push(Action::add(add));
864-
let version = tx.commit_with(&commit_actions, None).await?;
865-
866-
debug!("Committed Delta version {}", version);
867-
868-
Ok(version)
869-
}
870-
871-
fn generate_parquet_filename(&self, partitions: Option<Vec<WritablePartitionValue>>) -> String {
872-
let mut path_parts = vec![];
873-
/*
874-
* The specific file naming for parquet is not well documented including the preceding five
875-
* zeros and the trailing c000 string
876-
*
877-
*/
878-
path_parts.push(format!("part-00000-{}-c000.snappy.parquet", Uuid::new_v4()));
879-
880-
if let Some(partitions) = partitions {
881-
for partition in partitions {
882-
path_parts.push(format!("{}={}", partition.name, partition.value));
883-
}
884-
}
885-
886-
self.storage
887-
.join_paths(&path_parts.iter().map(|s| s.as_str()).collect::<Vec<&str>>())
888-
}
889-
890825
/// Create a new Delta Table struct without loading any data from backing storage.
891826
///
892827
/// NOTE: This is for advanced users. If you don't know why you need to use this method, please
@@ -1112,6 +1047,7 @@ impl Default for DeltaTransactionOptions {
11121047
#[derive(Debug)]
11131048
pub struct DeltaTransaction<'a> {
11141049
delta_table: &'a mut DeltaTable,
1050+
actions: Vec<crate::action::Action>,
11151051
options: DeltaTransactionOptions,
11161052
}
11171053

@@ -1122,15 +1058,93 @@ impl<'a> DeltaTransaction<'a> {
11221058
pub fn new(delta_table: &'a mut DeltaTable, options: Option<DeltaTransactionOptions>) -> Self {
11231059
DeltaTransaction {
11241060
delta_table,
1061+
actions: vec![],
11251062
options: options.unwrap_or_else(DeltaTransactionOptions::default),
11261063
}
11271064
}
11281065

1066+
/// Add an arbitrary "action" to the actions associated with this transaction
1067+
pub fn add_action(&mut self, action: action::Action) {
1068+
self.actions.push(action);
1069+
}
1070+
1071+
/// Add an arbitrary number of actions to the actions associated with this transaction
1072+
pub fn add_actions(&mut self, actions: Vec<action::Action>) {
1073+
for action in actions.into_iter() {
1074+
self.actions.push(action);
1075+
}
1076+
}
1077+
1078+
/// Create a new add action and write the given bytes to the storage backend as a fully formed
1079+
/// Parquet file
1080+
///
1081+
/// add_file accepts two optional parameters:
1082+
///
1083+
/// partitions: an ordered vec of WritablePartitionValues for the file to be added
1084+
/// actions: an ordered list of Actions to be inserted into the log file _ahead_ of the Add
1085+
/// action for the file added. This should typically be used for txn type actions
1086+
pub async fn add_file(
1087+
&mut self,
1088+
bytes: &Vec<u8>,
1089+
partitions: Option<Vec<(String, String)>>,
1090+
) -> Result<(), DeltaTransactionError> {
1091+
let path = self.generate_parquet_filename(partitions);
1092+
let storage_path = self
1093+
.delta_table
1094+
.storage
1095+
.join_path(&self.delta_table.table_path, &path);
1096+
1097+
debug!("Writing a parquet file to {}", &storage_path);
1098+
self.delta_table
1099+
.storage
1100+
.put_obj(&storage_path, &bytes)
1101+
.await
1102+
.map_err(|source| DeltaTransactionError::Storage { source })?;
1103+
1104+
// Determine the modification timestamp to include in the add action - milliseconds since epoch
1105+
// Err should be impossible in this case since `SystemTime::now()` is always greater than `UNIX_EPOCH`
1106+
let modification_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
1107+
let modification_time = modification_time.as_millis() as i64;
1108+
1109+
self.actions.push(Action::add(action::Add {
1110+
path,
1111+
size: bytes.len() as i64,
1112+
partitionValues: HashMap::default(),
1113+
partitionValues_parsed: None,
1114+
modificationTime: modification_time,
1115+
dataChange: true,
1116+
stats: None,
1117+
stats_parsed: None,
1118+
tags: None,
1119+
}));
1120+
1121+
Ok(())
1122+
}
1123+
1124+
fn generate_parquet_filename(&self, partitions: Option<Vec<(String, String)>>) -> String {
1125+
let mut path_parts = vec![];
1126+
/*
1127+
* The specific file naming for parquet is not well documented including the preceding five
1128+
* zeros and the trailing c000 string
1129+
*
1130+
*/
1131+
path_parts.push(format!("part-00000-{}-c000.snappy.parquet", Uuid::new_v4()));
1132+
1133+
if let Some(partitions) = partitions {
1134+
for partition in partitions {
1135+
path_parts.push(format!("{}={}", partition.0, partition.1));
1136+
}
1137+
}
1138+
1139+
self.delta_table
1140+
.storage
1141+
.join_paths(&path_parts.iter().map(|s| s.as_str()).collect::<Vec<&str>>())
1142+
}
1143+
11291144
/// Commits the given actions to the delta log.
11301145
/// This method will retry the transaction commit based on the value of `max_retry_commit_attempts` set in `DeltaTransactionOptions`.
1131-
pub async fn commit_with(
1146+
pub async fn commit(
11321147
&mut self,
1133-
additional_actions: &[Action],
11341148
_operation: Option<DeltaOperation>,
11351149
) -> Result<DeltaDataTypeVersion, DeltaTransactionError> {
11361150
// TODO: stubbing `operation` parameter (which will be necessary for writing the CommitInfo action), but leaving it unused for now.
@@ -1154,7 +1168,7 @@ impl<'a> DeltaTransaction<'a> {
11541168
// TODO: create a CommitInfo action and prepend it to actions.
11551169

11561170
// Serialize all actions that are part of this log entry.
1157-
let log_entry = log_entry_from_actions(additional_actions)?;
1171+
let log_entry = log_entry_from_actions(&self.actions)?;
11581172

11591173
// try to commit in a loop in case other writers write the next version first
11601174
let version = self.try_commit_loop(log_entry.as_bytes()).await?;
@@ -1171,12 +1185,11 @@ impl<'a> DeltaTransaction<'a> {
11711185
pub async fn commit_version(
11721186
&mut self,
11731187
version: DeltaDataTypeVersion,
1174-
additional_actions: &[Action],
11751188
_operation: Option<DeltaOperation>,
11761189
) -> Result<DeltaDataTypeVersion, DeltaTransactionError> {
11771190
// TODO: create a CommitInfo action and prepend it to actions.
11781191

1179-
let log_entry = log_entry_from_actions(additional_actions)?;
1192+
let log_entry = log_entry_from_actions(&self.actions)?;
11801193
let tmp_log_path = self.prepare_commit(log_entry.as_bytes()).await?;
11811194
let version = self.try_commit(&tmp_log_path, version).await?;
11821195

@@ -1356,12 +1369,6 @@ pub fn crate_version() -> &'static str {
13561369
env!("CARGO_PKG_VERSION")
13571370
}
13581371

1359-
/// A partition value for writing files
1360-
pub struct WritablePartitionValue {
1361-
name: String,
1362-
value: String,
1363-
}
1364-
13651372
#[cfg(test)]
13661373
mod tests {
13671374
use super::action;

rust/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ extern crate lazy_static;
5252
extern crate parquet;
5353
extern crate regex;
5454
extern crate serde;
55+
#[cfg(test)]
56+
#[macro_use]
5557
extern crate serde_json;
5658
extern crate thiserror;
5759

0 commit comments

Comments
 (0)