Skip to content

Commit 80f08db

Browse files
zachschuermannscovichnicklan
authored
[write stage0] add Transaction with commit info and commit implementation (#370)
This PR does 4 main things: 1. ~reorganize `transaction.rs` so that the transaction action is now moved to actions module~ **EDIT:** now in #386 1. new `Transaction` API which includes: a. `Table.new_transaction()` to create a new transaction from the latest snapshot of the table b. `Transaction.with_commit_info(engine_commit_info: Box<dyn EngineData>)` to add single-row commit info in the form of a `map<string, string>`. required to commit. c. `Transaction.with_operation(operation: String)` to set the operation name of the transaction (persisted in commit info) d. `Transaction.commit() // consumes transaction` to commit the transaction to the log (currently only supporting committing the commit info) 1. new engine API: `write_json_file(impl Iterator<Item = Box<dyn EngineData>>)` (and a default engine implementation for this) 1. new integration test suite `write.rs` to house many of our write tests as it's implemented resolves #378 --------- Co-authored-by: Ryan Johnson <scovich@users.noreply.github.com> Co-authored-by: Nick Lanham <nicklan@users.noreply.github.com>
1 parent f5d0a42 commit 80f08db

File tree

12 files changed

+1109
-12
lines changed

12 files changed

+1109
-12
lines changed

ffi/src/lib.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,9 @@ pub enum KernelError {
329329
InternalError,
330330
InvalidExpression,
331331
InvalidLogPath,
332+
InvalidCommitInfo,
333+
FileAlreadyExists,
334+
MissingCommitInfo,
332335
}
333336

334337
impl From<Error> for KernelError {
@@ -376,6 +379,9 @@ impl From<Error> for KernelError {
376379
} => Self::from(*source),
377380
Error::InvalidExpressionEvaluation(_) => KernelError::InvalidExpression,
378381
Error::InvalidLogPath(_) => KernelError::InvalidLogPath,
382+
Error::InvalidCommitInfo(_) => KernelError::InvalidCommitInfo,
383+
Error::FileAlreadyExists(_) => KernelError::FileAlreadyExists,
384+
Error::MissingCommitInfo => KernelError::MissingCommitInfo,
379385
}
380386
}
381387
}

kernel/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ delta_kernel_derive = { path = "../derive-macros", version = "0.4.0" }
3636
# used for developer-visibility
3737
visibility = "0.1.1"
3838

39+
# Used in the sync engine
40+
tempfile = { version = "3", optional = true }
3941
# Used in default engine
4042
arrow-buffer = { workspace = true, optional = true }
4143
arrow-array = { workspace = true, optional = true, features = ["chrono-tz"] }
@@ -99,6 +101,7 @@ sync-engine = [
99101
"arrow-json",
100102
"arrow-select",
101103
"parquet",
104+
"tempfile",
102105
]
103106
integration-test = [
104107
"hdfs-native-object-store/integration-test",

kernel/src/actions/mod.rs

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ static LOG_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
4646
.into()
4747
});
4848

49+
static LOG_COMMIT_INFO_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
50+
StructType::new([Option::<CommitInfo>::get_struct_field(COMMIT_INFO_NAME)]).into()
51+
});
52+
4953
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
5054
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
5155
fn get_log_schema() -> &'static SchemaRef {
@@ -58,6 +62,10 @@ fn get_log_add_schema() -> &'static SchemaRef {
5862
&LOG_ADD_SCHEMA
5963
}
6064

65+
pub(crate) fn get_log_commit_info_schema() -> &'static SchemaRef {
66+
&LOG_COMMIT_INFO_SCHEMA
67+
}
68+
6169
#[derive(Debug, Clone, PartialEq, Eq, Schema)]
6270
pub struct Format {
6371
/// Name of the encoding for files in this table
@@ -147,8 +155,26 @@ impl Protocol {
147155
}
148156

149157
#[derive(Debug, Clone, PartialEq, Eq, Schema)]
150-
pub struct CommitInfo {
151-
pub kernel_version: Option<String>,
158+
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
159+
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
160+
struct CommitInfo {
161+
/// The time this logical file was created, as milliseconds since the epoch.
162+
/// Read: optional, write: required (that is, kernel always writes).
163+
/// If in-commit timestamps are enabled, this is always required.
164+
pub(crate) timestamp: Option<i64>,
165+
/// An arbitrary string that identifies the operation associated with this commit. This is
166+
/// specified by the engine. Read: optional, write: required (that is, kernel alwarys writes).
167+
pub(crate) operation: Option<String>,
168+
/// Map of arbitrary string key-value pairs that provide additional information about the
169+
/// operation. This is specified by the engine. For now this is always empty on write.
170+
pub(crate) operation_parameters: Option<HashMap<String, String>>,
171+
/// The version of the delta_kernel crate used to write this commit. The kernel will always
172+
/// write this field, but it is optional since many tables will not have this field (i.e. any
173+
/// tables not written by kernel).
174+
pub(crate) kernel_version: Option<String>,
175+
/// A place for the engine to store additional metadata associated with this commit encoded as
176+
/// a map of strings.
177+
pub(crate) engine_commit_info: Option<HashMap<String, String>>,
152178
}
153179

154180
#[derive(Debug, Clone, PartialEq, Eq, Schema)]
@@ -427,4 +453,32 @@ mod tests {
427453
)]));
428454
assert_eq!(schema, expected);
429455
}
456+
457+
#[test]
458+
fn test_commit_info_schema() {
459+
let schema = get_log_schema()
460+
.project(&["commitInfo"])
461+
.expect("Couldn't get commitInfo field");
462+
463+
let expected = Arc::new(StructType::new(vec![StructField::new(
464+
"commitInfo",
465+
StructType::new(vec![
466+
StructField::new("timestamp", DataType::LONG, true),
467+
StructField::new("operation", DataType::STRING, true),
468+
StructField::new(
469+
"operationParameters",
470+
MapType::new(DataType::STRING, DataType::STRING, false),
471+
true,
472+
),
473+
StructField::new("kernelVersion", DataType::STRING, true),
474+
StructField::new(
475+
"engineCommitInfo",
476+
MapType::new(DataType::STRING, DataType::STRING, false),
477+
true,
478+
),
479+
]),
480+
true,
481+
)]));
482+
assert_eq!(schema, expected);
483+
}
430484
}

kernel/src/engine/arrow_utils.rs

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use arrow_array::{
1616
cast::AsArray, new_null_array, Array as ArrowArray, GenericListArray, OffsetSizeTrait,
1717
RecordBatch, StringArray, StructArray,
1818
};
19-
use arrow_json::ReaderBuilder;
19+
use arrow_json::{LineDelimitedWriter, ReaderBuilder};
2020
use arrow_schema::{
2121
DataType as ArrowDataType, Field as ArrowField, FieldRef as ArrowFieldRef, Fields,
2222
SchemaRef as ArrowSchemaRef,
@@ -662,6 +662,21 @@ fn parse_json_impl(json_strings: &StringArray, schema: ArrowSchemaRef) -> DeltaR
662662
Ok(concat_batches(&schema, output.iter())?)
663663
}
664664

665+
/// serialize an arrow RecordBatch to a JSON string by appending to a buffer.
666+
// TODO (zach): this should stream data to the JSON writer and output an iterator.
667+
pub(crate) fn to_json_bytes(
668+
data: impl Iterator<Item = Box<dyn EngineData>> + Send,
669+
) -> DeltaResult<Vec<u8>> {
670+
let mut writer = LineDelimitedWriter::new(Vec::new());
671+
for chunk in data.into_iter() {
672+
let arrow_data = ArrowEngineData::try_from_engine_data(chunk)?;
673+
let record_batch = arrow_data.record_batch();
674+
writer.write(record_batch)?;
675+
}
676+
writer.finish()?;
677+
Ok(writer.into_inner())
678+
}
679+
665680
#[cfg(test)]
666681
mod tests {
667682
use std::sync::Arc;
@@ -1408,4 +1423,24 @@ mod tests {
14081423
assert_eq!(mask_indices, expect_mask);
14091424
assert_eq!(reorder_indices, expect_reorder);
14101425
}
1426+
1427+
#[test]
1428+
fn test_write_json() -> DeltaResult<()> {
1429+
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
1430+
"string",
1431+
ArrowDataType::Utf8,
1432+
true,
1433+
)]));
1434+
let data = RecordBatch::try_new(
1435+
schema.clone(),
1436+
vec![Arc::new(StringArray::from(vec!["string1", "string2"]))],
1437+
)?;
1438+
let data: Box<dyn EngineData> = Box::new(ArrowEngineData::new(data));
1439+
let json = to_json_bytes(Box::new(std::iter::once(data)))?;
1440+
assert_eq!(
1441+
json,
1442+
"{\"string\":\"string1\"}\n{\"string\":\"string2\"}\n".as_bytes()
1443+
);
1444+
Ok(())
1445+
}
14111446
}

kernel/src/engine/default/json.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,12 @@ use bytes::{Buf, Bytes};
1111
use futures::{StreamExt, TryStreamExt};
1212
use object_store::path::Path;
1313
use object_store::{DynObjectStore, GetResultPayload};
14+
use url::Url;
1415

1516
use super::executor::TaskExecutor;
1617
use super::file_stream::{FileOpenFuture, FileOpener, FileStream};
1718
use crate::engine::arrow_utils::parse_json as arrow_parse_json;
19+
use crate::engine::arrow_utils::to_json_bytes;
1820
use crate::schema::SchemaRef;
1921
use crate::{
2022
DeltaResult, EngineData, Error, ExpressionRef, FileDataReadResultIterator, FileMeta,
@@ -89,6 +91,31 @@ impl<E: TaskExecutor> JsonHandler for DefaultJsonHandler<E> {
8991
self.readahead,
9092
)
9193
}
94+
95+
// note: for now we just buffer all the data and write it out all at once
96+
fn write_json_file(
97+
&self,
98+
path: &Url,
99+
data: Box<dyn Iterator<Item = Box<dyn EngineData>> + Send>,
100+
_overwrite: bool,
101+
) -> DeltaResult<()> {
102+
let buffer = to_json_bytes(data)?;
103+
// Put if absent
104+
let store = self.store.clone(); // cheap Arc
105+
let path = Path::from(path.path());
106+
let path_str = path.to_string();
107+
self.task_executor
108+
.block_on(async move {
109+
store
110+
.put_opts(&path, buffer.into(), object_store::PutMode::Create.into())
111+
.await
112+
})
113+
.map_err(|e| match e {
114+
object_store::Error::AlreadyExists { .. } => Error::FileAlreadyExists(path_str),
115+
e => e.into(),
116+
})?;
117+
Ok(())
118+
}
92119
}
93120

94121
/// A [`FileOpener`] that opens a JSON file and yields a [`FileOpenFuture`]

kernel/src/engine/sync/json.rs

Lines changed: 100 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
1-
use std::{fs::File, io::BufReader};
1+
use std::{fs::File, io::BufReader, io::Write};
22

33
use arrow_schema::SchemaRef as ArrowSchemaRef;
4+
use tempfile::NamedTempFile;
5+
use url::Url;
46

57
use super::read_files;
68
use crate::engine::arrow_data::ArrowEngineData;
79
use crate::engine::arrow_utils::parse_json as arrow_parse_json;
10+
use crate::engine::arrow_utils::to_json_bytes;
811
use crate::schema::SchemaRef;
912
use crate::{
10-
DeltaResult, EngineData, ExpressionRef, FileDataReadResultIterator, FileMeta, JsonHandler,
13+
DeltaResult, EngineData, Error, ExpressionRef, FileDataReadResultIterator, FileMeta,
14+
JsonHandler,
1115
};
1216

1317
pub(crate) struct SyncJsonHandler;
@@ -41,4 +45,98 @@ impl JsonHandler for SyncJsonHandler {
4145
) -> DeltaResult<Box<dyn EngineData>> {
4246
arrow_parse_json(json_strings, output_schema)
4347
}
48+
49+
// For sync writer we write data to a tmp file then atomically rename it to the final path.
50+
// This is highly OS-dependent and for now relies on the atomicity of tempfile's
51+
// `persist_noclobber`.
52+
fn write_json_file(
53+
&self,
54+
path: &Url,
55+
data: Box<dyn Iterator<Item = Box<dyn EngineData>> + Send>,
56+
_overwrite: bool,
57+
) -> DeltaResult<()> {
58+
let path = path
59+
.to_file_path()
60+
.map_err(|_| crate::Error::generic("sync client can only read local files"))?;
61+
let Some(parent) = path.parent() else {
62+
return Err(crate::Error::generic(format!(
63+
"no parent found for {:?}",
64+
path
65+
)));
66+
};
67+
68+
// write data to tmp file
69+
let mut tmp_file = NamedTempFile::new_in(parent)?;
70+
let buf = to_json_bytes(data)?;
71+
tmp_file.write_all(&buf)?;
72+
tmp_file.flush()?;
73+
74+
// use 'persist_noclobber' to atomically rename tmp file to final path
75+
tmp_file
76+
.persist_noclobber(path.clone())
77+
.map_err(|e| match e {
78+
tempfile::PersistError { error, .. }
79+
if error.kind() == std::io::ErrorKind::AlreadyExists =>
80+
{
81+
Error::FileAlreadyExists(path.to_string_lossy().to_string())
82+
}
83+
e => Error::IOError(e.into()),
84+
})?;
85+
Ok(())
86+
}
87+
}
88+
89+
#[cfg(test)]
90+
mod tests {
91+
use super::*;
92+
93+
use std::sync::Arc;
94+
95+
use arrow_array::{RecordBatch, StringArray};
96+
use arrow_schema::DataType as ArrowDataType;
97+
use arrow_schema::Field;
98+
use arrow_schema::Schema as ArrowSchema;
99+
use serde_json::json;
100+
use url::Url;
101+
102+
#[test]
103+
fn test_write_json_file() -> DeltaResult<()> {
104+
let test_dir = tempfile::tempdir().unwrap();
105+
let path = test_dir.path().join("00000000000000000001.json");
106+
let handler = SyncJsonHandler;
107+
108+
let schema = Arc::new(ArrowSchema::new(vec![Field::new(
109+
"dog",
110+
ArrowDataType::Utf8,
111+
true,
112+
)]));
113+
let data = RecordBatch::try_new(
114+
schema.clone(),
115+
vec![Arc::new(StringArray::from(vec!["remi", "wilson"]))],
116+
)?;
117+
let data: Box<dyn EngineData> = Box::new(ArrowEngineData::new(data));
118+
let empty: Box<dyn EngineData> =
119+
Box::new(ArrowEngineData::new(RecordBatch::new_empty(schema)));
120+
121+
let url = Url::from_file_path(path.clone()).unwrap();
122+
handler
123+
.write_json_file(&url, Box::new(std::iter::once(data)), false)
124+
.expect("write json file");
125+
assert!(matches!(
126+
handler.write_json_file(&url, Box::new(std::iter::once(empty)), false),
127+
Err(Error::FileAlreadyExists(_))
128+
));
129+
130+
let file = std::fs::read_to_string(path)?;
131+
let json: Vec<_> = serde_json::Deserializer::from_str(&file)
132+
.into_iter::<serde_json::Value>()
133+
.flatten()
134+
.collect();
135+
assert_eq!(
136+
json,
137+
vec![json!({"dog": "remi"}), json!({"dog": "wilson"}),]
138+
);
139+
140+
Ok(())
141+
}
44142
}

kernel/src/error.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,18 @@ pub enum Error {
159159
/// Unable to parse the name of a log path
160160
#[error("Invalid log path: {0}")]
161161
InvalidLogPath(String),
162+
163+
/// Invalid commit info passed to the transaction
164+
#[error("Invalid commit info: {0}")]
165+
InvalidCommitInfo(String),
166+
167+
/// Commit info was not passed to the transaction
168+
#[error("Missing commit info")]
169+
MissingCommitInfo,
170+
171+
/// The file already exists at the path, prohibiting a non-overwrite write
172+
#[error("File already exists: {0}")]
173+
FileAlreadyExists(String),
162174
}
163175

164176
// Convenience constructors for Error types that take a String argument

0 commit comments

Comments
 (0)