Skip to content

Commit

Permalink
add testcase for remote prom handler
Browse files Browse the repository at this point in the history
  • Loading branch information
jiacai2050 committed Jan 30, 2023
1 parent a19097c commit 251f77f
Showing 1 changed file with 238 additions and 47 deletions.
285 changes: 238 additions & 47 deletions server/src/handlers/prom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ use common_types::{
use interpreters::interpreter::Output;
use log::debug;
use prom_remote_api::types::{
label_matcher, Label, Query, QueryResult, RemoteStorage, Sample, TimeSeries, WriteRequest,
label_matcher, Label, LabelMatcher, Query, QueryResult, RemoteStorage, Sample, TimeSeries,
WriteRequest,
};
use query_engine::executor::{Executor as QueryExecutor, RecordBatchVec};
use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu};
Expand Down Expand Up @@ -110,7 +111,7 @@ impl reject::Reject for Error {}
const NAME_LABEL: &str = "__name__";
const VALUE_COLUMN: &str = "value";

pub struct CeresDBStorage<Q: QueryExecutor + 'static> {
pub struct CeresDBStorage<Q> {
instance: InstanceRef<Q>,
schema_config_provider: SchemaConfigProviderRef,
}
Expand All @@ -124,39 +125,55 @@ impl<Q: QueryExecutor + 'static> CeresDBStorage<Q> {
}
}

impl<Q: QueryExecutor + 'static> CeresDBStorage<Q> {
/// This function will separate measurement from labels, and sort labels by
/// name.
fn normalize_labels(labels: Vec<Label>) -> Result<(String, Vec<Label>)> {
let mut new_labels = Vec::with_capacity(labels.len());
let mut measurement = None;
for label in labels {
if label.name == NAME_LABEL {
measurement = Some(label.value);
continue;
}
impl<Q> CeresDBStorage<Q> {
/// Separate metric from labels, and sort labels by name.
fn normalize_labels(mut labels: Vec<Label>) -> Result<(String, Vec<Label>)> {
let metric_idx = labels
.iter()
.position(|label| label.name == NAME_LABEL)
.context(MissingName)?;
let metric = labels.swap_remove(metric_idx).value;
labels.sort_unstable_by(|a, b| a.name.cmp(&b.name));

new_labels.push(label);
}
let measurement = measurement.context(MissingName)?;
new_labels.sort_unstable_by(|a, b| a.name.cmp(&b.name));
Ok((metric, labels))
}

/// Separate metric from matchers, and convert remaining matchers to sql
/// filters.
fn normalize_matchers(mut matchers: Vec<LabelMatcher>) -> Result<(String, Vec<String>)> {
let metric_idx = matchers
.iter()
.position(|m| m.name == NAME_LABEL)
.context(MissingName)?;

Ok((measurement, new_labels))
let metric = matchers.swap_remove(metric_idx).value;
let filters = matchers
.iter()
.map(|m| match m.r#type() {
label_matcher::Type::Eq => format!("{} = '{}'", m.name, m.value),
label_matcher::Type::Neq => format!("{} != '{}'", m.name, m.value),
// https://github.com/prometheus/prometheus/blob/2ce94ac19673a3f7faf164e9e078a79d4d52b767/model/labels/regexp.go#L29
label_matcher::Type::Re => format!("{} ~ '^(?:{})'", m.name, m.value),
label_matcher::Type::Nre => format!("{} !~ '^(?:{})'", m.name, m.value),
})
.collect();

Ok((metric, filters))
}

fn convert_write_request(req: WriteRequest) -> Result<WriteRequestPb> {
let mut req_by_metric = HashMap::new();
for timeseries in req.timeseries {
let (measurement, labels) = Self::normalize_labels(timeseries.labels)?;
let (metric, labels) = Self::normalize_labels(timeseries.labels)?;
let (tag_names, tag_values): (Vec<_>, Vec<_>) = labels
.into_iter()
.map(|label| (label.name, label.value))
.unzip();

req_by_metric
.entry(measurement.to_string())
.entry(metric.to_string())
.or_insert_with(|| WriteMetric {
metric: measurement,
metric,
tag_names,
field_names: vec![VALUE_COLUMN.to_string()],
entries: Vec::new(),
Expand Down Expand Up @@ -194,7 +211,7 @@ impl<Q: QueryExecutor + 'static> CeresDBStorage<Q> {
})
}

fn convert_query_result(measurement: String, resp: Output) -> Result<QueryResult> {
fn convert_query_result(metric: String, resp: Output) -> Result<QueryResult> {
let record_batches = match resp {
Output::AffectedRows(_) => return ResponseMustRows {}.fail(),
Output::Records(v) => v,
Expand All @@ -207,7 +224,7 @@ impl<Q: QueryExecutor + 'static> CeresDBStorage<Q> {
Some(batch) => Converter::try_new(batch.schema())?,
};

converter.convert(measurement, record_batches)
converter.convert(metric, record_batches)
}
}

Expand Down Expand Up @@ -262,32 +279,15 @@ impl<Q: QueryExecutor + 'static> RemoteStorage for CeresDBStorage<Q> {

/// Process one query within ReadRequest.
async fn process_query(&self, ctx: &Self::Context, q: Query) -> Result<QueryResult> {
let mut filters = Vec::with_capacity(q.matchers.len());
let (metric, mut filters) = Self::normalize_matchers(q.matchers)?;
filters.push(format!(
"{} between {} AND {}",
TIMESTAMP_COLUMN, q.start_timestamp_ms, q.end_timestamp_ms
));
let mut measurement = None;
for m in &q.matchers {
if m.name == NAME_LABEL {
measurement = Some(m.value.to_string());
continue;
}

let filter = match m.r#type() {
label_matcher::Type::Eq => format!("{} = '{}'", m.name, m.value),
label_matcher::Type::Neq => format!("{} != '{}'", m.name, m.value),
// https://github.com/prometheus/prometheus/blob/2ce94ac19673a3f7faf164e9e078a79d4d52b767/model/labels/regexp.go#L29
label_matcher::Type::Re => format!("{} ~ '^(?:{})'", m.name, m.value),
label_matcher::Type::Nre => format!("{} !~ '^(?:{})'", m.name, m.value),
};
filters.push(filter)
}

let measurement = measurement.context(MissingName).unwrap();
let sql = format!(
"select * from {} where {} order by {}, {}",
measurement,
metric,
filters.join(" and "),
TSID_COLUMN,
TIMESTAMP_COLUMN
Expand All @@ -298,11 +298,11 @@ impl<Q: QueryExecutor + 'static> RemoteStorage for CeresDBStorage<Q> {
.map_err(Box::new)
.context(SqlHandle)?;

Self::convert_query_result(measurement, result)
Self::convert_query_result(metric, result)
}
}

/// Converter convert Arrow's RecordBatch into Prometheus's QueryResult
/// Converter converts Arrow's RecordBatch into Prometheus's QueryResult
struct Converter {
tsid_idx: usize,
timestamp_idx: usize,
Expand Down Expand Up @@ -362,9 +362,8 @@ impl Converter {
})
}

fn convert(&self, measurement: String, record_batches: RecordBatchVec) -> Result<QueryResult> {
fn convert(&self, metric: String, record_batches: RecordBatchVec) -> Result<QueryResult> {
let mut series_by_tsid = HashMap::new();
debug!("convert query result, tags:{:?}.", self.tags);
for batch in record_batches {
let tsid_col = batch.column(self.tsid_idx);
let timestamp_col = batch.column(self.timestamp_idx);
Expand Down Expand Up @@ -397,7 +396,7 @@ impl Converter {
.collect::<Vec<_>>();
labels.push(Label {
name: NAME_LABEL.to_string(),
value: measurement.clone(),
value: metric.clone(),
});

TimeSeries {
Expand Down Expand Up @@ -425,3 +424,195 @@ impl Converter {
})
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use arrow::{
array::{ArrayRef, Float64Array, StringArray, TimestampMillisecondArray, UInt64Array},
record_batch::RecordBatch as ArrowRecordBatch,
};
use common_types::{column_schema, record_batch::RecordBatch, schema};
use prom_remote_api::types::{label_matcher::Type, Label};

use super::*;

fn make_labels(tuples: Vec<(&str, &str)>) -> Vec<Label> {
tuples
.into_iter()
.map(|(name, value)| Label {
name: name.to_string(),
value: value.to_string(),
})
.collect()
}

fn make_matchers(tuples: Vec<(&str, &str, Type)>) -> Vec<LabelMatcher> {
tuples
.into_iter()
.map(|(name, value, matcher_type)| LabelMatcher {
name: name.to_string(),
value: value.to_string(),
r#type: matcher_type as i32,
})
.collect()
}

fn make_samples(tuples: Vec<(i64, f64)>) -> Vec<Sample> {
tuples
.into_iter()
.map(|(timestamp, value)| Sample { timestamp, value })
.collect()
}

#[test]
fn test_normailze_labels() {
let labels = make_labels(vec![
("aa", "va"),
("zz", "vz"),
(NAME_LABEL, "cpu"),
("yy", "vy"),
]);

let (metric, labels) = CeresDBStorage::<()>::normalize_labels(labels).unwrap();
assert_eq!("cpu", metric);
assert_eq!(
make_labels(vec![("aa", "va"), ("yy", "vy"), ("zz", "vz")]),
labels
);

assert!(CeresDBStorage::<()>::normalize_labels(vec![]).is_err());
}

#[test]
fn test_normailze_matchers() {
let matchers = make_matchers(vec![
("a", "1", Type::Eq),
("b", "2", Type::Neq),
("c", "3", Type::Re),
("d", "4", Type::Nre),
(NAME_LABEL, "cpu", Type::Eq),
]);

let (metric, filters) = CeresDBStorage::<()>::normalize_matchers(matchers).unwrap();
assert_eq!("cpu", metric);
assert_eq!(
vec!["a = '1'", "b != '2'", "c ~ '^(?:3)'", "d !~ '^(?:4)'"],
filters
);

assert!(CeresDBStorage::<()>::normalize_matchers(vec![]).is_err());
}

// Build a schema with
// - 2 tags(tag1, tag2)
// - 1 field(value)
fn build_schema() -> schema::Schema {
schema::Builder::new()
.auto_increment_column_id(true)
.add_key_column(
column_schema::Builder::new(TSID_COLUMN.to_string(), DatumKind::UInt64)
.build()
.unwrap(),
)
.unwrap()
.add_key_column(
column_schema::Builder::new(TIMESTAMP_COLUMN.to_string(), DatumKind::Timestamp)
.build()
.unwrap(),
)
.unwrap()
.add_normal_column(
column_schema::Builder::new(VALUE_COLUMN.to_string(), DatumKind::Double)
.build()
.unwrap(),
)
.unwrap()
.add_normal_column(
column_schema::Builder::new("tag1".to_string(), DatumKind::String)
.is_tag(true)
.build()
.unwrap(),
)
.unwrap()
.add_normal_column(
column_schema::Builder::new("tag2".to_string(), DatumKind::String)
.is_tag(true)
.build()
.unwrap(),
)
.unwrap()
.build()
.unwrap()
}

fn build_record_batch(schema: &schema::Schema) -> RecordBatchVec {
let tsid: ArrayRef = Arc::new(UInt64Array::from(vec![1, 1, 2, 3, 3]));
let timestamp: ArrayRef = Arc::new(TimestampMillisecondArray::from(vec![
11111111, 11111112, 11111113, 11111111, 11111112,
]));
let values: ArrayRef =
Arc::new(Float64Array::from(vec![100.0, 101.0, 200.0, 300.0, 301.0]));
let tag1: ArrayRef = Arc::new(StringArray::from(vec!["a", "a", "b", "c", "c"]));
let tag2: ArrayRef = Arc::new(StringArray::from(vec!["x", "x", "y", "z", "z"]));

let batch = ArrowRecordBatch::try_new(
schema.to_arrow_schema_ref(),
vec![tsid, timestamp, values, tag1, tag2],
)
.unwrap();

vec![RecordBatch::try_from(batch).unwrap()]
}

#[test]
fn test_convert_records_to_query_result() {
let metric = "cpu";
let schema = build_schema();
let batches = build_record_batch(&schema);
let record_schema = schema.to_record_schema();
let converter = Converter::try_new(&record_schema).unwrap();
let mut query_result = converter.convert(metric.to_string(), batches).unwrap();

query_result
.timeseries
// sort time series by first label's value(tag1 in this case)
.sort_unstable_by(|a, b| a.labels[0].value.cmp(&b.labels[0].value));

assert_eq!(
QueryResult {
timeseries: vec![
TimeSeries {
labels: make_labels(vec![
("tag1", "a"),
("tag2", "x"),
(NAME_LABEL, metric)
]),
samples: make_samples(vec![(11111111, 100.0), (11111112, 101.0),]),
..Default::default()
},
TimeSeries {
labels: make_labels(vec![
("tag1", "b"),
("tag2", "y"),
(NAME_LABEL, metric)
]),
samples: make_samples(vec![(11111113, 200.0)]),
..Default::default()
},
TimeSeries {
labels: make_labels(vec![
("tag1", "c"),
("tag2", "z"),
(NAME_LABEL, metric)
]),
samples: make_samples(vec![(11111111, 300.0), (11111112, 301.0),]),
..Default::default()
},
]
},
query_result
);
}
}

0 comments on commit 251f77f

Please sign in to comment.