Skip to content

Commit

Permalink
refactor by CR
Browse files Browse the repository at this point in the history
  • Loading branch information
chunshao90 committed May 24, 2023
1 parent f73501c commit 9b57bfc
Showing 1 changed file with 116 additions and 7 deletions.
123 changes: 116 additions & 7 deletions proxy/src/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@ fn find_new_columns(
entries: write_entries,
} = write_table_req;

let mut columns: BTreeMap<_, ColumnSchema> = BTreeMap::new();
let mut columns: HashMap<_, ColumnSchema> = HashMap::new();
for write_entry in write_entries {
// Parse tags.
for tag in &write_entry.tags {
Expand Down Expand Up @@ -698,10 +698,8 @@ fn find_new_columns(
),
}
);
if (field.name_index as usize) < field_names.len() {
let field_name = &field_names[field.name_index as usize];
build_column(&mut columns, schema, field_name, &field.value, false)?;
}
let field_name = &field_names[field.name_index as usize];
build_column(&mut columns, schema, field_name, &field.value, false)?;
}
}
}
Expand All @@ -710,14 +708,14 @@ fn find_new_columns(
}

fn build_column<'a>(
columns: &mut BTreeMap<&'a str, ColumnSchema>,
columns: &mut HashMap<&'a str, ColumnSchema>,
schema: &Schema,
name: &'a str,
value: &Option<Value>,
is_tag: bool,
) -> Result<()> {
// Skip adding columns, the following cases:
// 1. Field already exists.
// 1. Column already exists.
// 2. The new column has been added.
if schema.index_of(name).is_some() || columns.get(name).is_some() {
return Ok(());
Expand Down Expand Up @@ -1116,4 +1114,115 @@ mod test {
];
assert_eq!(rows, expect_rows);
}

#[test]
fn test_find_new_columns() {
let write_table_request = generate_write_table_request();
let schema = build_schema();
let new_columns = find_new_columns(&schema, &write_table_request).unwrap();
assert_eq!(new_columns.len(), 2);
assert!(new_columns[0].is_tag);
assert!(!new_columns[1].is_tag)
}

fn build_schema() -> Schema {
Builder::new()
.auto_increment_column_id(true)
.add_key_column(
column_schema::Builder::new("tsid".to_string(), DatumKind::Int64)
.build()
.unwrap(),
)
.unwrap()
.add_key_column(
column_schema::Builder::new("timestamp".to_string(), DatumKind::Timestamp)
.build()
.unwrap(),
)
.unwrap()
.add_normal_column(
column_schema::Builder::new("col1".to_string(), DatumKind::String)
.build()
.unwrap(),
)
.unwrap()
.add_normal_column(
column_schema::Builder::new("col2".to_string(), DatumKind::String)
.build()
.unwrap(),
)
.unwrap()
.add_normal_column(
column_schema::Builder::new("col3".to_string(), DatumKind::Int64)
.build()
.unwrap(),
)
.unwrap()
.add_normal_column(
column_schema::Builder::new("col4".to_string(), DatumKind::Int64)
.build()
.unwrap(),
)
.unwrap()
.build()
.unwrap()
}

fn make_tag(name_index: u32, val: &str) -> Tag {
Tag {
name_index,
value: Some(Value {
value: Some(value::Value::StringValue(val.to_string())),
}),
}
}

fn make_field(name_index: u32, val: value::Value) -> Field {
Field {
name_index,
value: Some(Value { value: Some(val) }),
}
}

fn generate_write_table_request() -> WriteTableRequest {
let name_col1 = "col1";
let name_col3 = "col3";
let name_col5 = "col5";
let name_new_col1 = "new_col1";

let tag1 = make_tag(0, name_new_col1);
let tag2 = make_tag(1, name_col1);
let tags = vec![tag1, tag2];

let field1 = make_field(0, value::Value::Int64Value(100));
let field2 = make_field(1, value::Value::Int64Value(10));

let field_group1 = FieldGroup {
timestamp: 1000,
fields: vec![field1.clone(), field2.clone()],
};
let field_group2 = FieldGroup {
timestamp: 2000,
fields: vec![field1],
};
let field_group3 = FieldGroup {
timestamp: 3000,
fields: vec![field2],
};

let write_entry = WriteSeriesEntry {
tags,
field_groups: vec![field_group1, field_group2, field_group3],
};

let tag_names = vec![name_col1.to_string(), name_new_col1.to_string()];
let field_names = vec![name_col3.to_string(), name_col5.to_string()];

WriteTableRequest {
table: "test".to_string(),
tag_names,
field_names,
entries: vec![write_entry],
}
}
}

0 comments on commit 9b57bfc

Please sign in to comment.