Skip to content
Merged
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,5 +95,6 @@ utime = "0.3"
serial_test = "0"
pretty_assertions = "0"
tempdir = "0"
tempfile = "3"
maplit = { version = "1" }
anyhow = "1"
2 changes: 1 addition & 1 deletion rust/src/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use super::open_table_with_version;
use super::schema::*;
use super::storage::{ObjectMeta, StorageBackend, StorageError};
use super::table_state::DeltaTableState;
use super::writer::time_utils;
use super::time_utils;
use super::{CheckPoint, DeltaTableError};
use crate::DeltaTable;

Expand Down
98 changes: 1 addition & 97 deletions rust/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::collections::HashMap;
use std::convert::TryFrom;
use std::fmt;
use std::io::{BufRead, BufReader, Cursor};
use std::time::{SystemTime, UNIX_EPOCH};
use std::{cmp::max, cmp::Ordering, collections::HashSet};
use uuid::Uuid;

Expand Down Expand Up @@ -551,7 +550,7 @@ impl DeltaTableBuilder {
///
#[derive(Debug)]
pub enum PeekCommit {
/// The next commit version and assoicated actions
/// The next commit version and associated actions
New(DeltaDataTypeVersion, Vec<Action>),
/// Provided DeltaVersion is up to date
UpToDate,
Expand Down Expand Up @@ -1468,79 +1467,6 @@ impl<'a> DeltaTransaction<'a> {
}
}

/// Create a new add action and write the given bytes to the storage backend as a fully formed
/// Parquet file
///
/// add_file accepts two optional parameters:
///
/// partitions: an ordered vec of WritablePartitionValues for the file to be added
/// actions: an ordered list of Actions to be inserted into the log file _ahead_ of the Add
/// action for the file added. This should typically be used for txn type actions
pub async fn add_file(
&mut self,
bytes: &[u8],
partitions: Option<Vec<(String, String)>>,
) -> Result<(), DeltaTableError> {
let mut partition_values = HashMap::new();
if let Some(partitions) = &partitions {
for (key, value) in partitions {
partition_values.insert(key.clone(), Some(value.clone()));
}
}

let path = self.generate_parquet_filename(partitions);
let parquet_uri = self
.delta_table
.storage
.join_path(&self.delta_table.table_uri, &path);

debug!("Writing a parquet file to {}", &parquet_uri);
self.delta_table
.storage
.put_obj(&parquet_uri, bytes)
.await?;

// Determine the modification timestamp to include in the add action - milliseconds since epoch
// Err should be impossible in this case since `SystemTime::now()` is always greater than `UNIX_EPOCH`
let modification_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
let modification_time = modification_time.as_millis() as i64;

self.actions.push(Action::add(action::Add {
path,
partition_values,
modification_time,
size: bytes.len() as i64,
partition_values_parsed: None,
data_change: true,
stats: None,
stats_parsed: None,
tags: None,
}));

Ok(())
}

fn generate_parquet_filename(&self, partitions: Option<Vec<(String, String)>>) -> String {
/*
* The specific file naming for parquet is not well documented including the preceding five
* zeros and the trailing c000 string
*
*/
let mut path_parts = vec![];

if let Some(partitions) = partitions {
for partition in partitions {
path_parts.push(format!("{}={}", partition.0, partition.1));
}
}

path_parts.push(format!("part-00000-{}-c000.snappy.parquet", Uuid::new_v4()));

self.delta_table
.storage
.join_paths(&path_parts.iter().map(|s| s.as_str()).collect::<Vec<&str>>())
}

/// Commits the given actions to the delta log.
/// This method will retry the transaction commit based on the value of `max_retry_commit_attempts` set in `DeltaTransactionOptions`.
pub async fn commit(
Expand Down Expand Up @@ -1744,28 +1670,6 @@ mod tests {
));
}

#[tokio::test]
async fn parquet_filename() {
let mut table = open_table("./tests/data/simple_table").await.unwrap();

let txn = DeltaTransaction {
delta_table: &mut table,
actions: vec![],
options: DeltaTransactionOptions::default(),
};

let partitions = vec![
(String::from("col1"), String::from("a")),
(String::from("col2"), String::from("b")),
];
let parquet_filename = txn.generate_parquet_filename(Some(partitions));
if cfg!(windows) {
assert!(parquet_filename.contains("col1=a\\col2=b\\part-00000-"));
} else {
assert!(parquet_filename.contains("col1=a/col2=b/part-00000-"));
}
}

#[tokio::test]
async fn test_create_delta_table() {
// Setup
Expand Down
13 changes: 11 additions & 2 deletions rust/src/delta_arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

use crate::schema;
use arrow::datatypes::{
DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema, SchemaRef, TimeUnit,
DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema,
SchemaRef as ArrowSchemaRef, TimeUnit,
};
use arrow::error::ArrowError;
use lazy_static::lazy_static;
Expand Down Expand Up @@ -175,6 +176,14 @@ impl TryFrom<&ArrowSchema> for schema::Schema {
}
}

impl TryFrom<ArrowSchemaRef> for schema::Schema {
type Error = ArrowError;

fn try_from(arrow_schema: ArrowSchemaRef) -> Result<Self, ArrowError> {
arrow_schema.as_ref().try_into()
}
}

impl TryFrom<&ArrowField> for schema::SchemaField {
type Error = ArrowError;
fn try_from(arrow_field: &ArrowField) -> Result<Self, ArrowError> {
Expand Down Expand Up @@ -268,7 +277,7 @@ pub(crate) fn delta_log_schema_for_table(
table_schema: ArrowSchema,
partition_columns: &[String],
use_extended_remove_schema: bool,
) -> SchemaRef {
) -> ArrowSchemaRef {
lazy_static! {
static ref SCHEMA_FIELDS: Vec<ArrowField> = vec![
ArrowField::new(
Expand Down
1 change: 1 addition & 0 deletions rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ pub mod partitions;
pub mod schema;
pub mod storage;
mod table_state;
mod time_utils;
pub mod writer;

#[cfg(feature = "datafusion-ext")]
Expand Down
48 changes: 48 additions & 0 deletions rust/src/time_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use arrow::temporal_conversions;
use parquet_format::TimeUnit;

/// Convert an ISO-8601/RFC3339 timestamp string to a numeric microsecond epoch representation.
/// Stats strings are written with millisecond precision as described by the delta protocol.
pub fn timestamp_micros_from_stats_string(s: &str) -> Result<i64, chrono::format::ParseError> {
chrono::DateTime::parse_from_rfc3339(s).map(|dt| dt.timestamp_millis() * 1000)
}

/// Convert the timestamp to a ISO-8601 style format suitable for JSON statistics.
pub fn timestamp_to_delta_stats_string(n: i64, time_unit: &TimeUnit) -> String {
let dt = match time_unit {
TimeUnit::MILLIS(_) => temporal_conversions::timestamp_ms_to_datetime(n),
TimeUnit::MICROS(_) => temporal_conversions::timestamp_us_to_datetime(n),
TimeUnit::NANOS(_) => temporal_conversions::timestamp_ns_to_datetime(n),
};

format!("{}", dt.format("%Y-%m-%dT%H:%M:%S%.3fZ"))
}

#[cfg(test)]
mod tests {
use super::*;
use parquet_format::{MicroSeconds, MilliSeconds, NanoSeconds, TimeUnit};

#[test]
fn test_timestamp_to_delta_stats_string() {
let s =
timestamp_to_delta_stats_string(1628685199541, &TimeUnit::MILLIS(MilliSeconds::new()));
assert_eq!("2021-08-11T12:33:19.541Z".to_string(), s);
let s = timestamp_to_delta_stats_string(
1628685199541000,
&TimeUnit::MICROS(MicroSeconds::new()),
);
assert_eq!("2021-08-11T12:33:19.541Z".to_string(), s);
let s = timestamp_to_delta_stats_string(
1628685199541000000,
&TimeUnit::NANOS(NanoSeconds::new()),
);
assert_eq!("2021-08-11T12:33:19.541Z".to_string(), s);
}

#[test]
fn test_timestamp_micros_from_stats_string() {
let us = timestamp_micros_from_stats_string("2021-08-11T12:33:19.541Z").unwrap();
assert_eq!(1628685199541000i64, us);
}
}
Loading