Skip to content

Commit

Permalink
DataFusion 42 compat
Browse files Browse the repository at this point in the history
  • Loading branch information
adragomir committed Oct 10, 2024
1 parent 496bdb3 commit 7a6094b
Show file tree
Hide file tree
Showing 18 changed files with 132 additions and 46 deletions.
68 changes: 44 additions & 24 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,34 +26,34 @@ debug = true
debug = "line-tables-only"

[workspace.dependencies]
delta_kernel = { version = "=0.3.0" }
delta_kernel = { version = "0.3.1" }
# delta_kernel = { path = "../delta-kernel-rs/kernel", version = "0.3.0" }

# arrow
arrow = { version = "52" }
arrow-arith = { version = "52" }
arrow-array = { version = "52", features = ["chrono-tz"] }
arrow-buffer = { version = "52" }
arrow-cast = { version = "52" }
arrow-ipc = { version = "52" }
arrow-json = { version = "52" }
arrow-ord = { version = "52" }
arrow-row = { version = "52" }
arrow-schema = { version = "52" }
arrow-select = { version = "52" }
object_store = { version = "0.10.1" }
parquet = { version = "52" }
arrow = { version = "53" }
arrow-arith = { version = "53" }
arrow-array = { version = "53", features = ["chrono-tz"] }
arrow-buffer = { version = "53" }
arrow-cast = { version = "53" }
arrow-ipc = { version = "53" }
arrow-json = { version = "53" }
arrow-ord = { version = "53" }
arrow-row = { version = "53" }
arrow-schema = { version = "53" }
arrow-select = { version = "53" }
object_store = { version = "0.11.0" }
parquet = { version = "53" }

# datafusion
datafusion = { version = "41" }
datafusion-expr = { version = "41" }
datafusion-common = { version = "41" }
datafusion-proto = { version = "41" }
datafusion-sql = { version = "41" }
datafusion-physical-expr = { version = "41" }
datafusion-physical-plan = { version = "41" }
datafusion-functions = { version = "41" }
datafusion-functions-aggregate = { version = "41" }
datafusion = { version = "42" }
datafusion-expr = { version = "42" }
datafusion-common = { version = "42" }
datafusion-proto = { version = "42" }
datafusion-sql = { version = "42" }
datafusion-physical-expr = { version = "42" }
datafusion-physical-plan = { version = "42" }
datafusion-functions = { version = "42" }
datafusion-functions-aggregate = { version = "42" }

# serde
serde = { version = "1.0.194", features = ["derive"] }
Expand Down Expand Up @@ -91,10 +91,30 @@ datafusion-functions-window = { git = 'https://github.com/hstack/arrow-datafusio
datafusion-optimizer = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
datafusion-physical-expr = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
datafusion-physical-expr-common = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
datafusion-physical-expr-functions-aggregate = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
datafusion-physical-optimizer = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
datafusion-physical-plan = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
datafusion-proto = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
datafusion-proto-common = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
datafusion-sql = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
datafusion-substrait = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
# datafusion = {path = "../arrow-datafusion-42/datafusion/core" }
# datafusion-catalog = {path = "../arrow-datafusion-42/datafusion/catalog" }
# datafusion-common = {path = "../arrow-datafusion-42/datafusion/common" }
# datafusion-common-runtime = {path = "../arrow-datafusion-42/datafusion/common-runtime" }
# datafusion-execution = {path = "../arrow-datafusion-42/datafusion/execution" }
# datafusion-expr = {path = "../arrow-datafusion-42/datafusion/expr" }
# datafusion-expr-common = {path = "../arrow-datafusion-42/datafusion/expr-common" }
# datafusion-functions = {path = "../arrow-datafusion-42/datafusion/functions" }
# datafusion-functions-aggregate = {path = "../arrow-datafusion-42/datafusion/functions-aggregate" }
# datafusion-functions-aggregate-common = {path = "../arrow-datafusion-42/datafusion/functions-aggregate-common" }
# datafusion-functions-nested = {path = "../arrow-datafusion-42/datafusion/functions-nested" }
# datafusion-functions-window = {path = "../arrow-datafusion-42/datafusion/functions-window" }
# datafusion-optimizer = {path = "../arrow-datafusion-42/datafusion/optimizer" }
# datafusion-physical-expr = {path = "../arrow-datafusion-42/datafusion/physical-expr" }
# datafusion-physical-expr-common = {path = "../arrow-datafusion-42/datafusion/physical-expr-common" }
# datafusion-physical-optimizer = {path = "../arrow-datafusion-42/datafusion/physical-optimizer" }
# datafusion-physical-plan = {path = "../arrow-datafusion-42/datafusion/physical-plan" }
# datafusion-proto = {path = "../arrow-datafusion-42/datafusion/proto" }
# datafusion-proto-common = {path = "../arrow-datafusion-42/datafusion/proto-common" }
# datafusion-sql = {path = "../arrow-datafusion-42/datafusion/sql" }
# datafusion-substrait = {path = "../arrow-datafusion-42/datafusion/substrait" }
8 changes: 7 additions & 1 deletion crates/core/src/data_catalog/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const DELTA_LOG_FOLDER: &str = "_delta_log";
///
/// assuming it contains valid deltalake data, i.e a `_delta_log` folder:
/// s3://host.example.com:3000/data/tpch/customer/_delta_log/
#[derive(Debug)]
pub struct ListingSchemaProvider {
authority: String,
/// Underlying object store
Expand Down Expand Up @@ -60,7 +61,12 @@ impl ListingSchemaProvider {

/// Reload table information from ObjectStore
pub async fn refresh(&self) -> datafusion_common::Result<()> {
let entries: Vec<_> = self.store.list(None).try_collect().await?;
let tmp1 = self.store.list(None);

let entries: Vec<ObjectMeta> = self.store.list(None)
.try_collect()
.await
.map_err(|e| DataFusionError::ObjectStore(e))?;
let mut tables = HashSet::new();
for file in entries.iter() {
let mut parent = Path::new(file.location.as_ref());
Expand Down
3 changes: 3 additions & 0 deletions crates/core/src/data_catalog/unity/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::data_catalog::models::ListSchemasResponse;
use crate::DeltaTableBuilder;

/// In-memory list of catalogs populated by unity catalog
#[derive(Debug)]
pub struct UnityCatalogList {
/// Collection of catalogs containing schemas and ultimately TableProviders
pub catalogs: DashMap<String, Arc<dyn CatalogProvider>>,
Expand Down Expand Up @@ -73,6 +74,7 @@ impl CatalogProviderList for UnityCatalogList {
}

/// A datafusion [`CatalogProvider`] backed by Databricks UnityCatalog
#[derive(Debug)]
pub struct UnityCatalogProvider {
/// Parent catalog for schemas of interest.
pub schemas: DashMap<String, Arc<dyn SchemaProvider>>,
Expand Down Expand Up @@ -124,6 +126,7 @@ impl CatalogProvider for UnityCatalogProvider {
}

/// A datafusion [`SchemaProvider`] backed by Databricks UnityCatalog
#[derive(Debug)]
pub struct UnitySchemaProvider {
/// UnityCatalog Api client
client: Arc<UnityCatalog>,
Expand Down
7 changes: 7 additions & 0 deletions crates/core/src/delta_datafusion/find_files/logical.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::cmp::Ordering;
use std::collections::HashSet;
use std::hash::{Hash, Hasher};

Expand Down Expand Up @@ -63,6 +64,12 @@ impl Hash for FindFilesNode {
}
}

impl PartialOrd for FindFilesNode {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
None
}
}

impl UserDefinedLogicalNodeCore for FindFilesNode {
fn name(&self) -> &str {
"FindFiles"
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/delta_datafusion/find_files/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ lazy_static! {
#[derive(Default)]
struct FindFilesPlannerExtension {}

#[derive(Default)]
#[derive(Default, Debug)]
struct FindFilesPlanner {}

#[async_trait]
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/delta_datafusion/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use datafusion_expr::{LogicalPlan, UserDefinedLogicalNodeCore};
// Metric Observer is used to update DataFusion metrics from a record batch.
// See MetricObserverExec for the physical implementation

#[derive(Debug, Hash, Eq, PartialEq)]
#[derive(Debug, Hash, Eq, PartialEq, PartialOrd)]
pub(crate) struct MetricObserver {
// id is preserved during conversion to physical node
pub id: String,
Expand Down
11 changes: 8 additions & 3 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
//! ```
use std::any::Any;
use std::borrow::Cow;
use std::collections::{HashMap, HashSet};
use std::fmt::{self, Debug};
use std::sync::Arc;
Expand All @@ -42,6 +43,8 @@ use datafusion::datasource::physical_plan::parquet::ParquetExecBuilder;
use datafusion::datasource::physical_plan::{
wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig, ParquetExec,
};

use datafusion::datasource::schema_adapter::DefaultSchemaAdapterFactory;
use datafusion::datasource::{listing::PartitionedFile, MemTable, TableProvider, TableType};
use datafusion::execution::context::{SessionConfig, SessionContext, SessionState, TaskContext};
use datafusion::execution::runtime_env::RuntimeEnv;
Expand Down Expand Up @@ -69,7 +72,7 @@ use datafusion_sql::planner::ParserOptions;
use either::Either;
use futures::TryStreamExt;
use itertools::Itertools;
use object_store::ObjectMeta;
use object_store::{Error, ObjectMeta};
use serde::{Deserialize, Serialize};
use url::Url;

Expand Down Expand Up @@ -727,7 +730,7 @@ impl TableProvider for DeltaTable {
None
}

fn get_logical_plan(&self) -> Option<&LogicalPlan> {
fn get_logical_plan(&self) -> Option<Cow<LogicalPlan>> {
None
}

Expand Down Expand Up @@ -791,6 +794,7 @@ impl TableProvider for DeltaTable {
}

/// A Delta table provider that enables additional metadata columns to be included during the scan
#[derive(Debug)]
pub struct DeltaTableProvider {
snapshot: DeltaTableState,
log_store: LogStoreRef,
Expand Down Expand Up @@ -840,7 +844,7 @@ impl TableProvider for DeltaTableProvider {
None
}

fn get_logical_plan(&self) -> Option<&LogicalPlan> {
fn get_logical_plan(&self) -> Option<Cow<LogicalPlan>> {
None
}

Expand Down Expand Up @@ -1442,6 +1446,7 @@ impl LogicalExtensionCodec for DeltaLogicalCodec {
}

/// Responsible for creating deltatables
#[derive(Debug)]
pub struct DeltaTableFactory {}

#[async_trait]
Expand Down
10 changes: 9 additions & 1 deletion crates/core/src/delta_datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
//! };
//!
//! let state = state.with_query_planner(Arc::new(merge_planner));
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

use async_trait::async_trait;
Expand All @@ -32,7 +34,7 @@ use datafusion::{
physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner},
};
use datafusion_expr::LogicalPlan;

use serde::Serializer;
use crate::delta_datafusion::DataFusionResult;

/// Deltaplanner
Expand All @@ -41,6 +43,12 @@ pub struct DeltaPlanner<T: ExtensionPlanner> {
pub extension_planner: T,
}

impl<T: ExtensionPlanner> Debug for DeltaPlanner<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "DeltaPlanner")
}
}

#[async_trait]
impl<T: ExtensionPlanner + Send + Sync + 'static + Clone> QueryPlanner for DeltaPlanner<T> {
async fn create_physical_plan(
Expand Down
19 changes: 12 additions & 7 deletions crates/core/src/delta_datafusion/schema_adapter.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::fmt::Debug;
use std::sync::Arc;

use arrow_array::RecordBatch;
use arrow_schema::{Schema, SchemaRef};
use arrow_array::{new_null_array, RecordBatch, RecordBatchOptions};
use arrow_cast::{can_cast_types, cast};
use arrow_schema::{Field, Schema, SchemaRef};
use datafusion::datasource::schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper};

use datafusion_common::plan_err;
use crate::operations::cast::cast_record_batch;

/// A Schema Adapter Factory which provides casting record batches from parquet to meet
Expand All @@ -13,21 +14,23 @@ use crate::operations::cast::cast_record_batch;
pub(crate) struct DeltaSchemaAdapterFactory {}

impl SchemaAdapterFactory for DeltaSchemaAdapterFactory {
fn create(&self, schema: SchemaRef) -> Box<dyn SchemaAdapter> {
fn create(&self, projected_table_schema: SchemaRef, table_schema: SchemaRef) -> Box<dyn SchemaAdapter> {
Box::new(DeltaSchemaAdapter {
table_schema: schema,
projected_table_schema,
table_schema,
})
}
}

pub(crate) struct DeltaSchemaAdapter {
projected_table_schema: SchemaRef,
/// Schema for the table
table_schema: SchemaRef,
}

impl SchemaAdapter for DeltaSchemaAdapter {
fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize> {
let field = self.table_schema.field(index);
let field = self.projected_table_schema.field(index);
Some(file_schema.fields.find(field.name())?.0)
}

Expand All @@ -45,6 +48,7 @@ impl SchemaAdapter for DeltaSchemaAdapter {

Ok((
Arc::new(SchemaMapping {
projected_table_schema: self.projected_table_schema.clone(),
table_schema: self.table_schema.clone(),
}),
projection,
Expand All @@ -54,12 +58,13 @@ impl SchemaAdapter for DeltaSchemaAdapter {

#[derive(Debug)]
pub(crate) struct SchemaMapping {
projected_table_schema: SchemaRef,
table_schema: SchemaRef,
}

impl SchemaMapper for SchemaMapping {
fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result<RecordBatch> {
let record_batch = cast_record_batch(&batch, self.table_schema.clone(), false, true)?;
let record_batch = cast_record_batch(&batch, self.projected_table_schema.clone(), false, true)?;
Ok(record_batch)
}

Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/kernel/scalars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ impl ScalarExt for Scalar {
Self::Binary(val) => create_escaped_binary_string(val.as_slice()),
Self::Null(_) => "null".to_string(),
Self::Struct(_) => unimplemented!(),
Self::Array(_) => unimplemented!()
}
}

Expand Down Expand Up @@ -269,6 +270,7 @@ impl ScalarExt for Scalar {
Self::Binary(val) => Value::String(create_escaped_binary_string(val.as_slice())),
Self::Null(_) => Value::Null,
Self::Struct(_) => unimplemented!(),
Self::Array(v) => unimplemented!(),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/kernel/snapshot/log_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ impl LogSegment {
config: &DeltaTableConfig,
) -> DeltaResult<impl Iterator<Item = Result<RecordBatch, DeltaTableError>> + '_> {
let log_path = table_root.child("_delta_log");
let tmp = Arc::new(read_schema.clone());
let mut decoder = json::get_decoder(Arc::new(read_schema.try_into()?), config)?;

let mut commit_data = Vec::new();
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/operations/merge/barrier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ impl RecordBatchStream for MergeBarrierStream {
}
}

#[derive(Debug, Hash, Eq, PartialEq)]
#[derive(Debug, Hash, Eq, PartialEq, PartialOrd)]
pub(crate) struct MergeBarrier {
pub input: LogicalPlan,
pub expr: Expr,
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/writer/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ mod tests {
Some($value),
Some($value),
None,
0,
Some(0),
false,
))
};
Expand Down
2 changes: 1 addition & 1 deletion crates/hdfs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ rust-version.workspace = true

[dependencies]
deltalake-core = { version = "0.21.0", path = "../core" }
hdfs-native-object-store = "0.11"
hdfs-native-object-store = "0.12.1"

# workspace dependecies
object_store = { workspace = true }
Expand Down
4 changes: 2 additions & 2 deletions crates/hdfs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use deltalake_core::logstore::{default_logstore, logstores, LogStore, LogStoreFa
use deltalake_core::storage::{
factories, url_prefix_handler, ObjectStoreFactory, ObjectStoreRef, StorageOptions,
};
use deltalake_core::{DeltaResult, Path};
use deltalake_core::{DeltaResult, DeltaTableError, Path};
use hdfs_native_object_store::HdfsObjectStore;
use url::Url;

Expand All @@ -20,7 +20,7 @@ impl ObjectStoreFactory for HdfsFactory {
let store: ObjectStoreRef = Arc::new(HdfsObjectStore::with_config(
url.as_str(),
options.0.clone(),
)?);
).map_err(|e| DeltaTableError::ObjectStore { source: e})?);
let prefix = Path::parse(url.path())?;
Ok((url_prefix_handler(store, prefix.clone()), prefix))
}
Expand Down
Loading

0 comments on commit 7a6094b

Please sign in to comment.