Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
f14129b
migrate code and move json logic into separate message handler
roeap Dec 4, 2021
baa9506
scaffolding for record batch handler
roeap Dec 5, 2021
a94fe68
improve tests
roeap Dec 5, 2021
8541c8a
first draft create command
roeap Dec 5, 2021
834618f
partition record batches
roeap Dec 10, 2021
95171ce
separate stats
roeap Dec 10, 2021
36eec02
cleanup
roeap Dec 10, 2021
27d541a
flatten
roeap Dec 10, 2021
d49cfaf
add tests for partitioning
roeap Dec 10, 2021
99f311c
move data writer to separate file
roeap Dec 10, 2021
b5a0855
write and commit tests
roeap Dec 10, 2021
f32ac36
put commands behind datafusion feature gate
roeap Dec 11, 2021
fc12753
rename and cleanup
roeap Dec 11, 2021
2a430a4
renames and cleanup
roeap Dec 11, 2021
8948498
fmt & clippy
roeap Dec 11, 2021
e9c0bb6
clean up writer api
roeap Dec 11, 2021
85d84bb
pass only partition data to partition writers
roeap Dec 11, 2021
30f18f0
soem api cleanup
roeap Dec 11, 2021
8b54c94
move partition writer to writer
roeap Dec 12, 2021
9c94e42
add null count test
roeap Dec 12, 2021
c73e078
hosekeeping
roeap Dec 12, 2021
40d4622
commands as datafusion physical plans
roeap Dec 12, 2021
3f40c5b
partitioned write exploration
roeap Dec 13, 2021
d982f3d
fmt
roeap Dec 13, 2021
f754d55
clippy
roeap Dec 13, 2021
bd922c3
remove dead code
roeap Dec 13, 2021
370b422
factor out execution plan for wrapping transactions
roeap Dec 13, 2021
ac8fa7f
extend commit info / operation handling
roeap Dec 13, 2021
69614bc
generalize transaction plan
roeap Dec 17, 2021
8203f50
fmt + clippy
roeap Dec 17, 2021
ecf368a
one more fmt
roeap Dec 17, 2021
02b66d9
add operation parameters to commit info
roeap Dec 17, 2021
abb7f50
set required features for example
roeap Dec 17, 2021
6f98587
start handling save modes
roeap Dec 17, 2021
0dc5a6d
fmt & clippy
roeap Dec 17, 2021
f03774d
cleaner command execution
roeap Dec 17, 2021
ba95574
update create tests
roeap Dec 17, 2021
b941bfc
create tables when writing to empty location
roeap Dec 17, 2021
02ae5dc
update example
roeap Dec 17, 2021
f800b1b
add append test
roeap Dec 17, 2021
bdfebe3
initial handling of overwrite
roeap Dec 18, 2021
c16a656
use delta operations to intialize commands
roeap Dec 18, 2021
b286e2e
Update rust/src/commands/transaction.rs
roeap Dec 27, 2021
cfa4283
address some PR comemnts
roeap Dec 29, 2021
989e218
skip serialization of null values for command parameters
roeap Dec 29, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,9 @@ utime = "0.3"
serial_test = "0"
pretty_assertions = "0"
tempdir = "0"
tempfile = "3"
maplit = { version = "1" }

[[example]]
name="write_delta_table"
required-features=["datafusion-ext"]
76 changes: 76 additions & 0 deletions rust/examples/write_delta_table.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
extern crate anyhow;
extern crate deltalake;

use arrow::array::{Int32Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use arrow::record_batch::RecordBatch;
use deltalake::{commands::DeltaCommands, DeltaTable, DeltaTableConfig};
use std::sync::Arc;

#[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()> {
// let name = env::args().skip(1).next();

let table_path = std::env::current_dir()?; // .join("/data");
let table_path = table_path.as_path().join("data");

let backend = Box::new(deltalake::storage::file::FileStorageBackend::new(
table_path.as_path().to_str().unwrap(),
));

let mut dt = DeltaTable::new(
table_path.as_path().to_str().unwrap(),
backend,
DeltaTableConfig::default(),
)
.unwrap();

let mut commands = DeltaCommands::try_from_uri(dt.table_uri.to_string())
.await
.unwrap();

let batch = get_record_batch();
commands
.write(vec![batch], None, Some(vec!["modified".to_string()]))
.await
.unwrap();

dt.update().await.unwrap();
println!("{}", dt.version);

Ok(())
}

fn get_record_batch() -> RecordBatch {
let int_values = Int32Array::from(vec![42, 44, 46, 48, 50, 52, 54, 56, 148, 150, 152]);
let id_values = StringArray::from(vec!["A", "B", "C", "D", "E", "F", "G", "H", "D", "E", "F"]);
let modified_values = StringArray::from(vec![
"2021-02-01",
"2021-02-01",
"2021-02-01",
"2021-02-01",
"2021-02-01",
"2021-02-01",
"2021-02-01",
"2021-02-01",
"2021-02-02",
"2021-02-02",
"2021-02-02",
]);

// expected results from parsing json payload
let schema = ArrowSchema::new(vec![
Field::new("id", DataType::Utf8, true),
Field::new("value", DataType::Int32, true),
Field::new("modified", DataType::Utf8, true),
]);
RecordBatch::try_new(
Arc::new(schema),
vec![
Arc::new(id_values),
Arc::new(int_values),
Arc::new(modified_values),
],
)
.unwrap()
}
98 changes: 88 additions & 10 deletions rust/src/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#![allow(non_snake_case, non_camel_case_types)]

use crate::DeltaTableMetaData;
use parquet::record::{ListAccessor, MapAccessor, RowAccessor};
use percent_encoding::percent_decode;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -847,32 +848,109 @@ impl Action {

/// Operation performed when creating a new log entry with one or more actions.
/// This is a key element of the `CommitInfo` action.
#[derive(Serialize, Deserialize, Debug)]
#[allow(clippy::large_enum_variant)]
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub enum DeltaOperation {
/// Represents a Delta `Create` operation.
/// Command for creating a new delta table
Create {
/// Location where table is stored
location: String,
/// Metadata for creating table
metadata: DeltaTableMetaData,
/// The save mode used during creation.
mode: SaveMode,
/// Reader / writer protocols requirements for table
protocol: Protocol,
},
/// Represents a Delta `Write` operation.
/// Write operations will typically only include `Add` actions.
#[serde(rename_all = "camelCase")]
Write {
/// The save mode used during the write.
mode: SaveMode,
/// The columns the write is partitioned by.
partitionBy: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
partition_by: Option<Vec<String>>,
/// The predicate used during the write.
#[serde(skip_serializing_if = "Option::is_none")]
predicate: Option<String>,
},
#[serde(rename_all = "camelCase")]
/// Represents a Delta `StreamingUpdate` operation.
StreamingUpdate {
/// The output mode the streaming writer is using.
outputMode: OutputMode,
output_mode: OutputMode,
/// The query id of the streaming writer.
queryId: String,
query_id: String,
/// The epoch id of the written micro-batch.
epochId: i64,
epoch_id: i64,
},
/// Represents a Delta `Update` operation.
Update {
/// Query string
query: String,
},
// TODO: Add more operations
/// Represents a Delta `Delete` operation.
Delete {
/// The predicate used during the write.
#[serde(skip_serializing_if = "Option::is_none")]
predicate: Option<String>,
}, // TODO: Add more operations
}

impl DeltaOperation {
/// Retrieve basic commit information to be added to Delta commits
pub fn get_commit_info(&self) -> Map<String, Value> {
let mut commit_info = Map::<String, Value>::new();
// TODO add operation parameter to commit info
match &self {
DeltaOperation::Create { .. } => {
commit_info.insert(
"operation".to_string(),
serde_json::Value::String("delta-rs.Create".to_string()),
);
}
DeltaOperation::Write { .. } => {
commit_info.insert(
"operation".to_string(),
serde_json::Value::String("delta-rs.Write".to_string()),
);
}
DeltaOperation::Delete { .. } => {
commit_info.insert(
"operation".to_string(),
serde_json::Value::String("delta-rs.Delete".to_string()),
);
}
DeltaOperation::Update { .. } => {
commit_info.insert(
"operation".to_string(),
serde_json::Value::String("delta-rs.Update".to_string()),
);
}
DeltaOperation::StreamingUpdate { .. } => {
commit_info.insert(
"operation".to_string(),
serde_json::Value::String("delta-rs.StreamingUpdate".to_string()),
);
}
};

if let Ok(serde_json::Value::Object(map)) = serde_json::to_value(self) {
commit_info.insert(
"operationParameters".to_string(),
map.values().next().unwrap().clone(),
);
};

commit_info
}
}

/// The SaveMode used when performing a DeltaOperation
#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum SaveMode {
/// Files will be appended to the target location.
Append,
Expand All @@ -885,7 +963,7 @@ pub enum SaveMode {
}

/// The OutputMode used in streaming operations.
#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum OutputMode {
/// Only new rows will be written when new data is available.
Append,
Expand All @@ -904,9 +982,9 @@ mod tests {
#[test]
fn test_add_action_without_partition_values_and_stats() {
let path = "./tests/data/delta-0.2.0/_delta_log/00000000000000000003.checkpoint.parquet";
let preader = SerializedFileReader::new(File::open(path).unwrap()).unwrap();
let reader = SerializedFileReader::new(File::open(path).unwrap()).unwrap();

let mut iter = preader.get_row_iter(None).unwrap();
let mut iter = reader.get_row_iter(None).unwrap();
let record = iter.nth(9).unwrap();
let add_record = record.get_group(1).unwrap();
let add_action = Add::from_parquet_record(&add_record).unwrap();
Expand Down
Loading