Skip to content

Commit

Permalink
use the trace_metric crate
Browse files Browse the repository at this point in the history
  • Loading branch information
ShiKaiWi committed Mar 10, 2023
1 parent 7c41bde commit 3da8fe4
Show file tree
Hide file tree
Showing 16 changed files with 70 additions and 188 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions analytic_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ table_engine = { workspace = true }
table_kv = { workspace = true }
tempfile = { workspace = true, optional = true }
tokio = { workspace = true }
trace_metric = { workspace = true }
wal = { workspace = true }
xorfilter-rs = { workspace = true }

Expand Down
3 changes: 2 additions & 1 deletion analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use log::{debug, error, info};
use snafu::{Backtrace, ResultExt, Snafu};
use table_engine::{predicate::Predicate, table::Result as TableResult};
use tokio::sync::oneshot;
use trace_metric::Collector;
use wal::manager::WalLocation;

use crate::{
Expand Down Expand Up @@ -855,7 +856,7 @@ impl SpaceStore {
let sequence = table_data.last_sequence();
let mut builder = MergeBuilder::new(MergeConfig {
request_id,
metrics_collector: None,
metrics_collector: Collector::new("compaction".to_string()),
// no need to set deadline for compaction
deadline: None,
space_id,
Expand Down
5 changes: 3 additions & 2 deletions analytic_engine/src/instance/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ use table_engine::{
stream::{
self, ErrWithSource, PartitionedStreams, RecordBatchStream, SendableRecordBatchStream,
},
table::{Metric, ReadRequest},
table::ReadRequest,
};
use tokio::sync::mpsc::{self, Receiver};
use trace_metric::Metric;

use crate::{
instance::Instance,
Expand Down Expand Up @@ -179,7 +180,7 @@ impl Instance {
for read_view in read_views {
let merge_config = MergeConfig {
request_id: request.request_id,
metrics_collector: Some(request.metrics_collector.clone()),
metrics_collector: request.metrics_collector.clone(),
deadline: request.opts.deadline,
space_id: table_data.space_id,
table_id: table_data.id,
Expand Down
90 changes: 30 additions & 60 deletions analytic_engine/src/row_iter/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,10 @@ use common_types::{
};
use common_util::{define_result, error::GenericError};
use futures::{future::try_join_all, StreamExt};
use log::{debug, info, trace};
use log::{debug, trace};
use snafu::{ensure, Backtrace, ResultExt, Snafu};
use table_engine::{
predicate::PredicateRef,
table::{Metric, ReadMetricsCollector, TableId},
};
use table_engine::{predicate::PredicateRef, table::TableId};
use trace_metric::{Collector, TracedMetrics};

use crate::{
row_iter::{
Expand Down Expand Up @@ -86,7 +84,7 @@ define_result!(Error);
#[derive(Debug)]
pub struct MergeConfig<'a> {
pub request_id: RequestId,
pub metrics_collector: Option<ReadMetricsCollector>,
pub metrics_collector: Collector,
/// None for background jobs, such as: compaction
pub deadline: Option<Instant>,
pub space_id: SpaceId,
Expand Down Expand Up @@ -231,8 +229,12 @@ impl<'a> MergeBuilder<'a> {
self.ssts,
self.config.merge_iter_options,
self.config.reverse,
Metrics::new(self.memtables.len(), sst_streams_num, sst_ids),
self.config.metrics_collector,
Metrics::new(
self.memtables.len(),
sst_streams_num,
sst_ids,
self.config.metrics_collector.clone(),
),
))
}
}
Expand Down Expand Up @@ -562,28 +564,44 @@ impl Ord for HeapBufferedStream {
}

/// Metrics for merge iterator.
#[derive(TracedMetrics)]
pub struct Metrics {
#[metric(counter)]
num_memtables: usize,
#[metric(counter)]
num_ssts: usize,
sst_ids: Vec<FileId>,
/// Total rows collected using fetch_rows_from_one_stream().
#[metric(counter)]
total_rows_fetch_from_one: usize,
/// Times to fetch rows from one stream.
#[metric(counter)]
times_fetch_rows_from_one: usize,
/// Times to fetch one row from multiple stream.
#[metric(counter)]
times_fetch_row_from_multiple: usize,
/// Create time of the metrics.
create_at: Instant,
/// Init time cost of the metrics.
#[metric(elapsed)]
init_duration: Duration,
/// Scan time cost of the metrics.
#[metric(elapsed)]
scan_duration: Duration,
/// Scan count
#[metric(counter)]
scan_count: usize,
#[metric(collector)]
metrics_collector: Collector,
}

impl Metrics {
fn new(num_memtables: usize, num_ssts: usize, sst_ids: Vec<FileId>) -> Self {
fn new(
num_memtables: usize,
num_ssts: usize,
sst_ids: Vec<FileId>,
collector: Collector,
) -> Self {
Self {
num_memtables,
num_ssts,
Expand All @@ -595,39 +613,9 @@ impl Metrics {
init_duration: Duration::default(),
scan_duration: Duration::default(),
scan_count: 0,
metrics_collector: collector,
}
}

fn collect(&self, collector: &ReadMetricsCollector) {
// TODO: maybe we can define a macro to generate the code.
collector.collect(Metric::counter(
"num_memtables".to_string(),
self.num_memtables,
));

collector.collect(Metric::counter("num_ssts".to_string(), self.num_ssts));
collector.collect(Metric::counter(
"times_fetch_rows_from_one".to_string(),
self.times_fetch_rows_from_one,
));
collector.collect(Metric::counter(
"times_rows_fetch_from_one".to_string(),
self.times_fetch_row_from_multiple,
));
collector.collect(Metric::counter(
"total_rows_fetch_from_one".to_string(),
self.total_rows_fetch_from_one,
));
collector.collect(Metric::elapsed(
"init_duration".to_string(),
self.init_duration,
));
collector.collect(Metric::elapsed(
"scan_duration".to_string(),
self.scan_duration,
));
collector.collect(Metric::counter("scan_count".to_string(), self.scan_count));
}
}

impl fmt::Debug for Metrics {
Expand Down Expand Up @@ -667,7 +655,6 @@ pub struct MergeIterator {
iter_options: IterOptions,
reverse: bool,
metrics: Metrics,
metrics_collector: Option<ReadMetricsCollector>,
}

impl MergeIterator {
Expand All @@ -681,7 +668,6 @@ impl MergeIterator {
iter_options: IterOptions,
reverse: bool,
metrics: Metrics,
metrics_collector: Option<ReadMetricsCollector>,
) -> Self {
let heap_cap = streams.len();
let record_batch_builder =
Expand All @@ -699,7 +685,6 @@ impl MergeIterator {
iter_options,
reverse,
metrics,
metrics_collector,
}
}

Expand Down Expand Up @@ -893,19 +878,6 @@ impl MergeIterator {
}
}

impl Drop for MergeIterator {
fn drop(&mut self) {
if let Some(collector) = &self.metrics_collector {
self.metrics.collect(collector);
}

info!(
"Merge iterator dropped, table_id:{:?}, request_id:{}, metrics:{:?}, iter_options:{:?},",
self.table_id, self.request_id, self.metrics, self.iter_options,
);
}
}

#[async_trait]
impl RecordBatchWithKeyIterator for MergeIterator {
type Error = Error;
Expand Down Expand Up @@ -968,8 +940,7 @@ mod tests {
Vec::new(),
IterOptions::default(),
false,
Metrics::new(1, 1, vec![]),
None,
Metrics::new(1, 1, vec![], Collector::new("".to_string())),
);

check_iterator(
Expand Down Expand Up @@ -1022,8 +993,7 @@ mod tests {
Vec::new(),
IterOptions::default(),
true,
Metrics::new(1, 1, vec![]),
None,
Metrics::new(1, 1, vec![], Collector::new("".to_string())),
);

check_iterator(
Expand Down
7 changes: 4 additions & 3 deletions analytic_engine/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ use table_engine::{
stream::{PartitionedStreams, SendableRecordBatchStream},
table::{
AlterOptions, AlterSchema, AlterSchemaRequest, Compact, Flush, FlushRequest, Get,
GetInvalidPrimaryKey, GetNullPrimaryKey, GetRequest, ReadMetricsCollector, ReadOptions,
ReadOrder, ReadRequest, Result, Scan, Table, TableId, TableStats, Write, WriteRequest,
GetInvalidPrimaryKey, GetNullPrimaryKey, GetRequest, ReadOptions, ReadOrder, ReadRequest,
Result, Scan, Table, TableId, TableStats, Write, WriteRequest,
},
};
use tokio::sync::oneshot;
use trace_metric::Collector;

use self::data::TableDataRef;
use crate::{
Expand Down Expand Up @@ -179,7 +180,7 @@ impl Table for TableImpl {
projected_schema: request.projected_schema,
predicate,
order: ReadOrder::None,
metrics_collector: ReadMetricsCollector::new(),
metrics_collector: Collector::new("".to_string()),
};
let mut batch_stream = self
.read(read_request)
Expand Down
8 changes: 3 additions & 5 deletions analytic_engine/src/tests/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@ use table_engine::{
self,
engine::{CreateTableRequest, TableState},
predicate::Predicate,
table::{
GetRequest, ReadMetricsCollector, ReadOptions, ReadOrder, ReadRequest, SchemaId, TableId,
TableSeq,
},
table::{GetRequest, ReadOptions, ReadOrder, ReadRequest, SchemaId, TableId, TableSeq},
};
use trace_metric::Collector;

use crate::{table_options, tests::row_util};

Expand Down Expand Up @@ -188,7 +186,7 @@ pub fn new_read_all_request_with_order(
projected_schema: ProjectedSchema::no_projection(schema),
predicate: Arc::new(Predicate::empty()),
order,
metrics_collector: ReadMetricsCollector::new(),
metrics_collector: Collector::new("".to_string()),
}
}

Expand Down
1 change: 1 addition & 0 deletions benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ snafu = { workspace = true }
table_engine = { workspace = true }
table_kv = { workspace = true }
tokio = { workspace = true }
trace_metric = { workspace = true }
wal = { workspace = true }
zstd = { workspace = true }

Expand Down
3 changes: 2 additions & 1 deletion benchmarks/src/merge_memtable_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use common_util::runtime::Runtime;
use log::info;
use object_store::{LocalFileSystem, ObjectStoreRef};
use table_engine::{predicate::Predicate, table::TableId};
use trace_metric::Collector;

use crate::{config::MergeMemTableBenchConfig, util};

Expand Down Expand Up @@ -142,7 +143,7 @@ impl MergeMemTableBench {
let store_picker: ObjectStorePickerRef = Arc::new(self.store.clone());
let mut builder = MergeBuilder::new(MergeConfig {
request_id,
metrics_collector: None,
metrics_collector: Collector::new("".to_string()),
deadline: None,
space_id,
table_id,
Expand Down
3 changes: 2 additions & 1 deletion benchmarks/src/merge_sst_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use log::info;
use object_store::{LocalFileSystem, ObjectStoreRef};
use table_engine::{predicate::Predicate, table::TableId};
use tokio::sync::mpsc::{self, UnboundedReceiver};
use trace_metric::Collector;

use crate::{config::MergeSstBenchConfig, util};

Expand Down Expand Up @@ -125,7 +126,7 @@ impl MergeSstBench {
let store_picker: ObjectStorePickerRef = Arc::new(self.store.clone());
let mut builder = MergeBuilder::new(MergeConfig {
request_id,
metrics_collector: None,
metrics_collector: Collector::new("".to_string()),
deadline: None,
space_id,
table_id,
Expand Down
3 changes: 2 additions & 1 deletion benchmarks/src/sst_tools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use object_store::{LocalFileSystem, ObjectStoreRef, Path};
use serde::Deserialize;
use table_engine::{predicate::Predicate, table::TableId};
use tokio::sync::mpsc;
use trace_metric::Collector;

use crate::{config::BenchPredicate, util};

Expand Down Expand Up @@ -220,7 +221,7 @@ pub async fn merge_sst(config: MergeSstConfig, runtime: Arc<Runtime>) {

let mut builder = MergeBuilder::new(MergeConfig {
request_id,
metrics_collector: None,
metrics_collector: Collector::new("".to_string()),
deadline: None,
space_id,
table_id,
Expand Down
1 change: 1 addition & 0 deletions system_catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ prost = { workspace = true }
snafu = { workspace = true }
table_engine = { workspace = true }
tokio = { workspace = true }
trace_metric = { workspace = true }
Loading

0 comments on commit 3da8fe4

Please sign in to comment.