Skip to content

Commit 423aab8

Browse files
committed
chore: upgrade to datafusion 41
1 parent a0f9c18 commit 423aab8

File tree

27 files changed

+156
-130
lines changed

27 files changed

+156
-130
lines changed

Cargo.toml

+8-8
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,14 @@ object_store = { version = "0.10.1" }
4545
parquet = { version = "52" }
4646

4747
# datafusion
48-
datafusion = { version = "40" }
49-
datafusion-expr = { version = "40" }
50-
datafusion-common = { version = "40" }
51-
datafusion-proto = { version = "40" }
52-
datafusion-sql = { version = "40" }
53-
datafusion-physical-expr = { version = "40" }
54-
datafusion-functions = { version = "40" }
55-
datafusion-functions-array = { version = "40" }
48+
datafusion = { version = "41" }
49+
datafusion-expr = { version = "41" }
50+
datafusion-common = { version = "41" }
51+
datafusion-proto = { version = "41" }
52+
datafusion-sql = { version = "41" }
53+
datafusion-physical-expr = { version = "41" }
54+
datafusion-functions = { version = "41" }
55+
datafusion-functions-aggregate = { version = "41" }
5656

5757
# serde
5858
serde = { version = "1.0.194", features = ["derive"] }

crates/core/Cargo.toml

+2-2
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ datafusion-proto = { workspace = true, optional = true }
4343
datafusion-sql = { workspace = true, optional = true }
4444
datafusion-physical-expr = { workspace = true, optional = true }
4545
datafusion-functions = { workspace = true, optional = true }
46-
datafusion-functions-array = { workspace = true, optional = true }
46+
datafusion-functions-aggregate = { workspace = true, optional = true }
4747

4848
# serde
4949
serde = { workspace = true, features = ["derive"] }
@@ -129,7 +129,7 @@ datafusion = [
129129
"datafusion-physical-expr",
130130
"datafusion-sql",
131131
"datafusion-functions",
132-
"datafusion-functions-array",
132+
"datafusion-functions-aggregate",
133133
"sqlparser",
134134
]
135135
datafusion-ext = ["datafusion"]

crates/core/src/data_catalog/storage/mod.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::sync::Arc;
66

77
use async_trait::async_trait;
88
use dashmap::DashMap;
9-
use datafusion::catalog::schema::SchemaProvider;
9+
use datafusion::catalog::SchemaProvider;
1010
use datafusion::datasource::TableProvider;
1111
use datafusion_common::DataFusionError;
1212
use futures::TryStreamExt;
@@ -147,7 +147,8 @@ impl SchemaProvider for ListingSchemaProvider {
147147
mod tests {
148148
use super::*;
149149
use datafusion::assert_batches_sorted_eq;
150-
use datafusion::catalog::{CatalogProvider, MemoryCatalogProvider};
150+
use datafusion::catalog::CatalogProvider;
151+
use datafusion::catalog_common::MemoryCatalogProvider;
151152
use datafusion::execution::context::SessionContext;
152153

153154
#[test]

crates/core/src/data_catalog/unity/datafusion.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::collections::HashMap;
55
use std::sync::Arc;
66

77
use dashmap::DashMap;
8-
use datafusion::catalog::schema::SchemaProvider;
8+
use datafusion::catalog::SchemaProvider;
99
use datafusion::catalog::{CatalogProvider, CatalogProviderList};
1010
use datafusion::datasource::TableProvider;
1111
use datafusion_common::DataFusionError;

crates/core/src/data_catalog/unity/models.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -252,8 +252,8 @@ pub enum TableType {
252252
StreamingTable,
253253
}
254254

255-
///
256255
#[derive(Deserialize)]
256+
/// Summary of the table
257257
pub struct TableSummary {
258258
/// The full name of the table.
259259
pub full_name: String,

crates/core/src/delta_datafusion/expr.rs

+41-15
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,13 @@ use std::{
2929
use arrow_schema::DataType;
3030
use chrono::{DateTime, NaiveDate};
3131
use datafusion::execution::context::SessionState;
32+
use datafusion::execution::session_state::SessionStateBuilder;
3233
use datafusion::execution::FunctionRegistry;
3334
use datafusion_common::Result as DFResult;
3435
use datafusion_common::{config::ConfigOptions, DFSchema, Result, ScalarValue, TableReference};
3536
use datafusion_expr::{
36-
expr::InList, AggregateUDF, Between, BinaryExpr, Cast, Expr, Like, TableSource,
37+
expr::InList, planner::ExprPlanner, AggregateUDF, Between, BinaryExpr, Cast, Expr, Like,
38+
TableSource,
3739
};
3840
use datafusion_sql::planner::{ContextProvider, SqlToRel};
3941
use datafusion_sql::sqlparser::ast::escape_quoted_string;
@@ -46,14 +48,44 @@ use crate::{DeltaResult, DeltaTableError};
4648
use super::DeltaParserOptions;
4749

4850
pub(crate) struct DeltaContextProvider<'a> {
49-
state: &'a SessionState,
51+
state: SessionState,
52+
/// Keeping this around just to make use of the 'a lifetime
53+
_original: &'a SessionState,
54+
planners: Vec<Arc<dyn ExprPlanner>>,
55+
}
56+
57+
impl<'a> DeltaContextProvider<'a> {
58+
fn new(state: &'a SessionState) -> Self {
59+
let planners = state.expr_planners();
60+
DeltaContextProvider {
61+
planners,
62+
// Creating a new session state with overridden scalar_functions since
63+
// the get_field() UDF was dropped from the default scalar functions upstream in
64+
// `36660fe10d9c0cdff62e0da0b94bee28422d3419`
65+
state: SessionStateBuilder::new_from_existing(state.clone())
66+
.with_scalar_functions(
67+
state
68+
.scalar_functions()
69+
.values()
70+
.cloned()
71+
.chain(std::iter::once(datafusion::functions::core::get_field()))
72+
.collect(),
73+
)
74+
.build(),
75+
_original: state,
76+
}
77+
}
5078
}
5179

5280
impl<'a> ContextProvider for DeltaContextProvider<'a> {
5381
fn get_table_source(&self, _name: TableReference) -> DFResult<Arc<dyn TableSource>> {
5482
unimplemented!()
5583
}
5684

85+
fn get_expr_planners(&self) -> &[Arc<dyn ExprPlanner>] {
86+
self.planners.as_slice()
87+
}
88+
5789
fn get_function_meta(&self, name: &str) -> Option<Arc<datafusion_expr::ScalarUDF>> {
5890
self.state.scalar_functions().get(name).cloned()
5991
}
@@ -75,15 +107,15 @@ impl<'a> ContextProvider for DeltaContextProvider<'a> {
75107
}
76108

77109
fn udf_names(&self) -> Vec<String> {
78-
unimplemented!()
110+
self.state.scalar_functions().keys().cloned().collect()
79111
}
80112

81113
fn udaf_names(&self) -> Vec<String> {
82-
unimplemented!()
114+
self.state.aggregate_functions().keys().cloned().collect()
83115
}
84116

85117
fn udwf_names(&self) -> Vec<String> {
86-
unimplemented!()
118+
self.state.window_functions().keys().cloned().collect()
87119
}
88120
}
89121

@@ -107,16 +139,10 @@ pub(crate) fn parse_predicate_expression(
107139
source: Box::new(err),
108140
})?;
109141

110-
let context_provider = DeltaContextProvider { state: df_state };
111-
let mut sql_to_rel =
142+
let context_provider = DeltaContextProvider::new(df_state);
143+
let sql_to_rel =
112144
SqlToRel::new_with_options(&context_provider, DeltaParserOptions::default().into());
113145

114-
// NOTE: This can be probably removed with Datafusion 41 once
115-
// <https://github.com/apache/datafusion/pull/11485> is released
116-
for planner in context_provider.state.expr_planners() {
117-
sql_to_rel = sql_to_rel.with_user_defined_planner(planner.clone());
118-
}
119-
120146
Ok(sql_to_rel.sql_to_expr(sql, schema, &mut Default::default())?)
121147
}
122148

@@ -401,6 +427,8 @@ impl<'a> fmt::Display for ScalarValueFormat<'a> {
401427
#[cfg(test)]
402428
mod test {
403429
use arrow_schema::DataType as ArrowDataType;
430+
use datafusion::functions_array::expr_fn::cardinality;
431+
use datafusion::functions_nested::expr_ext::{IndexAccessor, SliceAccessor};
404432
use datafusion::prelude::SessionContext;
405433
use datafusion_common::{Column, ScalarValue, ToDFSchema};
406434
use datafusion_expr::expr::ScalarFunction;
@@ -409,8 +437,6 @@ mod test {
409437
use datafusion_functions::core::expr_ext::FieldAccessor;
410438
use datafusion_functions::encoding::expr_fn::decode;
411439
use datafusion_functions::expr_fn::substring;
412-
use datafusion_functions_array::expr_ext::{IndexAccessor, SliceAccessor};
413-
use datafusion_functions_array::expr_fn::cardinality;
414440

415441
use crate::delta_datafusion::{DataFusionMixins, DeltaSessionContext};
416442
use crate::kernel::{ArrayType, DataType, PrimitiveType, StructField, StructType};

crates/core/src/delta_datafusion/find_files/mod.rs

+6-3
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,10 @@ lazy_static! {
4141
ONLY_FILES_SCHEMA.clone().to_dfschema_ref().unwrap();
4242
}
4343

44+
#[derive(Default)]
4445
struct FindFilesPlannerExtension {}
4546

47+
#[derive(Default)]
4648
struct FindFilesPlanner {}
4749

4850
#[async_trait]
@@ -188,6 +190,7 @@ async fn scan_table_by_files(
188190
pub mod tests {
189191
use std::sync::Arc;
190192

193+
use datafusion::execution::session_state::SessionStateBuilder;
191194
use datafusion::prelude::{DataFrame, SessionContext};
192195
use datafusion_common::{assert_batches_eq, assert_batches_sorted_eq};
193196
use datafusion_expr::{col, lit, Expr, Extension, LogicalPlan};
@@ -202,9 +205,9 @@ pub mod tests {
202205
expr: Expr,
203206
) -> Result<Vec<arrow_array::RecordBatch>, DeltaTableError> {
204207
let ctx = SessionContext::new();
205-
let state = ctx
206-
.state()
207-
.with_query_planner(Arc::new(FindFilesPlanner {}));
208+
let state = SessionStateBuilder::new_from_existing(ctx.state())
209+
.with_query_planner(Arc::new(FindFilesPlanner::default()))
210+
.build();
208211
let find_files_node = LogicalPlan::Extension(Extension {
209212
node: Arc::new(FindFilesNode::new(
210213
"my_cool_plan".into(),

crates/core/src/delta_datafusion/mod.rs

+8-9
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,12 @@ use arrow_cast::display::array_value_to_string;
3939
use arrow_schema::Field;
4040
use async_trait::async_trait;
4141
use chrono::{DateTime, TimeZone, Utc};
42+
use datafusion::catalog::{Session, TableProviderFactory};
4243
use datafusion::config::TableParquetOptions;
4344
use datafusion::datasource::physical_plan::parquet::ParquetExecBuilder;
4445
use datafusion::datasource::physical_plan::{
4546
wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig,
4647
};
47-
use datafusion::datasource::provider::TableProviderFactory;
4848
use datafusion::datasource::{listing::PartitionedFile, MemTable, TableProvider, TableType};
4949
use datafusion::execution::context::{SessionConfig, SessionContext, SessionState, TaskContext};
5050
use datafusion::execution::runtime_env::RuntimeEnv;
@@ -465,12 +465,11 @@ pub struct DeltaScanConfig {
465465
pub schema: Option<SchemaRef>,
466466
}
467467

468-
#[derive(Debug)]
469468
pub(crate) struct DeltaScanBuilder<'a> {
470469
snapshot: &'a DeltaTableState,
471470
log_store: LogStoreRef,
472471
filter: Option<Expr>,
473-
state: &'a SessionState,
472+
session: &'a dyn Session,
474473
projection: Option<&'a Vec<usize>>,
475474
limit: Option<usize>,
476475
files: Option<&'a [Add]>,
@@ -481,13 +480,13 @@ impl<'a> DeltaScanBuilder<'a> {
481480
pub fn new(
482481
snapshot: &'a DeltaTableState,
483482
log_store: LogStoreRef,
484-
state: &'a SessionState,
483+
session: &'a dyn Session,
485484
) -> Self {
486485
DeltaScanBuilder {
487486
snapshot,
488487
log_store,
489488
filter: None,
490-
state,
489+
session,
491490
projection: None,
492491
limit: None,
493492
files: None,
@@ -648,7 +647,7 @@ impl<'a> DeltaScanBuilder<'a> {
648647
.unwrap_or(Statistics::new_unknown(&schema));
649648

650649
let parquet_options = TableParquetOptions {
651-
global: self.state.config().options().execution.parquet.clone(),
650+
global: self.session.config().options().execution.parquet.clone(),
652651
..Default::default()
653652
};
654653

@@ -717,7 +716,7 @@ impl TableProvider for DeltaTable {
717716

718717
async fn scan(
719718
&self,
720-
session: &SessionState,
719+
session: &dyn Session,
721720
projection: Option<&Vec<usize>>,
722721
filters: &[Expr],
723722
limit: Option<usize>,
@@ -806,7 +805,7 @@ impl TableProvider for DeltaTableProvider {
806805

807806
async fn scan(
808807
&self,
809-
session: &SessionState,
808+
session: &dyn Session,
810809
projection: Option<&Vec<usize>>,
811810
filters: &[Expr],
812811
limit: Option<usize>,
@@ -1377,7 +1376,7 @@ pub struct DeltaTableFactory {}
13771376
impl TableProviderFactory for DeltaTableFactory {
13781377
async fn create(
13791378
&self,
1380-
_ctx: &SessionState,
1379+
_ctx: &dyn Session,
13811380
cmd: &CreateExternalTable,
13821381
) -> datafusion::error::Result<Arc<dyn TableProvider>> {
13831382
let provider = if cmd.options.is_empty() {

crates/core/src/kernel/arrow/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ pub(crate) fn delta_log_schema_for_table(
250250
.iter()
251251
.for_each(|f| max_min_schema_for_fields(&mut max_min_vec, f));
252252

253-
if max_min_vec.len() > 0 {
253+
if !max_min_vec.is_empty() {
254254
stats_parsed_fields.extend(["minValues", "maxValues"].into_iter().map(|name| {
255255
ArrowField::new(
256256
name,

crates/core/src/kernel/error.rs

-10
Original file line numberDiff line numberDiff line change
@@ -71,13 +71,3 @@ pub enum Error {
7171
#[error("Failed to parse value '{0}' as '{1}'")]
7272
Parse(String, DataType),
7373
}
74-
75-
#[cfg(feature = "object_store")]
76-
impl From<object_store::Error> for Error {
77-
fn from(value: object_store::Error) -> Self {
78-
match value {
79-
object_store::Error::NotFound { path, .. } => Self::FileNotFound(path),
80-
err => Self::ObjectStore(err),
81-
}
82-
}
83-
}

0 commit comments

Comments
 (0)