Skip to content

Commit

Permalink
use remote_metrics_collector to collect metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
baojinri committed Nov 16, 2023
1 parent a35f088 commit 8051eec
Show file tree
Hide file tree
Showing 9 changed files with 173 additions and 124 deletions.
28 changes: 28 additions & 0 deletions components/trace_metric/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,34 @@ impl CollectorVisitor for FormatCollectorVisitor {
}
}

/// A collector for metrics of remote plan.
///
/// It can be cloned and shared among threads.
#[derive(Clone, Debug, Default)]
pub struct RemoteMetricsCollector {
name: String,
metrics: Arc<Mutex<String>>,
}

impl RemoteMetricsCollector {
pub fn new(name: String) -> Self {
Self {
name,
metrics: Arc::new(Mutex::new(String::new())),
}
}

pub fn collect(&self, metric: String) {
let mut metrics = self.metrics.lock().unwrap();
metrics.push_str(metric.as_str());
}

pub fn display(&self) -> String {
let metrics = self.metrics.lock().unwrap();
format!("{}:\n{}", self.name, metrics)
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;
Expand Down
4 changes: 2 additions & 2 deletions df_engine_extensions/src/dist_sql_query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use datafusion::{
use futures::future::BoxFuture;
use generic_error::BoxError;
use table_engine::{predicate::PredicateRef, remote::model::TableIdentifier, table::TableRef};
use trace_metric::MetricsCollector;
use trace_metric::collector::RemoteMetricsCollector;

pub mod codec;
pub mod physical_plan;
Expand All @@ -39,7 +39,7 @@ pub trait RemotePhysicalPlanExecutor: fmt::Debug + Send + Sync + 'static {
table: TableIdentifier,
task_context: &TaskContext,
plan: Arc<dyn ExecutionPlan>,
metrics_collector: MetricsCollector,
remote_metrics_collector: RemoteMetricsCollector,
) -> DfResult<BoxFuture<'static, DfResult<SendableRecordBatchStream>>>;
}

Expand Down
79 changes: 54 additions & 25 deletions df_engine_extensions/src/dist_sql_query/physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

use std::{
any::Any,
collections::HashMap,
fmt,
pin::Pin,
sync::Arc,
Expand Down Expand Up @@ -43,7 +44,10 @@ use datafusion::{
};
use futures::{future::BoxFuture, FutureExt, Stream, StreamExt};
use table_engine::{remote::model::TableIdentifier, table::ReadRequest};
use trace_metric::{collector::FormatCollectorVisitor, MetricsCollector, TraceMetricWhenDrop};
use trace_metric::{
collector::{FormatCollectorVisitor, RemoteMetricsCollector},
MetricsCollector, TraceMetricWhenDrop,
};

use crate::dist_sql_query::{RemotePhysicalPlanExecutor, TableScanContext};

Expand Down Expand Up @@ -146,32 +150,46 @@ impl DisplayAs for UnresolvedPartitionedScan {
pub(crate) struct ResolvedPartitionedScan {
pub remote_exec_ctx: Arc<RemoteExecContext>,
pub pushdown_continue: bool,
pub metrics_collector: Option<MetricsCollector>,
pub metrics_collector: MetricsCollector,
pub remote_metrics_collector: HashMap<String, RemoteMetricsCollector>,
pub is_analyze: bool,
}

impl ResolvedPartitionedScan {
pub fn new(
remote_executor: Arc<dyn RemotePhysicalPlanExecutor>,
sut_table_plan_ctxs: Vec<SubTablePlanContext>,
metrics_collector: Option<MetricsCollector>,
metrics_collector: MetricsCollector,
remote_metrics_collector: HashMap<String, RemoteMetricsCollector>,
is_analyze: bool,
) -> Self {
let remote_exec_ctx = Arc::new(RemoteExecContext {
executor: remote_executor,
plan_ctxs: sut_table_plan_ctxs,
});

Self::new_with_details(remote_exec_ctx, true, metrics_collector)
Self::new_with_details(
remote_exec_ctx,
true,
metrics_collector,
remote_metrics_collector,
is_analyze,
)
}

pub fn new_with_details(
remote_exec_ctx: Arc<RemoteExecContext>,
pushdown_continue: bool,
metrics_collector: Option<MetricsCollector>,
metrics_collector: MetricsCollector,
remote_metrics_collector: HashMap<String, RemoteMetricsCollector>,
is_analyze: bool,
) -> Self {
Self {
remote_exec_ctx,
pushdown_continue,
metrics_collector,
remote_metrics_collector,
is_analyze,
}
}

Expand All @@ -180,6 +198,8 @@ impl ResolvedPartitionedScan {
remote_exec_ctx: self.remote_exec_ctx.clone(),
pushdown_continue: false,
metrics_collector: self.metrics_collector.clone(),
remote_metrics_collector: self.remote_metrics_collector.clone(),
is_analyze: self.is_analyze,
})
}

Expand Down Expand Up @@ -227,6 +247,8 @@ impl ResolvedPartitionedScan {
remote_exec_ctx,
can_push_down_more,
self.metrics_collector.clone(),
self.remote_metrics_collector.clone(),
self.is_analyze,
);

Ok(Arc::new(plan))
Expand Down Expand Up @@ -297,7 +319,7 @@ impl ExecutionPlan for ResolvedPartitionedScan {
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
// If this is a analyze plan, we should not collect metrics of children
// which have been send to remote, So we just return empty children.
if self.metrics_collector.is_some() {
if self.is_analyze {
return vec![];
}

Expand Down Expand Up @@ -334,13 +356,14 @@ impl ExecutionPlan for ResolvedPartitionedScan {
plan,
metrics_collector,
} = &self.remote_exec_ctx.plan_ctxs[partition];
let remote_metrics_collector = self.remote_metrics_collector.get(&sub_table.table).unwrap();

// Send plan for remote execution.
let stream_future = self.remote_exec_ctx.executor.execute(
sub_table.clone(),
&context,
plan.clone(),
metrics_collector.clone(),
remote_metrics_collector.clone(),
)?;
let record_stream =
PartitionedScanStream::new(stream_future, plan.schema(), metrics_collector.clone());
Expand All @@ -353,25 +376,31 @@ impl ExecutionPlan for ResolvedPartitionedScan {
}

fn metrics(&self) -> Option<MetricsSet> {
match &self.metrics_collector {
None => None,
Some(metrics_collector) => {
let mut metric_set = MetricsSet::new();

let mut format_visitor = FormatCollectorVisitor::default();
metrics_collector.visit(&mut format_visitor);
let metrics_desc = format_visitor.into_string();
metric_set.push(Arc::new(Metric::new(
MetricValue::Count {
name: format!("\n{metrics_desc}").into(),
count: Count::new(),
},
None,
)));

Some(metric_set)
}
let mut metric_set = MetricsSet::new();

for remote_metrics_collector in self.remote_metrics_collector.values() {
let metrics_desc = remote_metrics_collector.display();
metric_set.push(Arc::new(Metric::new(
MetricValue::Count {
name: format!("\n{metrics_desc}").into(),
count: Count::new(),
},
None,
)));
}

let mut format_visitor = FormatCollectorVisitor::default();
self.metrics_collector.visit(&mut format_visitor);
let metrics_desc = format_visitor.into_string();
metric_set.push(Arc::new(Metric::new(
MetricValue::Count {
name: format!("\n{metrics_desc}").into(),
count: Count::new(),
},
None,
)));

Some(metric_set)
}
}

Expand Down
41 changes: 23 additions & 18 deletions df_engine_extensions/src/dist_sql_query/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;
use std::{collections::HashMap, sync::Arc};

use async_recursion::async_recursion;
use catalog::manager::ManagerRef as CatalogManagerRef;
Expand All @@ -21,6 +21,7 @@ use datafusion::{
physical_plan::{analyze::AnalyzeExec, ExecutionPlan},
};
use table_engine::{remote::model::TableIdentifier, table::TableRef};
use trace_metric::collector::RemoteMetricsCollector;

use crate::{
dist_sql_query::{
Expand All @@ -45,7 +46,6 @@ pub struct Resolver {
remote_executor: RemotePhysicalPlanExecutorRef,
catalog_manager: CatalogManagerRef,
scan_builder: ExecutableScanBuilderRef,
is_analyze: bool,
}

impl Resolver {
Expand All @@ -58,7 +58,6 @@ impl Resolver {
remote_executor,
catalog_manager,
scan_builder,
is_analyze: false,
}
}

Expand Down Expand Up @@ -98,10 +97,13 @@ impl Resolver {
/// UnresolvedSubTableScan (send to remote node)
/// ```
pub fn resolve_partitioned_scan(
&mut self,
&self,
plan: Arc<dyn ExecutionPlan>,
) -> DfResult<Arc<dyn ExecutionPlan>> {
let resolved_plan = self.resolve_partitioned_scan_internal(plan)?;
// Check if this plan is `AnalyzeExec`, if it is, we should collect metrics.
let is_analyze = plan.as_any().downcast_ref::<AnalyzeExec>().is_some();

let resolved_plan = self.resolve_partitioned_scan_internal(plan, is_analyze)?;
PUSH_DOWN_PLAN_COUNTER
.with_label_values(&["remote_scan"])
.inc();
Expand All @@ -117,18 +119,15 @@ impl Resolver {
}

pub fn resolve_partitioned_scan_internal(
&mut self,
&self,
plan: Arc<dyn ExecutionPlan>,
is_analyze: bool,
) -> DfResult<Arc<dyn ExecutionPlan>> {
// Check if this plan is `AnalyzeExec`, if it is, we should collect metrics.
if plan.as_any().downcast_ref::<AnalyzeExec>().is_some() {
self.is_analyze = true;
}

// Leave node, let's resolve it and return.
if let Some(unresolved) = plan.as_any().downcast_ref::<UnresolvedPartitionedScan>() {
let metrics_collector = unresolved.metrics_collector.clone();
let sub_tables = unresolved.sub_tables.clone();
let mut remote_metrics_collector = HashMap::with_capacity(sub_tables.len());
let remote_plans = sub_tables
.into_iter()
.map(|table| {
Expand All @@ -137,6 +136,10 @@ impl Resolver {
table_scan_ctx: unresolved.table_scan_ctx.clone(),
});
let sub_metrics_collect = metrics_collector.span(table.table.clone());
remote_metrics_collector.insert(
table.table.clone(),
RemoteMetricsCollector::new(table.table.clone()),
);

SubTablePlanContext::new(table, plan, sub_metrics_collect)
})
Expand All @@ -145,7 +148,9 @@ impl Resolver {
return Ok(Arc::new(ResolvedPartitionedScan::new(
self.remote_executor.clone(),
remote_plans,
self.is_analyze.then_some(metrics_collector),
metrics_collector,
remote_metrics_collector,
is_analyze,
)));
}

Expand All @@ -158,7 +163,7 @@ impl Resolver {
// Resolve children if exist.
let mut new_children = Vec::with_capacity(children.len());
for child in children {
let child = self.resolve_partitioned_scan_internal(child)?;
let child = self.resolve_partitioned_scan_internal(child, is_analyze)?;

new_children.push(child);
}
Expand Down Expand Up @@ -266,7 +271,7 @@ mod test {
fn test_basic_partitioned_scan() {
let ctx = TestContext::new();
let plan = ctx.build_basic_partitioned_table_plan();
let mut resolver = ctx.resolver();
let resolver = ctx.resolver();
let new_plan = displayable(resolver.resolve_partitioned_scan(plan).unwrap().as_ref())
.indent(true)
.to_string();
Expand All @@ -288,7 +293,7 @@ mod test {
async fn test_unprocessed_plan() {
let ctx = TestContext::new();
let plan = ctx.build_unprocessed_plan();
let mut resolver = ctx.resolver();
let resolver = ctx.resolver();

let original_plan_display = displayable(plan.as_ref()).indent(true).to_string();

Expand All @@ -311,7 +316,7 @@ mod test {
fn test_aggr_push_down() {
let ctx = TestContext::new();
let plan = ctx.build_aggr_push_down_plan();
let mut resolver = ctx.resolver();
let resolver = ctx.resolver();
let new_plan = displayable(resolver.resolve_partitioned_scan(plan).unwrap().as_ref())
.indent(true)
.to_string();
Expand All @@ -322,7 +327,7 @@ mod test {
fn test_compounded_aggr_push_down() {
let ctx = TestContext::new();
let plan = ctx.build_compounded_aggr_push_down_plan();
let mut resolver = ctx.resolver();
let resolver = ctx.resolver();
let new_plan = displayable(resolver.resolve_partitioned_scan(plan).unwrap().as_ref())
.indent(true)
.to_string();
Expand All @@ -333,7 +338,7 @@ mod test {
fn test_node_with_multiple_partitioned_scan_children() {
let ctx = TestContext::new();
let plan = ctx.build_union_plan();
let mut resolver = ctx.resolver();
let resolver = ctx.resolver();
let new_plan = displayable(resolver.resolve_partitioned_scan(plan).unwrap().as_ref())
.indent(true)
.to_string();
Expand Down
4 changes: 2 additions & 2 deletions df_engine_extensions/src/dist_sql_query/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use table_engine::{
table::{ReadOptions, ReadRequest, TableId, TableRef},
ANALYTIC_ENGINE_TYPE,
};
use trace_metric::MetricsCollector;
use trace_metric::{collector::RemoteMetricsCollector, MetricsCollector};

use crate::dist_sql_query::{
physical_plan::{PartitionedScanStream, UnresolvedPartitionedScan, UnresolvedSubTableScan},
Expand Down Expand Up @@ -507,7 +507,7 @@ impl RemotePhysicalPlanExecutor for MockRemotePhysicalPlanExecutor {
_table: TableIdentifier,
_task_context: &TaskContext,
_plan: Arc<dyn ExecutionPlan>,
_metrics_collector: MetricsCollector,
_remote_metrics_collector: RemoteMetricsCollector,
) -> DfResult<BoxFuture<'static, DfResult<SendableRecordBatchStream>>> {
unimplemented!()
}
Expand Down
Loading

0 comments on commit 8051eec

Please sign in to comment.