diff --git a/df_engine_extensions/src/dist_sql_query/physical_plan.rs b/df_engine_extensions/src/dist_sql_query/physical_plan.rs index f6b8399a4b..04ed6c2132 100644 --- a/df_engine_extensions/src/dist_sql_query/physical_plan.rs +++ b/df_engine_extensions/src/dist_sql_query/physical_plan.rs @@ -32,7 +32,6 @@ use datafusion::{ aggregates::{AggregateExec, AggregateMode}, analyze::AnalyzeExec, coalesce_batches::CoalesceBatchesExec, - coalesce_partitions::CoalescePartitionsExec, displayable, filter::FilterExec, metrics::{Count, MetricValue, MetricsSet}, @@ -618,17 +617,13 @@ impl PushDownEvent { pub fn new(plan: Arc) -> Self { if let Some(aggr) = plan.as_any().downcast_ref::() { if *aggr.mode() == AggregateMode::Partial { - Self::Terminated(plan) + Self::Continue(plan) } else { Self::Unable } } else if plan.as_any().downcast_ref::().is_some() || plan.as_any().downcast_ref::().is_some() || plan.as_any().downcast_ref::().is_some() - || plan - .as_any() - .downcast_ref::() - .is_some() || plan .as_any() .downcast_ref::() diff --git a/df_engine_extensions/src/dist_sql_query/resolver.rs b/df_engine_extensions/src/dist_sql_query/resolver.rs index 0d10bb74b6..713727b404 100644 --- a/df_engine_extensions/src/dist_sql_query/resolver.rs +++ b/df_engine_extensions/src/dist_sql_query/resolver.rs @@ -18,14 +18,14 @@ use async_recursion::async_recursion; use catalog::manager::ManagerRef as CatalogManagerRef; use datafusion::{ error::{DataFusionError, Result as DfResult}, - physical_plan::ExecutionPlan, + physical_plan::{analyze::AnalyzeExec, ExecutionPlan}, }; use table_engine::{remote::model::TableIdentifier, table::TableRef}; use crate::{ dist_sql_query::{ physical_plan::{ - ResolvedPartitionedScan, SubTablePlanContext, UnresolvedPartitionedScan, + PushDownEvent, ResolvedPartitionedScan, SubTablePlanContext, UnresolvedPartitionedScan, UnresolvedSubTableScan, }, ExecutableScanBuilderRef, RemotePhysicalPlanExecutorRef, @@ -99,6 +99,7 @@ impl Resolver { &self, plan: Arc, ) -> DfResult> { + let plan = self.maybe_rewrite_analyze_plan(plan)?; let resolved_plan = self.resolve_partitioned_scan_internal(plan)?; PUSH_DOWN_PLAN_COUNTER .with_label_values(&["remote_scan"]) @@ -246,6 +247,35 @@ impl Resolver { .map_err(|e| DataFusionError::Internal(format!("failed to find table, err:{e}")))? .ok_or(DataFusionError::Internal("table not found".to_string())) } + + fn maybe_rewrite_analyze_plan( + &self, + plan: Arc, + ) -> DfResult> { + if let Some(analyze) = plan.as_any().downcast_ref::() { + let children = analyze.children(); + if children.len() != 1 { + return Err(DataFusionError::Internal( + "analyze physical can only have one child plan".to_string(), + )); + } + let child = children.first().unwrap().clone(); + if Self::is_a_pushdown_node(child.clone()) { + return Ok(plan); + } + let plan = plan.with_new_children(child.children())?; + return self.maybe_rewrite_analyze_plan(plan); + } + Ok(plan) + } + + #[inline] + fn is_a_pushdown_node(plan: Arc) -> bool { + match PushDownEvent::new(plan) { + PushDownEvent::Unable => false, + _ => true, + } + } } #[cfg(test)]