Skip to content

Unable to append to delta table without datafusion feature #2204

Closed
@jhoekx

Description

@jhoekx

Environment

Delta-rs version:
0.17

Binding:
rust

Environment:

  • Cloud provider: -
  • OS: Linux
  • Other: -

Bug

What happened:

First, create a Delta table in rust and write a RecordBatch to it.
Then, open the exact same table and try to write again.

This fails with:

Error: Transaction { source: UnsupportedWriterFeatures([Invariants]) }

What you expected to happen:

Adding new record batches to the table I just created and already wrote to is possible.

How to reproduce it:

Given Cargo.toml:

[package]
name = "delta-example"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
deltalake = { version = "0.17" }
tokio = "1"

Run src/main.rs to write an empty record batch twice:

use std::{future::IntoFuture, sync::Arc};

use deltalake::{
    arrow::array::{ArrayRef, Float64Builder, RecordBatch, TimestampMicrosecondBuilder},
    kernel::{DataType, PrimitiveType, StructField},
    open_table,
    operations::create::CreateBuilder,
    writer::{DeltaWriter, RecordBatchWriter},
    DeltaTable, DeltaTableError,
};

#[tokio::main]
async fn main() -> Result<(), DeltaTableError> {
    let ts: ArrayRef = Arc::new(TimestampMicrosecondBuilder::new().finish());
    let value: ArrayRef = Arc::new(Float64Builder::new().finish());
    let batch = RecordBatch::try_from_iter(vec![("ts", ts), ("value", value)]).unwrap();

    let mut table = create_or_get_table("./data/write").await?;

    let mut writer = RecordBatchWriter::for_table(&table)?;
    writer.write(batch.clone()).await?;
    writer.flush_and_commit(&mut table).await?;

    let mut table = create_or_get_table("./data/write").await?;

    let mut writer = RecordBatchWriter::for_table(&table)?;
    writer.write(batch.clone()).await?;
    writer.flush_and_commit(&mut table).await?;

    Ok(())
}

async fn create_or_get_table(table_uri: &str) -> Result<DeltaTable, DeltaTableError> {
    let table = match open_table(table_uri).await {
        Ok(table) => table,
        Err(err) => {
            if let DeltaTableError::NotATable(_) = err {
                let schema = vec![
                    StructField::new(
                        "ts".to_string(),
                        DataType::Primitive(PrimitiveType::Timestamp),
                        false,
                    ),
                    StructField::new(
                        "value".to_string(),
                        DataType::Primitive(PrimitiveType::Double),
                        false,
                    ),
                ];
                CreateBuilder::new()
                    .with_location(table_uri)
                    .with_columns(schema)
                    .into_future()
                    .await?
            } else {
                return Err(err.into());
            }
        }
    };
    Ok(table)
}

More details:

The reproduction case passes with features = ["datafusion"].

I would expect that I can perform the basic operation of adding a new record batch to the table without needing to pull in datafusion. This was possible in 0.16.

Activity

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Metadata

Assignees

Labels

binding/rustIssues for the Rust cratebugSomething isn't working

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions