Skip to content

Commit

Permalink
modify code
Browse files Browse the repository at this point in the history
  • Loading branch information
baojinri committed Oct 24, 2023
1 parent 63b3756 commit fe189f3
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 8 deletions.
7 changes: 1 addition & 6 deletions df_engine_extensions/src/dist_sql_query/physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use datafusion::{
aggregates::{AggregateExec, AggregateMode},
analyze::AnalyzeExec,
coalesce_batches::CoalesceBatchesExec,
coalesce_partitions::CoalescePartitionsExec,
displayable,
filter::FilterExec,
metrics::{Count, MetricValue, MetricsSet},
Expand Down Expand Up @@ -618,17 +617,13 @@ impl PushDownEvent {
pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
if let Some(aggr) = plan.as_any().downcast_ref::<AggregateExec>() {
if *aggr.mode() == AggregateMode::Partial {
Self::Terminated(plan)
Self::Continue(plan)
} else {
Self::Unable
}
} else if plan.as_any().downcast_ref::<FilterExec>().is_some()
|| plan.as_any().downcast_ref::<ProjectionExec>().is_some()
|| plan.as_any().downcast_ref::<RepartitionExec>().is_some()
|| plan
.as_any()
.downcast_ref::<CoalescePartitionsExec>()
.is_some()
|| plan
.as_any()
.downcast_ref::<CoalesceBatchesExec>()
Expand Down
34 changes: 32 additions & 2 deletions df_engine_extensions/src/dist_sql_query/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ use async_recursion::async_recursion;
use catalog::manager::ManagerRef as CatalogManagerRef;
use datafusion::{
error::{DataFusionError, Result as DfResult},
physical_plan::ExecutionPlan,
physical_plan::{analyze::AnalyzeExec, ExecutionPlan},
};
use table_engine::{remote::model::TableIdentifier, table::TableRef};

use crate::{
dist_sql_query::{
physical_plan::{
ResolvedPartitionedScan, SubTablePlanContext, UnresolvedPartitionedScan,
PushDownEvent, ResolvedPartitionedScan, SubTablePlanContext, UnresolvedPartitionedScan,
UnresolvedSubTableScan,
},
ExecutableScanBuilderRef, RemotePhysicalPlanExecutorRef,
Expand Down Expand Up @@ -99,6 +99,7 @@ impl Resolver {
&self,
plan: Arc<dyn ExecutionPlan>,
) -> DfResult<Arc<dyn ExecutionPlan>> {
let plan = self.maybe_rewrite_analyze_plan(plan)?;
let resolved_plan = self.resolve_partitioned_scan_internal(plan)?;
PUSH_DOWN_PLAN_COUNTER
.with_label_values(&["remote_scan"])
Expand Down Expand Up @@ -246,6 +247,35 @@ impl Resolver {
.map_err(|e| DataFusionError::Internal(format!("failed to find table, err:{e}")))?
.ok_or(DataFusionError::Internal("table not found".to_string()))
}

fn maybe_rewrite_analyze_plan(
&self,
plan: Arc<dyn ExecutionPlan>,
) -> DfResult<Arc<dyn ExecutionPlan>> {
if let Some(analyze) = plan.as_any().downcast_ref::<AnalyzeExec>() {
let children = analyze.children();
if children.len() != 1 {
return Err(DataFusionError::Internal(
"analyze physical can only have one child plan".to_string(),
));
}
let child = children.first().unwrap().clone();
if Self::is_a_pushdown_node(child.clone()) {
return Ok(plan);
}
let plan = plan.with_new_children(child.children())?;
return self.maybe_rewrite_analyze_plan(plan);
}
Ok(plan)
}

#[inline]
fn is_a_pushdown_node(plan: Arc<dyn ExecutionPlan>) -> bool {
match PushDownEvent::new(plan) {
PushDownEvent::Unable => false,
_ => true,
}
}
}

#[cfg(test)]
Expand Down

0 comments on commit fe189f3

Please sign in to comment.