From a35f088db807e9902f72a00751ca3e700cf8adb2 Mon Sep 17 00:00:00 2001 From: baojinri Date: Mon, 13 Nov 2023 16:42:26 +0800 Subject: [PATCH] avoid collect empty metrics of remote plan --- Cargo.lock | 36 +++++++-------- Cargo.toml | 2 +- .../src/dist_sql_query/physical_plan.rs | 45 ++++++++++++------- .../src/dist_sql_query/resolver.rs | 25 +++++++---- .../src/datafusion_impl/task_context.rs | 2 +- 5 files changed, 64 insertions(+), 46 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 70be3fa770..e2cfcbdcc3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -96,7 +96,7 @@ dependencies = [ "atomic_enum", "base64 0.13.1", "bytes_ext", - "ceresdbproto 1.0.21 (git+https://github.com/baojinri/ceresdbproto.git?rev=04cfacf43818dd0fcbf31d872a284965357a7361)", + "ceresdbproto 1.0.22 (git+https://github.com/baojinri/ceresdbproto.git?rev=9b05108ea41b323c7ef7e54bb570b59cca5e76ed)", "codec", "common_types", "datafusion", @@ -1303,7 +1303,7 @@ checksum = "8ef195bacb1ca0eb02d6a0562b09852941d01de2b962c7066c922115fab7dcb7" dependencies = [ "arrow 38.0.0", "async-trait", - "ceresdbproto 1.0.21 (registry+https://github.com/rust-lang/crates.io-index)", + "ceresdbproto 1.0.22 (registry+https://github.com/rust-lang/crates.io-index)", "dashmap 5.4.0", "futures 0.3.28", "paste 1.0.12", @@ -1343,8 +1343,8 @@ dependencies = [ [[package]] name = "ceresdbproto" -version = "1.0.21" -source = "git+https://github.com/baojinri/ceresdbproto.git?rev=04cfacf43818dd0fcbf31d872a284965357a7361#04cfacf43818dd0fcbf31d872a284965357a7361" +version = "1.0.22" +source = "git+https://github.com/baojinri/ceresdbproto.git?rev=9b05108ea41b323c7ef7e54bb570b59cca5e76ed#9b05108ea41b323c7ef7e54bb570b59cca5e76ed" dependencies = [ "prost", "protoc-bin-vendored", @@ -1527,7 +1527,7 @@ dependencies = [ "async-trait", "bytes_ext", "catalog", - "ceresdbproto 1.0.21 (git+https://github.com/baojinri/ceresdbproto.git?rev=04cfacf43818dd0fcbf31d872a284965357a7361)", + "ceresdbproto 1.0.22 (git+https://github.com/baojinri/ceresdbproto.git?rev=9b05108ea41b323c7ef7e54bb570b59cca5e76ed)", "common_types", "etcd-client", "future_ext", @@ -1605,7 +1605,7 @@ dependencies = [ "arrow 43.0.0", "arrow_ext", "bytes_ext", - "ceresdbproto 1.0.21 (git+https://github.com/baojinri/ceresdbproto.git?rev=04cfacf43818dd0fcbf31d872a284965357a7361)", + "ceresdbproto 1.0.22 (git+https://github.com/baojinri/ceresdbproto.git?rev=9b05108ea41b323c7ef7e54bb570b59cca5e76ed)", "chrono", "datafusion", "hash_ext", @@ -2360,7 +2360,7 @@ dependencies = [ "async-recursion", "async-trait", "catalog", - "ceresdbproto 1.0.21 (git+https://github.com/baojinri/ceresdbproto.git?rev=04cfacf43818dd0fcbf31d872a284965357a7361)", + "ceresdbproto 1.0.22 (git+https://github.com/baojinri/ceresdbproto.git?rev=9b05108ea41b323c7ef7e54bb570b59cca5e76ed)", "common_types", "datafusion", "datafusion-proto", @@ -3914,7 +3914,7 @@ name = "meta_client" version = "1.2.6-alpha" dependencies = [ "async-trait", - "ceresdbproto 1.0.21 (git+https://github.com/baojinri/ceresdbproto.git?rev=04cfacf43818dd0fcbf31d872a284965357a7361)", + "ceresdbproto 1.0.22 (git+https://github.com/baojinri/ceresdbproto.git?rev=9b05108ea41b323c7ef7e54bb570b59cca5e76ed)", "common_types", "futures 0.3.28", "generic_error", @@ -4439,7 +4439,7 @@ version = "1.2.6-alpha" dependencies = [ "async-trait", "bytes", - "ceresdbproto 1.0.21 (git+https://github.com/baojinri/ceresdbproto.git?rev=04cfacf43818dd0fcbf31d872a284965357a7361)", + "ceresdbproto 1.0.22 (git+https://github.com/baojinri/ceresdbproto.git?rev=9b05108ea41b323c7ef7e54bb570b59cca5e76ed)", "chrono", "clru", "crc", @@ -5316,7 +5316,7 @@ dependencies = [ "async-trait", "bytes", "catalog", - "ceresdbproto 1.0.21 (git+https://github.com/baojinri/ceresdbproto.git?rev=04cfacf43818dd0fcbf31d872a284965357a7361)", + "ceresdbproto 1.0.22 (git+https://github.com/baojinri/ceresdbproto.git?rev=9b05108ea41b323c7ef7e54bb570b59cca5e76ed)", "clru", "cluster", "common_types", @@ -5443,7 +5443,7 @@ dependencies = [ "arrow 43.0.0", "async-trait", "catalog", - "ceresdbproto 1.0.21 (git+https://github.com/baojinri/ceresdbproto.git?rev=04cfacf43818dd0fcbf31d872a284965357a7361)", + "ceresdbproto 1.0.22 (git+https://github.com/baojinri/ceresdbproto.git?rev=9b05108ea41b323c7ef7e54bb570b59cca5e76ed)", "cluster", "codec", "common_types", @@ -5754,7 +5754,7 @@ version = "1.2.6-alpha" dependencies = [ "arrow_ext", "async-trait", - "ceresdbproto 1.0.21 (git+https://github.com/baojinri/ceresdbproto.git?rev=04cfacf43818dd0fcbf31d872a284965357a7361)", + "ceresdbproto 1.0.22 (git+https://github.com/baojinri/ceresdbproto.git?rev=9b05108ea41b323c7ef7e54bb570b59cca5e76ed)", "common_types", "futures 0.3.28", "generic_error", @@ -5884,7 +5884,7 @@ name = "router" version = "1.2.6-alpha" dependencies = [ "async-trait", - "ceresdbproto 1.0.21 (git+https://github.com/baojinri/ceresdbproto.git?rev=04cfacf43818dd0fcbf31d872a284965357a7361)", + "ceresdbproto 1.0.22 (git+https://github.com/baojinri/ceresdbproto.git?rev=9b05108ea41b323c7ef7e54bb570b59cca5e76ed)", "cluster", "common_types", "generic_error", @@ -6259,7 +6259,7 @@ dependencies = [ "async-trait", "bytes_ext", "catalog", - "ceresdbproto 1.0.21 (git+https://github.com/baojinri/ceresdbproto.git?rev=04cfacf43818dd0fcbf31d872a284965357a7361)", + "ceresdbproto 1.0.22 (git+https://github.com/baojinri/ceresdbproto.git?rev=9b05108ea41b323c7ef7e54bb570b59cca5e76ed)", "clru", "cluster", "common_types", @@ -6785,7 +6785,7 @@ dependencies = [ "async-trait", "bytes_ext", "catalog", - "ceresdbproto 1.0.21 (git+https://github.com/baojinri/ceresdbproto.git?rev=04cfacf43818dd0fcbf31d872a284965357a7361)", + "ceresdbproto 1.0.22 (git+https://github.com/baojinri/ceresdbproto.git?rev=9b05108ea41b323c7ef7e54bb570b59cca5e76ed)", "codec", "common_types", "futures 0.3.28", @@ -6807,7 +6807,7 @@ dependencies = [ "arrow_ext", "async-trait", "bytes_ext", - "ceresdbproto 1.0.21 (git+https://github.com/baojinri/ceresdbproto.git?rev=04cfacf43818dd0fcbf31d872a284965357a7361)", + "ceresdbproto 1.0.22 (git+https://github.com/baojinri/ceresdbproto.git?rev=9b05108ea41b323c7ef7e54bb570b59cca5e76ed)", "common_types", "datafusion", "datafusion-proto", @@ -7010,7 +7010,7 @@ dependencies = [ name = "time_ext" version = "1.2.6-alpha" dependencies = [ - "ceresdbproto 1.0.21 (git+https://github.com/baojinri/ceresdbproto.git?rev=04cfacf43818dd0fcbf31d872a284965357a7361)", + "ceresdbproto 1.0.22 (git+https://github.com/baojinri/ceresdbproto.git?rev=9b05108ea41b323c7ef7e54bb570b59cca5e76ed)", "chrono", "common_types", "macros", @@ -7662,7 +7662,7 @@ version = "1.2.6-alpha" dependencies = [ "async-trait", "bytes_ext", - "ceresdbproto 1.0.21 (git+https://github.com/baojinri/ceresdbproto.git?rev=04cfacf43818dd0fcbf31d872a284965357a7361)", + "ceresdbproto 1.0.22 (git+https://github.com/baojinri/ceresdbproto.git?rev=9b05108ea41b323c7ef7e54bb570b59cca5e76ed)", "chrono", "codec", "common_types", diff --git a/Cargo.toml b/Cargo.toml index 3d7d6ec809..1f55f5c188 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -93,7 +93,7 @@ bytes = "1" bytes_ext = { path = "components/bytes_ext" } catalog = { path = "catalog" } catalog_impls = { path = "catalog_impls" } -ceresdbproto = { git = "https://github.com/baojinri/ceresdbproto.git", rev = "04cfacf43818dd0fcbf31d872a284965357a7361" } +ceresdbproto = { git = "https://github.com/baojinri/ceresdbproto.git", rev = "9b05108ea41b323c7ef7e54bb570b59cca5e76ed" } codec = { path = "components/codec" } chrono = "0.4" clap = "3.0" diff --git a/df_engine_extensions/src/dist_sql_query/physical_plan.rs b/df_engine_extensions/src/dist_sql_query/physical_plan.rs index 44e22d646f..e9ac36d0b5 100644 --- a/df_engine_extensions/src/dist_sql_query/physical_plan.rs +++ b/df_engine_extensions/src/dist_sql_query/physical_plan.rs @@ -146,14 +146,14 @@ impl DisplayAs for UnresolvedPartitionedScan { pub(crate) struct ResolvedPartitionedScan { pub remote_exec_ctx: Arc, pub pushdown_continue: bool, - pub metrics_collector: MetricsCollector, + pub metrics_collector: Option, } impl ResolvedPartitionedScan { pub fn new( remote_executor: Arc, sut_table_plan_ctxs: Vec, - metrics_collector: MetricsCollector, + metrics_collector: Option, ) -> Self { let remote_exec_ctx = Arc::new(RemoteExecContext { executor: remote_executor, @@ -166,7 +166,7 @@ impl ResolvedPartitionedScan { pub fn new_with_details( remote_exec_ctx: Arc, pushdown_continue: bool, - metrics_collector: MetricsCollector, + metrics_collector: Option, ) -> Self { Self { remote_exec_ctx, @@ -295,6 +295,12 @@ impl ExecutionPlan for ResolvedPartitionedScan { } fn children(&self) -> Vec> { + // If this is a analyze plan, we should not collect metrics of children + // which have been send to remote, So we just return empty children. + if self.metrics_collector.is_some() { + return vec![]; + } + self.remote_exec_ctx .plan_ctxs .iter() @@ -347,20 +353,25 @@ impl ExecutionPlan for ResolvedPartitionedScan { } fn metrics(&self) -> Option { - let mut metric_set = MetricsSet::new(); - - let mut format_visitor = FormatCollectorVisitor::default(); - self.metrics_collector.visit(&mut format_visitor); - let metrics_desc = format_visitor.into_string(); - metric_set.push(Arc::new(Metric::new( - MetricValue::Count { - name: format!("\n{metrics_desc}").into(), - count: Count::new(), - }, - None, - ))); - - Some(metric_set) + match &self.metrics_collector { + None => None, + Some(metrics_collector) => { + let mut metric_set = MetricsSet::new(); + + let mut format_visitor = FormatCollectorVisitor::default(); + metrics_collector.visit(&mut format_visitor); + let metrics_desc = format_visitor.into_string(); + metric_set.push(Arc::new(Metric::new( + MetricValue::Count { + name: format!("\n{metrics_desc}").into(), + count: Count::new(), + }, + None, + ))); + + Some(metric_set) + } + } } } diff --git a/df_engine_extensions/src/dist_sql_query/resolver.rs b/df_engine_extensions/src/dist_sql_query/resolver.rs index 3120bcb171..0cd558d315 100644 --- a/df_engine_extensions/src/dist_sql_query/resolver.rs +++ b/df_engine_extensions/src/dist_sql_query/resolver.rs @@ -18,7 +18,7 @@ use async_recursion::async_recursion; use catalog::manager::ManagerRef as CatalogManagerRef; use datafusion::{ error::{DataFusionError, Result as DfResult}, - physical_plan::ExecutionPlan, + physical_plan::{analyze::AnalyzeExec, ExecutionPlan}, }; use table_engine::{remote::model::TableIdentifier, table::TableRef}; @@ -45,6 +45,7 @@ pub struct Resolver { remote_executor: RemotePhysicalPlanExecutorRef, catalog_manager: CatalogManagerRef, scan_builder: ExecutableScanBuilderRef, + is_analyze: bool, } impl Resolver { @@ -57,6 +58,7 @@ impl Resolver { remote_executor, catalog_manager, scan_builder, + is_analyze: false, } } @@ -96,7 +98,7 @@ impl Resolver { /// UnresolvedSubTableScan (send to remote node) /// ``` pub fn resolve_partitioned_scan( - &self, + &mut self, plan: Arc, ) -> DfResult> { let resolved_plan = self.resolve_partitioned_scan_internal(plan)?; @@ -115,9 +117,14 @@ impl Resolver { } pub fn resolve_partitioned_scan_internal( - &self, + &mut self, plan: Arc, ) -> DfResult> { + // Check if this plan is `AnalyzeExec`, if it is, we should collect metrics. + if plan.as_any().downcast_ref::().is_some() { + self.is_analyze = true; + } + // Leave node, let's resolve it and return. if let Some(unresolved) = plan.as_any().downcast_ref::() { let metrics_collector = unresolved.metrics_collector.clone(); @@ -138,7 +145,7 @@ impl Resolver { return Ok(Arc::new(ResolvedPartitionedScan::new( self.remote_executor.clone(), remote_plans, - metrics_collector, + self.is_analyze.then_some(metrics_collector), ))); } @@ -259,7 +266,7 @@ mod test { fn test_basic_partitioned_scan() { let ctx = TestContext::new(); let plan = ctx.build_basic_partitioned_table_plan(); - let resolver = ctx.resolver(); + let mut resolver = ctx.resolver(); let new_plan = displayable(resolver.resolve_partitioned_scan(plan).unwrap().as_ref()) .indent(true) .to_string(); @@ -281,7 +288,7 @@ mod test { async fn test_unprocessed_plan() { let ctx = TestContext::new(); let plan = ctx.build_unprocessed_plan(); - let resolver = ctx.resolver(); + let mut resolver = ctx.resolver(); let original_plan_display = displayable(plan.as_ref()).indent(true).to_string(); @@ -304,7 +311,7 @@ mod test { fn test_aggr_push_down() { let ctx = TestContext::new(); let plan = ctx.build_aggr_push_down_plan(); - let resolver = ctx.resolver(); + let mut resolver = ctx.resolver(); let new_plan = displayable(resolver.resolve_partitioned_scan(plan).unwrap().as_ref()) .indent(true) .to_string(); @@ -315,7 +322,7 @@ mod test { fn test_compounded_aggr_push_down() { let ctx = TestContext::new(); let plan = ctx.build_compounded_aggr_push_down_plan(); - let resolver = ctx.resolver(); + let mut resolver = ctx.resolver(); let new_plan = displayable(resolver.resolve_partitioned_scan(plan).unwrap().as_ref()) .indent(true) .to_string(); @@ -326,7 +333,7 @@ mod test { fn test_node_with_multiple_partitioned_scan_children() { let ctx = TestContext::new(); let plan = ctx.build_union_plan(); - let resolver = ctx.resolver(); + let mut resolver = ctx.resolver(); let new_plan = displayable(resolver.resolve_partitioned_scan(plan).unwrap().as_ref()) .indent(true) .to_string(); diff --git a/query_engine/src/datafusion_impl/task_context.rs b/query_engine/src/datafusion_impl/task_context.rs index 224b74cfe8..90436fafaf 100644 --- a/query_engine/src/datafusion_impl/task_context.rs +++ b/query_engine/src/datafusion_impl/task_context.rs @@ -143,7 +143,7 @@ impl Preprocessor { plan: &Arc, ctx: &Context, ) -> Result> { - let resolver = self.dist_query_resolver_builder.build(ctx); + let mut resolver = self.dist_query_resolver_builder.build(ctx); resolver .resolve_partitioned_scan(plan.clone()) .box_err()