Skip to content

Commit

Permalink
Datafusion 43-hstack
Browse files Browse the repository at this point in the history
  • Loading branch information
ccciudatu committed Dec 11, 2024
1 parent 28162a5 commit 908a289
Show file tree
Hide file tree
Showing 8 changed files with 18 additions and 7 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ datafusion-common-runtime = { git = 'https://github.com/hstack/arrow-datafusion.
datafusion-execution = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
datafusion-expr = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
datafusion-expr-common = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
datafusion-ffi = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
datafusion-functions = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
datafusion-functions-aggregate = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
datafusion-functions-aggregate-common = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
Expand All @@ -92,7 +93,6 @@ datafusion-functions-window = { git = 'https://github.com/hstack/arrow-datafusio
datafusion-optimizer = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
datafusion-physical-expr = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
datafusion-physical-expr-common = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
datafusion-physical-expr-functions-aggregate = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
datafusion-physical-optimizer = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
datafusion-physical-plan = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
datafusion-proto = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
Expand Down
7 changes: 6 additions & 1 deletion crates/core/src/data_catalog/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,12 @@ impl ListingSchemaProvider {

/// Reload table information from ObjectStore
pub async fn refresh(&self) -> datafusion_common::Result<()> {
let entries: Vec<_> = self.store.list(None).try_collect().await?;
let tmp1 = self.store.list(None);

let entries: Vec<ObjectMeta> = self.store.list(None)
.try_collect()
.await
.map_err(|e| DataFusionError::ObjectStore(e))?;
let mut tables = HashSet::new();
for file in entries.iter() {
let mut parent = Path::new(file.location.as_ref());
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/delta_datafusion/cdf/scan_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ pub fn create_partition_values<F: FileAction>(
extensions: None,
range: None,
statistics: None,
metadata_size_hint: None,
};

file_groups.entry(new_part_values).or_default().push(part);
Expand Down
6 changes: 5 additions & 1 deletion crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ use datafusion::datasource::physical_plan::parquet::ParquetExecBuilder;
use datafusion::datasource::physical_plan::{
wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig, ParquetExec,
};

use datafusion::datasource::schema_adapter::DefaultSchemaAdapterFactory;
use datafusion::datasource::{listing::PartitionedFile, MemTable, TableProvider, TableType};
use datafusion::execution::context::{SessionConfig, SessionContext, SessionState, TaskContext};
use datafusion::execution::runtime_env::RuntimeEnv;
Expand Down Expand Up @@ -70,7 +72,7 @@ use datafusion_sql::planner::ParserOptions;
use either::Either;
use futures::TryStreamExt;
use itertools::Itertools;
use object_store::ObjectMeta;
use object_store::{Error, ObjectMeta};
use serde::{Deserialize, Serialize};
use url::Url;

Expand Down Expand Up @@ -1121,6 +1123,7 @@ fn partitioned_file_from_action(
range: None,
extensions: None,
statistics: None,
metadata_size_hint: None,
}
}

Expand Down Expand Up @@ -2024,6 +2027,7 @@ mod tests {
range: None,
extensions: None,
statistics: None,
metadata_size_hint: None,
};
assert_eq!(file.partition_values, ref_file.partition_values)
}
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/delta_datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
//! };
//!
//! let state = state.with_query_planner(Arc::new(merge_planner));
use std::fmt::Debug;
use std::sync::Arc;

use async_trait::async_trait;
Expand All @@ -32,7 +33,6 @@ use datafusion::{
physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner},
};
use datafusion_expr::LogicalPlan;

use crate::delta_datafusion::DataFusionResult;

/// Deltaplanner
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/delta_datafusion/schema_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub(crate) struct DeltaSchemaAdapter {

impl SchemaAdapter for DeltaSchemaAdapter {
fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize> {
let field = self.table_schema.field(index);
let field = self.projected_table_schema.field(index);
Some(file_schema.fields.find(field.name())?.0)
}

Expand Down
1 change: 1 addition & 0 deletions crates/core/src/kernel/snapshot/log_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ impl LogSegment {
config: &DeltaTableConfig,
) -> DeltaResult<impl Iterator<Item = Result<RecordBatch, DeltaTableError>> + '_> {
let log_path = table_root.child("_delta_log");
let tmp = Arc::new(read_schema.clone());
let mut decoder = json::get_decoder(Arc::new(read_schema.try_into()?), config)?;

let mut commit_data = Vec::new();
Expand Down
4 changes: 2 additions & 2 deletions crates/hdfs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use deltalake_core::logstore::{default_logstore, logstores, LogStore, LogStoreFa
use deltalake_core::storage::{
factories, url_prefix_handler, ObjectStoreFactory, ObjectStoreRef, StorageOptions,
};
use deltalake_core::{DeltaResult, Path};
use deltalake_core::{DeltaResult, DeltaTableError, Path};
use hdfs_native_object_store::HdfsObjectStore;
use url::Url;

Expand All @@ -20,7 +20,7 @@ impl ObjectStoreFactory for HdfsFactory {
let store: ObjectStoreRef = Arc::new(HdfsObjectStore::with_config(
url.as_str(),
options.0.clone(),
)?);
).map_err(|e| DeltaTableError::ObjectStore { source: e})?);
let prefix = Path::parse(url.path())?;
Ok((url_prefix_handler(store, prefix.clone()), prefix))
}
Expand Down

0 comments on commit 908a289

Please sign in to comment.