diff --git a/proxy/src/write.rs b/proxy/src/write.rs index faf476e24c..fc5627e00e 100644 --- a/proxy/src/write.rs +++ b/proxy/src/write.rs @@ -667,7 +667,7 @@ fn find_new_columns( entries: write_entries, } = write_table_req; - let mut columns: BTreeMap<_, ColumnSchema> = BTreeMap::new(); + let mut columns: HashMap<_, ColumnSchema> = HashMap::new(); for write_entry in write_entries { // Parse tags. for tag in &write_entry.tags { @@ -698,10 +698,8 @@ fn find_new_columns( ), } ); - if (field.name_index as usize) < field_names.len() { - let field_name = &field_names[field.name_index as usize]; - build_column(&mut columns, schema, field_name, &field.value, false)?; - } + let field_name = &field_names[field.name_index as usize]; + build_column(&mut columns, schema, field_name, &field.value, false)?; } } } @@ -710,14 +708,14 @@ fn find_new_columns( } fn build_column<'a>( - columns: &mut BTreeMap<&'a str, ColumnSchema>, + columns: &mut HashMap<&'a str, ColumnSchema>, schema: &Schema, name: &'a str, value: &Option, is_tag: bool, ) -> Result<()> { // Skip adding columns, the following cases: - // 1. Field already exists. + // 1. Column already exists. // 2. The new column has been added. if schema.index_of(name).is_some() || columns.get(name).is_some() { return Ok(()); @@ -951,7 +949,7 @@ fn convert_proto_value_to_datum( mod test { use ceresdbproto::storage::{value, Field, FieldGroup, Tag, Value, WriteSeriesEntry}; use common_types::{ - column_schema::{self, ColumnSchema}, + column_schema::{self}, datum::{Datum, DatumKind}, row::Row, schema::Builder, @@ -961,45 +959,136 @@ mod test { use super::*; - const TAG_K: &str = "tagk"; - const TAG_V: &str = "tagv"; - const TAG_K1: &str = "tagk1"; - const TAG_V1: &str = "tagv1"; - const FIELD_NAME: &str = "field"; - const FIELD_NAME1: &str = "field1"; - const FIELD_VALUE_STRING: &str = "stringValue"; + const NAME_COL1: &str = "col1"; + const NAME_NEW_COL1: &str = "new_col1"; + const NAME_COL2: &str = "col2"; + const NAME_COL3: &str = "col3"; + const NAME_COL4: &str = "col4"; + const NAME_COL5: &str = "col5"; - // tag_names field_names write_entry - fn generate_write_entry() -> (Schema, Vec, Vec, WriteSeriesEntry) { - let tag_names = vec![TAG_K.to_string(), TAG_K1.to_string()]; - let field_names = vec![FIELD_NAME.to_string(), FIELD_NAME1.to_string()]; + #[test] + fn test_write_entry_to_row_group() { + let (schema, tag_names, field_names, write_entry) = generate_write_entry(); + let rows = + write_entry_to_rows("test_table", &schema, &tag_names, &field_names, write_entry) + .unwrap(); + let row0 = vec![ + Datum::Timestamp(Timestamp::new(1000)), + Datum::String(NAME_COL1.into()), + Datum::String(NAME_COL2.into()), + Datum::Int64(100), + Datum::Null, + ]; + let row1 = vec![ + Datum::Timestamp(Timestamp::new(2000)), + Datum::String(NAME_COL1.into()), + Datum::String(NAME_COL2.into()), + Datum::Null, + Datum::Int64(10), + ]; + let row2 = vec![ + Datum::Timestamp(Timestamp::new(3000)), + Datum::String(NAME_COL1.into()), + Datum::String(NAME_COL2.into()), + Datum::Null, + Datum::Int64(10), + ]; - let tag = Tag { - name_index: 0, - value: Some(Value { - value: Some(value::Value::StringValue(TAG_V.to_string())), - }), - }; - let tag1 = Tag { - name_index: 1, + let expect_rows = vec![ + Row::from_datums(row0), + Row::from_datums(row1), + Row::from_datums(row2), + ]; + assert_eq!(rows, expect_rows); + } + + #[test] + fn test_find_new_columns() { + let write_table_request = generate_write_table_request(); + let schema = build_schema(); + let new_columns = find_new_columns(&schema, &write_table_request) + .unwrap() + .into_iter() + .map(|v| (v.name.clone(), v)) + .collect::>(); + + assert_eq!(new_columns.len(), 2); + assert!(new_columns.get(NAME_NEW_COL1).is_some()); + assert!(new_columns.get(NAME_NEW_COL1).unwrap().is_tag); + assert!(new_columns.get(NAME_COL5).is_some()); + assert!(!new_columns.get(NAME_COL5).unwrap().is_tag); + } + + fn build_schema() -> Schema { + Builder::new() + .auto_increment_column_id(true) + .add_key_column( + column_schema::Builder::new( + TIMESTAMP_COLUMN_NAME.to_string(), + DatumKind::Timestamp, + ) + .build() + .unwrap(), + ) + .unwrap() + .add_key_column( + column_schema::Builder::new(NAME_COL1.to_string(), DatumKind::String) + .is_tag(true) + .build() + .unwrap(), + ) + .unwrap() + .add_key_column( + column_schema::Builder::new(NAME_COL2.to_string(), DatumKind::String) + .is_tag(true) + .build() + .unwrap(), + ) + .unwrap() + .add_normal_column( + column_schema::Builder::new(NAME_COL3.to_string(), DatumKind::Int64) + .build() + .unwrap(), + ) + .unwrap() + .add_normal_column( + column_schema::Builder::new(NAME_COL4.to_string(), DatumKind::Int64) + .build() + .unwrap(), + ) + .unwrap() + .build() + .unwrap() + } + + fn make_tag(name_index: u32, val: &str) -> Tag { + Tag { + name_index, value: Some(Value { - value: Some(value::Value::StringValue(TAG_V1.to_string())), + value: Some(value::Value::StringValue(val.to_string())), }), - }; + } + } + + fn make_field(name_index: u32, val: value::Value) -> Field { + Field { + name_index, + value: Some(Value { value: Some(val) }), + } + } + + // tag_names field_names write_entry + fn generate_write_entry() -> (Schema, Vec, Vec, WriteSeriesEntry) { + let tag_names = vec![NAME_COL1.to_string(), NAME_COL2.to_string()]; + let field_names = vec![NAME_COL3.to_string(), NAME_COL4.to_string()]; + + let tag = make_tag(0, NAME_COL1); + let tag1 = make_tag(1, NAME_COL2); let tags = vec![tag, tag1]; - let field = Field { - name_index: 0, - value: Some(Value { - value: Some(value::Value::Float64Value(100.0)), - }), - }; - let field1 = Field { - name_index: 1, - value: Some(Value { - value: Some(value::Value::StringValue(FIELD_VALUE_STRING.to_string())), - }), - }; + let field = make_field(0, value::Value::Int64Value(100)); + let field1 = make_field(1, value::Value::Int64Value(10)); + let field_group = FieldGroup { timestamp: 1000, fields: vec![field], @@ -1018,102 +1107,44 @@ mod test { field_groups: vec![field_group, field_group1, field_group2], }; - let schema_builder = Builder::new(); - let schema = schema_builder - .auto_increment_column_id(true) - .add_key_column(ColumnSchema { - id: column_schema::COLUMN_ID_UNINIT, - name: TIMESTAMP_COLUMN_NAME.to_string(), - data_type: DatumKind::Timestamp, - is_nullable: false, - is_tag: false, - comment: String::new(), - escaped_name: TIMESTAMP_COLUMN_NAME.escape_debug().to_string(), - default_value: None, - }) - .unwrap() - .add_key_column(ColumnSchema { - id: column_schema::COLUMN_ID_UNINIT, - name: TAG_K.to_string(), - data_type: DatumKind::String, - is_nullable: false, - is_tag: true, - comment: String::new(), - escaped_name: TAG_K.escape_debug().to_string(), - default_value: None, - }) - .unwrap() - .add_normal_column(ColumnSchema { - id: column_schema::COLUMN_ID_UNINIT, - name: TAG_K1.to_string(), - data_type: DatumKind::String, - is_nullable: false, - is_tag: true, - comment: String::new(), - escaped_name: TAG_K1.escape_debug().to_string(), - default_value: None, - }) - .unwrap() - .add_normal_column(ColumnSchema { - id: column_schema::COLUMN_ID_UNINIT, - name: FIELD_NAME.to_string(), - data_type: DatumKind::Double, - is_nullable: true, - is_tag: false, - comment: String::new(), - escaped_name: FIELD_NAME.escape_debug().to_string(), - default_value: None, - }) - .unwrap() - .add_normal_column(ColumnSchema { - id: column_schema::COLUMN_ID_UNINIT, - name: FIELD_NAME1.to_string(), - data_type: DatumKind::String, - is_nullable: true, - is_tag: false, - comment: String::new(), - escaped_name: FIELD_NAME1.escape_debug().to_string(), - default_value: None, - }) - .unwrap() - .build() - .unwrap(); + let schema = build_schema(); (schema, tag_names, field_names, write_entry) } - #[test] - fn test_write_entry_to_row_group() { - let (schema, tag_names, field_names, write_entry) = generate_write_entry(); - let rows = - write_entry_to_rows("test_table", &schema, &tag_names, &field_names, write_entry) - .unwrap(); - let row0 = vec![ - Datum::Timestamp(Timestamp::new(1000)), - Datum::String(TAG_V.into()), - Datum::String(TAG_V1.into()), - Datum::Double(100.0), - Datum::Null, - ]; - let row1 = vec![ - Datum::Timestamp(Timestamp::new(2000)), - Datum::String(TAG_V.into()), - Datum::String(TAG_V1.into()), - Datum::Null, - Datum::String(FIELD_VALUE_STRING.into()), - ]; - let row2 = vec![ - Datum::Timestamp(Timestamp::new(3000)), - Datum::String(TAG_V.into()), - Datum::String(TAG_V1.into()), - Datum::Null, - Datum::String(FIELD_VALUE_STRING.into()), - ]; + fn generate_write_table_request() -> WriteTableRequest { + let tag1 = make_tag(0, NAME_NEW_COL1); + let tag2 = make_tag(1, NAME_COL1); + let tags = vec![tag1, tag2]; - let expect_rows = vec![ - Row::from_datums(row0), - Row::from_datums(row1), - Row::from_datums(row2), - ]; - assert_eq!(rows, expect_rows); + let field1 = make_field(0, value::Value::Int64Value(100)); + let field2 = make_field(1, value::Value::Int64Value(10)); + + let field_group1 = FieldGroup { + timestamp: 1000, + fields: vec![field1.clone(), field2.clone()], + }; + let field_group2 = FieldGroup { + timestamp: 2000, + fields: vec![field1], + }; + let field_group3 = FieldGroup { + timestamp: 3000, + fields: vec![field2], + }; + + let write_entry = WriteSeriesEntry { + tags, + field_groups: vec![field_group1, field_group2, field_group3], + }; + + let tag_names = vec![NAME_NEW_COL1.to_string(), NAME_COL1.to_string()]; + let field_names = vec![NAME_COL3.to_string(), NAME_COL5.to_string()]; + + WriteTableRequest { + table: "test".to_string(), + tag_names, + field_names, + entries: vec![write_entry], + } } }