Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion crates/sail-data-source/src/formats/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use sail_common_datafusion::datasource::{
use sail_common_datafusion::streaming::event::schema::is_flow_event_schema;
use sail_delta_lake::create_delta_provider;
use sail_delta_lake::datasource::{parse_predicate_expression, DataFusionMixins};
use sail_delta_lake::options::TableDeltaOptions;
use sail_delta_lake::options::{ColumnMappingModeOption, TableDeltaOptions};
use sail_delta_lake::physical_plan::plan_builder::DeltaTableConfig;
use sail_delta_lake::physical_plan::{DeltaDeletePlanBuilder, DeltaPlanBuilder};
use sail_delta_lake::table::open_table_with_object_store;
Expand Down Expand Up @@ -250,6 +250,13 @@ fn apply_delta_write_options(from: DeltaWriteOptions, to: &mut TableDeltaOptions
if let Some(write_batch_size) = from.write_batch_size {
to.write_batch_size = write_batch_size;
}
if let Some(column_mapping_mode) = from.column_mapping_mode {
match column_mapping_mode.to_ascii_lowercase().as_str() {
"name" => to.column_mapping_mode = ColumnMappingModeOption::Name,
"id" => to.column_mapping_mode = ColumnMappingModeOption::Id,
_ => to.column_mapping_mode = ColumnMappingModeOption::None,
}
}
Ok(())
}

Expand Down
14 changes: 14 additions & 0 deletions crates/sail-data-source/src/options/data/delta_write.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,17 @@
supported: true
rust_type: usize
rust_deserialize_with: crate::options::serde::deserialize_non_zero_usize

# Column mapping mode for new Delta tables
# Only "name" mode is supported (besides "none").
# If set to "name", new tables will be created using Delta Column Mapping (name mode).
- key: column_mapping_mode
aliases:
- columnMappingMode
- column_mapping
description: |
Column mapping mode for new Delta tables. Supported values: "none" and "name".
When set to "name", new tables will be created with Delta Column Mapping (name mode).
default: "none"
supported: true
rust_type: String
166 changes: 166 additions & 0 deletions crates/sail-delta-lake/src/column_mapping.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
use std::sync::atomic::{AtomicI64, Ordering};

use datafusion::arrow::datatypes::{Field as ArrowField, Schema as ArrowSchema};
use delta_kernel::schema::{ArrayType, DataType, MapType, MetadataValue, StructField, StructType};
use delta_kernel::table_features::ColumnMappingMode;

use crate::options::ColumnMappingModeOption;

/// Annotate a logical kernel schema with column mapping metadata (id + physicalName)
/// using a sequential id assignment. Intended only for new table creation (name mode).
pub fn annotate_schema_for_column_mapping(schema: &StructType) -> StructType {
let counter = AtomicI64::new(1);
let annotated_fields = schema
.fields()
.map(|f| -> Result<StructField, delta_kernel::Error> { Ok(annotate_field(f, &counter)) });
// Safe: we preserve existing names and structure
#[allow(clippy::expect_used)]
let result = StructType::try_new(annotated_fields).expect("failed to build annotated schema");
result
}

fn annotate_field(field: &StructField, counter: &AtomicI64) -> StructField {
match field.data_type() {
DataType::Struct(struct_type) => {
let next_id = counter.fetch_add(1, Ordering::Relaxed);
let physical_name = format!("col-{}", uuid::Uuid::new_v4());
let annotated = field.clone().add_metadata([
("delta.columnMapping.id", MetadataValue::Number(next_id)),
(
"delta.columnMapping.physicalName",
MetadataValue::String(physical_name),
),
]);
let nested = annotate_struct(struct_type.as_ref(), counter);
StructField {
name: annotated.name().clone(),
data_type: nested.into(),
nullable: annotated.is_nullable(),
metadata: annotated.metadata().clone(),
}
}
DataType::Array(array_type) => {
let next_id = counter.fetch_add(1, Ordering::Relaxed);
let physical_name = format!("col-{}", uuid::Uuid::new_v4());
let annotated = field.clone().add_metadata([
("delta.columnMapping.id", MetadataValue::Number(next_id)),
(
"delta.columnMapping.physicalName",
MetadataValue::String(physical_name),
),
]);
let new_element = match array_type.element_type() {
DataType::Struct(st) => annotate_struct(st.as_ref(), counter).into(),
other => other.clone(),
};
StructField {
name: annotated.name().clone(),
data_type: ArrayType::new(new_element, array_type.contains_null()).into(),
nullable: annotated.is_nullable(),
metadata: annotated.metadata().clone(),
}
}
DataType::Map(map_type) => {
let next_id = counter.fetch_add(1, Ordering::Relaxed);
let physical_name = format!("col-{}", uuid::Uuid::new_v4());
let annotated = field.clone().add_metadata([
("delta.columnMapping.id", MetadataValue::Number(next_id)),
(
"delta.columnMapping.physicalName",
MetadataValue::String(physical_name),
),
]);
let new_key = match map_type.key_type() {
DataType::Struct(st) => annotate_struct(st.as_ref(), counter).into(),
other => other.clone(),
};
let new_value = match map_type.value_type() {
DataType::Struct(st) => annotate_struct(st.as_ref(), counter).into(),
other => other.clone(),
};
StructField {
name: annotated.name().clone(),
data_type: MapType::new(new_key, new_value, map_type.value_contains_null()).into(),
nullable: annotated.is_nullable(),
metadata: annotated.metadata().clone(),
}
}
_ => {
let next_id = counter.fetch_add(1, Ordering::Relaxed);
let physical_name = format!("col-{}", uuid::Uuid::new_v4());
field.clone().add_metadata([
("delta.columnMapping.id", MetadataValue::Number(next_id)),
(
"delta.columnMapping.physicalName",
MetadataValue::String(physical_name),
),
])
}
}
}

fn annotate_struct(struct_type: &StructType, counter: &AtomicI64) -> StructType {
let fields = struct_type
.fields()
.map(|f| -> Result<StructField, delta_kernel::Error> { Ok(annotate_field(f, counter)) });
#[allow(clippy::expect_used)]
let result = StructType::try_new(fields).expect("failed to build nested annotated struct");
result
}

/// Build the physical schema used for file writes according to the column mapping mode.
/// - None: return unchanged.
/// - Name: rename fields to physicalName, remove id/parquet id metadata.
/// - Id: rename fields to physicalName, set parquet.field.id from delta.columnMapping.id.
pub fn make_physical_schema_for_writes(
logical_schema: &StructType,
mode: ColumnMappingModeOption,
) -> StructType {
let kernel_mode = match mode {
ColumnMappingModeOption::None => ColumnMappingMode::None,
ColumnMappingModeOption::Name => ColumnMappingMode::Name,
ColumnMappingModeOption::Id => ColumnMappingMode::Id,
};
logical_schema.make_physical(kernel_mode)
}

/// Enrich a physical Arrow schema with PARQUET:field_id metadata for each field whose
/// logical field in the kernel schema carries a `delta.columnMapping.id`.
///
/// This is needed so Parquet readers can resolve columns by field id when names are
/// physical (e.g., `col-<uuid>`) under column mapping Name/Id modes.
pub fn enrich_arrow_with_parquet_field_ids(
physical_arrow: &ArrowSchema,
logical_kernel: &StructType,
) -> ArrowSchema {
use delta_kernel::schema::ColumnMetadataKey;

// Build map: physical_name -> Option<id>
let mut physical_to_id: std::collections::HashMap<String, Option<i64>> =
std::collections::HashMap::new();
for kf in logical_kernel.fields() {
let id = match kf.get_config_value(&ColumnMetadataKey::ColumnMappingId) {
Some(MetadataValue::Number(fid)) => Some(*fid),
_ => None,
};
let phys = match kf.get_config_value(&ColumnMetadataKey::ColumnMappingPhysicalName) {
Some(MetadataValue::String(s)) => s.clone(),
_ => kf.name().clone(),
};
physical_to_id.insert(phys, id);
}

let new_fields: Vec<ArrowField> = physical_arrow
.fields()
.iter()
.map(|af| {
let mut meta = af.metadata().clone();
if let Some(Some(fid)) = physical_to_id.get(af.name()) {
meta.insert("PARQUET:field_id".to_string(), fid.to_string());
}
ArrowField::new(af.name(), af.data_type().clone(), af.is_nullable()).with_metadata(meta)
})
.collect();

ArrowSchema::new(new_fields)
}
78 changes: 68 additions & 10 deletions crates/sail-delta-lake/src/datasource/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,17 @@ use datafusion::datasource::{TableProvider, TableType};
use datafusion::logical_expr::utils::conjunction;
use datafusion::logical_expr::{Expr, LogicalPlan, TableProviderFilterPushDown};
use datafusion::physical_plan::ExecutionPlan;
// use crate::kernel::arrow::engine_ext::SnapshotExt as KernelSnapshotExt;
// use delta_kernel::snapshot::Snapshot as KernelSnapshot;
use delta_kernel::table_features::ColumnMappingMode;
use deltalake::errors::DeltaResult;
use deltalake::kernel::Add;
use deltalake::logstore::LogStoreRef;
use sail_common_datafusion::rename::physical_plan::rename_projected_physical_plan;

// use deltalake::errors::DeltaTableError;
// use delta_kernel::engine::arrow_conversion::TryIntoArrow;
use crate::column_mapping::enrich_arrow_with_parquet_field_ids;
use crate::datasource::scan::FileScanParams;
use crate::datasource::{
build_file_scan_config, delta_to_datafusion_error, df_logical_schema, get_pushdown_filters,
Expand Down Expand Up @@ -189,16 +196,53 @@ impl TableProvider for DeltaTableProvider {
None
};

// Build file schema (non-partition columns)
// Build physical file schema (non-partition columns) using kernel make_physical
let table_partition_cols = self.snapshot.metadata().partition_columns();
let file_schema = Arc::new(ArrowSchema::new(
schema
.fields()
.iter()
.filter(|f| !table_partition_cols.contains(f.name()))
.cloned()
.collect::<Vec<_>>(),
));
let kmode_explicit: ColumnMappingMode = self
.snapshot
.snapshot()
.table_configuration()
.column_mapping_mode();
let kschema_arc = self.snapshot.snapshot().table_configuration().schema();
// Fallback: if mode is None but schema contains column mapping annotations, treat as Name
let has_annotations = kschema_arc.fields().any(|f| {
f.metadata().contains_key(
delta_kernel::schema::ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(),
) && f
.metadata()
.contains_key(delta_kernel::schema::ColumnMetadataKey::ColumnMappingId.as_ref())
});
let kmode = if matches!(kmode_explicit, ColumnMappingMode::None) && has_annotations {
ColumnMappingMode::Name
} else {
kmode_explicit
};
let physical_kernel = kschema_arc.make_physical(kmode);
let physical_arrow: ArrowSchema =
deltalake::kernel::engine::arrow_conversion::TryIntoArrow::try_into_arrow(
&physical_kernel,
)?;
// If column mapping is enabled, ensure PARQUET:field_id is present in Arrow fields
let physical_arrow = match kmode {
ColumnMappingMode::Name | ColumnMappingMode::Id => {
enrich_arrow_with_parquet_field_ids(&physical_arrow, &kschema_arc)
}
ColumnMappingMode::None => physical_arrow,
};
log::trace!("read_kmode: {:?}", kmode);
let phys_field_names: Vec<String> = physical_arrow
.fields()
.iter()
.map(|f| f.name().clone())
.collect();
log::trace!("read_file_schema_fields: {:?}", &phys_field_names);
let file_fields = physical_arrow
.fields()
.iter()
.filter(|f| !table_partition_cols.contains(f.name()))
.cloned()
.collect::<Vec<_>>();
let file_schema = Arc::new(ArrowSchema::new(file_fields));

let file_scan_config = build_file_scan_config(
&self.snapshot,
Expand All @@ -219,7 +263,21 @@ impl TableProvider for DeltaTableProvider {
// MetricBuilder::new(&metrics).global_counter("files_pruned").add(files_pruned);

// TODO: Properly expose these metrics
Ok(DataSourceExec::from_data_source(file_scan_config))
let scan_exec = DataSourceExec::from_data_source(file_scan_config);
// Rename columns from physical back to logical names expected by `schema`
let mut logical_names = schema
.fields()
.iter()
.filter(|f| !table_partition_cols.contains(f.name()))
.map(|f| f.name().clone())
.collect::<Vec<_>>();
// append partition column names in order
logical_names.extend(table_partition_cols.iter().cloned());
if let Some(file_col) = &config.file_column_name {
logical_names.push(file_col.clone());
}
let renamed = rename_projected_physical_plan(scan_exec, &logical_names, projection)?;
Ok(renamed)
}

fn supports_filters_pushdown(
Expand Down
11 changes: 9 additions & 2 deletions crates/sail-delta-lake/src/kernel/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,11 @@ impl Snapshot {
let scan_iter = scan.scan_metadata_arrow(engine.as_ref())?;
for res in scan_iter {
let batch = res?.scan_files;
let batch = inner.parse_stats_column(&batch)?;
// Be tolerant of malformed or empty stats JSON
let batch = match inner.parse_stats_column(&batch) {
Ok(parsed) => parsed,
Err(_) => batch,
};
if tx.blocking_send(Ok(batch)).is_err() {
break;
}
Expand Down Expand Up @@ -434,7 +438,10 @@ impl EagerSnapshot {
.map_err(|e| DeltaTableError::Generic(e.to_string()))??;

let files = concat_batches(&SCAN_ROW_ARROW_SCHEMA, &files)?;
let files = self.snapshot.inner.parse_stats_column(&files)?;
let files = match self.snapshot.inner.parse_stats_column(&files) {
Ok(parsed) => parsed,
Err(_) => files,
};

self.files = files;

Expand Down
4 changes: 2 additions & 2 deletions crates/sail-delta-lake/src/kernel/transaction/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ impl ProtocolChecker {
pub static INSTANCE: LazyLock<ProtocolChecker> = LazyLock::new(|| {
let mut reader_features = HashSet::new();
reader_features.insert(ReaderFeature::TimestampWithoutTimezone);
// reader_features.insert(ReaderFeature::ColumnMapping);
reader_features.insert(ReaderFeature::ColumnMapping);

let mut writer_features = HashSet::new();
writer_features.insert(WriterFeature::AppendOnly);
Expand All @@ -232,7 +232,7 @@ pub static INSTANCE: LazyLock<ProtocolChecker> = LazyLock::new(|| {
writer_features.insert(WriterFeature::CheckConstraints);
writer_features.insert(WriterFeature::GeneratedColumns);
}
// writer_features.insert(WriterFeature::ColumnMapping);
writer_features.insert(WriterFeature::ColumnMapping);
// writer_features.insert(WriterFeature::IdentityColumns);

ProtocolChecker::new(reader_features, writer_features)
Expand Down
2 changes: 2 additions & 0 deletions crates/sail-delta-lake/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,6 @@ pub mod options;
pub mod physical_plan;
pub mod table;

mod column_mapping;

pub use table::create_delta_provider;
13 changes: 13 additions & 0 deletions crates/sail-delta-lake/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,17 @@ pub struct TableDeltaOptions {

pub version_as_of: Option<i64>,
pub timestamp_as_of: Option<String>,

/// Column mapping mode for new tables (dataframe API only)
#[serde(default)]
pub column_mapping_mode: ColumnMappingModeOption,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "camelCase")]
pub enum ColumnMappingModeOption {
#[default]
None,
Name,
Id,
}
Loading