From a256e72899b8db1dd59538fb2da3c4bf27508e2c Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Wed, 8 Mar 2023 21:31:04 +0800 Subject: [PATCH 01/19] feat: add ReadMetricsCollector for collecting metrics in read procedure --- analytic_engine/src/instance/mod.rs | 1 + analytic_engine/src/instance/read_metrics.rs | 2 + analytic_engine/src/table/mod.rs | 5 +- analytic_engine/src/tests/table.rs | 6 +- system_catalog/src/sys_catalog_table.rs | 5 +- table_engine/src/provider.rs | 11 +- table_engine/src/table.rs | 120 ++++++++++++++++++- 7 files changed, 139 insertions(+), 11 deletions(-) create mode 100644 analytic_engine/src/instance/read_metrics.rs diff --git a/analytic_engine/src/instance/mod.rs b/analytic_engine/src/instance/mod.rs index 7575ecd41f..d6cacb2fc1 100644 --- a/analytic_engine/src/instance/mod.rs +++ b/analytic_engine/src/instance/mod.rs @@ -14,6 +14,7 @@ pub mod flush_compaction; pub(crate) mod mem_collector; pub mod open; mod read; +pub(crate) mod read_metrics; pub(crate) mod write; pub mod write_worker; diff --git a/analytic_engine/src/instance/read_metrics.rs b/analytic_engine/src/instance/read_metrics.rs new file mode 100644 index 0000000000..668e3995cf --- /dev/null +++ b/analytic_engine/src/instance/read_metrics.rs @@ -0,0 +1,2 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + diff --git a/analytic_engine/src/table/mod.rs b/analytic_engine/src/table/mod.rs index d2bffd3bc6..6e0ce5d3f4 100644 --- a/analytic_engine/src/table/mod.rs +++ b/analytic_engine/src/table/mod.rs @@ -17,8 +17,8 @@ use table_engine::{ stream::{PartitionedStreams, SendableRecordBatchStream}, table::{ AlterOptions, AlterSchema, AlterSchemaRequest, Compact, Flush, FlushRequest, Get, - GetInvalidPrimaryKey, GetNullPrimaryKey, GetRequest, ReadOptions, ReadOrder, ReadRequest, - Result, Scan, Table, TableId, TableStats, Write, WriteRequest, + GetInvalidPrimaryKey, GetNullPrimaryKey, GetRequest, ReadMetricsCollector, ReadOptions, + ReadOrder, ReadRequest, Result, Scan, Table, TableId, TableStats, Write, WriteRequest, }, }; use tokio::sync::oneshot; @@ -179,6 +179,7 @@ impl Table for TableImpl { projected_schema: request.projected_schema, predicate, order: ReadOrder::None, + metrics_collector: ReadMetricsCollector::new(), }; let mut batch_stream = self .read(read_request) diff --git a/analytic_engine/src/tests/table.rs b/analytic_engine/src/tests/table.rs index f7baf84b69..ff99fdd312 100644 --- a/analytic_engine/src/tests/table.rs +++ b/analytic_engine/src/tests/table.rs @@ -20,7 +20,10 @@ use table_engine::{ self, engine::{CreateTableRequest, TableState}, predicate::Predicate, - table::{GetRequest, ReadOptions, ReadOrder, ReadRequest, SchemaId, TableId, TableSeq}, + table::{ + GetRequest, ReadMetricsCollector, ReadOptions, ReadOrder, ReadRequest, SchemaId, TableId, + TableSeq, + }, }; use crate::{table_options, tests::row_util}; @@ -185,6 +188,7 @@ pub fn new_read_all_request_with_order( projected_schema: ProjectedSchema::no_projection(schema), predicate: Arc::new(Predicate::empty()), order, + metrics_collector: ReadMetricsCollector::new(), } } diff --git a/system_catalog/src/sys_catalog_table.rs b/system_catalog/src/sys_catalog_table.rs index d2410ed033..10cac95133 100644 --- a/system_catalog/src/sys_catalog_table.rs +++ b/system_catalog/src/sys_catalog_table.rs @@ -35,8 +35,8 @@ use table_engine::{ }, predicate::PredicateBuilder, table::{ - GetRequest, ReadOptions, ReadOrder, ReadRequest, SchemaId, TableId, TableInfo, TableRef, - WriteRequest, + GetRequest, ReadMetricsCollector, ReadOptions, ReadOrder, ReadRequest, SchemaId, TableId, + TableInfo, TableRef, WriteRequest, }, }; use tokio::sync::Mutex; @@ -533,6 +533,7 @@ impl SysCatalogTable { projected_schema: ProjectedSchema::no_projection(self.table.schema()), predicate: PredicateBuilder::default().build(), order: ReadOrder::None, + metrics_collector: ReadMetricsCollector::new(), }; let mut batch_stream = self.table.read(read_request).await.context(ReadTable)?; diff --git a/table_engine/src/provider.rs b/table_engine/src/provider.rs index d966017da5..7d1ce80a84 100644 --- a/table_engine/src/provider.rs +++ b/table_engine/src/provider.rs @@ -19,7 +19,7 @@ use datafusion::{ execution::context::{SessionState, TaskContext}, physical_expr::PhysicalSortExpr, physical_plan::{ - DisplayFormatType, ExecutionPlan, Partitioning, + metrics::MetricsSet, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream as DfSendableRecordBatchStream, Statistics, }, }; @@ -30,7 +30,7 @@ use log::debug; use crate::{ predicate::{PredicateBuilder, PredicateRef}, stream::{SendableRecordBatchStream, ToDfStream}, - table::{self, ReadOptions, ReadOrder, ReadRequest, TableRef}, + table::{self, ReadMetricsCollector, ReadOptions, ReadOrder, ReadRequest, TableRef}, }; #[derive(Clone, Debug)] @@ -169,6 +169,7 @@ impl TableProviderAdapter { predicate, deadline, stream_state: Mutex::new(ScanStreamState::default()), + metrics_collector: ReadMetricsCollector::new(), }; scan_table.maybe_init_stream(state).await?; @@ -295,6 +296,7 @@ struct ScanTable { read_parallelism: usize, predicate: PredicateRef, deadline: Option, + metrics_collector: ReadMetricsCollector, stream_state: Mutex, } @@ -311,6 +313,7 @@ impl ScanTable { projected_schema: self.projected_schema.clone(), predicate: self.predicate.clone(), order: self.read_order, + metrics_collector: self.metrics_collector.clone(), }; let read_res = self.table.partitioned_read(req).await; @@ -387,6 +390,10 @@ impl ExecutionPlan for ScanTable { ) } + fn metrics(&self) -> Option { + Some(self.metrics_collector.take_as_df_metrics()) + } + fn statistics(&self) -> Statistics { // TODO(yingwen): Implement this Statistics::default() diff --git a/table_engine/src/table.rs b/table_engine/src/table.rs index 9f2e8356c2..2a37280644 100644 --- a/table_engine/src/table.rs +++ b/table_engine/src/table.rs @@ -7,7 +7,7 @@ use std::{ fmt, sync::{ atomic::{AtomicU32, AtomicU64, Ordering}, - Arc, + Arc, Mutex, }, time::{Duration, Instant}, }; @@ -23,6 +23,9 @@ use common_types::{ schema::{RecordSchemaWithKey, Schema, Version}, }; use common_util::error::{BoxError, GenericError}; +use datafusion::physical_plan::metrics::{ + Count, Metric as DfMetric, MetricValue as DfMetricValue, MetricsSet, Time, +}; use serde::Deserialize; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; @@ -375,6 +378,115 @@ impl ReadOrder { } } +#[derive(Clone, Debug)] +pub struct MetricValue { + pub name: String, + pub val: T, + pub partition: Option, +} + +#[derive(Clone, Debug)] +pub enum Metric { + Counter(MetricValue), + Elapsed(MetricValue), +} + +impl Metric { + #[inline] + pub fn counter(name: String, val: usize, partition: Option) -> Self { + Metric::Counter(MetricValue { + name, + val, + partition, + }) + } + + #[inline] + pub fn elapsed(name: String, val: Duration, partition: Option) -> Self { + Metric::Elapsed(MetricValue { + name, + val, + partition, + }) + } +} + +impl From for DfMetric { + fn from(metric: Metric) -> Self { + let (df_metric_val, partition) = match metric { + Metric::Counter(MetricValue { + name, + val, + partition, + }) => { + let count = Count::new(); + count.add(val); + ( + DfMetricValue::Count { + name: name.into(), + count, + }, + partition, + ) + } + Metric::Elapsed(MetricValue { + name, + val, + partition, + }) => { + let time = Time::new(); + time.add_duration(val); + ( + DfMetricValue::Time { + name: name.into(), + time, + }, + partition, + ) + } + }; + + DfMetric::new(df_metric_val, partition) + } +} + +/// A collector for metrics of a single read request. +/// +/// It can be cloned and shared among threads. +#[derive(Clone, Debug)] +pub struct ReadMetricsCollector { + pub(crate) metrics: Arc>>, +} + +impl Default for ReadMetricsCollector { + fn default() -> Self { + Self::new() + } +} + +impl ReadMetricsCollector { + pub fn new() -> Self { + Self { + metrics: Arc::new(Mutex::new(Vec::new())), + } + } + + pub fn collect(&self, metric: Metric) { + let mut metrics = self.metrics.lock().unwrap(); + metrics.push(metric); + } + + pub fn take_as_df_metrics(&self) -> MetricsSet { + let metrics: Vec<_> = std::mem::take(self.metrics.lock().unwrap().as_mut()); + let mut metrics_set = MetricsSet::new(); + for df_metric in metrics.into_iter().map(DfMetric::from) { + metrics_set.push(Arc::new(df_metric)); + } + + metrics_set + } +} + #[derive(Clone, Debug)] pub struct ReadRequest { /// Read request id. @@ -388,6 +500,8 @@ pub struct ReadRequest { pub predicate: PredicateRef, /// Read the rows in reverse order. pub order: ReadOrder, + /// Collector for metrics of this read request. + pub metrics_collector: ReadMetricsCollector, } impl TryFrom for ceresdbproto::remote_engine::TableReadRequest { @@ -448,6 +562,7 @@ impl TryFrom for ReadRequest { projected_schema, predicate, order, + metrics_collector: ReadMetricsCollector::new(), }) } } @@ -513,9 +628,6 @@ pub trait Table: std::fmt::Debug { async fn read(&self, request: ReadRequest) -> Result; /// Get the specific row according to the primary key. - /// TODO(xikai): object-safety is not ensured by now if the default - /// implementation is provided. Actually it is better to use the read - /// method to implement the get method. async fn get(&self, request: GetRequest) -> Result>; /// Read multiple partition of the table in parallel. From 6682360b13404181eeff9f0b1b64548d9fb5c742 Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Thu, 9 Mar 2023 20:43:22 +0800 Subject: [PATCH 02/19] collect metrics during merge --- .../src/instance/flush_compaction.rs | 1 + analytic_engine/src/instance/read.rs | 29 +++++--- analytic_engine/src/row_iter/merge.rs | 52 +++++++++++++- benchmarks/src/merge_memtable_bench.rs | 1 + benchmarks/src/merge_sst_bench.rs | 1 + benchmarks/src/sst_tools.rs | 1 + table_engine/src/table.rs | 70 ++++++++----------- 7 files changed, 105 insertions(+), 50 deletions(-) diff --git a/analytic_engine/src/instance/flush_compaction.rs b/analytic_engine/src/instance/flush_compaction.rs index b0f207c5bc..5250c21120 100644 --- a/analytic_engine/src/instance/flush_compaction.rs +++ b/analytic_engine/src/instance/flush_compaction.rs @@ -855,6 +855,7 @@ impl SpaceStore { let sequence = table_data.last_sequence(); let mut builder = MergeBuilder::new(MergeConfig { request_id, + metrics_collector: None, // no need to set deadline for compaction deadline: None, space_id, diff --git a/analytic_engine/src/instance/read.rs b/analytic_engine/src/instance/read.rs index 3a47b8e5bc..f98a1f1938 100644 --- a/analytic_engine/src/instance/read.rs +++ b/analytic_engine/src/instance/read.rs @@ -20,7 +20,7 @@ use table_engine::{ stream::{ self, ErrWithSource, PartitionedStreams, RecordBatchStream, SendableRecordBatchStream, }, - table::ReadRequest, + table::{Metric, ReadRequest}, }; use tokio::sync::mpsc::{self, Receiver}; @@ -66,6 +66,8 @@ pub enum Error { define_result!(Error); const RECORD_BATCH_READ_BUF_SIZE: usize = 1000; +const READ_METRIC_MERGE_SORT: &str = "do_merge_sort"; +const READ_METRIC_ITER_NUM: &str = "iter_num"; /// Check whether it needs to apply merge sorting when reading the table with /// the `table_options` by the `read_request`. @@ -91,13 +93,18 @@ impl Instance { let table_data = space_table.table_data(); - // Collect metrics. - table_data.metrics.on_read_request_begin(); - let iter_options = self.iter_options.clone(); let table_options = table_data.table_options(); - if need_merge_sort_streams(&table_data.table_options(), &request) { + // Collect metrics. + table_data.metrics.on_read_request_begin(); + let need_merge_sort = need_merge_sort_streams(&table_options, &request); + request.metrics_collector.collect(Metric::boolean( + READ_METRIC_MERGE_SORT.to_string(), + need_merge_sort, + )); + + if need_merge_sort { let merge_iters = self .build_merge_iters(table_data, &request, iter_options, &table_options) .await?; @@ -123,16 +130,16 @@ impl Instance { }; // Split iterators into `read_parallelism` groups. - let mut splited_iters: Vec<_> = std::iter::repeat_with(Vec::new) + let mut splitted_iters: Vec<_> = std::iter::repeat_with(Vec::new) .take(read_parallelism) .collect(); for (i, time_aligned_iter) in partitioned_iters.into_iter().enumerate() { - splited_iters[i % read_parallelism].push(time_aligned_iter); + splitted_iters[i % read_parallelism].push(time_aligned_iter); } let mut streams = Vec::with_capacity(read_parallelism); - for iters in splited_iters { + for iters in splitted_iters { let stream = iters_to_stream(iters, self.read_runtime(), &request.projected_schema); streams.push(stream); } @@ -172,6 +179,7 @@ impl Instance { for read_view in read_views { let merge_config = MergeConfig { request_id: request.request_id, + metrics_collector: Some(request.metrics_collector.clone()), deadline: request.opts.deadline, space_id: table_data.space_id, table_id: table_data.id, @@ -201,6 +209,11 @@ impl Instance { iters.push(dedup_iter); } + request.metrics_collector.collect(Metric::counter( + READ_METRIC_ITER_NUM.to_string(), + iters.len(), + )); + Ok(iters) } diff --git a/analytic_engine/src/row_iter/merge.rs b/analytic_engine/src/row_iter/merge.rs index 638dc78b64..9fdbd8fc0b 100644 --- a/analytic_engine/src/row_iter/merge.rs +++ b/analytic_engine/src/row_iter/merge.rs @@ -22,7 +22,10 @@ use common_util::{define_result, error::GenericError}; use futures::{future::try_join_all, StreamExt}; use log::{debug, info, trace}; use snafu::{ensure, Backtrace, ResultExt, Snafu}; -use table_engine::{predicate::PredicateRef, table::TableId}; +use table_engine::{ + predicate::PredicateRef, + table::{Metric, ReadMetricsCollector, TableId}, +}; use crate::{ row_iter::{ @@ -83,6 +86,7 @@ define_result!(Error); #[derive(Debug)] pub struct MergeConfig<'a> { pub request_id: RequestId, + pub metrics_collector: Option, /// None for background jobs, such as: compaction pub deadline: Option, pub space_id: SpaceId, @@ -228,6 +232,7 @@ impl<'a> MergeBuilder<'a> { self.config.merge_iter_options, self.config.reverse, Metrics::new(self.memtables.len(), sst_streams_num, sst_ids), + self.config.metrics_collector, )) } } @@ -556,14 +561,15 @@ impl Ord for HeapBufferedStream { } } +/// Metrics for merge iterator. pub struct Metrics { num_memtables: usize, num_ssts: usize, sst_ids: Vec, - /// Times to fetch rows from one stream. - times_fetch_rows_from_one: usize, /// Total rows collected using fetch_rows_from_one_stream(). total_rows_fetch_from_one: usize, + /// Times to fetch rows from one stream. + times_fetch_rows_from_one: usize, /// Times to fetch one row from multiple stream. times_fetch_row_from_multiple: usize, /// Create time of the metrics. @@ -591,6 +597,37 @@ impl Metrics { scan_count: 0, } } + + fn collect(&self, collector: &ReadMetricsCollector) { + // TODO: maybe we can define a macro to generate the code. + collector.collect(Metric::counter( + "num_memtables".to_string(), + self.num_memtables, + )); + + collector.collect(Metric::counter("num_ssts".to_string(), self.num_ssts)); + collector.collect(Metric::counter( + "times_fetch_rows_from_one".to_string(), + self.times_fetch_rows_from_one, + )); + collector.collect(Metric::counter( + "times_rows_fetch_from_one".to_string(), + self.times_fetch_row_from_multiple, + )); + collector.collect(Metric::counter( + "total_rows_fetch_from_one".to_string(), + self.total_rows_fetch_from_one, + )); + collector.collect(Metric::elapsed( + "init_duration".to_string(), + self.init_duration, + )); + collector.collect(Metric::elapsed( + "scan_duration".to_string(), + self.scan_duration, + )); + collector.collect(Metric::counter("scan_count".to_string(), self.scan_count)); + } } impl fmt::Debug for Metrics { @@ -630,6 +667,7 @@ pub struct MergeIterator { iter_options: IterOptions, reverse: bool, metrics: Metrics, + metrics_collector: Option, } impl MergeIterator { @@ -643,6 +681,7 @@ impl MergeIterator { iter_options: IterOptions, reverse: bool, metrics: Metrics, + metrics_collector: Option, ) -> Self { let heap_cap = streams.len(); let record_batch_builder = @@ -660,6 +699,7 @@ impl MergeIterator { iter_options, reverse, metrics, + metrics_collector, } } @@ -855,6 +895,10 @@ impl MergeIterator { impl Drop for MergeIterator { fn drop(&mut self) { + if let Some(collector) = &self.metrics_collector { + self.metrics.collect(collector); + } + info!( "Merge iterator dropped, table_id:{:?}, request_id:{}, metrics:{:?}, iter_options:{:?},", self.table_id, self.request_id, self.metrics, self.iter_options, @@ -925,6 +969,7 @@ mod tests { IterOptions::default(), false, Metrics::new(1, 1, vec![]), + None, ); check_iterator( @@ -978,6 +1023,7 @@ mod tests { IterOptions::default(), true, Metrics::new(1, 1, vec![]), + None, ); check_iterator( diff --git a/benchmarks/src/merge_memtable_bench.rs b/benchmarks/src/merge_memtable_bench.rs index 5901f5fb22..06700a07aa 100644 --- a/benchmarks/src/merge_memtable_bench.rs +++ b/benchmarks/src/merge_memtable_bench.rs @@ -142,6 +142,7 @@ impl MergeMemTableBench { let store_picker: ObjectStorePickerRef = Arc::new(self.store.clone()); let mut builder = MergeBuilder::new(MergeConfig { request_id, + metrics_collector: None, deadline: None, space_id, table_id, diff --git a/benchmarks/src/merge_sst_bench.rs b/benchmarks/src/merge_sst_bench.rs index 3dd260bba2..8f65879b9b 100644 --- a/benchmarks/src/merge_sst_bench.rs +++ b/benchmarks/src/merge_sst_bench.rs @@ -125,6 +125,7 @@ impl MergeSstBench { let store_picker: ObjectStorePickerRef = Arc::new(self.store.clone()); let mut builder = MergeBuilder::new(MergeConfig { request_id, + metrics_collector: None, deadline: None, space_id, table_id, diff --git a/benchmarks/src/sst_tools.rs b/benchmarks/src/sst_tools.rs index f2dec57519..24387b0c51 100644 --- a/benchmarks/src/sst_tools.rs +++ b/benchmarks/src/sst_tools.rs @@ -220,6 +220,7 @@ pub async fn merge_sst(config: MergeSstConfig, runtime: Arc) { let mut builder = MergeBuilder::new(MergeConfig { request_id, + metrics_collector: None, deadline: None, space_id, table_id, diff --git a/table_engine/src/table.rs b/table_engine/src/table.rs index 2a37280644..94390271ba 100644 --- a/table_engine/src/table.rs +++ b/table_engine/src/table.rs @@ -382,71 +382,63 @@ impl ReadOrder { pub struct MetricValue { pub name: String, pub val: T, - pub partition: Option, } #[derive(Clone, Debug)] pub enum Metric { + Boolean(MetricValue), Counter(MetricValue), Elapsed(MetricValue), } impl Metric { #[inline] - pub fn counter(name: String, val: usize, partition: Option) -> Self { - Metric::Counter(MetricValue { - name, - val, - partition, - }) + pub fn counter(name: String, val: usize) -> Self { + Metric::Counter(MetricValue { name, val }) } #[inline] - pub fn elapsed(name: String, val: Duration, partition: Option) -> Self { - Metric::Elapsed(MetricValue { - name, - val, - partition, - }) + pub fn elapsed(name: String, val: Duration) -> Self { + Metric::Elapsed(MetricValue { name, val }) + } + + #[inline] + pub fn boolean(name: String, val: bool) -> Self { + Metric::Boolean(MetricValue { name, val }) } } impl From for DfMetric { fn from(metric: Metric) -> Self { - let (df_metric_val, partition) = match metric { - Metric::Counter(MetricValue { - name, - val, - partition, - }) => { + let df_metric_val = match metric { + Metric::Counter(MetricValue { name, val }) => { let count = Count::new(); count.add(val); - ( - DfMetricValue::Count { - name: name.into(), - count, - }, - partition, - ) + DfMetricValue::Count { + name: name.into(), + count, + } } - Metric::Elapsed(MetricValue { - name, - val, - partition, - }) => { + Metric::Elapsed(MetricValue { name, val }) => { let time = Time::new(); time.add_duration(val); - ( - DfMetricValue::Time { - name: name.into(), - time, - }, - partition, - ) + DfMetricValue::Time { + name: name.into(), + time, + } + } + Metric::Boolean(MetricValue { name, val }) => { + let count = Count::new(); + // Use 0 for false, 1 for true. + count.add(val as usize); + DfMetricValue::Count { + name: name.into(), + count, + } } }; - DfMetric::new(df_metric_val, partition) + DfMetric::new(df_metric_val, None) } } From 5e1bf7da3c043dc0dd7d81b5fdadbb31306790bf Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Fri, 10 Mar 2023 11:53:09 +0800 Subject: [PATCH 03/19] add new component trace_metric --- Cargo.lock | 4 ++ Cargo.toml | 1 + components/trace_metric/Cargo.toml | 13 +++++ components/trace_metric/src/collector.rs | 65 ++++++++++++++++++++++++ components/trace_metric/src/lib.rs | 6 +++ components/trace_metric/src/metric.rs | 41 +++++++++++++++ 6 files changed, 130 insertions(+) create mode 100644 components/trace_metric/Cargo.toml create mode 100644 components/trace_metric/src/collector.rs create mode 100644 components/trace_metric/src/lib.rs create mode 100644 components/trace_metric/src/metric.rs diff --git a/Cargo.lock b/Cargo.lock index add93025d3..1d11fc15ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6341,6 +6341,10 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" +[[package]] +name = "trace_metric" +version = "1.0.0" + [[package]] name = "tracing" version = "0.1.36" diff --git a/Cargo.toml b/Cargo.toml index ca08b1005b..7165289917 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ members = [ "components/profile", "components/skiplist", "components/table_kv", + "components/trace_metric", "components/tracing_util", "df_operator", "integration_tests", diff --git a/components/trace_metric/Cargo.toml b/components/trace_metric/Cargo.toml new file mode 100644 index 0000000000..41c1806323 --- /dev/null +++ b/components/trace_metric/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "trace_metric" + +[package.version] +workspace = true + +[package.authors] +workspace = true + +[package.edition] +workspace = true + +[dependencies] diff --git a/components/trace_metric/src/collector.rs b/components/trace_metric/src/collector.rs new file mode 100644 index 0000000000..95fafc3b59 --- /dev/null +++ b/components/trace_metric/src/collector.rs @@ -0,0 +1,65 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. + +use std::sync::{Arc, Mutex}; + +use crate::metric::Metric; + +/// A collector for metrics of a single read request. +/// +/// It can be cloned and shared among threads. +#[derive(Clone, Debug)] +pub struct Collector { + name: String, + metrics: Arc>>, + children: Arc>>, +} + +impl Collector { + /// Create a new collector with the given name. + pub fn new(name: String) -> Self { + Self { + name, + metrics: Arc::new(Mutex::new(vec![])), + children: Arc::new(Mutex::new(vec![])), + } + } + + /// Collect a metric. + pub fn collect(&self, metric: Metric) { + let mut metrics = self.metrics.lock().unwrap(); + metrics.push(metric); + } + + /// Span a child collector with a given name. + pub fn span(&self, name: String) -> Collector { + let mut children = self.children.lock().unwrap(); + let child = Self::new(name); + children.push(child.clone()); + child + } + + #[inline] + pub fn name(&self) -> &str { + &self.name + } + + /// Visit all metrics in the collector, excluding the metrics belonging to + /// the children. + pub fn visit_metrics(&self, f: &mut dyn FnMut(&Metric)) { + let metrics = self.metrics.lock().unwrap(); + for metric in metrics.iter() { + f(metric); + } + } + + /// Visit all the collectors including itself and its children. + pub fn visit(&self, f: &mut dyn FnMut(&Collector)) { + f(self); + let children = self.children.lock().unwrap(); + for child in children.iter() { + child.visit(f); + } + } +} diff --git a/components/trace_metric/src/lib.rs b/components/trace_metric/src/lib.rs new file mode 100644 index 0000000000..4540763d06 --- /dev/null +++ b/components/trace_metric/src/lib.rs @@ -0,0 +1,6 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. + +pub mod collector; +pub mod metric; diff --git a/components/trace_metric/src/metric.rs b/components/trace_metric/src/metric.rs new file mode 100644 index 0000000000..07db95b8e4 --- /dev/null +++ b/components/trace_metric/src/metric.rs @@ -0,0 +1,41 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. + +use std::{fmt, time::Duration}; + +#[derive(Clone)] +pub struct MetricValue { + pub name: String, + pub val: T, +} + +#[derive(Clone, Debug)] +pub enum Metric { + Boolean(MetricValue), + Counter(MetricValue), + Elapsed(MetricValue), +} + +impl Metric { + #[inline] + pub fn counter(name: String, val: usize) -> Self { + Metric::Counter(MetricValue { name, val }) + } + + #[inline] + pub fn elapsed(name: String, val: Duration) -> Self { + Metric::Elapsed(MetricValue { name, val }) + } + + #[inline] + pub fn boolean(name: String, val: bool) -> Self { + Metric::Boolean(MetricValue { name, val }) + } +} + +impl fmt::Debug for MetricValue { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}={:?}", self.name, self.val) + } +} From ff62722836502387524affbeec32411abfd79b72 Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Fri, 10 Mar 2023 14:07:43 +0800 Subject: [PATCH 04/19] add derive crate --- components/trace_metric/Cargo.toml | 2 -- components/trace_metric/src/lib.rs | 3 +++ components/trace_metric_derive/Cargo.toml | 20 ++++++++++++++++++++ components/trace_metric_derive/src/lib.rs | 2 ++ 4 files changed, 25 insertions(+), 2 deletions(-) create mode 100644 components/trace_metric_derive/Cargo.toml create mode 100644 components/trace_metric_derive/src/lib.rs diff --git a/components/trace_metric/Cargo.toml b/components/trace_metric/Cargo.toml index 41c1806323..899ffa37eb 100644 --- a/components/trace_metric/Cargo.toml +++ b/components/trace_metric/Cargo.toml @@ -9,5 +9,3 @@ workspace = true [package.edition] workspace = true - -[dependencies] diff --git a/components/trace_metric/src/lib.rs b/components/trace_metric/src/lib.rs index 4540763d06..b67dc6993e 100644 --- a/components/trace_metric/src/lib.rs +++ b/components/trace_metric/src/lib.rs @@ -4,3 +4,6 @@ pub mod collector; pub mod metric; + +pub use collector::Collector; +pub use metric::Metric; diff --git a/components/trace_metric_derive/Cargo.toml b/components/trace_metric_derive/Cargo.toml new file mode 100644 index 0000000000..102510424b --- /dev/null +++ b/components/trace_metric_derive/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "trace_metric_derive" + +[package.version] +workspace = true + +[package.authors] +workspace = true + +[package.edition] +workspace = true + +[lib] +proc-macro = true + +[dependencies] +proc-macro2 = "1.0" +quote = "1.0" +syn = { version="1.0", features=["full"] } +trace_metric = { workspace = true } \ No newline at end of file diff --git a/components/trace_metric_derive/src/lib.rs b/components/trace_metric_derive/src/lib.rs new file mode 100644 index 0000000000..668e3995cf --- /dev/null +++ b/components/trace_metric_derive/src/lib.rs @@ -0,0 +1,2 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + From 7c41bde4f19f5c35c7b04aac3dfdd794c038a4b9 Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Fri, 10 Mar 2023 17:14:26 +0800 Subject: [PATCH 05/19] impl proc macro for trace_metric --- Cargo.lock | 19 +++ Cargo.toml | 5 + components/trace_metric/Cargo.toml | 3 + components/trace_metric/src/collector.rs | 4 +- components/trace_metric/src/lib.rs | 1 + components/trace_metric_derive/Cargo.toml | 3 +- components/trace_metric_derive/src/builder.rs | 144 ++++++++++++++++++ components/trace_metric_derive/src/lib.rs | 12 ++ .../trace_metric_derive_tests/Cargo.toml | 14 ++ .../trace_metric_derive_tests/src/lib.rs | 45 ++++++ 10 files changed, 246 insertions(+), 4 deletions(-) create mode 100644 components/trace_metric_derive/src/builder.rs create mode 100644 components/trace_metric_derive_tests/Cargo.toml create mode 100644 components/trace_metric_derive_tests/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 1d11fc15ee..23dbdd15b0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6344,6 +6344,25 @@ checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" [[package]] name = "trace_metric" version = "1.0.0" +dependencies = [ + "trace_metric_derive", +] + +[[package]] +name = "trace_metric_derive" +version = "1.0.0" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "trace_metric_derive_tests" +version = "1.0.0" +dependencies = [ + "trace_metric", +] [[package]] name = "tracing" diff --git a/Cargo.toml b/Cargo.toml index 7165289917..184655d147 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,8 @@ members = [ "components/skiplist", "components/table_kv", "components/trace_metric", + "components/trace_metric_derive", + "components/trace_metric_derive_tests", "components/tracing_util", "df_operator", "integration_tests", @@ -113,6 +115,9 @@ table_engine = { path = "table_engine" } table_kv = { path = "components/table_kv" } tempfile = "3.1.0" tracing_util = { path = "components/tracing_util" } +trace_metric = { path = "components/trace_metric" } +trace_metric_derive = { path = "components/trace_metric_derive" } +trace_metric_derive_tests = { path = "components/trace_metric_derive_tests" } tonic = "0.8.1" tokio = { version = "1.25", features = ["full"] } wal = { path = "wal" } diff --git a/components/trace_metric/Cargo.toml b/components/trace_metric/Cargo.toml index 899ffa37eb..070a830879 100644 --- a/components/trace_metric/Cargo.toml +++ b/components/trace_metric/Cargo.toml @@ -9,3 +9,6 @@ workspace = true [package.edition] workspace = true + +[dependencies] +trace_metric_derive = { workspace = true } diff --git a/components/trace_metric/src/collector.rs b/components/trace_metric/src/collector.rs index 95fafc3b59..1f65481662 100644 --- a/components/trace_metric/src/collector.rs +++ b/components/trace_metric/src/collector.rs @@ -47,7 +47,7 @@ impl Collector { /// Visit all metrics in the collector, excluding the metrics belonging to /// the children. - pub fn visit_metrics(&self, f: &mut dyn FnMut(&Metric)) { + pub fn visit_metrics(&self, f: &mut impl FnMut(&Metric)) { let metrics = self.metrics.lock().unwrap(); for metric in metrics.iter() { f(metric); @@ -55,7 +55,7 @@ impl Collector { } /// Visit all the collectors including itself and its children. - pub fn visit(&self, f: &mut dyn FnMut(&Collector)) { + pub fn visit(&self, f: &mut impl FnMut(&Collector)) { f(self); let children = self.children.lock().unwrap(); for child in children.iter() { diff --git a/components/trace_metric/src/lib.rs b/components/trace_metric/src/lib.rs index b67dc6993e..68fa44372c 100644 --- a/components/trace_metric/src/lib.rs +++ b/components/trace_metric/src/lib.rs @@ -7,3 +7,4 @@ pub mod metric; pub use collector::Collector; pub use metric::Metric; +pub use trace_metric_derive::TracedMetrics; diff --git a/components/trace_metric_derive/Cargo.toml b/components/trace_metric_derive/Cargo.toml index 102510424b..63be507837 100644 --- a/components/trace_metric_derive/Cargo.toml +++ b/components/trace_metric_derive/Cargo.toml @@ -16,5 +16,4 @@ proc-macro = true [dependencies] proc-macro2 = "1.0" quote = "1.0" -syn = { version="1.0", features=["full"] } -trace_metric = { workspace = true } \ No newline at end of file +syn = { version = "1.0", features = ["full"] } diff --git a/components/trace_metric_derive/src/builder.rs b/components/trace_metric_derive/src/builder.rs new file mode 100644 index 0000000000..2d2c8c0dea --- /dev/null +++ b/components/trace_metric_derive/src/builder.rs @@ -0,0 +1,144 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +use proc_macro::TokenStream; +use quote::quote; +use syn::{DeriveInput, Field, Generics, Ident}; + +const COLLECTOR_FIELD_TOKENS: &str = "(collector)"; +const COUNTER_FIELD_TOKENS: &str = "(counter)"; +const ELAPSED_FIELD_TOKENS: &str = "(elapsed)"; +const BOOLEAN_FIELD_TOKENS: &str = "(boolean)"; + +enum MetricType { + Counter, + Elapsed, + Boolean, +} + +impl MetricType { + fn try_from_tokens(s: &str) -> Option { + if s == COUNTER_FIELD_TOKENS { + Some(Self::Counter) + } else if s == ELAPSED_FIELD_TOKENS { + Some(Self::Elapsed) + } else if s == BOOLEAN_FIELD_TOKENS { + Some(Self::Boolean) + } else { + None + } + } +} + +struct MetricField { + metric_type: MetricType, + field_name: Ident, +} + +impl MetricField { + fn try_from_field(field: Field) -> Option { + for attr in field.attrs.iter() { + if !attr.path.is_ident("metric") { + continue; + } + + let field_name = field.ident.expect("Metric field must have a name"); + let metric_type_tokens = attr.tokens.to_string(); + let metric_type = + MetricType::try_from_tokens(&metric_type_tokens).expect("Unknown metric type"); + return Some(Self { + metric_type, + field_name, + }); + } + + None + } +} + +pub struct Builder { + struct_name: Ident, + metric_fields: Vec, + collector_field: Ident, + generics: Generics, +} + +impl Builder { + pub fn parse_from_ast(ast: DeriveInput) -> Self { + let struct_name = ast.ident; + let (metric_fields, collector_field) = match ast.data { + syn::Data::Struct(syn::DataStruct { + fields: syn::Fields::Named(syn::FieldsNamed { named, .. }), + .. + }) => { + let mut metric_fields = Vec::new(); + let mut collector_field = None; + for field in named { + if Self::is_collector_field(&field) { + collector_field = Some(field); + continue; + } + if let Some(metric_field) = MetricField::try_from_field(field) { + metric_fields.push(metric_field); + } + } + ( + metric_fields, + collector_field + .expect("TracedMetrics must have a collector field") + .ident + .expect("TracedMetrics collector field must be named"), + ) + } + _ => panic!("TracedMetrics only supports struct with named fields"), + }; + + Self { + struct_name, + metric_fields, + collector_field, + generics: ast.generics, + } + } + + pub fn build(&self) -> TokenStream { + let mut collect_statements = Vec::with_capacity(self.metric_fields.len()); + let collector_field = &self.collector_field; + for metric_field in self.metric_fields.iter() { + let field_name = &metric_field.field_name; + let metric = match metric_field.metric_type { + MetricType::Counter => { + quote! { trace_metric::Metric::counter("#field_name".to_string(), self.#field_name) } + } + MetricType::Elapsed => { + quote! { trace_metric::Metric::elapsed("#field_name".to_string(), self.#field_name) } + } + MetricType::Boolean => { + quote! { trace_metric::Metric::boolean("#field_name".to_string(), self.#field_name) } + } + }; + let statement = quote! { + self.#collector_field.collect(#metric); + }; + collect_statements.push(statement); + } + + let where_clause = &self.generics.where_clause; + let generics = &self.generics; + let struct_name = &self.struct_name; + quote! { + impl #generics Drop for #struct_name #generics #where_clause { + fn drop(&mut self) { + #(#collect_statements)* + } + } + } + .into() + } + + fn is_collector_field(field: &Field) -> bool { + field.attrs.iter().any(|attr| { + attr.path.is_ident("metric") + && attr.tokens.to_string().as_str() == COLLECTOR_FIELD_TOKENS + }) + } +} diff --git a/components/trace_metric_derive/src/lib.rs b/components/trace_metric_derive/src/lib.rs index 668e3995cf..7e04abed2e 100644 --- a/components/trace_metric_derive/src/lib.rs +++ b/components/trace_metric_derive/src/lib.rs @@ -1,2 +1,14 @@ // Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +use proc_macro::TokenStream; +use syn::{parse_macro_input, DeriveInput}; + +mod builder; + +use builder::Builder; + +#[proc_macro_derive(TracedMetrics, attributes(metric))] +pub fn derive(input: TokenStream) -> TokenStream { + let ast = parse_macro_input!(input as DeriveInput); + Builder::parse_from_ast(ast).build() +} diff --git a/components/trace_metric_derive_tests/Cargo.toml b/components/trace_metric_derive_tests/Cargo.toml new file mode 100644 index 0000000000..435253c275 --- /dev/null +++ b/components/trace_metric_derive_tests/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "trace_metric_derive_tests" + +[package.version] +workspace = true + +[package.authors] +workspace = true + +[package.edition] +workspace = true + +[dependencies] +trace_metric = { workspace = true } diff --git a/components/trace_metric_derive_tests/src/lib.rs b/components/trace_metric_derive_tests/src/lib.rs new file mode 100644 index 0000000000..1617fa2085 --- /dev/null +++ b/components/trace_metric_derive_tests/src/lib.rs @@ -0,0 +1,45 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +use std::time::Duration; + +use trace_metric::{Collector, TracedMetrics}; + +#[derive(Debug, Clone, TracedMetrics)] +pub struct ExampleMetrics { + #[metric(counter)] + pub counter: usize, + #[metric(elapsed)] + pub elapsed: Duration, + #[metric(boolean)] + pub boolean: bool, + pub foo: String, + + #[metric(collector)] + pub collector: Collector, +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn basic() { + let collector = Collector::new("test".to_string()); + { + let _ = ExampleMetrics { + counter: 1, + elapsed: Duration::from_secs(1), + boolean: true, + foo: "bar".to_owned(), + collector: collector.clone(), + }; + } + + let mut metric_num = 0; + collector.visit_metrics(&mut |_| { + metric_num += 1; + }); + + assert_eq!(metric_num, 3) + } +} From 3da8fe4dc9e02d683270ba2e683e3339fa8d5efd Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Fri, 10 Mar 2023 17:43:46 +0800 Subject: [PATCH 06/19] use the trace_metric crate --- Cargo.lock | 4 + analytic_engine/Cargo.toml | 1 + .../src/instance/flush_compaction.rs | 3 +- analytic_engine/src/instance/read.rs | 5 +- analytic_engine/src/row_iter/merge.rs | 90 +++++--------- analytic_engine/src/table/mod.rs | 7 +- analytic_engine/src/tests/table.rs | 8 +- benchmarks/Cargo.toml | 1 + benchmarks/src/merge_memtable_bench.rs | 3 +- benchmarks/src/merge_sst_bench.rs | 3 +- benchmarks/src/sst_tools.rs | 3 +- system_catalog/Cargo.toml | 1 + system_catalog/src/sys_catalog_table.rs | 7 +- table_engine/Cargo.toml | 1 + table_engine/src/provider.rs | 10 +- table_engine/src/table.rs | 111 +----------------- 16 files changed, 70 insertions(+), 188 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 23dbdd15b0..7f624a3b07 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -111,6 +111,7 @@ dependencies = [ "table_kv", "tempfile", "tokio", + "trace_metric", "wal", "xorfilter-rs", ] @@ -656,6 +657,7 @@ dependencies = [ "table_engine", "table_kv", "tokio", + "trace_metric", "wal", "zstd 0.12.1+zstd.1.5.2", ] @@ -5869,6 +5871,7 @@ dependencies = [ "snafu 0.6.10", "table_engine", "tokio", + "trace_metric", ] [[package]] @@ -5894,6 +5897,7 @@ dependencies = [ "smallvec", "snafu 0.6.10", "tokio", + "trace_metric", ] [[package]] diff --git a/analytic_engine/Cargo.toml b/analytic_engine/Cargo.toml index 3047957eca..6aeb9fab3b 100644 --- a/analytic_engine/Cargo.toml +++ b/analytic_engine/Cargo.toml @@ -46,6 +46,7 @@ table_engine = { workspace = true } table_kv = { workspace = true } tempfile = { workspace = true, optional = true } tokio = { workspace = true } +trace_metric = { workspace = true } wal = { workspace = true } xorfilter-rs = { workspace = true } diff --git a/analytic_engine/src/instance/flush_compaction.rs b/analytic_engine/src/instance/flush_compaction.rs index 5250c21120..df8f0d7317 100644 --- a/analytic_engine/src/instance/flush_compaction.rs +++ b/analytic_engine/src/instance/flush_compaction.rs @@ -28,6 +28,7 @@ use log::{debug, error, info}; use snafu::{Backtrace, ResultExt, Snafu}; use table_engine::{predicate::Predicate, table::Result as TableResult}; use tokio::sync::oneshot; +use trace_metric::Collector; use wal::manager::WalLocation; use crate::{ @@ -855,7 +856,7 @@ impl SpaceStore { let sequence = table_data.last_sequence(); let mut builder = MergeBuilder::new(MergeConfig { request_id, - metrics_collector: None, + metrics_collector: Collector::new("compaction".to_string()), // no need to set deadline for compaction deadline: None, space_id, diff --git a/analytic_engine/src/instance/read.rs b/analytic_engine/src/instance/read.rs index f98a1f1938..6962d1d04d 100644 --- a/analytic_engine/src/instance/read.rs +++ b/analytic_engine/src/instance/read.rs @@ -20,9 +20,10 @@ use table_engine::{ stream::{ self, ErrWithSource, PartitionedStreams, RecordBatchStream, SendableRecordBatchStream, }, - table::{Metric, ReadRequest}, + table::ReadRequest, }; use tokio::sync::mpsc::{self, Receiver}; +use trace_metric::Metric; use crate::{ instance::Instance, @@ -179,7 +180,7 @@ impl Instance { for read_view in read_views { let merge_config = MergeConfig { request_id: request.request_id, - metrics_collector: Some(request.metrics_collector.clone()), + metrics_collector: request.metrics_collector.clone(), deadline: request.opts.deadline, space_id: table_data.space_id, table_id: table_data.id, diff --git a/analytic_engine/src/row_iter/merge.rs b/analytic_engine/src/row_iter/merge.rs index 9fdbd8fc0b..3949ef8f42 100644 --- a/analytic_engine/src/row_iter/merge.rs +++ b/analytic_engine/src/row_iter/merge.rs @@ -20,12 +20,10 @@ use common_types::{ }; use common_util::{define_result, error::GenericError}; use futures::{future::try_join_all, StreamExt}; -use log::{debug, info, trace}; +use log::{debug, trace}; use snafu::{ensure, Backtrace, ResultExt, Snafu}; -use table_engine::{ - predicate::PredicateRef, - table::{Metric, ReadMetricsCollector, TableId}, -}; +use table_engine::{predicate::PredicateRef, table::TableId}; +use trace_metric::{Collector, TracedMetrics}; use crate::{ row_iter::{ @@ -86,7 +84,7 @@ define_result!(Error); #[derive(Debug)] pub struct MergeConfig<'a> { pub request_id: RequestId, - pub metrics_collector: Option, + pub metrics_collector: Collector, /// None for background jobs, such as: compaction pub deadline: Option, pub space_id: SpaceId, @@ -231,8 +229,12 @@ impl<'a> MergeBuilder<'a> { self.ssts, self.config.merge_iter_options, self.config.reverse, - Metrics::new(self.memtables.len(), sst_streams_num, sst_ids), - self.config.metrics_collector, + Metrics::new( + self.memtables.len(), + sst_streams_num, + sst_ids, + self.config.metrics_collector.clone(), + ), )) } } @@ -562,28 +564,44 @@ impl Ord for HeapBufferedStream { } /// Metrics for merge iterator. +#[derive(TracedMetrics)] pub struct Metrics { + #[metric(counter)] num_memtables: usize, + #[metric(counter)] num_ssts: usize, sst_ids: Vec, /// Total rows collected using fetch_rows_from_one_stream(). + #[metric(counter)] total_rows_fetch_from_one: usize, /// Times to fetch rows from one stream. + #[metric(counter)] times_fetch_rows_from_one: usize, /// Times to fetch one row from multiple stream. + #[metric(counter)] times_fetch_row_from_multiple: usize, /// Create time of the metrics. create_at: Instant, /// Init time cost of the metrics. + #[metric(elapsed)] init_duration: Duration, /// Scan time cost of the metrics. + #[metric(elapsed)] scan_duration: Duration, /// Scan count + #[metric(counter)] scan_count: usize, + #[metric(collector)] + metrics_collector: Collector, } impl Metrics { - fn new(num_memtables: usize, num_ssts: usize, sst_ids: Vec) -> Self { + fn new( + num_memtables: usize, + num_ssts: usize, + sst_ids: Vec, + collector: Collector, + ) -> Self { Self { num_memtables, num_ssts, @@ -595,39 +613,9 @@ impl Metrics { init_duration: Duration::default(), scan_duration: Duration::default(), scan_count: 0, + metrics_collector: collector, } } - - fn collect(&self, collector: &ReadMetricsCollector) { - // TODO: maybe we can define a macro to generate the code. - collector.collect(Metric::counter( - "num_memtables".to_string(), - self.num_memtables, - )); - - collector.collect(Metric::counter("num_ssts".to_string(), self.num_ssts)); - collector.collect(Metric::counter( - "times_fetch_rows_from_one".to_string(), - self.times_fetch_rows_from_one, - )); - collector.collect(Metric::counter( - "times_rows_fetch_from_one".to_string(), - self.times_fetch_row_from_multiple, - )); - collector.collect(Metric::counter( - "total_rows_fetch_from_one".to_string(), - self.total_rows_fetch_from_one, - )); - collector.collect(Metric::elapsed( - "init_duration".to_string(), - self.init_duration, - )); - collector.collect(Metric::elapsed( - "scan_duration".to_string(), - self.scan_duration, - )); - collector.collect(Metric::counter("scan_count".to_string(), self.scan_count)); - } } impl fmt::Debug for Metrics { @@ -667,7 +655,6 @@ pub struct MergeIterator { iter_options: IterOptions, reverse: bool, metrics: Metrics, - metrics_collector: Option, } impl MergeIterator { @@ -681,7 +668,6 @@ impl MergeIterator { iter_options: IterOptions, reverse: bool, metrics: Metrics, - metrics_collector: Option, ) -> Self { let heap_cap = streams.len(); let record_batch_builder = @@ -699,7 +685,6 @@ impl MergeIterator { iter_options, reverse, metrics, - metrics_collector, } } @@ -893,19 +878,6 @@ impl MergeIterator { } } -impl Drop for MergeIterator { - fn drop(&mut self) { - if let Some(collector) = &self.metrics_collector { - self.metrics.collect(collector); - } - - info!( - "Merge iterator dropped, table_id:{:?}, request_id:{}, metrics:{:?}, iter_options:{:?},", - self.table_id, self.request_id, self.metrics, self.iter_options, - ); - } -} - #[async_trait] impl RecordBatchWithKeyIterator for MergeIterator { type Error = Error; @@ -968,8 +940,7 @@ mod tests { Vec::new(), IterOptions::default(), false, - Metrics::new(1, 1, vec![]), - None, + Metrics::new(1, 1, vec![], Collector::new("".to_string())), ); check_iterator( @@ -1022,8 +993,7 @@ mod tests { Vec::new(), IterOptions::default(), true, - Metrics::new(1, 1, vec![]), - None, + Metrics::new(1, 1, vec![], Collector::new("".to_string())), ); check_iterator( diff --git a/analytic_engine/src/table/mod.rs b/analytic_engine/src/table/mod.rs index 6e0ce5d3f4..6de53d6e49 100644 --- a/analytic_engine/src/table/mod.rs +++ b/analytic_engine/src/table/mod.rs @@ -17,11 +17,12 @@ use table_engine::{ stream::{PartitionedStreams, SendableRecordBatchStream}, table::{ AlterOptions, AlterSchema, AlterSchemaRequest, Compact, Flush, FlushRequest, Get, - GetInvalidPrimaryKey, GetNullPrimaryKey, GetRequest, ReadMetricsCollector, ReadOptions, - ReadOrder, ReadRequest, Result, Scan, Table, TableId, TableStats, Write, WriteRequest, + GetInvalidPrimaryKey, GetNullPrimaryKey, GetRequest, ReadOptions, ReadOrder, ReadRequest, + Result, Scan, Table, TableId, TableStats, Write, WriteRequest, }, }; use tokio::sync::oneshot; +use trace_metric::Collector; use self::data::TableDataRef; use crate::{ @@ -179,7 +180,7 @@ impl Table for TableImpl { projected_schema: request.projected_schema, predicate, order: ReadOrder::None, - metrics_collector: ReadMetricsCollector::new(), + metrics_collector: Collector::new("".to_string()), }; let mut batch_stream = self .read(read_request) diff --git a/analytic_engine/src/tests/table.rs b/analytic_engine/src/tests/table.rs index ff99fdd312..50db341a1d 100644 --- a/analytic_engine/src/tests/table.rs +++ b/analytic_engine/src/tests/table.rs @@ -20,11 +20,9 @@ use table_engine::{ self, engine::{CreateTableRequest, TableState}, predicate::Predicate, - table::{ - GetRequest, ReadMetricsCollector, ReadOptions, ReadOrder, ReadRequest, SchemaId, TableId, - TableSeq, - }, + table::{GetRequest, ReadOptions, ReadOrder, ReadRequest, SchemaId, TableId, TableSeq}, }; +use trace_metric::Collector; use crate::{table_options, tests::row_util}; @@ -188,7 +186,7 @@ pub fn new_read_all_request_with_order( projected_schema: ProjectedSchema::no_projection(schema), predicate: Arc::new(Predicate::empty()), order, - metrics_collector: ReadMetricsCollector::new(), + metrics_collector: Collector::new("".to_string()), } } diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index c8e63f99e9..ac1b59d295 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -32,6 +32,7 @@ snafu = { workspace = true } table_engine = { workspace = true } table_kv = { workspace = true } tokio = { workspace = true } +trace_metric = { workspace = true } wal = { workspace = true } zstd = { workspace = true } diff --git a/benchmarks/src/merge_memtable_bench.rs b/benchmarks/src/merge_memtable_bench.rs index 06700a07aa..e5b0f9ced5 100644 --- a/benchmarks/src/merge_memtable_bench.rs +++ b/benchmarks/src/merge_memtable_bench.rs @@ -35,6 +35,7 @@ use common_util::runtime::Runtime; use log::info; use object_store::{LocalFileSystem, ObjectStoreRef}; use table_engine::{predicate::Predicate, table::TableId}; +use trace_metric::Collector; use crate::{config::MergeMemTableBenchConfig, util}; @@ -142,7 +143,7 @@ impl MergeMemTableBench { let store_picker: ObjectStorePickerRef = Arc::new(self.store.clone()); let mut builder = MergeBuilder::new(MergeConfig { request_id, - metrics_collector: None, + metrics_collector: Collector::new("".to_string()), deadline: None, space_id, table_id, diff --git a/benchmarks/src/merge_sst_bench.rs b/benchmarks/src/merge_sst_bench.rs index 8f65879b9b..c2eaafa6e7 100644 --- a/benchmarks/src/merge_sst_bench.rs +++ b/benchmarks/src/merge_sst_bench.rs @@ -29,6 +29,7 @@ use log::info; use object_store::{LocalFileSystem, ObjectStoreRef}; use table_engine::{predicate::Predicate, table::TableId}; use tokio::sync::mpsc::{self, UnboundedReceiver}; +use trace_metric::Collector; use crate::{config::MergeSstBenchConfig, util}; @@ -125,7 +126,7 @@ impl MergeSstBench { let store_picker: ObjectStorePickerRef = Arc::new(self.store.clone()); let mut builder = MergeBuilder::new(MergeConfig { request_id, - metrics_collector: None, + metrics_collector: Collector::new("".to_string()), deadline: None, space_id, table_id, diff --git a/benchmarks/src/sst_tools.rs b/benchmarks/src/sst_tools.rs index 24387b0c51..ad97d0cf2d 100644 --- a/benchmarks/src/sst_tools.rs +++ b/benchmarks/src/sst_tools.rs @@ -33,6 +33,7 @@ use object_store::{LocalFileSystem, ObjectStoreRef, Path}; use serde::Deserialize; use table_engine::{predicate::Predicate, table::TableId}; use tokio::sync::mpsc; +use trace_metric::Collector; use crate::{config::BenchPredicate, util}; @@ -220,7 +221,7 @@ pub async fn merge_sst(config: MergeSstConfig, runtime: Arc) { let mut builder = MergeBuilder::new(MergeConfig { request_id, - metrics_collector: None, + metrics_collector: Collector::new("".to_string()), deadline: None, space_id, table_id, diff --git a/system_catalog/Cargo.toml b/system_catalog/Cargo.toml index 9bd9cd69b4..ca58403126 100644 --- a/system_catalog/Cargo.toml +++ b/system_catalog/Cargo.toml @@ -24,3 +24,4 @@ prost = { workspace = true } snafu = { workspace = true } table_engine = { workspace = true } tokio = { workspace = true } +trace_metric = { workspace = true } diff --git a/system_catalog/src/sys_catalog_table.rs b/system_catalog/src/sys_catalog_table.rs index 10cac95133..24ea52cd2b 100644 --- a/system_catalog/src/sys_catalog_table.rs +++ b/system_catalog/src/sys_catalog_table.rs @@ -35,11 +35,12 @@ use table_engine::{ }, predicate::PredicateBuilder, table::{ - GetRequest, ReadMetricsCollector, ReadOptions, ReadOrder, ReadRequest, SchemaId, TableId, - TableInfo, TableRef, WriteRequest, + GetRequest, ReadOptions, ReadOrder, ReadRequest, SchemaId, TableId, TableInfo, TableRef, + WriteRequest, }, }; use tokio::sync::Mutex; +use trace_metric::Collector; use crate::{SYSTEM_SCHEMA_ID, SYS_CATALOG_TABLE_ID, SYS_CATALOG_TABLE_NAME}; @@ -533,7 +534,7 @@ impl SysCatalogTable { projected_schema: ProjectedSchema::no_projection(self.table.schema()), predicate: PredicateBuilder::default().build(), order: ReadOrder::None, - metrics_collector: ReadMetricsCollector::new(), + metrics_collector: Collector::new("open_sys_catalog_table".to_string()), }; let mut batch_stream = self.table.read(read_request).await.context(ReadTable)?; diff --git a/table_engine/Cargo.toml b/table_engine/Cargo.toml index bbab342295..be51a9062f 100644 --- a/table_engine/Cargo.toml +++ b/table_engine/Cargo.toml @@ -30,6 +30,7 @@ serde = { workspace = true } smallvec = { workspace = true } snafu = { workspace = true } tokio = { workspace = true } +trace_metric = { workspace = true } [dev-dependencies] env_logger = { workspace = true } diff --git a/table_engine/src/provider.rs b/table_engine/src/provider.rs index 7d1ce80a84..b16799c26b 100644 --- a/table_engine/src/provider.rs +++ b/table_engine/src/provider.rs @@ -26,11 +26,12 @@ use datafusion::{ use datafusion_expr::{Expr, TableSource, TableType}; use df_operator::visitor; use log::debug; +use trace_metric::Collector; use crate::{ predicate::{PredicateBuilder, PredicateRef}, stream::{SendableRecordBatchStream, ToDfStream}, - table::{self, ReadMetricsCollector, ReadOptions, ReadOrder, ReadRequest, TableRef}, + table::{self, ReadOptions, ReadOrder, ReadRequest, TableRef}, }; #[derive(Clone, Debug)] @@ -169,7 +170,7 @@ impl TableProviderAdapter { predicate, deadline, stream_state: Mutex::new(ScanStreamState::default()), - metrics_collector: ReadMetricsCollector::new(), + metrics_collector: Collector::new("scan_table".to_string()), }; scan_table.maybe_init_stream(state).await?; @@ -296,7 +297,7 @@ struct ScanTable { read_parallelism: usize, predicate: PredicateRef, deadline: Option, - metrics_collector: ReadMetricsCollector, + metrics_collector: Collector, stream_state: Mutex, } @@ -391,7 +392,8 @@ impl ExecutionPlan for ScanTable { } fn metrics(&self) -> Option { - Some(self.metrics_collector.take_as_df_metrics()) + // TODO: Convert metrics_collector to MetricsSet. + None } fn statistics(&self) -> Statistics { diff --git a/table_engine/src/table.rs b/table_engine/src/table.rs index 94390271ba..abddbe2d7a 100644 --- a/table_engine/src/table.rs +++ b/table_engine/src/table.rs @@ -7,7 +7,7 @@ use std::{ fmt, sync::{ atomic::{AtomicU32, AtomicU64, Ordering}, - Arc, Mutex, + Arc, }, time::{Duration, Instant}, }; @@ -23,11 +23,9 @@ use common_types::{ schema::{RecordSchemaWithKey, Schema, Version}, }; use common_util::error::{BoxError, GenericError}; -use datafusion::physical_plan::metrics::{ - Count, Metric as DfMetric, MetricValue as DfMetricValue, MetricsSet, Time, -}; use serde::Deserialize; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; +use trace_metric::Collector; use crate::{ engine::TableState, @@ -378,107 +376,6 @@ impl ReadOrder { } } -#[derive(Clone, Debug)] -pub struct MetricValue { - pub name: String, - pub val: T, -} - -#[derive(Clone, Debug)] -pub enum Metric { - Boolean(MetricValue), - Counter(MetricValue), - Elapsed(MetricValue), -} - -impl Metric { - #[inline] - pub fn counter(name: String, val: usize) -> Self { - Metric::Counter(MetricValue { name, val }) - } - - #[inline] - pub fn elapsed(name: String, val: Duration) -> Self { - Metric::Elapsed(MetricValue { name, val }) - } - - #[inline] - pub fn boolean(name: String, val: bool) -> Self { - Metric::Boolean(MetricValue { name, val }) - } -} - -impl From for DfMetric { - fn from(metric: Metric) -> Self { - let df_metric_val = match metric { - Metric::Counter(MetricValue { name, val }) => { - let count = Count::new(); - count.add(val); - DfMetricValue::Count { - name: name.into(), - count, - } - } - Metric::Elapsed(MetricValue { name, val }) => { - let time = Time::new(); - time.add_duration(val); - DfMetricValue::Time { - name: name.into(), - time, - } - } - Metric::Boolean(MetricValue { name, val }) => { - let count = Count::new(); - // Use 0 for false, 1 for true. - count.add(val as usize); - DfMetricValue::Count { - name: name.into(), - count, - } - } - }; - - DfMetric::new(df_metric_val, None) - } -} - -/// A collector for metrics of a single read request. -/// -/// It can be cloned and shared among threads. -#[derive(Clone, Debug)] -pub struct ReadMetricsCollector { - pub(crate) metrics: Arc>>, -} - -impl Default for ReadMetricsCollector { - fn default() -> Self { - Self::new() - } -} - -impl ReadMetricsCollector { - pub fn new() -> Self { - Self { - metrics: Arc::new(Mutex::new(Vec::new())), - } - } - - pub fn collect(&self, metric: Metric) { - let mut metrics = self.metrics.lock().unwrap(); - metrics.push(metric); - } - - pub fn take_as_df_metrics(&self) -> MetricsSet { - let metrics: Vec<_> = std::mem::take(self.metrics.lock().unwrap().as_mut()); - let mut metrics_set = MetricsSet::new(); - for df_metric in metrics.into_iter().map(DfMetric::from) { - metrics_set.push(Arc::new(df_metric)); - } - - metrics_set - } -} - #[derive(Clone, Debug)] pub struct ReadRequest { /// Read request id. @@ -493,7 +390,7 @@ pub struct ReadRequest { /// Read the rows in reverse order. pub order: ReadOrder, /// Collector for metrics of this read request. - pub metrics_collector: ReadMetricsCollector, + pub metrics_collector: Collector, } impl TryFrom for ceresdbproto::remote_engine::TableReadRequest { @@ -554,7 +451,7 @@ impl TryFrom for ReadRequest { projected_schema, predicate, order, - metrics_collector: ReadMetricsCollector::new(), + metrics_collector: Collector::new("".to_string()), }) } } From d85e5026e6a0cc2180d04759551ea98180d66053 Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Fri, 10 Mar 2023 18:17:57 +0800 Subject: [PATCH 07/19] support optional collector --- components/trace_metric_derive/src/builder.rs | 77 +++++++++++++------ 1 file changed, 52 insertions(+), 25 deletions(-) diff --git a/components/trace_metric_derive/src/builder.rs b/components/trace_metric_derive/src/builder.rs index 2d2c8c0dea..17fcb8c52c 100644 --- a/components/trace_metric_derive/src/builder.rs +++ b/components/trace_metric_derive/src/builder.rs @@ -1,7 +1,7 @@ // Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. use proc_macro::TokenStream; -use quote::quote; +use quote::{quote, ToTokens}; use syn::{DeriveInput, Field, Generics, Ident}; const COLLECTOR_FIELD_TOKENS: &str = "(collector)"; @@ -55,10 +55,35 @@ impl MetricField { } } +struct CollectorField { + field_name: Ident, + optional: bool, +} + +impl CollectorField { + fn try_from_field(field: Field) -> Option { + let is_collector_field = field.attrs.iter().any(|attr| { + attr.path.is_ident("metric") + && attr.tokens.to_string().as_str() == COLLECTOR_FIELD_TOKENS + }); + + if !is_collector_field { + None + } else { + let ident = field.ident.expect("Collector field must be named"); + let type_tokens = field.ty.into_token_stream().to_string(); + Some(Self { + field_name: ident, + optional: type_tokens.starts_with("Option"), + }) + } + } +} + pub struct Builder { struct_name: Ident, metric_fields: Vec, - collector_field: Ident, + collector_field: CollectorField, generics: Generics, } @@ -73,20 +98,15 @@ impl Builder { let mut metric_fields = Vec::new(); let mut collector_field = None; for field in named { - if Self::is_collector_field(&field) { - collector_field = Some(field); - continue; - } - if let Some(metric_field) = MetricField::try_from_field(field) { + if let Some(collector) = CollectorField::try_from_field(field.clone()) { + collector_field = Some(collector); + } else if let Some(metric_field) = MetricField::try_from_field(field) { metric_fields.push(metric_field); } } ( metric_fields, - collector_field - .expect("TracedMetrics must have a collector field") - .ident - .expect("TracedMetrics collector field must be named"), + collector_field.expect("TracedMetrics must have a collector field"), ) } _ => panic!("TracedMetrics only supports struct with named fields"), @@ -102,7 +122,6 @@ impl Builder { pub fn build(&self) -> TokenStream { let mut collect_statements = Vec::with_capacity(self.metric_fields.len()); - let collector_field = &self.collector_field; for metric_field in self.metric_fields.iter() { let field_name = &metric_field.field_name; let metric = match metric_field.metric_type { @@ -117,7 +136,7 @@ impl Builder { } }; let statement = quote! { - self.#collector_field.collect(#metric); + collector.collect(#metric); }; collect_statements.push(statement); } @@ -125,20 +144,28 @@ impl Builder { let where_clause = &self.generics.where_clause; let generics = &self.generics; let struct_name = &self.struct_name; - quote! { - impl #generics Drop for #struct_name #generics #where_clause { - fn drop(&mut self) { - #(#collect_statements)* + let collector_field_name = &self.collector_field.field_name; + let stream = if self.collector_field.optional { + quote! { + impl #generics Drop for #struct_name #generics #where_clause { + fn drop(&mut self) { + if let Some(collector) = &self.#collector_field_name { + #(#collect_statements)* + } + } } } - } - .into() - } + } else { + quote! { + impl #generics Drop for #struct_name #generics #where_clause { + fn drop(&mut self) { + let collector = &self.#collector_field_name; + #(#collect_statements)* + } + } + } + }; - fn is_collector_field(field: &Field) -> bool { - field.attrs.iter().any(|attr| { - attr.path.is_ident("metric") - && attr.tokens.to_string().as_str() == COLLECTOR_FIELD_TOKENS - }) + stream.into() } } From 204bb19e380e7a55f524da598aedab7b12acb27a Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Mon, 13 Mar 2023 10:00:47 +0800 Subject: [PATCH 08/19] fix license --- components/trace_metric/src/collector.rs | 2 -- components/trace_metric/src/lib.rs | 2 -- components/trace_metric/src/metric.rs | 2 -- 3 files changed, 6 deletions(-) diff --git a/components/trace_metric/src/collector.rs b/components/trace_metric/src/collector.rs index 1f65481662..ff108bb611 100644 --- a/components/trace_metric/src/collector.rs +++ b/components/trace_metric/src/collector.rs @@ -1,7 +1,5 @@ // Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. -// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. - use std::sync::{Arc, Mutex}; use crate::metric::Metric; diff --git a/components/trace_metric/src/lib.rs b/components/trace_metric/src/lib.rs index 68fa44372c..acc0e96ed9 100644 --- a/components/trace_metric/src/lib.rs +++ b/components/trace_metric/src/lib.rs @@ -1,7 +1,5 @@ // Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. -// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. - pub mod collector; pub mod metric; diff --git a/components/trace_metric/src/metric.rs b/components/trace_metric/src/metric.rs index 07db95b8e4..42bcfaae83 100644 --- a/components/trace_metric/src/metric.rs +++ b/components/trace_metric/src/metric.rs @@ -1,7 +1,5 @@ // Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. -// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. - use std::{fmt, time::Duration}; #[derive(Clone)] From b69b50064f91eae0c718425ffd2c1f12c6ef75d2 Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Mon, 13 Mar 2023 10:23:12 +0800 Subject: [PATCH 09/19] make collector is optional --- .../src/instance/flush_compaction.rs | 3 +- analytic_engine/src/instance/read.rs | 2 +- analytic_engine/src/row_iter/merge.rs | 47 ++++--------------- benchmarks/src/merge_memtable_bench.rs | 3 +- benchmarks/src/merge_sst_bench.rs | 3 +- benchmarks/src/sst_tools.rs | 3 +- 6 files changed, 13 insertions(+), 48 deletions(-) diff --git a/analytic_engine/src/instance/flush_compaction.rs b/analytic_engine/src/instance/flush_compaction.rs index df8f0d7317..5250c21120 100644 --- a/analytic_engine/src/instance/flush_compaction.rs +++ b/analytic_engine/src/instance/flush_compaction.rs @@ -28,7 +28,6 @@ use log::{debug, error, info}; use snafu::{Backtrace, ResultExt, Snafu}; use table_engine::{predicate::Predicate, table::Result as TableResult}; use tokio::sync::oneshot; -use trace_metric::Collector; use wal::manager::WalLocation; use crate::{ @@ -856,7 +855,7 @@ impl SpaceStore { let sequence = table_data.last_sequence(); let mut builder = MergeBuilder::new(MergeConfig { request_id, - metrics_collector: Collector::new("compaction".to_string()), + metrics_collector: None, // no need to set deadline for compaction deadline: None, space_id, diff --git a/analytic_engine/src/instance/read.rs b/analytic_engine/src/instance/read.rs index 6962d1d04d..e0fbf26904 100644 --- a/analytic_engine/src/instance/read.rs +++ b/analytic_engine/src/instance/read.rs @@ -180,7 +180,7 @@ impl Instance { for read_view in read_views { let merge_config = MergeConfig { request_id: request.request_id, - metrics_collector: request.metrics_collector.clone(), + metrics_collector: Some(request.metrics_collector.clone()), deadline: request.opts.deadline, space_id: table_data.space_id, table_id: table_data.id, diff --git a/analytic_engine/src/row_iter/merge.rs b/analytic_engine/src/row_iter/merge.rs index 3949ef8f42..69d9154edc 100644 --- a/analytic_engine/src/row_iter/merge.rs +++ b/analytic_engine/src/row_iter/merge.rs @@ -4,7 +4,7 @@ use std::{ cmp, cmp::Ordering, collections::BinaryHeap, - fmt, mem, + mem, ops::{Deref, DerefMut}, time::{Duration, Instant}, }; @@ -35,7 +35,7 @@ use crate::{ sst::{ factory::{FactoryRef as SstFactoryRef, ObjectStorePickerRef, SstReadOptions}, file::FileHandle, - manager::{FileId, MAX_LEVEL}, + manager::MAX_LEVEL, }, table::version::{MemTableVec, SamplingMemTable}, }; @@ -84,7 +84,7 @@ define_result!(Error); #[derive(Debug)] pub struct MergeConfig<'a> { pub request_id: RequestId, - pub metrics_collector: Collector, + pub metrics_collector: Option, /// None for background jobs, such as: compaction pub deadline: Option, pub space_id: SpaceId, @@ -232,8 +232,7 @@ impl<'a> MergeBuilder<'a> { Metrics::new( self.memtables.len(), sst_streams_num, - sst_ids, - self.config.metrics_collector.clone(), + self.config.metrics_collector, ), )) } @@ -570,7 +569,6 @@ pub struct Metrics { num_memtables: usize, #[metric(counter)] num_ssts: usize, - sst_ids: Vec, /// Total rows collected using fetch_rows_from_one_stream(). #[metric(counter)] total_rows_fetch_from_one: usize, @@ -580,8 +578,6 @@ pub struct Metrics { /// Times to fetch one row from multiple stream. #[metric(counter)] times_fetch_row_from_multiple: usize, - /// Create time of the metrics. - create_at: Instant, /// Init time cost of the metrics. #[metric(elapsed)] init_duration: Duration, @@ -592,24 +588,17 @@ pub struct Metrics { #[metric(counter)] scan_count: usize, #[metric(collector)] - metrics_collector: Collector, + metrics_collector: Option, } impl Metrics { - fn new( - num_memtables: usize, - num_ssts: usize, - sst_ids: Vec, - collector: Collector, - ) -> Self { + fn new(num_memtables: usize, num_ssts: usize, collector: Option) -> Self { Self { num_memtables, num_ssts, - sst_ids, times_fetch_rows_from_one: 0, total_rows_fetch_from_one: 0, times_fetch_row_from_multiple: 0, - create_at: Instant::now(), init_duration: Duration::default(), scan_duration: Duration::default(), scan_count: 0, @@ -618,26 +607,6 @@ impl Metrics { } } -impl fmt::Debug for Metrics { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Metrics") - .field("num_memtables", &self.num_memtables) - .field("num_ssts", &self.num_ssts) - .field("sst_ids", &self.sst_ids) - .field("times_fetch_rows_from_one", &self.times_fetch_rows_from_one) - .field("total_rows_fetch_from_one", &self.total_rows_fetch_from_one) - .field( - "times_fetch_row_from_multiple", - &self.times_fetch_row_from_multiple, - ) - .field("duration_since_create", &self.create_at.elapsed()) - .field("init_duration", &self.init_duration) - .field("scan_duration", &self.scan_duration) - .field("scan_count", &self.scan_count) - .finish() - } -} - pub struct MergeIterator { table_id: TableId, request_id: RequestId, @@ -940,7 +909,7 @@ mod tests { Vec::new(), IterOptions::default(), false, - Metrics::new(1, 1, vec![], Collector::new("".to_string())), + Metrics::new(1, 1, None), ); check_iterator( @@ -993,7 +962,7 @@ mod tests { Vec::new(), IterOptions::default(), true, - Metrics::new(1, 1, vec![], Collector::new("".to_string())), + Metrics::new(1, 1, None), ); check_iterator( diff --git a/benchmarks/src/merge_memtable_bench.rs b/benchmarks/src/merge_memtable_bench.rs index e5b0f9ced5..06700a07aa 100644 --- a/benchmarks/src/merge_memtable_bench.rs +++ b/benchmarks/src/merge_memtable_bench.rs @@ -35,7 +35,6 @@ use common_util::runtime::Runtime; use log::info; use object_store::{LocalFileSystem, ObjectStoreRef}; use table_engine::{predicate::Predicate, table::TableId}; -use trace_metric::Collector; use crate::{config::MergeMemTableBenchConfig, util}; @@ -143,7 +142,7 @@ impl MergeMemTableBench { let store_picker: ObjectStorePickerRef = Arc::new(self.store.clone()); let mut builder = MergeBuilder::new(MergeConfig { request_id, - metrics_collector: Collector::new("".to_string()), + metrics_collector: None, deadline: None, space_id, table_id, diff --git a/benchmarks/src/merge_sst_bench.rs b/benchmarks/src/merge_sst_bench.rs index c2eaafa6e7..8f65879b9b 100644 --- a/benchmarks/src/merge_sst_bench.rs +++ b/benchmarks/src/merge_sst_bench.rs @@ -29,7 +29,6 @@ use log::info; use object_store::{LocalFileSystem, ObjectStoreRef}; use table_engine::{predicate::Predicate, table::TableId}; use tokio::sync::mpsc::{self, UnboundedReceiver}; -use trace_metric::Collector; use crate::{config::MergeSstBenchConfig, util}; @@ -126,7 +125,7 @@ impl MergeSstBench { let store_picker: ObjectStorePickerRef = Arc::new(self.store.clone()); let mut builder = MergeBuilder::new(MergeConfig { request_id, - metrics_collector: Collector::new("".to_string()), + metrics_collector: None, deadline: None, space_id, table_id, diff --git a/benchmarks/src/sst_tools.rs b/benchmarks/src/sst_tools.rs index ad97d0cf2d..24387b0c51 100644 --- a/benchmarks/src/sst_tools.rs +++ b/benchmarks/src/sst_tools.rs @@ -33,7 +33,6 @@ use object_store::{LocalFileSystem, ObjectStoreRef, Path}; use serde::Deserialize; use table_engine::{predicate::Predicate, table::TableId}; use tokio::sync::mpsc; -use trace_metric::Collector; use crate::{config::BenchPredicate, util}; @@ -221,7 +220,7 @@ pub async fn merge_sst(config: MergeSstConfig, runtime: Arc) { let mut builder = MergeBuilder::new(MergeConfig { request_id, - metrics_collector: Collector::new("".to_string()), + metrics_collector: None, deadline: None, space_id, table_id, From 28846b5534c14cca9c355fc31239225286823167 Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Mon, 13 Mar 2023 15:32:09 +0800 Subject: [PATCH 10/19] trace metrics in parquet reader --- .../src/instance/flush_compaction.rs | 1 + analytic_engine/src/memtable/mod.rs | 7 ++- analytic_engine/src/memtable/skiplist/mod.rs | 4 ++ analytic_engine/src/row_iter/chain.rs | 3 + analytic_engine/src/row_iter/merge.rs | 3 + .../src/row_iter/record_batch_stream.rs | 20 ++++++- analytic_engine/src/sst/factory.rs | 11 +++- analytic_engine/src/sst/meta_data/mod.rs | 2 +- .../src/sst/parquet/async_reader.rs | 57 ++++++++++++++++--- .../src/sst/parquet/row_group_pruner.rs | 41 ++++++++++++- analytic_engine/src/sst/parquet/writer.rs | 9 ++- benchmarks/src/merge_memtable_bench.rs | 4 +- benchmarks/src/scan_memtable_bench.rs | 1 + benchmarks/src/sst_bench.rs | 1 + benchmarks/src/sst_tools.rs | 1 + benchmarks/src/util.rs | 1 + tools/src/bin/sst-convert.rs | 1 + 17 files changed, 147 insertions(+), 20 deletions(-) diff --git a/analytic_engine/src/instance/flush_compaction.rs b/analytic_engine/src/instance/flush_compaction.rs index 5250c21120..997f94e1e9 100644 --- a/analytic_engine/src/instance/flush_compaction.rs +++ b/analytic_engine/src/instance/flush_compaction.rs @@ -1054,6 +1054,7 @@ fn build_mem_table_iter(memtable: MemTableRef, table_data: &TableData) -> Result projected_schema: ProjectedSchema::no_projection(table_data.schema()), need_dedup: table_data.dedup(), reverse: false, + metrics_collector: None, }; memtable .scan(scan_ctx, scan_req) diff --git a/analytic_engine/src/memtable/mod.rs b/analytic_engine/src/memtable/mod.rs index 93cca03268..567130644b 100644 --- a/analytic_engine/src/memtable/mod.rs +++ b/analytic_engine/src/memtable/mod.rs @@ -18,6 +18,7 @@ use common_types::{ }; use common_util::{define_result, error::GenericError}; use snafu::{Backtrace, Snafu}; +use trace_metric::Collector; use crate::memtable::key::KeySequence; @@ -129,14 +130,16 @@ pub struct ScanRequest { pub projected_schema: ProjectedSchema, pub need_dedup: bool, pub reverse: bool, + /// Collector for scan metrics. + pub metrics_collector: Option, } /// In memory storage for table's data. /// /// # Concurrency -/// The memtable is designed for single-writer and mutltiple-reader usage, so +/// The memtable is designed for single-writer and multiple-reader usage, so /// not all function supports concurrent writer, the caller should guarantee not -/// writing to the memtable concurrrently. +/// writing to the memtable concurrently. // All operation is done in memory, no need to use async trait pub trait MemTable { /// Schema of this memtable diff --git a/analytic_engine/src/memtable/skiplist/mod.rs b/analytic_engine/src/memtable/skiplist/mod.rs index dc99495459..4e3a1a8e27 100644 --- a/analytic_engine/src/memtable/skiplist/mod.rs +++ b/analytic_engine/src/memtable/skiplist/mod.rs @@ -204,6 +204,7 @@ mod tests { projected_schema: projected_schema.clone(), need_dedup: true, reverse: false, + metrics_collector: None, }, vec![ build_row(b"a", 1, 10.0, "v1", 1000, 1_000_000), @@ -223,6 +224,7 @@ mod tests { projected_schema: projected_schema.clone(), need_dedup: true, reverse: false, + metrics_collector: None, }, vec![ build_row(b"a", 1, 10.0, "v1", 1000, 1_000_000), @@ -241,6 +243,7 @@ mod tests { projected_schema, need_dedup: true, reverse: false, + metrics_collector: None, }, vec![ build_row(b"a", 1, 10.0, "v1", 1000, 1_000_000), @@ -272,6 +275,7 @@ mod tests { projected_schema, need_dedup: true, reverse: false, + metrics_collector: None, }, vec![ build_row_for_two_column(b"a", 1), diff --git a/analytic_engine/src/row_iter/chain.rs b/analytic_engine/src/row_iter/chain.rs index 029fcbe264..ba918e4b9d 100644 --- a/analytic_engine/src/row_iter/chain.rs +++ b/analytic_engine/src/row_iter/chain.rs @@ -116,6 +116,7 @@ impl<'a> Builder<'a> { false, self.config.predicate.as_ref(), self.config.deadline, + None, ) .context(BuildStreamFromMemtable)?; streams.push(stream); @@ -131,6 +132,7 @@ impl<'a> Builder<'a> { false, self.config.predicate.as_ref(), self.config.deadline, + None, ) .context(BuildStreamFromMemtable)?; streams.push(stream); @@ -145,6 +147,7 @@ impl<'a> Builder<'a> { self.config.sst_factory, &self.config.sst_read_options, self.config.store_picker, + None, ) .await .context(BuildStreamFromSst)?; diff --git a/analytic_engine/src/row_iter/merge.rs b/analytic_engine/src/row_iter/merge.rs index 69d9154edc..84e8203e10 100644 --- a/analytic_engine/src/row_iter/merge.rs +++ b/analytic_engine/src/row_iter/merge.rs @@ -184,6 +184,7 @@ impl<'a> MergeBuilder<'a> { self.config.reverse, self.config.predicate.as_ref(), self.config.deadline, + self.config.metrics_collector.clone(), ) .context(BuildStreamFromMemtable)?; streams.push(stream); @@ -197,6 +198,7 @@ impl<'a> MergeBuilder<'a> { self.config.reverse, self.config.predicate.as_ref(), self.config.deadline, + self.config.metrics_collector.clone(), ) .context(BuildStreamFromMemtable)?; streams.push(stream); @@ -212,6 +214,7 @@ impl<'a> MergeBuilder<'a> { self.config.sst_factory, &self.config.sst_read_options, self.config.store_picker, + self.config.metrics_collector.clone(), ) .await .context(BuildStreamFromSst)?; diff --git a/analytic_engine/src/row_iter/record_batch_stream.rs b/analytic_engine/src/row_iter/record_batch_stream.rs index 50ec9443f9..f9603acbf4 100644 --- a/analytic_engine/src/row_iter/record_batch_stream.rs +++ b/analytic_engine/src/row_iter/record_batch_stream.rs @@ -27,6 +27,7 @@ use datafusion::{ use futures::stream::{self, Stream, StreamExt}; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use table_engine::{predicate::Predicate, table::TableId}; +use trace_metric::Collector; use crate::{ memtable::{MemTableRef, ScanContext, ScanRequest}, @@ -208,6 +209,7 @@ pub fn filtered_stream_from_memtable( reverse: bool, predicate: &Predicate, deadline: Option, + metrics_collector: Option, ) -> Result { stream_from_memtable( projected_schema.clone(), @@ -215,6 +217,7 @@ pub fn filtered_stream_from_memtable( memtable, reverse, deadline, + metrics_collector, ) .and_then(|origin_stream| { filter_stream( @@ -234,12 +237,15 @@ pub fn stream_from_memtable( memtable: &MemTableRef, reverse: bool, deadline: Option, + metrics_collector: Option, ) -> Result { let scan_ctx = ScanContext { deadline, ..Default::default() }; let max_seq = memtable.last_sequence(); + let scan_memtable_desc = format!("scan_memtable_{max_seq}"); + let metrics_collector = metrics_collector.map(|v| v.span(scan_memtable_desc)); let scan_req = ScanRequest { start_user_key: Bound::Unbounded, end_user_key: Bound::Unbounded, @@ -247,6 +253,7 @@ pub fn stream_from_memtable( projected_schema, need_dedup, reverse, + metrics_collector, }; let iter = memtable.scan(scan_ctx, scan_req).context(ScanMemtable)?; @@ -270,6 +277,7 @@ pub async fn filtered_stream_from_sst_file( sst_factory: &SstFactoryRef, sst_read_options: &SstReadOptions, store_picker: &ObjectStorePickerRef, + metrics_collector: Option, ) -> Result { stream_from_sst_file( space_id, @@ -278,6 +286,7 @@ pub async fn filtered_stream_from_sst_file( sst_factory, sst_read_options, store_picker, + metrics_collector, ) .await .and_then(|origin_stream| { @@ -300,6 +309,7 @@ pub async fn stream_from_sst_file( sst_factory: &SstFactoryRef, sst_read_options: &SstReadOptions, store_picker: &ObjectStorePickerRef, + metrics_collector: Option, ) -> Result { sst_file.read_meter().mark(); let path = sst_util::new_sst_file_path(space_id, table_id, sst_file.id()); @@ -308,8 +318,16 @@ pub async fn stream_from_sst_file( file_size: Some(sst_file.size() as usize), file_format: Some(sst_file.storage_format()), }; + let scan_sst_desc = format!("scan_sst_{}", sst_file.id()); + let metrics_collector = metrics_collector.map(|v| v.span(scan_sst_desc)); let mut sst_reader = sst_factory - .create_reader(&path, sst_read_options, read_hint, store_picker) + .create_reader( + &path, + sst_read_options, + read_hint, + store_picker, + metrics_collector, + ) .await .context(CreateSstReader)?; let meta = sst_reader.meta_data().await.context(ReadSstMeta)?; diff --git a/analytic_engine/src/sst/factory.rs b/analytic_engine/src/sst/factory.rs index 4318810033..ae5cd8c578 100644 --- a/analytic_engine/src/sst/factory.rs +++ b/analytic_engine/src/sst/factory.rs @@ -10,6 +10,7 @@ use common_util::{define_result, runtime::Runtime}; use object_store::{ObjectStoreRef, Path}; use snafu::{ResultExt, Snafu}; use table_engine::predicate::PredicateRef; +use trace_metric::Collector; use crate::{ sst::{ @@ -65,6 +66,7 @@ pub trait Factory: Send + Sync + Debug { options: &SstReadOptions, hint: SstReadHint, store_picker: &'a ObjectStorePickerRef, + metrics_collector: Option, ) -> Result>; async fn create_writer<'a>( @@ -127,6 +129,7 @@ impl Factory for FactoryImpl { options: &SstReadOptions, hint: SstReadHint, store_picker: &'a ObjectStorePickerRef, + metrics_collector: Option, ) -> Result> { let storage_format = match hint.file_format { Some(v) => v, @@ -138,7 +141,13 @@ impl Factory for FactoryImpl { match storage_format { StorageFormat::Columnar | StorageFormat::Hybrid => { - let reader = AsyncParquetReader::new(path, options, hint.file_size, store_picker); + let reader = AsyncParquetReader::new( + path, + options, + hint.file_size, + store_picker, + metrics_collector, + ); let reader = ThreadedReader::new( reader, options.runtime.clone(), diff --git a/analytic_engine/src/sst/meta_data/mod.rs b/analytic_engine/src/sst/meta_data/mod.rs index 9f29b04cbf..c1aade735f 100644 --- a/analytic_engine/src/sst/meta_data/mod.rs +++ b/analytic_engine/src/sst/meta_data/mod.rs @@ -149,7 +149,7 @@ impl SstMetaReader { }; let mut reader = self .factory - .create_reader(&path, &self.read_opts, read_hint, &self.store_picker) + .create_reader(&path, &self.read_opts, read_hint, &self.store_picker, None) .await .context(CreateSstReader)?; let meta_data = reader.meta_data().await.context(ReadMetaData)?; diff --git a/analytic_engine/src/sst/parquet/async_reader.rs b/analytic_engine/src/sst/parquet/async_reader.rs index b601bdbcac..f8a38c5d4e 100644 --- a/analytic_engine/src/sst/parquet/async_reader.rs +++ b/analytic_engine/src/sst/parquet/async_reader.rs @@ -7,7 +7,7 @@ use std::{ pin::Pin, sync::Arc, task::{Context, Poll}, - time::Instant, + time::{Duration, Instant}, }; use arrow::{datatypes::SchemaRef, record_batch::RecordBatch as ArrowRecordBatch}; @@ -34,6 +34,7 @@ use prometheus::local::LocalHistogram; use snafu::ResultExt; use table_engine::predicate::PredicateRef; use tokio::sync::mpsc::{self, Receiver, Sender}; +use trace_metric::{Collector, TracedMetrics}; use crate::sst::{ factory::{ObjectStorePickerRef, ReadFrequency, SstReadOptions}, @@ -72,6 +73,19 @@ pub struct Reader<'a> { /// Options for `read_parallelly` parallelism_options: ParallelismOptions, + metrics: Metrics, +} + +#[derive(Debug, Clone, TracedMetrics)] +pub(crate) struct Metrics { + #[metric(boolean)] + pub meta_data_cache_hit: bool, + #[metric(elapsed)] + pub read_meta_data_duration: Duration, + #[metric(counter)] + pub parallelism: usize, + #[metric(collector)] + pub metrics_collector: Option, } impl<'a> Reader<'a> { @@ -80,12 +94,20 @@ impl<'a> Reader<'a> { options: &SstReadOptions, file_size_hint: Option, store_picker: &'a ObjectStorePickerRef, + metrics_collector: Option, ) -> Self { let batch_size = options.read_batch_row_num; let parallelism_options = ParallelismOptions::new(options.read_batch_row_num, options.num_rows_per_row_group); let store = store_picker.pick_by_freq(options.frequency); + let metrics = Metrics { + meta_data_cache_hit: false, + read_meta_data_duration: Duration::from_secs(0), + parallelism: 0, + metrics_collector, + }; + Self { path, store, @@ -98,6 +120,7 @@ impl<'a> Reader<'a> { meta_data: None, row_projector: None, parallelism_options, + metrics, } } @@ -149,8 +172,18 @@ impl<'a> Reader<'a> { row_groups: &[RowGroupMetaData], parquet_filter: Option<&ParquetFilter>, ) -> Result> { - let pruner = - RowGroupPruner::try_new(&schema, row_groups, parquet_filter, self.predicate.exprs())?; + let metrics_collector = self + .metrics + .metrics_collector + .as_ref() + .map(|v| v.span("prune_row_groups".to_string())); + let mut pruner = RowGroupPruner::try_new( + &schema, + row_groups, + parquet_filter, + self.predicate.exprs(), + metrics_collector, + )?; Ok(pruner.prune()) } @@ -191,10 +224,12 @@ impl<'a> Reader<'a> { // adjust it when supporting it other situations. let chunks_num = read_parallelism; let chunk_size = target_row_groups.len() / read_parallelism + 1; + self.metrics.parallelism = read_parallelism; info!( "Reader fetch record batches parallelly, parallelism suggest:{}, real:{}, chunk_size:{}", suggest_read_parallelism, read_parallelism, chunk_size ); + let mut target_row_group_chunks = vec![Vec::with_capacity(chunk_size); chunks_num]; for (row_group_idx, row_group) in target_row_groups.into_iter().enumerate() { let chunk_idx = row_group_idx % chunks_num; @@ -232,7 +267,12 @@ impl<'a> Reader<'a> { return Ok(()); } - let meta_data = self.read_sst_meta().await?; + let meta_data = { + let start = Instant::now(); + let meta_data = self.read_sst_meta().await?; + self.metrics.read_meta_data_duration = start.elapsed(); + meta_data + }; let row_projector = self .projected_schema @@ -280,9 +320,10 @@ impl<'a> Reader<'a> { } } - async fn read_sst_meta(&self) -> Result { + async fn read_sst_meta(&mut self) -> Result { if let Some(cache) = &self.meta_cache { if let Some(meta_data) = cache.get(self.path.as_ref()) { + self.metrics.meta_data_cache_hit = true; return Ok(meta_data); } } @@ -356,7 +397,7 @@ impl ParallelismOptions { } #[derive(Clone, Debug)] -struct ReaderMetrics { +struct ObjectReaderMetrics { bytes_scanned: usize, sst_get_range_length_histogram: LocalHistogram, } @@ -366,7 +407,7 @@ struct ObjectStoreReader { storage: ObjectStoreRef, path: Path, meta_data: MetaData, - metrics: ReaderMetrics, + metrics: ObjectReaderMetrics, } impl ObjectStoreReader { @@ -375,7 +416,7 @@ impl ObjectStoreReader { storage, path, meta_data, - metrics: ReaderMetrics { + metrics: ObjectReaderMetrics { bytes_scanned: 0, sst_get_range_length_histogram: metrics::SST_GET_RANGE_HISTOGRAM.local(), }, diff --git a/analytic_engine/src/sst/parquet/row_group_pruner.rs b/analytic_engine/src/sst/parquet/row_group_pruner.rs index b67a24d128..24919635c5 100644 --- a/analytic_engine/src/sst/parquet/row_group_pruner.rs +++ b/analytic_engine/src/sst/parquet/row_group_pruner.rs @@ -14,12 +14,29 @@ use parquet_ext::prune::{ min_max, }; use snafu::ensure; +use trace_metric::{Collector, TracedMetrics}; use crate::sst::{ parquet::meta_data::ParquetFilter, reader::error::{OtherNoCause, Result}, }; +#[derive(Debug, Clone, TracedMetrics)] +struct Metrics { + #[metric(boolean)] + use_bloom_filter: bool, + #[metric(counter)] + total_row_groups: usize, + #[metric(counter)] + row_groups_after_prune: usize, + #[metric(counter)] + pruned_by_bloom_filter: usize, + #[metric(counter)] + pruned_by_min_max: usize, + #[metric(collector)] + collector: Option, +} + /// RowGroupPruner is used to prune row groups according to the provided /// predicates and filters. /// @@ -30,6 +47,7 @@ pub struct RowGroupPruner<'a> { row_groups: &'a [RowGroupMetaData], parquet_filter: Option<&'a ParquetFilter>, predicates: &'a [Expr], + metrics: Metrics, } impl<'a> RowGroupPruner<'a> { @@ -38,6 +56,7 @@ impl<'a> RowGroupPruner<'a> { row_groups: &'a [RowGroupMetaData], parquet_filter: Option<&'a ParquetFilter>, predicates: &'a [Expr], + metrics_collector: Option, ) -> Result { if let Some(f) = parquet_filter { ensure!(f.len() == row_groups.len(), OtherNoCause { @@ -45,15 +64,25 @@ impl<'a> RowGroupPruner<'a> { }); } + let metrics = Metrics { + use_bloom_filter: parquet_filter.is_some(), + total_row_groups: row_groups.len(), + row_groups_after_prune: 0, + pruned_by_bloom_filter: 0, + pruned_by_min_max: 0, + collector: metrics_collector, + }; + Ok(Self { schema, row_groups, parquet_filter, predicates, + metrics, }) } - pub fn prune(&self) -> Vec { + pub fn prune(&mut self) -> Vec { debug!( "Begin to prune row groups, total_row_groups:{}, parquet_filter:{}, predicates:{:?}", self.row_groups.len(), @@ -62,13 +91,16 @@ impl<'a> RowGroupPruner<'a> { ); let pruned0 = self.prune_by_min_max(); - match self.parquet_filter { + self.metrics.pruned_by_min_max = self.row_groups.len() - pruned0.len(); + + let pruned = match self.parquet_filter { Some(v) => { // TODO: We can do continuous prune based on the `pruned0` to reduce the // filtering cost. let pruned1 = self.prune_by_filters(v); let pruned = Self::intersect_pruned_row_groups(&pruned0, &pruned1); + self.metrics.pruned_by_bloom_filter = self.row_groups.len() - pruned1.len(); debug!( "Finish pruning row groups by parquet_filter and min_max, total_row_groups:{}, pruned_by_min_max:{}, pruned_by_blooms:{}, pruned_by_both:{}", self.row_groups.len(), @@ -87,7 +119,10 @@ impl<'a> RowGroupPruner<'a> { ); pruned0 } - } + }; + + self.metrics.row_groups_after_prune = pruned.len(); + pruned } fn prune_by_min_max(&self) -> Vec { diff --git a/analytic_engine/src/sst/parquet/writer.rs b/analytic_engine/src/sst/parquet/writer.rs index 63950ceab4..6b7af75886 100644 --- a/analytic_engine/src/sst/parquet/writer.rs +++ b/analytic_engine/src/sst/parquet/writer.rs @@ -381,8 +381,13 @@ mod tests { }; let mut reader: Box = { - let mut reader = - AsyncParquetReader::new(&sst_file_path, &sst_read_options, None, &store_picker); + let mut reader = AsyncParquetReader::new( + &sst_file_path, + &sst_read_options, + None, + &store_picker, + None, + ); let mut sst_meta_readback = reader .meta_data() .await diff --git a/benchmarks/src/merge_memtable_bench.rs b/benchmarks/src/merge_memtable_bench.rs index 06700a07aa..47b378a08b 100644 --- a/benchmarks/src/merge_memtable_bench.rs +++ b/benchmarks/src/merge_memtable_bench.rs @@ -90,7 +90,7 @@ impl MergeMemTableBench { )); info!( - "\nMergeMemTableBench memtable loaded, memory used: {}", + "MergeMemTableBench memtable loaded, memory used:{}", memtable.approximate_memory_usage() ); @@ -182,7 +182,7 @@ impl MergeMemTableBench { } info!( - "\nMergeMemTableBench total rows of sst: {}, total batch num: {}, cost: {:?}", + "MergeMemTableBench total rows of sst:{}, total batch num:{}, cost:{:?}", total_rows, batch_num, begin_instant.elapsed(), diff --git a/benchmarks/src/scan_memtable_bench.rs b/benchmarks/src/scan_memtable_bench.rs index 6106e2b884..8aafedcc7b 100644 --- a/benchmarks/src/scan_memtable_bench.rs +++ b/benchmarks/src/scan_memtable_bench.rs @@ -86,6 +86,7 @@ impl ScanMemTableBench { projected_schema: self.projected_schema.clone(), need_dedup: true, reverse: false, + metrics_collector: None, }; let iter = self.memtable.scan(scan_ctx, scan_req).unwrap(); diff --git a/benchmarks/src/sst_bench.rs b/benchmarks/src/sst_bench.rs index 807ce81fa8..9cddf5a602 100644 --- a/benchmarks/src/sst_bench.rs +++ b/benchmarks/src/sst_bench.rs @@ -87,6 +87,7 @@ impl SstBench { &self.sst_read_options, SstReadHint::default(), &store_picker, + None, ) .await .unwrap(); diff --git a/benchmarks/src/sst_tools.rs b/benchmarks/src/sst_tools.rs index 24387b0c51..13d85459d8 100644 --- a/benchmarks/src/sst_tools.rs +++ b/benchmarks/src/sst_tools.rs @@ -136,6 +136,7 @@ async fn sst_to_record_batch_stream( sst_read_options, SstReadHint::default(), &store_picker, + None, ) .await .unwrap(); diff --git a/benchmarks/src/util.rs b/benchmarks/src/util.rs index 0e0b7fd6bc..7e0875b9ff 100644 --- a/benchmarks/src/util.rs +++ b/benchmarks/src/util.rs @@ -119,6 +119,7 @@ pub async fn load_sst_to_memtable( &sst_read_options, SstReadHint::default(), &store_picker, + None, ) .await .unwrap(); diff --git a/tools/src/bin/sst-convert.rs b/tools/src/bin/sst-convert.rs index cc1076deb3..e1009e48b4 100644 --- a/tools/src/bin/sst-convert.rs +++ b/tools/src/bin/sst-convert.rs @@ -92,6 +92,7 @@ async fn run(args: Args, runtime: Arc) -> Result<()> { &reader_opts, SstReadHint::default(), &store_picker, + None, ) .await .expect("no sst reader found"); From aa43958d4efeed07247750e4dd71dd11c4b23591 Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Mon, 13 Mar 2023 15:34:31 +0800 Subject: [PATCH 11/19] rename Collector to MetricsCollector --- analytic_engine/src/memtable/mod.rs | 4 ++-- analytic_engine/src/row_iter/merge.rs | 8 ++++---- analytic_engine/src/row_iter/record_batch_stream.rs | 10 +++++----- analytic_engine/src/sst/factory.rs | 6 +++--- analytic_engine/src/sst/parquet/async_reader.rs | 6 +++--- analytic_engine/src/sst/parquet/row_group_pruner.rs | 6 +++--- analytic_engine/src/table/mod.rs | 4 ++-- analytic_engine/src/tests/table.rs | 4 ++-- components/trace_metric/src/collector.rs | 10 +++++----- components/trace_metric/src/lib.rs | 2 +- components/trace_metric_derive_tests/src/lib.rs | 6 +++--- system_catalog/src/sys_catalog_table.rs | 4 ++-- table_engine/src/provider.rs | 6 +++--- table_engine/src/table.rs | 6 +++--- 14 files changed, 41 insertions(+), 41 deletions(-) diff --git a/analytic_engine/src/memtable/mod.rs b/analytic_engine/src/memtable/mod.rs index 567130644b..bc4e4a2743 100644 --- a/analytic_engine/src/memtable/mod.rs +++ b/analytic_engine/src/memtable/mod.rs @@ -18,7 +18,7 @@ use common_types::{ }; use common_util::{define_result, error::GenericError}; use snafu::{Backtrace, Snafu}; -use trace_metric::Collector; +use trace_metric::MetricsCollector; use crate::memtable::key::KeySequence; @@ -131,7 +131,7 @@ pub struct ScanRequest { pub need_dedup: bool, pub reverse: bool, /// Collector for scan metrics. - pub metrics_collector: Option, + pub metrics_collector: Option, } /// In memory storage for table's data. diff --git a/analytic_engine/src/row_iter/merge.rs b/analytic_engine/src/row_iter/merge.rs index 84e8203e10..430288e4fb 100644 --- a/analytic_engine/src/row_iter/merge.rs +++ b/analytic_engine/src/row_iter/merge.rs @@ -23,7 +23,7 @@ use futures::{future::try_join_all, StreamExt}; use log::{debug, trace}; use snafu::{ensure, Backtrace, ResultExt, Snafu}; use table_engine::{predicate::PredicateRef, table::TableId}; -use trace_metric::{Collector, TracedMetrics}; +use trace_metric::{MetricsCollector, TracedMetrics}; use crate::{ row_iter::{ @@ -84,7 +84,7 @@ define_result!(Error); #[derive(Debug)] pub struct MergeConfig<'a> { pub request_id: RequestId, - pub metrics_collector: Option, + pub metrics_collector: Option, /// None for background jobs, such as: compaction pub deadline: Option, pub space_id: SpaceId, @@ -591,11 +591,11 @@ pub struct Metrics { #[metric(counter)] scan_count: usize, #[metric(collector)] - metrics_collector: Option, + metrics_collector: Option, } impl Metrics { - fn new(num_memtables: usize, num_ssts: usize, collector: Option) -> Self { + fn new(num_memtables: usize, num_ssts: usize, collector: Option) -> Self { Self { num_memtables, num_ssts, diff --git a/analytic_engine/src/row_iter/record_batch_stream.rs b/analytic_engine/src/row_iter/record_batch_stream.rs index f9603acbf4..ec8f0b9a48 100644 --- a/analytic_engine/src/row_iter/record_batch_stream.rs +++ b/analytic_engine/src/row_iter/record_batch_stream.rs @@ -27,7 +27,7 @@ use datafusion::{ use futures::stream::{self, Stream, StreamExt}; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use table_engine::{predicate::Predicate, table::TableId}; -use trace_metric::Collector; +use trace_metric::MetricsCollector; use crate::{ memtable::{MemTableRef, ScanContext, ScanRequest}, @@ -209,7 +209,7 @@ pub fn filtered_stream_from_memtable( reverse: bool, predicate: &Predicate, deadline: Option, - metrics_collector: Option, + metrics_collector: Option, ) -> Result { stream_from_memtable( projected_schema.clone(), @@ -237,7 +237,7 @@ pub fn stream_from_memtable( memtable: &MemTableRef, reverse: bool, deadline: Option, - metrics_collector: Option, + metrics_collector: Option, ) -> Result { let scan_ctx = ScanContext { deadline, @@ -277,7 +277,7 @@ pub async fn filtered_stream_from_sst_file( sst_factory: &SstFactoryRef, sst_read_options: &SstReadOptions, store_picker: &ObjectStorePickerRef, - metrics_collector: Option, + metrics_collector: Option, ) -> Result { stream_from_sst_file( space_id, @@ -309,7 +309,7 @@ pub async fn stream_from_sst_file( sst_factory: &SstFactoryRef, sst_read_options: &SstReadOptions, store_picker: &ObjectStorePickerRef, - metrics_collector: Option, + metrics_collector: Option, ) -> Result { sst_file.read_meter().mark(); let path = sst_util::new_sst_file_path(space_id, table_id, sst_file.id()); diff --git a/analytic_engine/src/sst/factory.rs b/analytic_engine/src/sst/factory.rs index ae5cd8c578..a58abc8e5f 100644 --- a/analytic_engine/src/sst/factory.rs +++ b/analytic_engine/src/sst/factory.rs @@ -10,7 +10,7 @@ use common_util::{define_result, runtime::Runtime}; use object_store::{ObjectStoreRef, Path}; use snafu::{ResultExt, Snafu}; use table_engine::predicate::PredicateRef; -use trace_metric::Collector; +use trace_metric::MetricsCollector; use crate::{ sst::{ @@ -66,7 +66,7 @@ pub trait Factory: Send + Sync + Debug { options: &SstReadOptions, hint: SstReadHint, store_picker: &'a ObjectStorePickerRef, - metrics_collector: Option, + metrics_collector: Option, ) -> Result>; async fn create_writer<'a>( @@ -129,7 +129,7 @@ impl Factory for FactoryImpl { options: &SstReadOptions, hint: SstReadHint, store_picker: &'a ObjectStorePickerRef, - metrics_collector: Option, + metrics_collector: Option, ) -> Result> { let storage_format = match hint.file_format { Some(v) => v, diff --git a/analytic_engine/src/sst/parquet/async_reader.rs b/analytic_engine/src/sst/parquet/async_reader.rs index f8a38c5d4e..15301f4317 100644 --- a/analytic_engine/src/sst/parquet/async_reader.rs +++ b/analytic_engine/src/sst/parquet/async_reader.rs @@ -34,7 +34,7 @@ use prometheus::local::LocalHistogram; use snafu::ResultExt; use table_engine::predicate::PredicateRef; use tokio::sync::mpsc::{self, Receiver, Sender}; -use trace_metric::{Collector, TracedMetrics}; +use trace_metric::{MetricsCollector, TracedMetrics}; use crate::sst::{ factory::{ObjectStorePickerRef, ReadFrequency, SstReadOptions}, @@ -85,7 +85,7 @@ pub(crate) struct Metrics { #[metric(counter)] pub parallelism: usize, #[metric(collector)] - pub metrics_collector: Option, + pub metrics_collector: Option, } impl<'a> Reader<'a> { @@ -94,7 +94,7 @@ impl<'a> Reader<'a> { options: &SstReadOptions, file_size_hint: Option, store_picker: &'a ObjectStorePickerRef, - metrics_collector: Option, + metrics_collector: Option, ) -> Self { let batch_size = options.read_batch_row_num; let parallelism_options = diff --git a/analytic_engine/src/sst/parquet/row_group_pruner.rs b/analytic_engine/src/sst/parquet/row_group_pruner.rs index 24919635c5..975fcd5c11 100644 --- a/analytic_engine/src/sst/parquet/row_group_pruner.rs +++ b/analytic_engine/src/sst/parquet/row_group_pruner.rs @@ -14,7 +14,7 @@ use parquet_ext::prune::{ min_max, }; use snafu::ensure; -use trace_metric::{Collector, TracedMetrics}; +use trace_metric::{MetricsCollector, TracedMetrics}; use crate::sst::{ parquet::meta_data::ParquetFilter, @@ -34,7 +34,7 @@ struct Metrics { #[metric(counter)] pruned_by_min_max: usize, #[metric(collector)] - collector: Option, + collector: Option, } /// RowGroupPruner is used to prune row groups according to the provided @@ -56,7 +56,7 @@ impl<'a> RowGroupPruner<'a> { row_groups: &'a [RowGroupMetaData], parquet_filter: Option<&'a ParquetFilter>, predicates: &'a [Expr], - metrics_collector: Option, + metrics_collector: Option, ) -> Result { if let Some(f) = parquet_filter { ensure!(f.len() == row_groups.len(), OtherNoCause { diff --git a/analytic_engine/src/table/mod.rs b/analytic_engine/src/table/mod.rs index 6de53d6e49..405f8aa393 100644 --- a/analytic_engine/src/table/mod.rs +++ b/analytic_engine/src/table/mod.rs @@ -22,7 +22,7 @@ use table_engine::{ }, }; use tokio::sync::oneshot; -use trace_metric::Collector; +use trace_metric::MetricsCollector; use self::data::TableDataRef; use crate::{ @@ -180,7 +180,7 @@ impl Table for TableImpl { projected_schema: request.projected_schema, predicate, order: ReadOrder::None, - metrics_collector: Collector::new("".to_string()), + metrics_collector: MetricsCollector::new("".to_string()), }; let mut batch_stream = self .read(read_request) diff --git a/analytic_engine/src/tests/table.rs b/analytic_engine/src/tests/table.rs index 50db341a1d..0bcd3030f0 100644 --- a/analytic_engine/src/tests/table.rs +++ b/analytic_engine/src/tests/table.rs @@ -22,7 +22,7 @@ use table_engine::{ predicate::Predicate, table::{GetRequest, ReadOptions, ReadOrder, ReadRequest, SchemaId, TableId, TableSeq}, }; -use trace_metric::Collector; +use trace_metric::MetricsCollector; use crate::{table_options, tests::row_util}; @@ -186,7 +186,7 @@ pub fn new_read_all_request_with_order( projected_schema: ProjectedSchema::no_projection(schema), predicate: Arc::new(Predicate::empty()), order, - metrics_collector: Collector::new("".to_string()), + metrics_collector: MetricsCollector::new("".to_string()), } } diff --git a/components/trace_metric/src/collector.rs b/components/trace_metric/src/collector.rs index ff108bb611..7b580fe044 100644 --- a/components/trace_metric/src/collector.rs +++ b/components/trace_metric/src/collector.rs @@ -8,13 +8,13 @@ use crate::metric::Metric; /// /// It can be cloned and shared among threads. #[derive(Clone, Debug)] -pub struct Collector { +pub struct MetricsCollector { name: String, metrics: Arc>>, - children: Arc>>, + children: Arc>>, } -impl Collector { +impl MetricsCollector { /// Create a new collector with the given name. pub fn new(name: String) -> Self { Self { @@ -31,7 +31,7 @@ impl Collector { } /// Span a child collector with a given name. - pub fn span(&self, name: String) -> Collector { + pub fn span(&self, name: String) -> MetricsCollector { let mut children = self.children.lock().unwrap(); let child = Self::new(name); children.push(child.clone()); @@ -53,7 +53,7 @@ impl Collector { } /// Visit all the collectors including itself and its children. - pub fn visit(&self, f: &mut impl FnMut(&Collector)) { + pub fn visit(&self, f: &mut impl FnMut(&MetricsCollector)) { f(self); let children = self.children.lock().unwrap(); for child in children.iter() { diff --git a/components/trace_metric/src/lib.rs b/components/trace_metric/src/lib.rs index acc0e96ed9..78f5ab7ce6 100644 --- a/components/trace_metric/src/lib.rs +++ b/components/trace_metric/src/lib.rs @@ -3,6 +3,6 @@ pub mod collector; pub mod metric; -pub use collector::Collector; +pub use collector::MetricsCollector; pub use metric::Metric; pub use trace_metric_derive::TracedMetrics; diff --git a/components/trace_metric_derive_tests/src/lib.rs b/components/trace_metric_derive_tests/src/lib.rs index 1617fa2085..8446aa99dd 100644 --- a/components/trace_metric_derive_tests/src/lib.rs +++ b/components/trace_metric_derive_tests/src/lib.rs @@ -2,7 +2,7 @@ use std::time::Duration; -use trace_metric::{Collector, TracedMetrics}; +use trace_metric::{MetricsCollector, TracedMetrics}; #[derive(Debug, Clone, TracedMetrics)] pub struct ExampleMetrics { @@ -15,7 +15,7 @@ pub struct ExampleMetrics { pub foo: String, #[metric(collector)] - pub collector: Collector, + pub collector: MetricsCollector, } #[cfg(test)] @@ -24,7 +24,7 @@ mod test { #[test] fn basic() { - let collector = Collector::new("test".to_string()); + let collector = MetricsCollector::new("test".to_string()); { let _ = ExampleMetrics { counter: 1, diff --git a/system_catalog/src/sys_catalog_table.rs b/system_catalog/src/sys_catalog_table.rs index 24ea52cd2b..70a4c4dc0b 100644 --- a/system_catalog/src/sys_catalog_table.rs +++ b/system_catalog/src/sys_catalog_table.rs @@ -40,7 +40,7 @@ use table_engine::{ }, }; use tokio::sync::Mutex; -use trace_metric::Collector; +use trace_metric::MetricsCollector; use crate::{SYSTEM_SCHEMA_ID, SYS_CATALOG_TABLE_ID, SYS_CATALOG_TABLE_NAME}; @@ -534,7 +534,7 @@ impl SysCatalogTable { projected_schema: ProjectedSchema::no_projection(self.table.schema()), predicate: PredicateBuilder::default().build(), order: ReadOrder::None, - metrics_collector: Collector::new("open_sys_catalog_table".to_string()), + metrics_collector: MetricsCollector::new("open_sys_catalog_table".to_string()), }; let mut batch_stream = self.table.read(read_request).await.context(ReadTable)?; diff --git a/table_engine/src/provider.rs b/table_engine/src/provider.rs index b16799c26b..537bca8f5f 100644 --- a/table_engine/src/provider.rs +++ b/table_engine/src/provider.rs @@ -26,7 +26,7 @@ use datafusion::{ use datafusion_expr::{Expr, TableSource, TableType}; use df_operator::visitor; use log::debug; -use trace_metric::Collector; +use trace_metric::MetricsCollector; use crate::{ predicate::{PredicateBuilder, PredicateRef}, @@ -170,7 +170,7 @@ impl TableProviderAdapter { predicate, deadline, stream_state: Mutex::new(ScanStreamState::default()), - metrics_collector: Collector::new("scan_table".to_string()), + metrics_collector: MetricsCollector::new("scan_table".to_string()), }; scan_table.maybe_init_stream(state).await?; @@ -297,7 +297,7 @@ struct ScanTable { read_parallelism: usize, predicate: PredicateRef, deadline: Option, - metrics_collector: Collector, + metrics_collector: MetricsCollector, stream_state: Mutex, } diff --git a/table_engine/src/table.rs b/table_engine/src/table.rs index abddbe2d7a..c4c2d7c9be 100644 --- a/table_engine/src/table.rs +++ b/table_engine/src/table.rs @@ -25,7 +25,7 @@ use common_types::{ use common_util::error::{BoxError, GenericError}; use serde::Deserialize; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; -use trace_metric::Collector; +use trace_metric::MetricsCollector; use crate::{ engine::TableState, @@ -390,7 +390,7 @@ pub struct ReadRequest { /// Read the rows in reverse order. pub order: ReadOrder, /// Collector for metrics of this read request. - pub metrics_collector: Collector, + pub metrics_collector: MetricsCollector, } impl TryFrom for ceresdbproto::remote_engine::TableReadRequest { @@ -451,7 +451,7 @@ impl TryFrom for ReadRequest { projected_schema, predicate, order, - metrics_collector: Collector::new("".to_string()), + metrics_collector: MetricsCollector::new("".to_string()), }) } } From f85178f247b2ad8d1c232966ec60cc008929911d Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Mon, 13 Mar 2023 15:38:36 +0800 Subject: [PATCH 12/19] rename TracedMetrics to TraceMetricWhenDrop --- analytic_engine/src/row_iter/merge.rs | 4 ++-- analytic_engine/src/sst/parquet/async_reader.rs | 4 ++-- analytic_engine/src/sst/parquet/row_group_pruner.rs | 4 ++-- components/trace_metric/src/lib.rs | 2 +- components/trace_metric_derive/src/builder.rs | 4 ++-- components/trace_metric_derive/src/lib.rs | 2 +- components/trace_metric_derive_tests/src/lib.rs | 4 ++-- 7 files changed, 12 insertions(+), 12 deletions(-) diff --git a/analytic_engine/src/row_iter/merge.rs b/analytic_engine/src/row_iter/merge.rs index 430288e4fb..24ab2347a7 100644 --- a/analytic_engine/src/row_iter/merge.rs +++ b/analytic_engine/src/row_iter/merge.rs @@ -23,7 +23,7 @@ use futures::{future::try_join_all, StreamExt}; use log::{debug, trace}; use snafu::{ensure, Backtrace, ResultExt, Snafu}; use table_engine::{predicate::PredicateRef, table::TableId}; -use trace_metric::{MetricsCollector, TracedMetrics}; +use trace_metric::{MetricsCollector, TraceMetricWhenDrop}; use crate::{ row_iter::{ @@ -566,7 +566,7 @@ impl Ord for HeapBufferedStream { } /// Metrics for merge iterator. -#[derive(TracedMetrics)] +#[derive(TraceMetricWhenDrop)] pub struct Metrics { #[metric(counter)] num_memtables: usize, diff --git a/analytic_engine/src/sst/parquet/async_reader.rs b/analytic_engine/src/sst/parquet/async_reader.rs index 15301f4317..0384729f6a 100644 --- a/analytic_engine/src/sst/parquet/async_reader.rs +++ b/analytic_engine/src/sst/parquet/async_reader.rs @@ -34,7 +34,7 @@ use prometheus::local::LocalHistogram; use snafu::ResultExt; use table_engine::predicate::PredicateRef; use tokio::sync::mpsc::{self, Receiver, Sender}; -use trace_metric::{MetricsCollector, TracedMetrics}; +use trace_metric::{MetricsCollector, TraceMetricWhenDrop}; use crate::sst::{ factory::{ObjectStorePickerRef, ReadFrequency, SstReadOptions}, @@ -76,7 +76,7 @@ pub struct Reader<'a> { metrics: Metrics, } -#[derive(Debug, Clone, TracedMetrics)] +#[derive(Debug, Clone, TraceMetricWhenDrop)] pub(crate) struct Metrics { #[metric(boolean)] pub meta_data_cache_hit: bool, diff --git a/analytic_engine/src/sst/parquet/row_group_pruner.rs b/analytic_engine/src/sst/parquet/row_group_pruner.rs index 975fcd5c11..5eb23c3b84 100644 --- a/analytic_engine/src/sst/parquet/row_group_pruner.rs +++ b/analytic_engine/src/sst/parquet/row_group_pruner.rs @@ -14,14 +14,14 @@ use parquet_ext::prune::{ min_max, }; use snafu::ensure; -use trace_metric::{MetricsCollector, TracedMetrics}; +use trace_metric::{MetricsCollector, TraceMetricWhenDrop}; use crate::sst::{ parquet::meta_data::ParquetFilter, reader::error::{OtherNoCause, Result}, }; -#[derive(Debug, Clone, TracedMetrics)] +#[derive(Debug, Clone, TraceMetricWhenDrop)] struct Metrics { #[metric(boolean)] use_bloom_filter: bool, diff --git a/components/trace_metric/src/lib.rs b/components/trace_metric/src/lib.rs index 78f5ab7ce6..2818d0ee89 100644 --- a/components/trace_metric/src/lib.rs +++ b/components/trace_metric/src/lib.rs @@ -5,4 +5,4 @@ pub mod metric; pub use collector::MetricsCollector; pub use metric::Metric; -pub use trace_metric_derive::TracedMetrics; +pub use trace_metric_derive::TraceMetricWhenDrop; diff --git a/components/trace_metric_derive/src/builder.rs b/components/trace_metric_derive/src/builder.rs index 17fcb8c52c..889e13052f 100644 --- a/components/trace_metric_derive/src/builder.rs +++ b/components/trace_metric_derive/src/builder.rs @@ -106,10 +106,10 @@ impl Builder { } ( metric_fields, - collector_field.expect("TracedMetrics must have a collector field"), + collector_field.expect("TraceMetricWhenDrop must have a collector field"), ) } - _ => panic!("TracedMetrics only supports struct with named fields"), + _ => panic!("TraceMetricWhenDrop only supports struct with named fields"), }; Self { diff --git a/components/trace_metric_derive/src/lib.rs b/components/trace_metric_derive/src/lib.rs index 7e04abed2e..c3dbc29739 100644 --- a/components/trace_metric_derive/src/lib.rs +++ b/components/trace_metric_derive/src/lib.rs @@ -7,7 +7,7 @@ mod builder; use builder::Builder; -#[proc_macro_derive(TracedMetrics, attributes(metric))] +#[proc_macro_derive(TraceMetricWhenDrop, attributes(metric))] pub fn derive(input: TokenStream) -> TokenStream { let ast = parse_macro_input!(input as DeriveInput); Builder::parse_from_ast(ast).build() diff --git a/components/trace_metric_derive_tests/src/lib.rs b/components/trace_metric_derive_tests/src/lib.rs index 8446aa99dd..7bb36b5cb4 100644 --- a/components/trace_metric_derive_tests/src/lib.rs +++ b/components/trace_metric_derive_tests/src/lib.rs @@ -2,9 +2,9 @@ use std::time::Duration; -use trace_metric::{MetricsCollector, TracedMetrics}; +use trace_metric::{MetricsCollector, TraceMetricWhenDrop}; -#[derive(Debug, Clone, TracedMetrics)] +#[derive(Debug, Clone, TraceMetricWhenDrop)] pub struct ExampleMetrics { #[metric(counter)] pub counter: usize, From c9e00dd44ee2a84d4c81c0db906141f803a26dfc Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Mon, 13 Mar 2023 19:49:10 +0800 Subject: [PATCH 13/19] support FormatCollectorVisitor --- components/trace_metric/src/collector.rs | 85 ++++++++++++++++++++++-- components/trace_metric/src/metric.rs | 12 +++- 2 files changed, 91 insertions(+), 6 deletions(-) diff --git a/components/trace_metric/src/collector.rs b/components/trace_metric/src/collector.rs index 7b580fe044..91b16b9759 100644 --- a/components/trace_metric/src/collector.rs +++ b/components/trace_metric/src/collector.rs @@ -53,11 +53,86 @@ impl MetricsCollector { } /// Visit all the collectors including itself and its children. - pub fn visit(&self, f: &mut impl FnMut(&MetricsCollector)) { - f(self); - let children = self.children.lock().unwrap(); - for child in children.iter() { - child.visit(f); + pub fn visit(&self, visitor: &mut impl CollectorVisitor) { + self.visit_with_level(0, visitor); + } + + /// Visit all the collectors including itself and its children. + fn visit_with_level(&self, level: usize, visitor: &mut impl CollectorVisitor) { + visitor.visit(level, self); + // Clone the children to avoid holding the lock, which may cause deadlocks + // because the lock order is not guaranteed. + let children = self.children.lock().unwrap().clone(); + for child in children { + child.visit_with_level(level + 1, visitor); } } } + +pub trait CollectorVisitor { + fn visit(&mut self, level: usize, collector: &MetricsCollector); +} + +#[derive(Default)] +pub struct FormatCollectorVisitor { + buffer: String, +} + +impl FormatCollectorVisitor { + pub fn into_string(self) -> String { + self.buffer + } + + fn indent(level: usize) -> String { + " ".repeat(level * 2) + } + + fn append_line(&mut self, indent: &str, line: &str) { + self.buffer.push_str(&format!("{indent}{line}\n")); + } +} + +impl CollectorVisitor for FormatCollectorVisitor { + fn visit(&mut self, level: usize, collector: &MetricsCollector) { + let collector_indent = Self::indent(level); + self.append_line(&collector_indent, &format!("<{}>", collector.name())); + let metric_indent = Self::indent(level + 1); + collector.visit_metrics(&mut |metric| { + self.append_line(&metric_indent, &format!("{metric:?}")); + }); + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use super::*; + + #[test] + fn test_metrics_collector() { + let collector = MetricsCollector::new("root".to_string()); + collector.collect(Metric::counter("counter".to_string(), 1)); + collector.collect(Metric::elapsed( + "elapsed".to_string(), + Duration::from_millis(100), + )); + let child_1_0 = collector.span("child_1_0".to_string()); + child_1_0.collect(Metric::boolean("boolean".to_string(), false)); + + let child_2_0 = child_1_0.span("child_2_0".to_string()); + child_2_0.collect(Metric::counter("counter".to_string(), 1)); + child_2_0.collect(Metric::elapsed( + "elapsed".to_string(), + Duration::from_millis(100), + )); + + let child_1_1 = collector.span("child_1_1".to_string()); + child_1_1.collect(Metric::boolean("boolean".to_string(), false)); + let _child_1_2 = collector.span("child_1_2".to_string()); + + let mut visitor = FormatCollectorVisitor::default(); + collector.visit(&mut visitor); + println!("{}", visitor.into_string()); + } +} diff --git a/components/trace_metric/src/metric.rs b/components/trace_metric/src/metric.rs index 42bcfaae83..87027e4b32 100644 --- a/components/trace_metric/src/metric.rs +++ b/components/trace_metric/src/metric.rs @@ -8,7 +8,7 @@ pub struct MetricValue { pub val: T, } -#[derive(Clone, Debug)] +#[derive(Clone)] pub enum Metric { Boolean(MetricValue), Counter(MetricValue), @@ -37,3 +37,13 @@ impl fmt::Debug for MetricValue { write!(f, "{}={:?}", self.name, self.val) } } + +impl fmt::Debug for Metric { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Metric::Boolean(v) => write!(f, "{}={:?}", v.name, v.val), + Metric::Counter(v) => write!(f, "{}={:?}", v.name, v.val), + Metric::Elapsed(v) => write!(f, "{}={:?}", v.name, v.val), + } + } +} From d67e3815abcb1db8ac1447fcaea91adf47929da1 Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Mon, 13 Mar 2023 20:01:43 +0800 Subject: [PATCH 14/19] set the metrics into datafusion --- table_engine/src/provider.rs | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/table_engine/src/provider.rs b/table_engine/src/provider.rs index 537bca8f5f..ce3e7ffdab 100644 --- a/table_engine/src/provider.rs +++ b/table_engine/src/provider.rs @@ -19,14 +19,15 @@ use datafusion::{ execution::context::{SessionState, TaskContext}, physical_expr::PhysicalSortExpr, physical_plan::{ - metrics::MetricsSet, DisplayFormatType, ExecutionPlan, Partitioning, + metrics::{Count, MetricValue, MetricsSet}, + DisplayFormatType, ExecutionPlan, Metric, Partitioning, SendableRecordBatchStream as DfSendableRecordBatchStream, Statistics, }, }; use datafusion_expr::{Expr, TableSource, TableType}; use df_operator::visitor; use log::debug; -use trace_metric::MetricsCollector; +use trace_metric::{collector::FormatCollectorVisitor, MetricsCollector}; use crate::{ predicate::{PredicateBuilder, PredicateRef}, @@ -392,8 +393,19 @@ impl ExecutionPlan for ScanTable { } fn metrics(&self) -> Option { - // TODO: Convert metrics_collector to MetricsSet. - None + let mut format_visitor = FormatCollectorVisitor::default(); + self.metrics_collector.visit(&mut format_visitor); + let metrics_desc = format_visitor.into_string(); + + let metric_value = MetricValue::Count { + name: metrics_desc.into(), + count: Count::new(), + }; + let metric = Metric::new(metric_value, None); + let mut metric_set = MetricsSet::new(); + metric_set.push(Arc::new(metric)); + + Some(metric_set) } fn statistics(&self) -> Statistics { From f8ccc20c9846d74d54cf59329a88c1262b7bbc09 Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Mon, 13 Mar 2023 20:10:07 +0800 Subject: [PATCH 15/19] remove useless file --- analytic_engine/src/instance/read_metrics.rs | 2 -- 1 file changed, 2 deletions(-) delete mode 100644 analytic_engine/src/instance/read_metrics.rs diff --git a/analytic_engine/src/instance/read_metrics.rs b/analytic_engine/src/instance/read_metrics.rs deleted file mode 100644 index 668e3995cf..0000000000 --- a/analytic_engine/src/instance/read_metrics.rs +++ /dev/null @@ -1,2 +0,0 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. - From c41b0aaae94e4b54ed70e299730a12d1eb29b3b4 Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Mon, 13 Mar 2023 20:21:05 +0800 Subject: [PATCH 16/19] remove useless module --- analytic_engine/src/instance/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/analytic_engine/src/instance/mod.rs b/analytic_engine/src/instance/mod.rs index d6cacb2fc1..7575ecd41f 100644 --- a/analytic_engine/src/instance/mod.rs +++ b/analytic_engine/src/instance/mod.rs @@ -14,7 +14,6 @@ pub mod flush_compaction; pub(crate) mod mem_collector; pub mod open; mod read; -pub(crate) mod read_metrics; pub(crate) mod write; pub mod write_worker; From e2ad4a7a638d7305740e40d039a846779ee2a84f Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Tue, 14 Mar 2023 13:59:08 +0800 Subject: [PATCH 17/19] rename constant variable --- analytic_engine/src/instance/read.rs | 8 ++++---- analytic_engine/src/sst/parquet/async_reader.rs | 3 ++- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/analytic_engine/src/instance/read.rs b/analytic_engine/src/instance/read.rs index e0fbf26904..0a0d954f90 100644 --- a/analytic_engine/src/instance/read.rs +++ b/analytic_engine/src/instance/read.rs @@ -67,8 +67,8 @@ pub enum Error { define_result!(Error); const RECORD_BATCH_READ_BUF_SIZE: usize = 1000; -const READ_METRIC_MERGE_SORT: &str = "do_merge_sort"; -const READ_METRIC_ITER_NUM: &str = "iter_num"; +const MERGE_SORT_METRIC_NAME: &str = "do_merge_sort"; +const ITER_NUM_METRIC_NAME: &str = "iter_num"; /// Check whether it needs to apply merge sorting when reading the table with /// the `table_options` by the `read_request`. @@ -101,7 +101,7 @@ impl Instance { table_data.metrics.on_read_request_begin(); let need_merge_sort = need_merge_sort_streams(&table_options, &request); request.metrics_collector.collect(Metric::boolean( - READ_METRIC_MERGE_SORT.to_string(), + MERGE_SORT_METRIC_NAME.to_string(), need_merge_sort, )); @@ -211,7 +211,7 @@ impl Instance { } request.metrics_collector.collect(Metric::counter( - READ_METRIC_ITER_NUM.to_string(), + ITER_NUM_METRIC_NAME.to_string(), iters.len(), )); diff --git a/analytic_engine/src/sst/parquet/async_reader.rs b/analytic_engine/src/sst/parquet/async_reader.rs index 0384729f6a..5885ddaeab 100644 --- a/analytic_engine/src/sst/parquet/async_reader.rs +++ b/analytic_engine/src/sst/parquet/async_reader.rs @@ -51,6 +51,7 @@ use crate::sst::{ reader::{error::*, Result, SstReader}, }; +const PRUNE_ROW_GROUPS_COLLECTOR_NAME: &str = "prune_row_groups"; type SendableRecordBatchStream = Pin> + Send>>; pub struct Reader<'a> { @@ -176,7 +177,7 @@ impl<'a> Reader<'a> { .metrics .metrics_collector .as_ref() - .map(|v| v.span("prune_row_groups".to_string())); + .map(|v| v.span(PRUNE_ROW_GROUPS_COLLECTOR_NAME.to_string())); let mut pruner = RowGroupPruner::try_new( &schema, row_groups, From 281a7fea1be27fb2057284685baeb8243384b90d Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Tue, 14 Mar 2023 15:27:01 +0800 Subject: [PATCH 18/19] fix wrong field name in metrics --- components/trace_metric/src/collector.rs | 16 ++++++++++++++-- components/trace_metric_derive/src/builder.rs | 10 +++++----- .../trace_metric_derive_tests/src/lib.rs | 18 +++++++++++------- 3 files changed, 30 insertions(+), 14 deletions(-) diff --git a/components/trace_metric/src/collector.rs b/components/trace_metric/src/collector.rs index 91b16b9759..308b109fed 100644 --- a/components/trace_metric/src/collector.rs +++ b/components/trace_metric/src/collector.rs @@ -95,7 +95,7 @@ impl FormatCollectorVisitor { impl CollectorVisitor for FormatCollectorVisitor { fn visit(&mut self, level: usize, collector: &MetricsCollector) { let collector_indent = Self::indent(level); - self.append_line(&collector_indent, &format!("<{}>", collector.name())); + self.append_line(&collector_indent, &format!("{}:", collector.name())); let metric_indent = Self::indent(level + 1); collector.visit_metrics(&mut |metric| { self.append_line(&metric_indent, &format!("{metric:?}")); @@ -133,6 +133,18 @@ mod tests { let mut visitor = FormatCollectorVisitor::default(); collector.visit(&mut visitor); - println!("{}", visitor.into_string()); + let expect_output = r#"root: + counter=1 + elapsed=100ms + child_1_0: + boolean=false + child_2_0: + counter=1 + elapsed=100ms + child_1_1: + boolean=false + child_1_2: +"#; + assert_eq!(expect_output, &visitor.into_string()); } } diff --git a/components/trace_metric_derive/src/builder.rs b/components/trace_metric_derive/src/builder.rs index 889e13052f..4cd5687ef5 100644 --- a/components/trace_metric_derive/src/builder.rs +++ b/components/trace_metric_derive/src/builder.rs @@ -126,13 +126,13 @@ impl Builder { let field_name = &metric_field.field_name; let metric = match metric_field.metric_type { MetricType::Counter => { - quote! { trace_metric::Metric::counter("#field_name".to_string(), self.#field_name) } + quote! { ::trace_metric::Metric::counter(stringify!(#field_name).to_string(), self.#field_name) } } MetricType::Elapsed => { - quote! { trace_metric::Metric::elapsed("#field_name".to_string(), self.#field_name) } + quote! { ::trace_metric::Metric::elapsed(stringify!(#field_name).to_string(), self.#field_name) } } MetricType::Boolean => { - quote! { trace_metric::Metric::boolean("#field_name".to_string(), self.#field_name) } + quote! { ::trace_metric::Metric::boolean(stringify!(#field_name).to_string(), self.#field_name) } } }; let statement = quote! { @@ -147,7 +147,7 @@ impl Builder { let collector_field_name = &self.collector_field.field_name; let stream = if self.collector_field.optional { quote! { - impl #generics Drop for #struct_name #generics #where_clause { + impl #generics ::core::ops::Drop for #struct_name #generics #where_clause { fn drop(&mut self) { if let Some(collector) = &self.#collector_field_name { #(#collect_statements)* @@ -157,7 +157,7 @@ impl Builder { } } else { quote! { - impl #generics Drop for #struct_name #generics #where_clause { + impl #generics ::core::ops::Drop for #struct_name #generics #where_clause { fn drop(&mut self) { let collector = &self.#collector_field_name; #(#collect_statements)* diff --git a/components/trace_metric_derive_tests/src/lib.rs b/components/trace_metric_derive_tests/src/lib.rs index 7bb36b5cb4..48a6ddb9bd 100644 --- a/components/trace_metric_derive_tests/src/lib.rs +++ b/components/trace_metric_derive_tests/src/lib.rs @@ -20,6 +20,8 @@ pub struct ExampleMetrics { #[cfg(test)] mod test { + use trace_metric::collector::FormatCollectorVisitor; + use super::*; #[test] @@ -34,12 +36,14 @@ mod test { collector: collector.clone(), }; } - - let mut metric_num = 0; - collector.visit_metrics(&mut |_| { - metric_num += 1; - }); - - assert_eq!(metric_num, 3) + let mut formatter = FormatCollectorVisitor::default(); + collector.visit(&mut formatter); + let expect_output = r#"test: + counter=1 + elapsed=1s + boolean=true +"#; + + assert_eq!(expect_output, &formatter.into_string()); } } From a5b82fd646d506caca3d83aaefccef234d40cc67 Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Tue, 14 Mar 2023 15:45:34 +0800 Subject: [PATCH 19/19] address CR --- analytic_engine/src/sst/parquet/async_reader.rs | 6 ++---- .../src/sst/parquet/row_group_pruner.rs | 14 ++++++-------- components/trace_metric/src/collector.rs | 7 +++---- 3 files changed, 11 insertions(+), 16 deletions(-) diff --git a/analytic_engine/src/sst/parquet/async_reader.rs b/analytic_engine/src/sst/parquet/async_reader.rs index 5885ddaeab..5703a7b668 100644 --- a/analytic_engine/src/sst/parquet/async_reader.rs +++ b/analytic_engine/src/sst/parquet/async_reader.rs @@ -77,7 +77,7 @@ pub struct Reader<'a> { metrics: Metrics, } -#[derive(Debug, Clone, TraceMetricWhenDrop)] +#[derive(Default, Debug, Clone, TraceMetricWhenDrop)] pub(crate) struct Metrics { #[metric(boolean)] pub meta_data_cache_hit: bool, @@ -103,10 +103,8 @@ impl<'a> Reader<'a> { let store = store_picker.pick_by_freq(options.frequency); let metrics = Metrics { - meta_data_cache_hit: false, - read_meta_data_duration: Duration::from_secs(0), - parallelism: 0, metrics_collector, + ..Default::default() }; Self { diff --git a/analytic_engine/src/sst/parquet/row_group_pruner.rs b/analytic_engine/src/sst/parquet/row_group_pruner.rs index 5eb23c3b84..46cd438933 100644 --- a/analytic_engine/src/sst/parquet/row_group_pruner.rs +++ b/analytic_engine/src/sst/parquet/row_group_pruner.rs @@ -21,16 +21,16 @@ use crate::sst::{ reader::error::{OtherNoCause, Result}, }; -#[derive(Debug, Clone, TraceMetricWhenDrop)] +#[derive(Default, Debug, Clone, TraceMetricWhenDrop)] struct Metrics { #[metric(boolean)] - use_bloom_filter: bool, + use_custom_filter: bool, #[metric(counter)] total_row_groups: usize, #[metric(counter)] row_groups_after_prune: usize, #[metric(counter)] - pruned_by_bloom_filter: usize, + pruned_by_custom_filter: usize, #[metric(counter)] pruned_by_min_max: usize, #[metric(collector)] @@ -65,12 +65,10 @@ impl<'a> RowGroupPruner<'a> { } let metrics = Metrics { - use_bloom_filter: parquet_filter.is_some(), + use_custom_filter: parquet_filter.is_some(), total_row_groups: row_groups.len(), - row_groups_after_prune: 0, - pruned_by_bloom_filter: 0, - pruned_by_min_max: 0, collector: metrics_collector, + ..Default::default() }; Ok(Self { @@ -100,7 +98,7 @@ impl<'a> RowGroupPruner<'a> { let pruned1 = self.prune_by_filters(v); let pruned = Self::intersect_pruned_row_groups(&pruned0, &pruned1); - self.metrics.pruned_by_bloom_filter = self.row_groups.len() - pruned1.len(); + self.metrics.pruned_by_custom_filter = self.row_groups.len() - pruned1.len(); debug!( "Finish pruning row groups by parquet_filter and min_max, total_row_groups:{}, pruned_by_min_max:{}, pruned_by_blooms:{}, pruned_by_both:{}", self.row_groups.len(), diff --git a/components/trace_metric/src/collector.rs b/components/trace_metric/src/collector.rs index 308b109fed..dfafd979ed 100644 --- a/components/trace_metric/src/collector.rs +++ b/components/trace_metric/src/collector.rs @@ -43,9 +43,8 @@ impl MetricsCollector { &self.name } - /// Visit all metrics in the collector, excluding the metrics belonging to - /// the children. - pub fn visit_metrics(&self, f: &mut impl FnMut(&Metric)) { + /// Calls a closure on each top-level metrics of this collector. + pub fn for_each_metric(&self, f: &mut impl FnMut(&Metric)) { let metrics = self.metrics.lock().unwrap(); for metric in metrics.iter() { f(metric); @@ -97,7 +96,7 @@ impl CollectorVisitor for FormatCollectorVisitor { let collector_indent = Self::indent(level); self.append_line(&collector_indent, &format!("{}:", collector.name())); let metric_indent = Self::indent(level + 1); - collector.visit_metrics(&mut |metric| { + collector.for_each_metric(&mut |metric| { self.append_line(&metric_indent, &format!("{metric:?}")); }); }