Skip to content

Commit

Permalink
use BTreeMap
Browse files Browse the repository at this point in the history
  • Loading branch information
baojinri committed Nov 22, 2023
1 parent d194a5c commit 55829df
Showing 1 changed file with 76 additions and 27 deletions.
103 changes: 76 additions & 27 deletions proxy/src/opentsdb/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use std::{
collections::{HashMap, HashSet},
collections::{BTreeMap, HashMap, HashSet},
default::Default,
fmt::Debug,
};
Expand Down Expand Up @@ -220,13 +220,13 @@ pub struct QueryResponse {
/// A list of tags only returned when the results are for a single time
/// series. If results are aggregated, this value may be null or an empty
/// map
pub(crate) tags: HashMap<String, String>,
pub(crate) tags: BTreeMap<String, String>,
/// If more than one timeseries were included in the result set, i.e. they
/// were aggregated, this will display a list of tag names that were found
/// in common across all time series.
#[serde(rename = "aggregatedTags")]
pub(crate) aggregated_tags: Vec<String>,
pub(crate) dps: HashMap<String, f64>,
pub(crate) dps: BTreeMap<String, f64>,
}

#[derive(Default)]
Expand All @@ -237,10 +237,10 @@ struct QueryConverter {
// (column_name, index)
tags_idx: Vec<(String, usize)>,
aggregated_tags: Vec<String>,
// (tags_key, (tagk, tagv))
tags: HashMap<String, HashMap<String, String>>,
// (tags_key, (timestamp, value))
values: HashMap<String, HashMap<String, f64>>,
// (time_series, (tagk, tagv))
tags: BTreeMap<String, BTreeMap<String, String>>,
// (time_series, (timestamp, value))
values: BTreeMap<String, BTreeMap<String, f64>>,

resp: Vec<QueryResponse>,
}
Expand All @@ -249,18 +249,18 @@ impl QueryConverter {
fn try_new(
schema: &RecordSchema,
metric: String,
timestamp_col_name: &str,
field_col_name: &str,
timestamp_col_name: String,
field_col_name: String,
tags: Vec<String>,
aggregated_tags: Vec<String>,
) -> Result<Self> {
let timestamp_idx = schema
.index_of(timestamp_col_name)
.index_of(&timestamp_col_name)
.context(InternalNoCause {
msg: "Timestamp column is missing in query response",
})?;

let value_idx = schema.index_of(field_col_name).context(InternalNoCause {
let value_idx = schema.index_of(&field_col_name).context(InternalNoCause {
msg: "Value column is missing in query response",
})?;

Expand Down Expand Up @@ -316,7 +316,8 @@ impl QueryConverter {
fn add_batch(&mut self, record_batch: RecordBatch) -> Result<()> {
let row_num = record_batch.num_rows();
for row_idx in 0..row_num {
let mut tags = HashMap::with_capacity(self.tags_idx.len());
let mut tags = BTreeMap::new();
// tags_key is used to identify a time series
let mut tags_key = String::new();
for (tag_key, idx) in &self.tags_idx {
let tag_value = record_batch
Expand Down Expand Up @@ -404,8 +405,8 @@ pub(crate) fn convert_output_to_response(
QueryConverter::try_new(
record_schema,
metric,
&timestamp_col_name,
&field_col_name,
timestamp_col_name,
field_col_name,
tags,
aggregated_tags,
)?
Expand Down Expand Up @@ -467,19 +468,30 @@ mod tests {
.unwrap(),
)
.unwrap()
.add_normal_column(
column_schema::Builder::new("tag2".to_string(), DatumKind::String)
.is_tag(true)
.build()
.unwrap(),
)
.unwrap()
.build()
.unwrap()
}

fn build_record_batch(schema: &Schema) -> RecordBatchVec {
let tsid: ArrayRef = Arc::new(UInt64Array::from(vec![1]));
let timestamp: ArrayRef = Arc::new(TimestampMillisecondArray::from(vec![11111111]));
let values: ArrayRef = Arc::new(Float64Array::from(vec![100.0]));
let tag1: ArrayRef = Arc::new(StringArray::from(vec!["a"]));
let tsid: ArrayRef = Arc::new(UInt64Array::from(vec![1, 1, 2, 3, 3]));
let timestamp: ArrayRef = Arc::new(TimestampMillisecondArray::from(vec![
11111111, 11111112, 11111113, 11111111, 11111112,
]));
let values: ArrayRef =
Arc::new(Float64Array::from(vec![100.0, 101.0, 200.0, 300.0, 301.0]));
let tag1: ArrayRef = Arc::new(StringArray::from(vec!["a", "a", "b", "c", "c"]));
let tag2: ArrayRef = Arc::new(StringArray::from(vec!["x", "x", "y", "z", "z"]));

let batch = ArrowRecordBatch::try_new(
schema.to_arrow_schema_ref(),
vec![tsid, timestamp, values, tag1],
vec![tsid, timestamp, values, tag1, tag2],
)
.unwrap();

Expand All @@ -489,7 +501,7 @@ mod tests {
#[test]
fn test_convert_output_to_response() {
let metric = "metric".to_string();
let tags = vec!["tag1".to_string()];
let tags = vec!["tag1".to_string(), "tag2".to_string()];
let schema = build_schema();
let record_batch = build_record_batch(&schema);
let result = convert_output_to_response(
Expand All @@ -498,19 +510,56 @@ mod tests {
DEFAULT_FIELD.to_string(),
TIMESTAMP_COLUMN.to_string(),
tags,
vec!["tag1".to_string()],
vec![],
)
.unwrap();

assert_eq!(
vec![QueryResponse {
metric: metric.clone(),
tags: vec![("tag1".to_string(), "a".to_string()),]
vec![
QueryResponse {
metric: metric.clone(),
tags: vec![
("tag1".to_string(), "a".to_string()),
("tag2".to_string(), "x".to_string()),
]
.into_iter()
.collect(),
aggregated_tags: vec![],
dps: vec![
("11111111".to_string(), 100.0),
("11111112".to_string(), 101.0),
]
.into_iter()
.collect(),
},
QueryResponse {
metric: metric.clone(),
tags: vec![
("tag1".to_string(), "b".to_string()),
("tag2".to_string(), "y".to_string()),
]
.into_iter()
.collect(),
aggregated_tags: vec![],
dps: vec![("11111113".to_string(), 200.0),].into_iter().collect(),
},
QueryResponse {
metric: metric.clone(),
tags: vec![
("tag1".to_string(), "c".to_string()),
("tag2".to_string(), "z".to_string()),
]
.into_iter()
.collect(),
aggregated_tags: vec![],
dps: vec![
("11111111".to_string(), 300.0),
("11111112".to_string(), 301.0),
]
.into_iter()
.collect(),
aggregated_tags: vec!["tag1".to_string()],
dps: vec![("11111111".to_string(), 100.0),].into_iter().collect(),
}],
},
],
result
);
}
Expand Down

0 comments on commit 55829df

Please sign in to comment.