Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions datafusion/catalog/src/cte_worktable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::{any::Any, borrow::Cow};
use crate::Session;
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use datafusion_common::{assert_or_internal_err, DataFusionError};
use datafusion_physical_plan::work_table::WorkTableExec;

use datafusion_physical_plan::ExecutionPlan;
Expand Down Expand Up @@ -86,15 +87,20 @@ impl TableProvider for CteWorkTable {
async fn scan(
&self,
_state: &dyn Session,
_projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
// TODO: pushdown filters and limits
assert_or_internal_err!(
filters.is_empty(),
"CteWorkTable does not support pushing filters"
);
assert_or_internal_err!(limit.is_none(), "CteWorkTable pushing limit");
Ok(Arc::new(WorkTableExec::new(
self.name.clone(),
Arc::clone(&self.table_schema),
)))
projection.cloned(),
)?))
}

fn supports_filters_pushdown(
Expand Down
46 changes: 34 additions & 12 deletions datafusion/physical-plan/src/work_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,25 +103,35 @@ pub struct WorkTableExec {
name: String,
/// The schema of the stream
schema: SchemaRef,
/// Projection to apply to build the output stream from the recursion state
projection: Option<Vec<usize>>,
/// The work table
work_table: Arc<WorkTable>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
/// Cache holding plan properties like equivalences, output partitioning etc.
cache: PlanProperties,
plan_properties: PlanProperties,
}

impl WorkTableExec {
/// Create a new execution plan for a worktable exec.
pub fn new(name: String, schema: SchemaRef) -> Self {
let cache = Self::compute_properties(Arc::clone(&schema));
Self {
pub fn new(
name: String,
mut schema: SchemaRef,
projection: Option<Vec<usize>>,
) -> Result<Self> {
if let Some(projection) = &projection {
schema = Arc::new(schema.project(projection)?);
}
let plan_properties = Self::compute_properties(Arc::clone(&schema));
Ok(Self {
name,
schema,
projection,
metrics: ExecutionPlanMetricsSet::new(),
work_table: Arc::new(WorkTable::new()),
cache,
}
plan_properties,
})
}

/// Ref to name
Expand Down Expand Up @@ -173,7 +183,7 @@ impl ExecutionPlan for WorkTableExec {
}

fn properties(&self) -> &PlanProperties {
&self.cache
&self.plan_properties
}

fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
Expand All @@ -199,11 +209,22 @@ impl ExecutionPlan for WorkTableExec {
0,
"WorkTableExec got an invalid partition {partition} (expected 0)"
);
let batch = self.work_table.take()?;
let ReservedBatches {
mut batches,
reservation,
} = self.work_table.take()?;
if let Some(projection) = &self.projection {
// We apply the projection
// TODO: it would be better to apply it as soon as possible and not only here
// TODO: an aggressive projection makes the memory reservation smaller, even if we do not edit it
batches = batches
.into_iter()
.map(|b| b.project(projection))
.collect::<Result<Vec<_>, _>>()?;
}

let stream =
MemoryStream::try_new(batch.batches, Arc::clone(&self.schema), None)?
.with_reservation(batch.reservation);
let stream = MemoryStream::try_new(batches, Arc::clone(&self.schema), None)?
.with_reservation(reservation);
Ok(Box::pin(cooperative(stream)))
}

Expand Down Expand Up @@ -236,9 +257,10 @@ impl ExecutionPlan for WorkTableExec {
Some(Arc::new(Self {
name: self.name.clone(),
schema: Arc::clone(&self.schema),
projection: self.projection.clone(),
metrics: ExecutionPlanMetricsSet::new(),
work_table,
cache: self.cache.clone(),
plan_properties: self.plan_properties.clone(),
}))
}
}
Expand Down