Skip to content

Commit

Permalink
avoid collect empty metrics of remote plan
Browse files Browse the repository at this point in the history
  • Loading branch information
baojinri committed Nov 15, 2023
1 parent 8d2955f commit a35f088
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 46 deletions.
36 changes: 18 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ bytes = "1"
bytes_ext = { path = "components/bytes_ext" }
catalog = { path = "catalog" }
catalog_impls = { path = "catalog_impls" }
ceresdbproto = { git = "https://github.com/baojinri/ceresdbproto.git", rev = "04cfacf43818dd0fcbf31d872a284965357a7361" }
ceresdbproto = { git = "https://github.com/baojinri/ceresdbproto.git", rev = "9b05108ea41b323c7ef7e54bb570b59cca5e76ed" }
codec = { path = "components/codec" }
chrono = "0.4"
clap = "3.0"
Expand Down
45 changes: 28 additions & 17 deletions df_engine_extensions/src/dist_sql_query/physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,14 @@ impl DisplayAs for UnresolvedPartitionedScan {
pub(crate) struct ResolvedPartitionedScan {
pub remote_exec_ctx: Arc<RemoteExecContext>,
pub pushdown_continue: bool,
pub metrics_collector: MetricsCollector,
pub metrics_collector: Option<MetricsCollector>,
}

impl ResolvedPartitionedScan {
pub fn new(
remote_executor: Arc<dyn RemotePhysicalPlanExecutor>,
sut_table_plan_ctxs: Vec<SubTablePlanContext>,
metrics_collector: MetricsCollector,
metrics_collector: Option<MetricsCollector>,
) -> Self {
let remote_exec_ctx = Arc::new(RemoteExecContext {
executor: remote_executor,
Expand All @@ -166,7 +166,7 @@ impl ResolvedPartitionedScan {
pub fn new_with_details(
remote_exec_ctx: Arc<RemoteExecContext>,
pushdown_continue: bool,
metrics_collector: MetricsCollector,
metrics_collector: Option<MetricsCollector>,
) -> Self {
Self {
remote_exec_ctx,
Expand Down Expand Up @@ -295,6 +295,12 @@ impl ExecutionPlan for ResolvedPartitionedScan {
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
// If this is a analyze plan, we should not collect metrics of children
// which have been send to remote, So we just return empty children.
if self.metrics_collector.is_some() {
return vec![];
}

self.remote_exec_ctx
.plan_ctxs
.iter()
Expand Down Expand Up @@ -347,20 +353,25 @@ impl ExecutionPlan for ResolvedPartitionedScan {
}

fn metrics(&self) -> Option<MetricsSet> {
let mut metric_set = MetricsSet::new();

let mut format_visitor = FormatCollectorVisitor::default();
self.metrics_collector.visit(&mut format_visitor);
let metrics_desc = format_visitor.into_string();
metric_set.push(Arc::new(Metric::new(
MetricValue::Count {
name: format!("\n{metrics_desc}").into(),
count: Count::new(),
},
None,
)));

Some(metric_set)
match &self.metrics_collector {
None => None,
Some(metrics_collector) => {
let mut metric_set = MetricsSet::new();

let mut format_visitor = FormatCollectorVisitor::default();
metrics_collector.visit(&mut format_visitor);
let metrics_desc = format_visitor.into_string();
metric_set.push(Arc::new(Metric::new(
MetricValue::Count {
name: format!("\n{metrics_desc}").into(),
count: Count::new(),
},
None,
)));

Some(metric_set)
}
}
}
}

Expand Down
25 changes: 16 additions & 9 deletions df_engine_extensions/src/dist_sql_query/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use async_recursion::async_recursion;
use catalog::manager::ManagerRef as CatalogManagerRef;
use datafusion::{
error::{DataFusionError, Result as DfResult},
physical_plan::ExecutionPlan,
physical_plan::{analyze::AnalyzeExec, ExecutionPlan},
};
use table_engine::{remote::model::TableIdentifier, table::TableRef};

Expand All @@ -45,6 +45,7 @@ pub struct Resolver {
remote_executor: RemotePhysicalPlanExecutorRef,
catalog_manager: CatalogManagerRef,
scan_builder: ExecutableScanBuilderRef,
is_analyze: bool,
}

impl Resolver {
Expand All @@ -57,6 +58,7 @@ impl Resolver {
remote_executor,
catalog_manager,
scan_builder,
is_analyze: false,
}
}

Expand Down Expand Up @@ -96,7 +98,7 @@ impl Resolver {
/// UnresolvedSubTableScan (send to remote node)
/// ```
pub fn resolve_partitioned_scan(
&self,
&mut self,
plan: Arc<dyn ExecutionPlan>,
) -> DfResult<Arc<dyn ExecutionPlan>> {
let resolved_plan = self.resolve_partitioned_scan_internal(plan)?;
Expand All @@ -115,9 +117,14 @@ impl Resolver {
}

pub fn resolve_partitioned_scan_internal(
&self,
&mut self,
plan: Arc<dyn ExecutionPlan>,
) -> DfResult<Arc<dyn ExecutionPlan>> {
// Check if this plan is `AnalyzeExec`, if it is, we should collect metrics.
if plan.as_any().downcast_ref::<AnalyzeExec>().is_some() {
self.is_analyze = true;
}

// Leave node, let's resolve it and return.
if let Some(unresolved) = plan.as_any().downcast_ref::<UnresolvedPartitionedScan>() {
let metrics_collector = unresolved.metrics_collector.clone();
Expand All @@ -138,7 +145,7 @@ impl Resolver {
return Ok(Arc::new(ResolvedPartitionedScan::new(
self.remote_executor.clone(),
remote_plans,
metrics_collector,
self.is_analyze.then_some(metrics_collector),
)));
}

Expand Down Expand Up @@ -259,7 +266,7 @@ mod test {
fn test_basic_partitioned_scan() {
let ctx = TestContext::new();
let plan = ctx.build_basic_partitioned_table_plan();
let resolver = ctx.resolver();
let mut resolver = ctx.resolver();
let new_plan = displayable(resolver.resolve_partitioned_scan(plan).unwrap().as_ref())
.indent(true)
.to_string();
Expand All @@ -281,7 +288,7 @@ mod test {
async fn test_unprocessed_plan() {
let ctx = TestContext::new();
let plan = ctx.build_unprocessed_plan();
let resolver = ctx.resolver();
let mut resolver = ctx.resolver();

let original_plan_display = displayable(plan.as_ref()).indent(true).to_string();

Expand All @@ -304,7 +311,7 @@ mod test {
fn test_aggr_push_down() {
let ctx = TestContext::new();
let plan = ctx.build_aggr_push_down_plan();
let resolver = ctx.resolver();
let mut resolver = ctx.resolver();
let new_plan = displayable(resolver.resolve_partitioned_scan(plan).unwrap().as_ref())
.indent(true)
.to_string();
Expand All @@ -315,7 +322,7 @@ mod test {
fn test_compounded_aggr_push_down() {
let ctx = TestContext::new();
let plan = ctx.build_compounded_aggr_push_down_plan();
let resolver = ctx.resolver();
let mut resolver = ctx.resolver();
let new_plan = displayable(resolver.resolve_partitioned_scan(plan).unwrap().as_ref())
.indent(true)
.to_string();
Expand All @@ -326,7 +333,7 @@ mod test {
fn test_node_with_multiple_partitioned_scan_children() {
let ctx = TestContext::new();
let plan = ctx.build_union_plan();
let resolver = ctx.resolver();
let mut resolver = ctx.resolver();
let new_plan = displayable(resolver.resolve_partitioned_scan(plan).unwrap().as_ref())
.indent(true)
.to_string();
Expand Down
2 changes: 1 addition & 1 deletion query_engine/src/datafusion_impl/task_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ impl Preprocessor {
plan: &Arc<dyn ExecutionPlan>,
ctx: &Context,
) -> Result<Arc<dyn ExecutionPlan>> {
let resolver = self.dist_query_resolver_builder.build(ctx);
let mut resolver = self.dist_query_resolver_builder.build(ctx);
resolver
.resolve_partitioned_scan(plan.clone())
.box_err()
Expand Down

0 comments on commit a35f088

Please sign in to comment.