Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adopt kernel schema types #2495

Merged
merged 11 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: adopt more kernel
  • Loading branch information
roeap committed Jun 4, 2024
commit 7ed93efc8cb9a511d527399d2a0dcfb617da9738
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ debug = true
debug = "line-tables-only"

[workspace.dependencies]
delta_kernel = { git = "https://github.com/roeap/delta-kernel-rs", rev = "312044f5759f4e38e634d2dbfb62bc9fcd95c471" }
delta_kernel = { git = "https://github.com/roeap/delta-kernel-rs", rev = "37a79a309c5405e21ce203fe4b550c39a24f287f" }
# delta_kernel = { path = "../delta-kernel-rs/kernel" }

# arrow
arrow = { version = "51" }
Expand Down
6 changes: 0 additions & 6 deletions crates/core/src/kernel/models/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,6 @@ impl StructTypeExt for StructType {
}
}

/// The maximum precision for [PrimitiveType::Decimal] values
pub const DECIMAL_MAX_PRECISION: u8 = 38;

/// The maximum scale for [PrimitiveType::Decimal] values
pub const DECIMAL_MAX_SCALE: i8 = 38;

#[cfg(test)]
mod tests {
use super::*;
Expand Down
5 changes: 3 additions & 2 deletions crates/core/src/kernel/scalars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,15 @@ impl ScalarExt for Scalar {
}
Ordering::Less => {
let mut s = value.to_string();
for _ in 0..(scale.abs()) {
for _ in 0..*scale {
s.push('0');
}
s
}
},
Self::Binary(val) => create_escaped_binary_string(val.as_slice()),
Self::Null(_) => "null".to_string(),
Self::Struct(_, _) => todo!(),
}
}

Expand Down Expand Up @@ -161,7 +162,7 @@ impl ScalarExt for Scalar {
Decimal128(precision, scale) => {
arr.as_any().downcast_ref::<Decimal128Array>().map(|v| {
let value = v.value(index);
Self::Decimal(value, *precision, *scale)
Self::Decimal(value, *precision, *scale as u8)
})
}
Date32 => arr
Expand Down
20 changes: 19 additions & 1 deletion crates/core/src/operations/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::collections::HashMap;
use std::sync::Arc;

use delta_kernel::schema::MetadataValue;
use futures::future::BoxFuture;
use maplit::hashset;
use serde_json::Value;
Expand Down Expand Up @@ -128,7 +129,24 @@ impl CreateBuilder {
) -> Self {
let mut field = StructField::new(name.into(), data_type, nullable);
if let Some(meta) = metadata {
field = field.with_metadata(meta);
field = field.with_metadata(meta.iter().map(|(k, v)| {
(
k,
if let Value::Number(n) = v {
n.as_i64().map_or_else(
|| MetadataValue::String(v.to_string()),
|i| {
i32::try_from(i)
.ok()
.map(MetadataValue::Number)
.unwrap_or_else(|| MetadataValue::String(v.to_string()))
},
)
} else {
MetadataValue::String(v.to_string())
},
)
}));
};
self.columns.push(field);
self
Expand Down
38 changes: 37 additions & 1 deletion crates/core/src/schema/partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,42 @@ pub enum PartitionValue {
NotIn(Vec<String>),
}

#[derive(Clone, Debug, PartialEq)]
struct ScalarHelper<'a>(&'a Scalar);

impl PartialOrd for ScalarHelper<'_> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
use Scalar::*;
match (self.0, other.0) {
(Null(_), Null(_)) => Some(Ordering::Equal),
(Integer(a), Integer(b)) => a.partial_cmp(b),
(Long(a), Long(b)) => a.partial_cmp(b),
(Short(a), Short(b)) => a.partial_cmp(b),
(Byte(a), Byte(b)) => a.partial_cmp(b),
(Float(a), Float(b)) => a.partial_cmp(b),
(Double(a), Double(b)) => a.partial_cmp(b),
(String(a), String(b)) => a.partial_cmp(b),
(Boolean(a), Boolean(b)) => a.partial_cmp(b),
(Timestamp(a), Timestamp(b)) => a.partial_cmp(b),
(TimestampNtz(a), TimestampNtz(b)) => a.partial_cmp(b),
(Date(a), Date(b)) => a.partial_cmp(b),
(Binary(a), Binary(b)) => a.partial_cmp(b),
(Decimal(a, p1, s1), Decimal(b, p2, s2)) => {
// TODO implement proper decimal comparison
if p1 != p2 || s1 != s2 {
return None;
};
a.partial_cmp(b)
}
// TODO should we make an assumption about the ordering of nulls?
// rigth now this is only used for internal purposes.
(Null(_), _) => Some(Ordering::Less),
(_, Null(_)) => Some(Ordering::Greater),
_ => None,
}
}
}

/// A Struct used for filtering a DeltaTable partition by key and value.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct PartitionFilter {
Expand All @@ -50,7 +86,7 @@ fn compare_typed_value(
match data_type {
DataType::Primitive(primitive_type) => {
let other = primitive_type.parse_scalar(filter_value).ok()?;
partition_value.partial_cmp(&other)
ScalarHelper(partition_value).partial_cmp(&ScalarHelper(&other))
}
// NOTE: complex types are not supported as partition columns
_ => None,
Expand Down
47 changes: 2 additions & 45 deletions crates/core/src/table/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
use std::time::Duration;
use std::{collections::HashMap, str::FromStr};

use delta_kernel::column_mapping::ColumnMappingMode;
use lazy_static::lazy_static;
use serde::{Deserialize, Serialize};

use crate::errors::DeltaTableError;

use super::Constraint;
use crate::errors::DeltaTableError;

/// Typed property keys that can be defined on a delta table
/// <https://docs.delta.io/latest/table-properties.html#delta-table-properties-reference>
Expand Down Expand Up @@ -463,49 +463,6 @@ impl FromStr for CheckpointPolicy {
}
}

#[derive(Serialize, Deserialize, Debug, Copy, Clone, PartialEq)]
/// The Column Mapping modes used for reading and writing data
#[serde(rename_all = "camelCase")]
pub enum ColumnMappingMode {
/// No column mapping is applied
None,
/// Columns are mapped by their field_id in parquet
Id,
/// Columns are mapped to a physical name
Name,
}

impl Default for ColumnMappingMode {
fn default() -> Self {
Self::None
}
}

impl AsRef<str> for ColumnMappingMode {
fn as_ref(&self) -> &str {
match self {
Self::None => "none",
Self::Id => "id",
Self::Name => "name",
}
}
}

impl FromStr for ColumnMappingMode {
type Err = DeltaTableError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_ascii_lowercase().as_str() {
"none" => Ok(Self::None),
"id" => Ok(Self::Id),
"name" => Ok(Self::Name),
_ => Err(DeltaTableError::Generic(
"Invalid string for ColumnMappingMode".into(),
)),
}
}
}

const SECONDS_PER_MINUTE: u64 = 60;
const SECONDS_PER_HOUR: u64 = 60 * SECONDS_PER_MINUTE;
const SECONDS_PER_DAY: u64 = 24 * SECONDS_PER_HOUR;
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/table/state_arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ use arrow_array::{
StringArray, StructArray, TimestampMicrosecondArray, TimestampMillisecondArray,
};
use arrow_schema::{DataType, Field, Fields, TimeUnit};
use delta_kernel::column_mapping::ColumnMappingMode;
use itertools::Itertools;

use super::config::ColumnMappingMode;
use super::state::DeltaTableState;
use crate::errors::DeltaTableError;
use crate::kernel::{Add, DataType as DeltaDataType, StructType};
Expand Down Expand Up @@ -184,7 +184,7 @@ impl DeltaTableState {
"Invalid partition column {0}",
name
)))?
.physical_name()?
.physical_name(column_mapping_mode)?
.to_string();
Ok((physical_name, name.as_str()))
})
Expand Down
1 change: 1 addition & 0 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1364,6 +1364,7 @@ fn scalar_to_py(value: &Scalar, py_date: &PyAny, py: Python) -> PyResult<PyObjec
date.to_object(py)
}
Decimal(_, _, _) => value.serialize().to_object(py),
Struct(_, _) => todo!(),
};

Ok(val)
Expand Down
47 changes: 20 additions & 27 deletions python/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use deltalake::arrow::datatypes::{
use deltalake::arrow::error::ArrowError;
use deltalake::arrow::pyarrow::PyArrowType;
use deltalake::kernel::{
ArrayType as DeltaArrayType, DataType, MapType as DeltaMapType, PrimitiveType as DeltaPrimitve,
StructField, StructType as DeltaStructType, StructTypeExt,
ArrayType as DeltaArrayType, DataType, MapType as DeltaMapType, MetadataValue,
PrimitiveType as DeltaPrimitve, StructField, StructType as DeltaStructType, StructTypeExt,
};
use pyo3::exceptions::{PyException, PyNotImplementedError, PyTypeError, PyValueError};
use pyo3::prelude::*;
Expand Down Expand Up @@ -98,30 +98,6 @@ impl PrimitiveType {
Ok(Self {
inner_type: data_type,
})

// if data_type.starts_with("decimal") {
// if try_parse_decimal_type(&data_type).is_none() {
// Err(PyValueError::new_err(format!(
// "invalid decimal type: {data_type}"
// )))
// } else {
// Ok(Self {
// inner_type: data_type,
// })
// }
// } else if !VALID_PRIMITIVE_TYPES
// .iter()
// .any(|&valid| data_type == valid)
// {
// Err(PyValueError::new_err(format!(
// "data_type must be one of decimal(<precision>, <scale>), {}.",
// VALID_PRIMITIVE_TYPES.join(", ")
// )))
// } else {
// Ok(Self {
// inner_type: data_type,
// })
// }
}

#[getter]
Expand Down Expand Up @@ -455,7 +431,24 @@ impl Field {
};

let mut inner = StructField::new(name, ty, nullable);
inner = inner.with_metadata(metadata);
inner = inner.with_metadata(metadata.iter().map(|(k, v)| {
(
k,
if let serde_json::Value::Number(n) = v {
n.as_i64().map_or_else(
|| MetadataValue::String(v.to_string()),
|i| {
i32::try_from(i)
.ok()
.map(MetadataValue::Number)
.unwrap_or_else(|| MetadataValue::String(v.to_string()))
},
)
} else {
MetadataValue::String(v.to_string())
},
)
}));

Ok(Self { inner })
}
Expand Down