Skip to content

Commit

Permalink
refactor by CR
Browse files Browse the repository at this point in the history
  • Loading branch information
chunshao90 committed May 24, 2023
1 parent f73501c commit a507541
Showing 1 changed file with 165 additions and 134 deletions.
299 changes: 165 additions & 134 deletions proxy/src/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@ fn find_new_columns(
entries: write_entries,
} = write_table_req;

let mut columns: BTreeMap<_, ColumnSchema> = BTreeMap::new();
let mut columns: HashMap<_, ColumnSchema> = HashMap::new();
for write_entry in write_entries {
// Parse tags.
for tag in &write_entry.tags {
Expand Down Expand Up @@ -698,10 +698,8 @@ fn find_new_columns(
),
}
);
if (field.name_index as usize) < field_names.len() {
let field_name = &field_names[field.name_index as usize];
build_column(&mut columns, schema, field_name, &field.value, false)?;
}
let field_name = &field_names[field.name_index as usize];
build_column(&mut columns, schema, field_name, &field.value, false)?;
}
}
}
Expand All @@ -710,14 +708,14 @@ fn find_new_columns(
}

fn build_column<'a>(
columns: &mut BTreeMap<&'a str, ColumnSchema>,
columns: &mut HashMap<&'a str, ColumnSchema>,
schema: &Schema,
name: &'a str,
value: &Option<Value>,
is_tag: bool,
) -> Result<()> {
// Skip adding columns, the following cases:
// 1. Field already exists.
// 1. Column already exists.
// 2. The new column has been added.
if schema.index_of(name).is_some() || columns.get(name).is_some() {
return Ok(());
Expand Down Expand Up @@ -951,7 +949,7 @@ fn convert_proto_value_to_datum(
mod test {
use ceresdbproto::storage::{value, Field, FieldGroup, Tag, Value, WriteSeriesEntry};
use common_types::{
column_schema::{self, ColumnSchema},
column_schema::{self},
datum::{Datum, DatumKind},
row::Row,
schema::Builder,
Expand All @@ -961,45 +959,136 @@ mod test {

use super::*;

const TAG_K: &str = "tagk";
const TAG_V: &str = "tagv";
const TAG_K1: &str = "tagk1";
const TAG_V1: &str = "tagv1";
const FIELD_NAME: &str = "field";
const FIELD_NAME1: &str = "field1";
const FIELD_VALUE_STRING: &str = "stringValue";
const NAME_COL1: &str = "col1";
const NAME_NEW_COL1: &str = "new_col1";
const NAME_COL2: &str = "col2";
const NAME_COL3: &str = "col3";
const NAME_COL4: &str = "col4";
const NAME_COL5: &str = "col5";

// tag_names field_names write_entry
fn generate_write_entry() -> (Schema, Vec<String>, Vec<String>, WriteSeriesEntry) {
let tag_names = vec![TAG_K.to_string(), TAG_K1.to_string()];
let field_names = vec![FIELD_NAME.to_string(), FIELD_NAME1.to_string()];
#[test]
fn test_write_entry_to_row_group() {
let (schema, tag_names, field_names, write_entry) = generate_write_entry();
let rows =
write_entry_to_rows("test_table", &schema, &tag_names, &field_names, write_entry)
.unwrap();
let row0 = vec![
Datum::Timestamp(Timestamp::new(1000)),
Datum::String(NAME_COL1.into()),
Datum::String(NAME_COL2.into()),
Datum::Int64(100),
Datum::Null,
];
let row1 = vec![
Datum::Timestamp(Timestamp::new(2000)),
Datum::String(NAME_COL1.into()),
Datum::String(NAME_COL2.into()),
Datum::Null,
Datum::Int64(10),
];
let row2 = vec![
Datum::Timestamp(Timestamp::new(3000)),
Datum::String(NAME_COL1.into()),
Datum::String(NAME_COL2.into()),
Datum::Null,
Datum::Int64(10),
];

let tag = Tag {
name_index: 0,
value: Some(Value {
value: Some(value::Value::StringValue(TAG_V.to_string())),
}),
};
let tag1 = Tag {
name_index: 1,
let expect_rows = vec![
Row::from_datums(row0),
Row::from_datums(row1),
Row::from_datums(row2),
];
assert_eq!(rows, expect_rows);
}

#[test]
fn test_find_new_columns() {
let write_table_request = generate_write_table_request();
let schema = build_schema();
let new_columns = find_new_columns(&schema, &write_table_request)
.unwrap()
.into_iter()
.map(|v| (v.name.clone(), v))
.collect::<HashMap<_, _>>();

assert_eq!(new_columns.len(), 2);
assert!(new_columns.get(NAME_NEW_COL1).is_some());
assert!(new_columns.get(NAME_NEW_COL1).unwrap().is_tag);
assert!(new_columns.get(NAME_COL5).is_some());
assert!(!new_columns.get(NAME_COL5).unwrap().is_tag);
}

fn build_schema() -> Schema {
Builder::new()
.auto_increment_column_id(true)
.add_key_column(
column_schema::Builder::new(
TIMESTAMP_COLUMN_NAME.to_string(),
DatumKind::Timestamp,
)
.build()
.unwrap(),
)
.unwrap()
.add_key_column(
column_schema::Builder::new(NAME_COL1.to_string(), DatumKind::String)
.is_tag(true)
.build()
.unwrap(),
)
.unwrap()
.add_key_column(
column_schema::Builder::new(NAME_COL2.to_string(), DatumKind::String)
.is_tag(true)
.build()
.unwrap(),
)
.unwrap()
.add_normal_column(
column_schema::Builder::new(NAME_COL3.to_string(), DatumKind::Int64)
.build()
.unwrap(),
)
.unwrap()
.add_normal_column(
column_schema::Builder::new(NAME_COL4.to_string(), DatumKind::Int64)
.build()
.unwrap(),
)
.unwrap()
.build()
.unwrap()
}

fn make_tag(name_index: u32, val: &str) -> Tag {
Tag {
name_index,
value: Some(Value {
value: Some(value::Value::StringValue(TAG_V1.to_string())),
value: Some(value::Value::StringValue(val.to_string())),
}),
};
}
}

fn make_field(name_index: u32, val: value::Value) -> Field {
Field {
name_index,
value: Some(Value { value: Some(val) }),
}
}

// tag_names field_names write_entry
fn generate_write_entry() -> (Schema, Vec<String>, Vec<String>, WriteSeriesEntry) {
let tag_names = vec![NAME_COL1.to_string(), NAME_COL2.to_string()];
let field_names = vec![NAME_COL3.to_string(), NAME_COL4.to_string()];

let tag = make_tag(0, NAME_COL1);
let tag1 = make_tag(1, NAME_COL2);
let tags = vec![tag, tag1];

let field = Field {
name_index: 0,
value: Some(Value {
value: Some(value::Value::Float64Value(100.0)),
}),
};
let field1 = Field {
name_index: 1,
value: Some(Value {
value: Some(value::Value::StringValue(FIELD_VALUE_STRING.to_string())),
}),
};
let field = make_field(0, value::Value::Int64Value(100));
let field1 = make_field(1, value::Value::Int64Value(10));

let field_group = FieldGroup {
timestamp: 1000,
fields: vec![field],
Expand All @@ -1018,102 +1107,44 @@ mod test {
field_groups: vec![field_group, field_group1, field_group2],
};

let schema_builder = Builder::new();
let schema = schema_builder
.auto_increment_column_id(true)
.add_key_column(ColumnSchema {
id: column_schema::COLUMN_ID_UNINIT,
name: TIMESTAMP_COLUMN_NAME.to_string(),
data_type: DatumKind::Timestamp,
is_nullable: false,
is_tag: false,
comment: String::new(),
escaped_name: TIMESTAMP_COLUMN_NAME.escape_debug().to_string(),
default_value: None,
})
.unwrap()
.add_key_column(ColumnSchema {
id: column_schema::COLUMN_ID_UNINIT,
name: TAG_K.to_string(),
data_type: DatumKind::String,
is_nullable: false,
is_tag: true,
comment: String::new(),
escaped_name: TAG_K.escape_debug().to_string(),
default_value: None,
})
.unwrap()
.add_normal_column(ColumnSchema {
id: column_schema::COLUMN_ID_UNINIT,
name: TAG_K1.to_string(),
data_type: DatumKind::String,
is_nullable: false,
is_tag: true,
comment: String::new(),
escaped_name: TAG_K1.escape_debug().to_string(),
default_value: None,
})
.unwrap()
.add_normal_column(ColumnSchema {
id: column_schema::COLUMN_ID_UNINIT,
name: FIELD_NAME.to_string(),
data_type: DatumKind::Double,
is_nullable: true,
is_tag: false,
comment: String::new(),
escaped_name: FIELD_NAME.escape_debug().to_string(),
default_value: None,
})
.unwrap()
.add_normal_column(ColumnSchema {
id: column_schema::COLUMN_ID_UNINIT,
name: FIELD_NAME1.to_string(),
data_type: DatumKind::String,
is_nullable: true,
is_tag: false,
comment: String::new(),
escaped_name: FIELD_NAME1.escape_debug().to_string(),
default_value: None,
})
.unwrap()
.build()
.unwrap();
let schema = build_schema();
(schema, tag_names, field_names, write_entry)
}

#[test]
fn test_write_entry_to_row_group() {
let (schema, tag_names, field_names, write_entry) = generate_write_entry();
let rows =
write_entry_to_rows("test_table", &schema, &tag_names, &field_names, write_entry)
.unwrap();
let row0 = vec![
Datum::Timestamp(Timestamp::new(1000)),
Datum::String(TAG_V.into()),
Datum::String(TAG_V1.into()),
Datum::Double(100.0),
Datum::Null,
];
let row1 = vec![
Datum::Timestamp(Timestamp::new(2000)),
Datum::String(TAG_V.into()),
Datum::String(TAG_V1.into()),
Datum::Null,
Datum::String(FIELD_VALUE_STRING.into()),
];
let row2 = vec![
Datum::Timestamp(Timestamp::new(3000)),
Datum::String(TAG_V.into()),
Datum::String(TAG_V1.into()),
Datum::Null,
Datum::String(FIELD_VALUE_STRING.into()),
];
fn generate_write_table_request() -> WriteTableRequest {
let tag1 = make_tag(0, NAME_NEW_COL1);
let tag2 = make_tag(1, NAME_COL1);
let tags = vec![tag1, tag2];

let expect_rows = vec![
Row::from_datums(row0),
Row::from_datums(row1),
Row::from_datums(row2),
];
assert_eq!(rows, expect_rows);
let field1 = make_field(0, value::Value::Int64Value(100));
let field2 = make_field(1, value::Value::Int64Value(10));

let field_group1 = FieldGroup {
timestamp: 1000,
fields: vec![field1.clone(), field2.clone()],
};
let field_group2 = FieldGroup {
timestamp: 2000,
fields: vec![field1],
};
let field_group3 = FieldGroup {
timestamp: 3000,
fields: vec![field2],
};

let write_entry = WriteSeriesEntry {
tags,
field_groups: vec![field_group1, field_group2, field_group3],
};

let tag_names = vec![NAME_NEW_COL1.to_string(), NAME_COL1.to_string()];
let field_names = vec![NAME_COL3.to_string(), NAME_COL5.to_string()];

WriteTableRequest {
table: "test".to_string(),
tag_names,
field_names,
entries: vec![write_entry],
}
}
}

0 comments on commit a507541

Please sign in to comment.