Closed
Description
Environment
Delta-rs version:
0.17
Binding:
rust
Environment:
- Cloud provider: -
- OS: Linux
- Other: -
Bug
What happened:
First, create a Delta table in rust and write a RecordBatch
to it.
Then, open the exact same table and try to write again.
This fails with:
Error: Transaction { source: UnsupportedWriterFeatures([Invariants]) }
What you expected to happen:
Adding new record batches to the table I just created and already wrote to is possible.
How to reproduce it:
Given Cargo.toml
:
[package]
name = "delta-example"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
deltalake = { version = "0.17" }
tokio = "1"
Run src/main.rs
to write an empty record batch twice:
use std::{future::IntoFuture, sync::Arc};
use deltalake::{
arrow::array::{ArrayRef, Float64Builder, RecordBatch, TimestampMicrosecondBuilder},
kernel::{DataType, PrimitiveType, StructField},
open_table,
operations::create::CreateBuilder,
writer::{DeltaWriter, RecordBatchWriter},
DeltaTable, DeltaTableError,
};
#[tokio::main]
async fn main() -> Result<(), DeltaTableError> {
let ts: ArrayRef = Arc::new(TimestampMicrosecondBuilder::new().finish());
let value: ArrayRef = Arc::new(Float64Builder::new().finish());
let batch = RecordBatch::try_from_iter(vec![("ts", ts), ("value", value)]).unwrap();
let mut table = create_or_get_table("./data/write").await?;
let mut writer = RecordBatchWriter::for_table(&table)?;
writer.write(batch.clone()).await?;
writer.flush_and_commit(&mut table).await?;
let mut table = create_or_get_table("./data/write").await?;
let mut writer = RecordBatchWriter::for_table(&table)?;
writer.write(batch.clone()).await?;
writer.flush_and_commit(&mut table).await?;
Ok(())
}
async fn create_or_get_table(table_uri: &str) -> Result<DeltaTable, DeltaTableError> {
let table = match open_table(table_uri).await {
Ok(table) => table,
Err(err) => {
if let DeltaTableError::NotATable(_) = err {
let schema = vec![
StructField::new(
"ts".to_string(),
DataType::Primitive(PrimitiveType::Timestamp),
false,
),
StructField::new(
"value".to_string(),
DataType::Primitive(PrimitiveType::Double),
false,
),
];
CreateBuilder::new()
.with_location(table_uri)
.with_columns(schema)
.into_future()
.await?
} else {
return Err(err.into());
}
}
};
Ok(table)
}
More details:
The reproduction case passes with features = ["datafusion"]
.
I would expect that I can perform the basic operation of adding a new record batch to the table without needing to pull in datafusion. This was possible in 0.16.
Activity