Skip to content

Commit

Permalink
[HSTACK] Add support for deep projections
Browse files Browse the repository at this point in the history
  • Loading branch information
adragomir authored and aditanase committed Jan 9, 2025
1 parent 27b053b commit 5ad6370
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 0 deletions.
57 changes: 57 additions & 0 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ pub(crate) struct DeltaScanBuilder<'a> {
filter: Option<Expr>,
session: &'a dyn Session,
projection: Option<&'a Vec<usize>>,
projection_deep: Option<&'a HashMap<usize, Vec<String>>>,
limit: Option<usize>,
files: Option<&'a [Add]>,
config: Option<DeltaScanConfig>,
Expand All @@ -478,6 +479,7 @@ impl<'a> DeltaScanBuilder<'a> {
filter: None,
session,
projection: None,
projection_deep: None,
limit: None,
files: None,
config: None,
Expand All @@ -499,6 +501,12 @@ impl<'a> DeltaScanBuilder<'a> {
self
}

pub fn with_projection_deep(mut self, projection_deep: Option<&'a HashMap<usize, Vec<String>>>) -> Self {
self.projection_deep = projection_deep;
self
}


pub fn with_limit(mut self, limit: Option<usize>) -> Self {
self.limit = limit;
self
Expand Down Expand Up @@ -655,6 +663,7 @@ impl<'a> DeltaScanBuilder<'a> {
},
statistics: stats,
projection: self.projection.cloned(),
projection_deep: self.projection_deep.cloned(),
limit: self.limit,
table_partition_cols,
output_ordering: vec![],
Expand Down Expand Up @@ -732,6 +741,30 @@ impl TableProvider for DeltaTable {
Ok(Arc::new(scan))
}

async fn scan_deep(
&self,
state: &dyn Session,
projection: Option<&Vec<usize>>,
projection_deep: Option<&HashMap<usize, Vec<String>>>,
filters: &[Expr],
limit: Option<usize>,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
let session_state = state.as_any().downcast_ref::<SessionState>().unwrap();

register_store(self.log_store(), session_state.runtime_env().clone());
let filter_expr = conjunction(filters.iter().cloned());

let scan = DeltaScanBuilder::new(self.snapshot()?, self.log_store(), session_state)
.with_projection(projection)
.with_projection_deep(projection_deep)
.with_limit(limit)
.with_filter(filter_expr)
.build()
.await?;

Ok(Arc::new(scan))
}

fn supports_filters_pushdown(
&self,
filter: &[&Expr],
Expand Down Expand Up @@ -824,6 +857,30 @@ impl TableProvider for DeltaTableProvider {
Ok(Arc::new(scan.build().await?))
}

async fn scan_deep(
&self,
state: &dyn Session,
projection: Option<&Vec<usize>>,
projection_deep: Option<&HashMap<usize, Vec<String>>>,
filters: &[Expr],
limit: Option<usize>,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
let session_state = state.as_any().downcast_ref::<SessionState>().unwrap();
register_store(self.log_store.clone(), session_state.runtime_env().clone());
let filter_expr = conjunction(filters.iter().cloned());

let scan = DeltaScanBuilder::new(&self.snapshot, self.log_store.clone(), session_state)
.with_projection(projection)
.with_projection_deep(projection_deep)
.with_limit(limit)
.with_filter(filter_expr)
.with_scan_config(self.config.clone())
.build()
.await?;

Ok(Arc::new(scan))
}

fn supports_filters_pushdown(
&self,
filter: &[&Expr],
Expand Down
3 changes: 3 additions & 0 deletions crates/core/src/operations/load_cdf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ impl CdfLoadBuilder {
file_groups: cdc_file_groups.into_values().collect(),
statistics: Statistics::new_unknown(&cdc_file_schema),
projection: None,
projection_deep: None,
limit: None,
table_partition_cols: cdc_partition_cols,
output_ordering: vec![],
Expand All @@ -381,6 +382,7 @@ impl CdfLoadBuilder {
file_groups: add_file_groups.into_values().collect(),
statistics: Statistics::new_unknown(&add_remove_file_schema.clone()),
projection: None,
projection_deep: None,
limit: None,
table_partition_cols: add_remove_partition_cols.clone(),
output_ordering: vec![],
Expand All @@ -398,6 +400,7 @@ impl CdfLoadBuilder {
file_groups: remove_file_groups.into_values().collect(),
statistics: Statistics::new_unknown(&add_remove_file_schema),
projection: None,
projection_deep: None,
limit: None,
table_partition_cols: add_remove_partition_cols,
output_ordering: vec![],
Expand Down

0 comments on commit 5ad6370

Please sign in to comment.