Skip to content

Commit 63d45cf

Browse files
committed
Actually add_file in the flush for BufferedJSONWriter
This commit also addresses a number of other code review comments
1 parent 034eeb6 commit 63d45cf

File tree

3 files changed

+50
-46
lines changed

3 files changed

+50
-46
lines changed

rust/src/bin/delta-inspect.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ async fn main() -> anyhow::Result<()> {
5454
let files = if files_matches.is_present("full_path") {
5555
table.get_file_paths()
5656
} else {
57-
table.get_files().clone()
57+
table.get_files()
5858
};
5959

6060
files.iter().for_each(|f| println!("{}", f));

rust/src/delta.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1085,9 +1085,16 @@ impl<'a> DeltaTransaction<'a> {
10851085
/// action for the file added. This should typically be used for txn type actions
10861086
pub async fn add_file(
10871087
&mut self,
1088-
bytes: &Vec<u8>,
1088+
bytes: &[u8],
10891089
partitions: Option<Vec<(String, String)>>,
10901090
) -> Result<(), DeltaTransactionError> {
1091+
let mut partition_values = HashMap::new();
1092+
if let Some(partitions) = &partitions {
1093+
for (key, value) in partitions {
1094+
partition_values.insert(key.clone(), value.clone());
1095+
}
1096+
}
1097+
10911098
let path = self.generate_parquet_filename(partitions);
10921099
let storage_path = self
10931100
.delta_table
@@ -1109,7 +1116,7 @@ impl<'a> DeltaTransaction<'a> {
11091116
self.actions.push(Action::add(action::Add {
11101117
path,
11111118
size: bytes.len() as i64,
1112-
partitionValues: HashMap::default(),
1119+
partitionValues: partition_values,
11131120
partitionValues_parsed: None,
11141121
modificationTime: modification_time,
11151122
dataChange: true,
@@ -1122,13 +1129,12 @@ impl<'a> DeltaTransaction<'a> {
11221129
}
11231130

11241131
fn generate_parquet_filename(&self, partitions: Option<Vec<(String, String)>>) -> String {
1125-
let mut path_parts = vec![];
11261132
/*
11271133
* The specific file naming for parquet is not well documented including the preceding five
11281134
* zeros and the trailing c000 string
11291135
*
11301136
*/
1131-
path_parts.push(format!("part-00000-{}-c000.snappy.parquet", Uuid::new_v4()));
1137+
let mut path_parts = vec![format!("part-00000-{}-c000.snappy.parquet", Uuid::new_v4())];
11321138

11331139
if let Some(partitions) = partitions {
11341140
for partition in partitions {

rust/src/writer.rs

Lines changed: 39 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
use crate::action::Txn;
88
use crate::{DeltaTableError, DeltaTransactionError};
99
use arrow::record_batch::RecordBatch;
10+
use log::*;
1011
use parquet::arrow::ArrowWriter;
1112
use parquet::basic::Compression;
1213
use parquet::file::properties::WriterProperties;
@@ -16,23 +17,19 @@ use std::collections::HashMap;
1617
use std::convert::TryFrom;
1718
use std::sync::Arc;
1819

19-
/**
20-
* BufferedJSONWriter allows for buffering serde_json::Value rows before flushing to parquet files
21-
* and a Delta transaction
22-
*/
23-
pub struct BufferedJSONWriter {
20+
/// BufferedJsonWriter allows for buffering serde_json::Value rows before flushing to parquet files
21+
/// and a Delta transaction
22+
pub struct BufferedJsonWriter {
2423
table: crate::DeltaTable,
2524
buffer: HashMap<WriterPartition, Vec<Value>>,
2625
schema: arrow::datatypes::SchemaRef,
2726
partitions: Vec<String>,
2827
txns: Vec<Txn>,
2928
}
3029

31-
impl BufferedJSONWriter {
32-
/**
33-
* Attempt to construct the BufferedJSONWriter, will fail if the table's metadata is not
34-
* present
35-
*/
30+
impl BufferedJsonWriter {
31+
/// Attempt to construct the BufferedJsonWriter, will fail if the table's metadata is not
32+
/// present
3633
pub fn try_new(table: crate::DeltaTable) -> Result<Self, DeltaTableError> {
3734
let metadata = table.get_metadata()?.clone();
3835
let schema = metadata.schema;
@@ -49,38 +46,32 @@ impl BufferedJSONWriter {
4946
})
5047
}
5148

52-
/**
53-
* Return the total Values pending in the buffer
54-
*/
49+
/// Return the total Values pending in the buffer
5550
pub fn count(&self, partitions: &WriterPartition) -> Option<usize> {
5651
self.buffer.get(&partitions).map(|b| b.len())
5752
}
5853

59-
/**
60-
* Add a txn action to the buffer
61-
*/
54+
/// Add a txn action to the buffer
6255
pub fn record_txn(&mut self, txn: Txn) {
6356
self.txns.push(txn);
6457
}
6558

66-
/**
67-
* Write a new Value into the buffer
68-
*/
59+
/// Write a new Value into the buffer
6960
pub fn write(
7061
&mut self,
7162
value: Value,
7263
partitions: WriterPartition,
7364
) -> Result<(), DeltaTableError> {
7465
match partitions {
7566
WriterPartition::NoPartitions => {
76-
if self.partitions.len() > 0 {
67+
if !self.partitions.is_empty() {
7768
return Err(DeltaTableError::SchemaMismatch {
7869
msg: "Table has partitions but noone were supplied on write".to_string(),
7970
});
8071
}
8172
}
8273
WriterPartition::KeyValues { .. } => {
83-
if self.partitions.len() == 0 {
74+
if self.partitions.is_empty() {
8475
return Err(DeltaTableError::SchemaMismatch {
8576
msg: "Table has no partitions yet they were supplied on write".to_string(),
8677
});
@@ -96,12 +87,10 @@ impl BufferedJSONWriter {
9687
Ok(())
9788
}
9889

99-
/**
100-
* Flush the buffer, causing a write of parquet files for each set of partitioned information
101-
* as well as any buffered txn actions
102-
*
103-
* This will create a single transaction in the delta transaction log
104-
*/
90+
/// Flush the buffer, causing a write of parquet files for each set of partitioned information
91+
/// as well as any buffered txn actions
92+
///
93+
/// This will create a single transaction in the delta transaction log
10594
pub async fn flush(&mut self) -> Result<(), DeltaTransactionError> {
10695
use arrow::json::reader::Decoder;
10796

@@ -116,21 +105,32 @@ impl BufferedJSONWriter {
116105
.next_batch(&mut value_iter)
117106
.map_err(|source| DeltaTableError::ArrowError { source })?;
118107

119-
if record_batch.is_none() {
120-
return Ok(());
108+
if record_batch.is_some() {
109+
let mut pb = ParquetBuffer::try_new(self.schema.clone())?;
110+
pb.write_batch(&record_batch.unwrap())?;
111+
let _metadata = pb.close();
112+
parquet_bufs.push((partitions.clone(), pb.data()));
113+
} else {
114+
warn!("Attempted to flush an empty RecordBatch from the BufferedJsonWriter");
121115
}
122-
123-
let mut pb = ParquetBuffer::try_new(self.schema.clone())?;
124-
pb.write_batch(&record_batch.unwrap())?;
125-
let _metadata = pb.close();
126-
parquet_bufs.push(pb.data());
127116
}
128117

129118
let mut dtx = self.table.create_transaction(None);
119+
for (partitions, buf) in parquet_bufs {
120+
match partitions {
121+
WriterPartition::NoPartitions => {
122+
dtx.add_file(&buf, None).await?;
123+
}
124+
WriterPartition::KeyValues { partitions } => {
125+
dtx.add_file(&buf, Some(partitions)).await?;
126+
}
127+
}
128+
}
129+
130130
dtx.add_actions(
131131
self.txns
132132
.drain(0..)
133-
.map(|t| crate::action::Action::txn(t))
133+
.map(crate::action::Action::txn)
134134
.collect(),
135135
);
136136

@@ -200,9 +200,7 @@ impl<'a> Iterator for InMemValueIter<'a> {
200200
}
201201
}
202202

203-
/**
204-
* The type of partition for a row being written to a writer
205-
*/
203+
/// The type of partition for a row being written to a writer
206204
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
207205
pub enum WriterPartition {
208206
/// The row is not partitioned
@@ -221,7 +219,7 @@ mod tests {
221219
#[tokio::test]
222220
async fn test_writer_buffer_nopartition() {
223221
let table = crate::open_table("./tests/data/delta-0.8.0").await.unwrap();
224-
let mut writer = BufferedJSONWriter::try_new(table).unwrap();
222+
let mut writer = BufferedJsonWriter::try_new(table).unwrap();
225223
assert_eq!(writer.count(&WriterPartition::NoPartitions), None);
226224
let res = writer.write(json!({"hello":"world"}), WriterPartition::NoPartitions);
227225
assert!(res.is_ok());
@@ -233,15 +231,15 @@ mod tests {
233231
let table = crate::open_table("./tests/data/delta-0.8.0-partitioned")
234232
.await
235233
.unwrap();
236-
let mut writer = BufferedJSONWriter::try_new(table).unwrap();
234+
let mut writer = BufferedJsonWriter::try_new(table).unwrap();
237235
let res = writer.write(json!({"hello":"world"}), WriterPartition::NoPartitions);
238236
assert!(res.is_err());
239237
}
240238

241239
#[tokio::test]
242240
async fn test_writer_write_partitions_to_nopartition() {
243241
let table = crate::open_table("./tests/data/delta-0.8.0").await.unwrap();
244-
let mut writer = BufferedJSONWriter::try_new(table).unwrap();
242+
let mut writer = BufferedJsonWriter::try_new(table).unwrap();
245243
let partitions = WriterPartition::KeyValues {
246244
partitions: vec![("year".to_string(), "2021".to_string())],
247245
};

0 commit comments

Comments
 (0)