Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion iceberg_rust_ffi/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion iceberg_rust_ffi/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "iceberg_rust_ffi"
version = "0.7.9"
version = "0.7.10"
edition = "2021"

[lib]
Expand All @@ -20,6 +20,8 @@ futures = "0.3"
libc = "0.2"
anyhow = "1.0"
arrow-array = "57.1"
arrow-buffer = "57.1"
arrow-schema = "57.1"
arrow-ipc = "57.1"
parquet = "57.1"
tracing-subscriber = "0.3"
Expand Down
4 changes: 4 additions & 0 deletions iceberg_rust_ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ mod transaction;
// Writer module
mod writer;

// Column-based writer module (zero-copy from Julia)
mod writer_columns;

// Response types module
mod response;

Expand All @@ -50,6 +53,7 @@ pub use transaction::{IcebergDataFiles, IcebergTransaction, IcebergTransactionRe
pub use writer::{
IcebergDataFileWriter, IcebergDataFileWriterResponse, IcebergWriterCloseResponse,
};
pub use writer_columns::ColumnDescriptor;

// We use `jl_adopt_thread` to ensure Rust can call into Julia when notifying
// the Base.Event that is waiting for the Rust result.
Expand Down
14 changes: 13 additions & 1 deletion iceberg_rust_ffi/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
/// for use with the Transaction API.
use std::ffi::{c_char, c_void};
use std::io::Cursor;
use std::sync::Arc;

use arrow_ipc::reader::StreamReader;
use arrow_schema::SchemaRef as ArrowSchemaRef;
use iceberg::arrow::schema_to_arrow_schema;
use iceberg::spec::DataFileFormat;
use iceberg::writer::base_writer::data_file_writer::{DataFileWriter, DataFileWriterBuilder};
use iceberg::writer::file_writer::location_generator::{
Expand Down Expand Up @@ -52,7 +55,9 @@ type ConcreteDataFileWriter =
/// Opaque writer handle for FFI
/// Holds the DataFileWriter which can write RecordBatches and produce DataFiles
pub struct IcebergDataFileWriter {
writer: Option<ConcreteDataFileWriter>,
pub(crate) writer: Option<ConcreteDataFileWriter>,
/// The Arrow schema for this table, used by write_columns to create RecordBatches
pub(crate) arrow_schema: ArrowSchemaRef,
}

unsafe impl Send for IcebergDataFileWriter {}
Expand Down Expand Up @@ -150,8 +155,15 @@ export_runtime_op!(
.await
.map_err(|e| anyhow::anyhow!("Failed to build data file writer: {}", e))?;

// Convert Iceberg schema to Arrow schema for use in write_columns
let arrow_schema = Arc::new(
schema_to_arrow_schema(table.metadata().current_schema().as_ref())
.map_err(|e| anyhow::anyhow!("Failed to convert schema to Arrow: {}", e))?
);

Ok::<IcebergDataFileWriter, anyhow::Error>(IcebergDataFileWriter {
writer: Some(writer),
arrow_schema,
})
},
table: *mut IcebergTable,
Expand Down
239 changes: 239 additions & 0 deletions iceberg_rust_ffi/src/writer_columns.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
/// Column-based writer support for iceberg_rust_ffi
///
/// This module provides FFI bindings for writing raw column data directly to Parquet,
/// avoiding the overhead of Arrow IPC serialization. Julia passes raw column pointers
/// and metadata, and Rust builds Arrow arrays directly from them.
use std::ffi::c_void;
use std::sync::Arc;

use arrow_array::{
types::{Date32Type, Float32Type, Float64Type, Int32Type, Int64Type, TimestampMicrosecondType},
ArrayRef, BooleanArray, PrimitiveArray, RecordBatch, StringArray,
};
use arrow_buffer::{BooleanBuffer, Buffer, NullBuffer, ScalarBuffer};

use crate::writer::IcebergDataFileWriter;
use iceberg::writer::IcebergWriter;
use object_store_ffi::{
export_runtime_op, with_cancellation, CResult, NotifyGuard, ResponseGuard, RT,
};

/// Column type codes (must match Julia's ColumnType enum)
pub const COLUMN_TYPE_INT32: i32 = 0;
pub const COLUMN_TYPE_INT64: i32 = 1;
pub const COLUMN_TYPE_FLOAT32: i32 = 2;
pub const COLUMN_TYPE_FLOAT64: i32 = 3;
pub const COLUMN_TYPE_STRING: i32 = 4;
pub const COLUMN_TYPE_DATE: i32 = 5;
pub const COLUMN_TYPE_TIMESTAMP: i32 = 6;
pub const COLUMN_TYPE_BOOLEAN: i32 = 7;
pub const COLUMN_TYPE_UUID: i32 = 8;

/// Descriptor for a single column passed from Julia
#[repr(C)]
#[derive(Clone, Copy)]
pub struct ColumnDescriptor {
/// Pointer to the raw data (interpretation depends on column_type)
pub data_ptr: *const c_void,
/// For string columns: pointer to offsets array (Int64)
pub offsets_ptr: *const i64,
/// Pointer to validity bitmap (only if is_nullable is true)
/// Points to bit-packed data from Julia's BitVector.chunks (UInt64 array)
/// Bit i is 1 if row i is valid, 0 if null
pub validity_ptr: *const u8,
/// Number of rows
pub num_rows: usize,
/// Column type (see COLUMN_TYPE_* constants)
pub column_type: i32,
/// Whether this column is nullable
pub is_nullable: bool,
}

unsafe impl Send for ColumnDescriptor {}
unsafe impl Sync for ColumnDescriptor {}

/// Build an Arrow array from a ColumnDescriptor
unsafe fn build_arrow_array(desc: &ColumnDescriptor) -> Result<ArrayRef, anyhow::Error> {
let null_buffer = if desc.is_nullable && !desc.validity_ptr.is_null() {
// Julia's BitVector stores bits packed in UInt64 chunks, which we can use directly
// since Arrow also uses little-endian bit-packed format.
// We just need to copy the right number of bytes.
let num_bytes = (desc.num_rows + 7) / 8;
let validity_slice = std::slice::from_raw_parts(desc.validity_ptr, num_bytes);
Some(NullBuffer::new(BooleanBuffer::new(
Buffer::from(validity_slice.to_vec()),
0,
desc.num_rows,
)))
} else {
None
};

let array: ArrayRef = match desc.column_type {
COLUMN_TYPE_INT32 => {
let data = std::slice::from_raw_parts(desc.data_ptr as *const i32, desc.num_rows);
let buffer = ScalarBuffer::from(data.to_vec());
Arc::new(PrimitiveArray::<Int32Type>::new(buffer, null_buffer))
}
COLUMN_TYPE_INT64 => {
let data = std::slice::from_raw_parts(desc.data_ptr as *const i64, desc.num_rows);
let buffer = ScalarBuffer::from(data.to_vec());
Arc::new(PrimitiveArray::<Int64Type>::new(buffer, null_buffer))
}
COLUMN_TYPE_FLOAT32 => {
let data = std::slice::from_raw_parts(desc.data_ptr as *const f32, desc.num_rows);
let buffer = ScalarBuffer::from(data.to_vec());
Arc::new(PrimitiveArray::<Float32Type>::new(buffer, null_buffer))
}
COLUMN_TYPE_FLOAT64 => {
let data = std::slice::from_raw_parts(desc.data_ptr as *const f64, desc.num_rows);
let buffer = ScalarBuffer::from(data.to_vec());
Arc::new(PrimitiveArray::<Float64Type>::new(buffer, null_buffer))
}
COLUMN_TYPE_DATE => {
// Date is stored as Int32 (days since epoch)
let data = std::slice::from_raw_parts(desc.data_ptr as *const i32, desc.num_rows);
let buffer = ScalarBuffer::from(data.to_vec());
Arc::new(PrimitiveArray::<Date32Type>::new(buffer, null_buffer))
}
COLUMN_TYPE_TIMESTAMP => {
// Timestamp is stored as Int64 (microseconds since epoch)
let data = std::slice::from_raw_parts(desc.data_ptr as *const i64, desc.num_rows);
let buffer = ScalarBuffer::from(data.to_vec());
Arc::new(
PrimitiveArray::<TimestampMicrosecondType>::new(buffer, null_buffer)
.with_timezone("UTC"),
)
}
COLUMN_TYPE_BOOLEAN => {
let data = std::slice::from_raw_parts(desc.data_ptr as *const u8, desc.num_rows);
// Convert bytes to boolean buffer
let mut bits = vec![0u8; (desc.num_rows + 7) / 8];
for (i, &val) in data.iter().enumerate() {
if val != 0 {
bits[i / 8] |= 1 << (i % 8);
}
}
let values = BooleanBuffer::new(Buffer::from(bits), 0, desc.num_rows);
Arc::new(BooleanArray::new(values, null_buffer))
}
COLUMN_TYPE_STRING => {
// String data is passed as:
// - data_ptr: pointer to concatenated UTF-8 bytes
// - offsets_ptr: pointer to Int64 offsets array (length = num_rows + 1)
if desc.offsets_ptr.is_null() {
return Err(anyhow::anyhow!("String column requires offsets"));
}
let offsets = std::slice::from_raw_parts(desc.offsets_ptr, desc.num_rows + 1);
let total_bytes = offsets[desc.num_rows] as usize;
let bytes = std::slice::from_raw_parts(desc.data_ptr as *const u8, total_bytes);

// Build strings from offsets
let mut strings: Vec<Option<&str>> = Vec::with_capacity(desc.num_rows);
for i in 0..desc.num_rows {
let is_null: bool = if let Some(ref nb) = null_buffer {
nb.is_null(i)
} else {
false
};
if is_null {
strings.push(None);
} else {
let start = offsets[i] as usize;
let end = offsets[i + 1] as usize;
let s = std::str::from_utf8(&bytes[start..end])
.map_err(|e| anyhow::anyhow!("Invalid UTF-8 in string column: {}", e))?;
strings.push(Some(s));
}
}
Arc::new(StringArray::from(strings))
}
COLUMN_TYPE_UUID => {
// UUID is stored as 16 bytes (UInt128 in Julia)
// Store as fixed-size binary (16 bytes per value)
let data = std::slice::from_raw_parts(desc.data_ptr as *const u8, desc.num_rows * 16);

// Build the array using the builder or from_iter_values
let values: Vec<&[u8]> = data.chunks(16).collect();
Arc::new(
arrow_array::FixedSizeBinaryArray::try_from_iter(values.into_iter())
.map_err(|e| anyhow::anyhow!("Failed to create UUID array: {}", e))?,
)
}
_ => {
return Err(anyhow::anyhow!("Unknown column type: {}", desc.column_type));
}
};

Ok(array)
}

// Write columns directly to the Parquet writer.
// Accepts an array of ColumnDescriptors and builds a RecordBatch from them,
// then writes to the underlying Parquet writer.
// The caller must ensure all pointers are valid and point to appropriately sized data.
export_runtime_op!(
iceberg_writer_write_columns,
crate::IcebergResponse,
|| {
if writer.is_null() {
return Err(anyhow::anyhow!("Null writer pointer provided"));
}
if columns.is_null() || num_columns == 0 {
return Err(anyhow::anyhow!("No columns provided"));
}

// Copy column descriptors for safe use across await
let cols: Vec<ColumnDescriptor> = unsafe {
std::slice::from_raw_parts(columns, num_columns).to_vec()
};

let writer_ref = unsafe { &mut *writer };
Ok((writer_ref, cols))
},
result_tuple,
async {
let (writer_ref, cols) = result_tuple;

// Get the writer's schema (stored when writer was created)
let arrow_schema = writer_ref.arrow_schema.clone();

// Validate column count matches schema
if cols.len() != arrow_schema.fields().len() {
return Err(anyhow::anyhow!(
"Column count mismatch: got {} columns but schema has {} fields",
cols.len(),
arrow_schema.fields().len()
));
}

// Get the writer
let iceberg_writer = writer_ref
.writer
.as_mut()
.ok_or_else(|| anyhow::anyhow!("Writer has been closed"))?;

// Build Arrow arrays from column descriptors
let mut arrays: Vec<ArrayRef> = Vec::with_capacity(cols.len());

for desc in cols.iter() {
let array = unsafe { build_arrow_array(desc)? };
arrays.push(array);
}

// Create record batch using the table's Arrow schema (with proper field IDs)
let batch = RecordBatch::try_new(arrow_schema, arrays)
.map_err(|e| anyhow::anyhow!("Failed to create RecordBatch: {}", e))?;

// Write the batch
iceberg_writer
.write(batch)
.await
.map_err(|e| anyhow::anyhow!("Failed to write batch: {}", e))?;

Ok::<(), anyhow::Error>(())
},
writer: *mut IcebergDataFileWriter,
columns: *const ColumnDescriptor,
num_columns: usize
);
6 changes: 5 additions & 1 deletion src/RustyIceberg.jl
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,12 @@ export schema_to_json, partition_spec_to_json, sort_order_to_json
export iceberg_type_to_arrow_type, arrow_type, arrow_types
export Transaction, DataFiles, free_transaction!, free_data_files!, commit, transaction
export FastAppendAction, free_fast_append_action!, add_data_files, apply, with_fast_append
export DataFileWriter, free_writer!, close_writer
export DataFileWriter, free_writer!, close_writer, write_columns
export WriterConfig, CompressionCodec, UNCOMPRESSED, SNAPPY, GZIP, LZ4, ZSTD
export ColumnDescriptor, ColumnType
export COLUMN_TYPE_INT32, COLUMN_TYPE_INT64, COLUMN_TYPE_FLOAT32, COLUMN_TYPE_FLOAT64
export COLUMN_TYPE_STRING, COLUMN_TYPE_DATE, COLUMN_TYPE_TIMESTAMP, COLUMN_TYPE_BOOLEAN, COLUMN_TYPE_UUID
export julia_type_to_column_type

# Always use the JLL library - override via Preferences if needed for local development
# To use a local build, set the preference:
Expand Down
Loading