diff --git a/Cargo.lock b/Cargo.lock index d2164f6ffc..67f870ba04 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5933,6 +5933,7 @@ dependencies = [ "async-trait", "catalog", "ceresdbproto", + "cluster", "common_types", "common_util", "datafusion", diff --git a/cluster/src/config.rs b/cluster/src/config.rs index 8d02e1e288..9f2627c2e8 100644 --- a/cluster/src/config.rs +++ b/cluster/src/config.rs @@ -7,6 +7,7 @@ use table_engine::ANALYTIC_ENGINE_TYPE; #[derive(Debug, Clone, Deserialize)] #[serde(default)] +// TODO: move this to table_engine crates pub struct SchemaConfig { pub default_engine_type: String, pub default_timestamp_column_name: String, diff --git a/server/src/grpc/storage_service/mod.rs b/server/src/grpc/storage_service/mod.rs index 07ac80ecde..0b229a9415 100644 --- a/server/src/grpc/storage_service/mod.rs +++ b/server/src/grpc/storage_service/mod.rs @@ -3,7 +3,7 @@ // Storage rpc service implement. use std::{ - collections::{BTreeMap, HashMap}, + collections::HashMap, stringify, sync::Arc, time::{Duration, Instant}, @@ -11,15 +11,8 @@ use std::{ use async_trait::async_trait; use ceresdbproto::storage::{ - storage_service_server::StorageService, value::Value, PrometheusQueryRequest, - PrometheusQueryResponse, RouteRequest, RouteResponse, SqlQueryRequest, SqlQueryResponse, - WriteRequest, WriteResponse, WriteTableRequest, -}; -use cluster::config::SchemaConfig; -use common_types::{ - column_schema::{self, ColumnSchema}, - datum::DatumKind, - schema::{Builder as SchemaBuilder, Schema, TSID_COLUMN}, + storage_service_server::StorageService, PrometheusQueryRequest, PrometheusQueryResponse, + RouteRequest, RouteResponse, SqlQueryRequest, SqlQueryResponse, WriteRequest, WriteResponse, }; use common_util::{error::BoxError, time::InstantExt}; use futures::stream::{self, BoxStream, StreamExt}; @@ -29,8 +22,7 @@ use log::{error, warn}; use paste::paste; use query_engine::executor::Executor as QueryExecutor; use router::{Router, RouterRef}; -use snafu::{ensure, OptionExt, ResultExt}; -use sql::plan::CreateTablePlan; +use snafu::ResultExt; use table_engine::engine::EngineRuntimes; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; @@ -449,371 +441,3 @@ impl StorageService for StorageServiceImpl { } } } - -/// Create CreateTablePlan from a write metric. -/// -/// The caller must ENSURE that the HandlerContext's schema_config is not None. -pub fn write_table_request_to_create_table_plan( - schema_config: &SchemaConfig, - write_table: &WriteTableRequest, -) -> Result { - Ok(CreateTablePlan { - engine: schema_config.default_engine_type.clone(), - if_not_exists: true, - table: write_table.table.clone(), - table_schema: build_schema_from_write_table_request(schema_config, write_table)?, - options: HashMap::default(), - partition_info: None, - }) -} - -fn build_column_schema( - column_name: &str, - data_type: DatumKind, - is_tag: bool, -) -> Result { - let builder = column_schema::Builder::new(column_name.to_string(), data_type) - .is_nullable(true) - .is_tag(is_tag); - - builder.build().box_err().context(ErrWithCause { - code: StatusCode::BAD_REQUEST, - msg: "invalid column schema", - }) -} - -fn build_schema_from_write_table_request( - schema_config: &SchemaConfig, - write_table_req: &WriteTableRequest, -) -> Result { - let WriteTableRequest { - table, - field_names, - tag_names, - entries: write_entries, - } = write_table_req; - - let mut schema_builder = - SchemaBuilder::with_capacity(field_names.len()).auto_increment_column_id(true); - - ensure!( - !write_entries.is_empty(), - ErrNoCause { - code: StatusCode::BAD_REQUEST, - msg: format!("empty write entires to write table:{table}"), - } - ); - - let mut name_column_map: BTreeMap<_, ColumnSchema> = BTreeMap::new(); - for write_entry in write_entries { - // parse tags - for tag in &write_entry.tags { - let name_index = tag.name_index as usize; - ensure!( - name_index < tag_names.len(), - ErrNoCause { - code: StatusCode::BAD_REQUEST, - msg: format!( - "tag index {name_index} is not found in tag_names:{tag_names:?}, table:{table}", - ), - } - ); - - let tag_name = &tag_names[name_index]; - - let tag_value = tag - .value - .as_ref() - .with_context(|| ErrNoCause { - code: StatusCode::BAD_REQUEST, - msg: format!("Tag({tag_name}) value is needed, table_name:{table} "), - })? - .value - .as_ref() - .with_context(|| ErrNoCause { - code: StatusCode::BAD_REQUEST, - msg: format!("Tag({tag_name}) value type is not supported, table_name:{table}"), - })?; - - let data_type = try_get_data_type_from_value(tag_value)?; - - if let Some(column_schema) = name_column_map.get(tag_name) { - ensure_data_type_compatible(table, tag_name, true, data_type, column_schema)?; - } - let column_schema = build_column_schema(tag_name, data_type, true)?; - name_column_map.insert(tag_name, column_schema); - } - - // parse fields - for field_group in &write_entry.field_groups { - for field in &field_group.fields { - if (field.name_index as usize) < field_names.len() { - let field_name = &field_names[field.name_index as usize]; - let field_value = field - .value - .as_ref() - .with_context(|| ErrNoCause { - code: StatusCode::BAD_REQUEST, - msg: format!("Field({field_name}) value is needed, table:{table}"), - })? - .value - .as_ref() - .with_context(|| ErrNoCause { - code: StatusCode::BAD_REQUEST, - msg: format!( - "Field({field_name}) value type is not supported, table:{table}" - ), - })?; - - let data_type = try_get_data_type_from_value(field_value)?; - - if let Some(column_schema) = name_column_map.get(field_name) { - ensure_data_type_compatible( - table, - field_name, - false, - data_type, - column_schema, - )?; - } - - let column_schema = build_column_schema(field_name, data_type, false)?; - name_column_map.insert(field_name, column_schema); - } - } - } - } - - // Timestamp column will be the last column - let timestamp_column_schema = column_schema::Builder::new( - schema_config.default_timestamp_column_name.clone(), - DatumKind::Timestamp, - ) - .is_nullable(false) - .build() - .box_err() - .context(ErrWithCause { - code: StatusCode::BAD_REQUEST, - msg: "invalid timestamp column schema to build", - })?; - - // Use (tsid, timestamp) as primary key. - let tsid_column_schema = - column_schema::Builder::new(TSID_COLUMN.to_string(), DatumKind::UInt64) - .is_nullable(false) - .build() - .box_err() - .context(ErrWithCause { - code: StatusCode::BAD_REQUEST, - msg: "invalid tsid column schema to build", - })?; - - schema_builder = schema_builder - .add_key_column(tsid_column_schema) - .box_err() - .context(ErrWithCause { - code: StatusCode::BAD_REQUEST, - msg: "invalid timestamp column to add", - })? - .add_key_column(timestamp_column_schema) - .box_err() - .context(ErrWithCause { - code: StatusCode::BAD_REQUEST, - msg: "invalid tsid column to add", - })?; - - for col in name_column_map.into_values() { - schema_builder = schema_builder - .add_normal_column(col) - .box_err() - .context(ErrWithCause { - code: StatusCode::BAD_REQUEST, - msg: "invalid normal column to add", - })?; - } - - schema_builder.build().box_err().context(ErrWithCause { - code: StatusCode::BAD_REQUEST, - msg: "invalid schema to build", - }) -} - -fn ensure_data_type_compatible( - table_name: &str, - column_name: &str, - is_tag: bool, - data_type: DatumKind, - column_schema: &ColumnSchema, -) -> Result<()> { - ensure!( - column_schema.is_tag == is_tag, - ErrNoCause { - code: StatusCode::BAD_REQUEST, - msg: format!( - "Duplicated column: {column_name} in fields and tags for table: {table_name}", - ), - } - ); - - ensure!( - column_schema.data_type == data_type, - ErrNoCause { - code: StatusCode::BAD_REQUEST, - msg: format!( - "Column: {} in table: {} data type is not same, expected: {}, actual: {}", - column_name, table_name, column_schema.data_type, data_type, - ), - } - ); - - Ok(()) -} - -fn try_get_data_type_from_value(value: &Value) -> Result { - match value { - Value::Float64Value(_) => Ok(DatumKind::Double), - Value::StringValue(_) => Ok(DatumKind::String), - Value::Int64Value(_) => Ok(DatumKind::Int64), - Value::Float32Value(_) => Ok(DatumKind::Float), - Value::Int32Value(_) => Ok(DatumKind::Int32), - Value::Int16Value(_) => Ok(DatumKind::Int16), - Value::Int8Value(_) => Ok(DatumKind::Int8), - Value::BoolValue(_) => Ok(DatumKind::Boolean), - Value::Uint64Value(_) => Ok(DatumKind::UInt64), - Value::Uint32Value(_) => Ok(DatumKind::UInt32), - Value::Uint16Value(_) => Ok(DatumKind::UInt16), - Value::Uint8Value(_) => Ok(DatumKind::UInt8), - Value::TimestampValue(_) => Ok(DatumKind::Timestamp), - Value::VarbinaryValue(_) => Ok(DatumKind::Varbinary), - } -} - -#[cfg(test)] -mod tests { - use ceresdbproto::storage::{value, Field, FieldGroup, Tag, Value, WriteSeriesEntry}; - use cluster::config::SchemaConfig; - use common_types::datum::DatumKind; - - use super::*; - - const TAG1: &str = "host"; - const TAG2: &str = "idc"; - const FIELD1: &str = "cpu"; - const FIELD2: &str = "memory"; - const FIELD3: &str = "log"; - const FIELD4: &str = "ping_ok"; - const TABLE: &str = "pod_system_table"; - const TIMESTAMP_COLUMN: &str = "custom_timestamp"; - - fn make_tag(name_index: u32, val: &str) -> Tag { - Tag { - name_index, - value: Some(Value { - value: Some(value::Value::StringValue(val.to_string())), - }), - } - } - - fn make_field(name_index: u32, val: value::Value) -> Field { - Field { - name_index, - value: Some(Value { value: Some(val) }), - } - } - - fn generate_write_table_request() -> WriteTableRequest { - let tag1 = make_tag(0, "test.host"); - let tag2 = make_tag(1, "test.idc"); - let tags = vec![tag1, tag2]; - - let field1 = make_field(0, value::Value::Float64Value(100.0)); - let field2 = make_field(1, value::Value::Float64Value(1024.0)); - let field3 = make_field(2, value::Value::StringValue("test log".to_string())); - let field4 = make_field(3, value::Value::BoolValue(true)); - - let field_group1 = FieldGroup { - timestamp: 1000, - fields: vec![field1.clone(), field4], - }; - let field_group2 = FieldGroup { - timestamp: 2000, - fields: vec![field1, field2], - }; - let field_group3 = FieldGroup { - timestamp: 3000, - fields: vec![field3], - }; - - let write_entry = WriteSeriesEntry { - tags, - field_groups: vec![field_group1, field_group2, field_group3], - }; - - let tag_names = vec![TAG1.to_string(), TAG2.to_string()]; - let field_names = vec![ - FIELD1.to_string(), - FIELD2.to_string(), - FIELD3.to_string(), - FIELD4.to_string(), - ]; - - WriteTableRequest { - table: TABLE.to_string(), - tag_names, - field_names, - entries: vec![write_entry], - } - } - - #[test] - fn test_build_schema_from_write_table_request() { - let schema_config = SchemaConfig { - default_timestamp_column_name: TIMESTAMP_COLUMN.to_string(), - ..SchemaConfig::default() - }; - let write_table_request = generate_write_table_request(); - - let schema = build_schema_from_write_table_request(&schema_config, &write_table_request); - assert!(schema.is_ok()); - - let schema = schema.unwrap(); - - assert_eq!(8, schema.num_columns()); - assert_eq!(2, schema.num_primary_key_columns()); - assert_eq!(TIMESTAMP_COLUMN, schema.timestamp_name()); - let tsid = schema.tsid_column(); - assert!(tsid.is_some()); - - let key_columns = schema.key_columns(); - assert_eq!(2, key_columns.len()); - assert_eq!("tsid", key_columns[0].name); - assert_eq!(TIMESTAMP_COLUMN, key_columns[1].name); - - let columns = schema.normal_columns(); - assert_eq!(6, columns.len()); - - // sorted by column names because of btree - assert_eq!(FIELD1, columns[0].name); - assert!(!columns[0].is_tag); - assert_eq!(DatumKind::Double, columns[0].data_type); - assert_eq!(TAG1, columns[1].name); - assert!(columns[1].is_tag); - assert_eq!(DatumKind::String, columns[1].data_type); - assert_eq!(TAG2, columns[2].name); - assert!(columns[2].is_tag); - assert_eq!(DatumKind::String, columns[2].data_type); - assert_eq!(FIELD3, columns[3].name); - assert!(!columns[3].is_tag); - assert_eq!(DatumKind::String, columns[3].data_type); - assert_eq!(FIELD2, columns[4].name); - assert!(!columns[4].is_tag); - assert_eq!(DatumKind::Double, columns[4].data_type); - assert_eq!(FIELD4, columns[5].name); - assert!(!columns[5].is_tag); - assert_eq!(DatumKind::Boolean, columns[5].data_type); - - for column in columns { - assert!(column.is_nullable); - } - } -} diff --git a/server/src/grpc/storage_service/write.rs b/server/src/grpc/storage_service/write.rs index 85501d72dd..476d771743 100644 --- a/server/src/grpc/storage_service/write.rs +++ b/server/src/grpc/storage_service/write.rs @@ -25,12 +25,15 @@ use interpreters::{context::Context as InterpreterContext, factory::Factory, int use log::debug; use query_engine::executor::Executor as QueryExecutor; use snafu::{ensure, OptionExt, ResultExt}; -use sql::plan::{InsertPlan, Plan}; +use sql::{ + frontend::{Context, Frontend}, + plan::{InsertPlan, Plan}, + provider::CatalogMetaProvider, +}; use table_engine::table::TableRef; use crate::{ grpc::storage_service::{ - self, error::{self, ErrNoCause, ErrWithCause, Result}, HandlerContext, }, @@ -260,22 +263,26 @@ async fn create_table( schema_config: &SchemaConfig, deadline: Option, ) -> Result<()> { - let create_table_plan = - storage_service::write_table_request_to_create_table_plan(schema_config, write_table_req) - .box_err() - .with_context(|| ErrWithCause { - code: StatusCode::INTERNAL_SERVER_ERROR, - msg: format!( - "Failed to build creating table plan, table:{}", - write_table_req.table - ), - })?; + let provider = CatalogMetaProvider { + manager: instance.catalog_manager.clone(), + default_catalog: catalog, + default_schema: schema, + function_registry: &*instance.function_registry, + }; + let frontend = Frontend::new(provider); + let mut ctx = Context::new(request_id, deadline); + let plan = frontend + .write_req_to_plan(&mut ctx, schema_config, write_table_req) + .box_err() + .with_context(|| ErrWithCause { + code: StatusCode::INTERNAL_SERVER_ERROR, + msg: format!( + "Failed to build creating table plan, table:{}", + write_table_req.table + ), + })?; - debug!( - "Grpc handle create table begin, table:{}, schema:{:?}", - create_table_plan.table, create_table_plan.table_schema, - ); - let plan = Plan::Create(create_table_plan); + debug!("Grpc handle create table begin, plan:{:?}", plan); instance .limiter diff --git a/sql/Cargo.toml b/sql/Cargo.toml index 59ab5566aa..bffdc03f93 100644 --- a/sql/Cargo.toml +++ b/sql/Cargo.toml @@ -19,6 +19,7 @@ arrow = { workspace = true } async-trait = { workspace = true } catalog = { workspace = true } ceresdbproto = { workspace = true } +cluster = { workspace = true } common_types = { workspace = true } common_util = { workspace = true } datafusion = { workspace = true } diff --git a/sql/src/frontend.rs b/sql/src/frontend.rs index e46fc92da2..900a50ca28 100644 --- a/sql/src/frontend.rs +++ b/sql/src/frontend.rs @@ -4,7 +4,8 @@ use std::{sync::Arc, time::Instant}; -use ceresdbproto::prometheus::Expr as PromExpr; +use ceresdbproto::{prometheus::Expr as PromExpr, storage::WriteTableRequest}; +use cluster::config::SchemaConfig; use common_types::request_id::RequestId; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use table_engine::table; @@ -108,4 +109,17 @@ impl Frontend

{ planner.promql_expr_to_plan(expr).context(CreatePlan) } + + pub fn write_req_to_plan( + &self, + ctx: &mut Context, + schema_config: &SchemaConfig, + write_table: &WriteTableRequest, + ) -> Result { + let planner = Planner::new(&self.provider, ctx.request_id, ctx.read_parallelism); + + planner + .write_req_to_plan(schema_config, write_table) + .context(CreatePlan) + } } diff --git a/sql/src/planner.rs b/sql/src/planner.rs index 3ce578a0cb..06ee750b03 100644 --- a/sql/src/planner.rs +++ b/sql/src/planner.rs @@ -16,12 +16,14 @@ use arrow::{ error::ArrowError, }; use catalog::consts::{DEFAULT_CATALOG, DEFAULT_SCHEMA}; +use ceresdbproto::storage::{value::Value as PbValue, WriteTableRequest}; +use cluster::config::SchemaConfig; use common_types::{ column_schema::{self, ColumnSchema}, datum::{Datum, DatumKind}, request_id::RequestId, row::{RowGroup, RowGroupBuilder}, - schema::{self, Schema, TSID_COLUMN}, + schema::{self, Builder as SchemaBuilder, Schema, TSID_COLUMN}, }; use common_util::error::GenericError; use datafusion::{ @@ -247,6 +249,9 @@ pub enum Error { #[snafu(display("Unsupported partition method, msg:{}", msg,))] UnsupportedPartition { msg: String }, + + #[snafu(display("Failed to build plan, msg:{}", msg))] + InvalidWriteEntry { msg: String }, } define_result!(Error); @@ -318,8 +323,215 @@ impl<'a, P: MetaProvider> Planner<'a, P> { expr.to_plan(planner.meta_provider, self.read_parallelism) .context(BuildPromPlanError) } + + pub fn write_req_to_plan( + &self, + schema_config: &SchemaConfig, + write_table: &WriteTableRequest, + ) -> Result { + Ok(Plan::Create(CreateTablePlan { + engine: schema_config.default_engine_type.clone(), + if_not_exists: true, + table: write_table.table.clone(), + table_schema: build_schema_from_write_table_request(schema_config, write_table)?, + options: HashMap::default(), + partition_info: None, + })) + } +} + +fn build_column_schema( + column_name: &str, + data_type: DatumKind, + is_tag: bool, +) -> Result { + let builder = column_schema::Builder::new(column_name.to_string(), data_type) + .is_nullable(true) + .is_tag(is_tag); + + builder.build().with_context(|| InvalidColumnSchema { + column_name: column_name.to_string(), + }) +} + +fn build_schema_from_write_table_request( + schema_config: &SchemaConfig, + write_table_req: &WriteTableRequest, +) -> Result { + let WriteTableRequest { + table, + field_names, + tag_names, + entries: write_entries, + } = write_table_req; + + let mut schema_builder = + SchemaBuilder::with_capacity(field_names.len()).auto_increment_column_id(true); + + ensure!( + !write_entries.is_empty(), + InvalidWriteEntry { + msg: "empty write entries".to_string() + } + ); + + let mut name_column_map: BTreeMap<_, ColumnSchema> = BTreeMap::new(); + for write_entry in write_entries { + // parse tags + for tag in &write_entry.tags { + let name_index = tag.name_index as usize; + ensure!( + name_index < tag_names.len(), + InvalidWriteEntry{ + msg: format!( + "tag index {name_index} is not found in tag_names:{tag_names:?}, table:{table}", + ), + } + ); + + let tag_name = &tag_names[name_index]; + + let tag_value = tag + .value + .as_ref() + .with_context(|| InvalidWriteEntry { + msg: format!("Tag({tag_name}) value is needed, table_name:{table} "), + })? + .value + .as_ref() + .with_context(|| InvalidWriteEntry { + msg: format!("Tag({tag_name}) value type is not supported, table_name:{table}"), + })?; + + let data_type = try_get_data_type_from_value(tag_value)?; + + if let Some(column_schema) = name_column_map.get(tag_name) { + ensure_data_type_compatible(table, tag_name, true, data_type, column_schema)?; + } + let column_schema = build_column_schema(tag_name, data_type, true)?; + name_column_map.insert(tag_name, column_schema); + } + + // parse fields + for field_group in &write_entry.field_groups { + for field in &field_group.fields { + if (field.name_index as usize) < field_names.len() { + let field_name = &field_names[field.name_index as usize]; + let field_value = field + .value + .as_ref() + .with_context(|| InvalidWriteEntry { + msg: format!("Field({field_name}) value is needed, table:{table}"), + })? + .value + .as_ref() + .with_context(|| InvalidWriteEntry { + msg: format!( + "Field({field_name}) value type is not supported, table:{table}" + ), + })?; + + let data_type = try_get_data_type_from_value(field_value)?; + + if let Some(column_schema) = name_column_map.get(field_name) { + ensure_data_type_compatible( + table, + field_name, + false, + data_type, + column_schema, + )?; + } + + let column_schema = build_column_schema(field_name, data_type, false)?; + name_column_map.insert(field_name, column_schema); + } + } + } + } + + // Timestamp column will be the last column + let timestamp_column_schema = column_schema::Builder::new( + schema_config.default_timestamp_column_name.clone(), + DatumKind::Timestamp, + ) + .is_nullable(false) + .build() + .with_context(|| InvalidColumnSchema { + column_name: schema_config.default_timestamp_column_name.clone(), + })?; + + // Use (tsid, timestamp) as primary key. + let tsid_column_schema = + column_schema::Builder::new(TSID_COLUMN.to_string(), DatumKind::UInt64) + .is_nullable(false) + .build() + .with_context(|| InvalidColumnSchema { + column_name: TSID_COLUMN.to_string(), + })?; + + schema_builder = schema_builder + .add_key_column(tsid_column_schema) + .context(BuildTableSchema {})? + .add_key_column(timestamp_column_schema) + .context(BuildTableSchema {})?; + + for col in name_column_map.into_values() { + schema_builder = schema_builder + .add_normal_column(col) + .context(BuildTableSchema {})?; + } + + schema_builder.build().context(BuildTableSchema {}) +} + +fn ensure_data_type_compatible( + table_name: &str, + column_name: &str, + is_tag: bool, + data_type: DatumKind, + column_schema: &ColumnSchema, +) -> Result<()> { + ensure!( + column_schema.is_tag == is_tag, + InvalidWriteEntry { + msg: format!( + "Duplicated column: {column_name} in fields and tags for table: {table_name}", + ), + } + ); + + ensure!( + column_schema.data_type == data_type, + InvalidWriteEntry { + msg: format!( + "Column: {} in table: {} data type is not same, expected: {}, actual: {}", + column_name, table_name, column_schema.data_type, data_type, + ), + } + ); + + Ok(()) } +fn try_get_data_type_from_value(value: &PbValue) -> Result { + match value { + PbValue::Float64Value(_) => Ok(DatumKind::Double), + PbValue::StringValue(_) => Ok(DatumKind::String), + PbValue::Int64Value(_) => Ok(DatumKind::Int64), + PbValue::Float32Value(_) => Ok(DatumKind::Float), + PbValue::Int32Value(_) => Ok(DatumKind::Int32), + PbValue::Int16Value(_) => Ok(DatumKind::Int16), + PbValue::Int8Value(_) => Ok(DatumKind::Int8), + PbValue::BoolValue(_) => Ok(DatumKind::Boolean), + PbValue::Uint64Value(_) => Ok(DatumKind::UInt64), + PbValue::Uint32Value(_) => Ok(DatumKind::UInt32), + PbValue::Uint16Value(_) => Ok(DatumKind::UInt16), + PbValue::Uint8Value(_) => Ok(DatumKind::UInt8), + PbValue::TimestampValue(_) => Ok(DatumKind::Timestamp), + PbValue::VarbinaryValue(_) => Ok(DatumKind::Varbinary), + } +} /// A planner wraps the datafusion's logical planner, and delegate sql like /// select/explain to datafusion's planner. struct PlannerDelegate<'a, P: MetaProvider> { @@ -1080,6 +1292,9 @@ pub fn get_table_ref(table_name: &str) -> TableReference { #[cfg(test)] mod tests { + use ceresdbproto::storage::{ + value, Field, FieldGroup, Tag, Value as PbValue, WriteSeriesEntry, + }; use sqlparser::ast::Value; use super::*; @@ -2026,4 +2241,125 @@ mod tests { assert_eq!(parsed, expect); } } + + const TAG1: &str = "host"; + const TAG2: &str = "idc"; + const FIELD1: &str = "cpu"; + const FIELD2: &str = "memory"; + const FIELD3: &str = "log"; + const FIELD4: &str = "ping_ok"; + const TABLE: &str = "pod_system_table"; + const TIMESTAMP_COLUMN: &str = "custom_timestamp"; + + fn make_tag(name_index: u32, val: &str) -> Tag { + Tag { + name_index, + value: Some(PbValue { + value: Some(value::Value::StringValue(val.to_string())), + }), + } + } + + fn make_field(name_index: u32, val: value::Value) -> Field { + Field { + name_index, + value: Some(PbValue { value: Some(val) }), + } + } + + fn generate_write_table_request() -> WriteTableRequest { + let tag1 = make_tag(0, "test.host"); + let tag2 = make_tag(1, "test.idc"); + let tags = vec![tag1, tag2]; + + let field1 = make_field(0, value::Value::Float64Value(100.0)); + let field2 = make_field(1, value::Value::Float64Value(1024.0)); + let field3 = make_field(2, value::Value::StringValue("test log".to_string())); + let field4 = make_field(3, value::Value::BoolValue(true)); + + let field_group1 = FieldGroup { + timestamp: 1000, + fields: vec![field1.clone(), field4], + }; + let field_group2 = FieldGroup { + timestamp: 2000, + fields: vec![field1, field2], + }; + let field_group3 = FieldGroup { + timestamp: 3000, + fields: vec![field3], + }; + + let write_entry = WriteSeriesEntry { + tags, + field_groups: vec![field_group1, field_group2, field_group3], + }; + + let tag_names = vec![TAG1.to_string(), TAG2.to_string()]; + let field_names = vec![ + FIELD1.to_string(), + FIELD2.to_string(), + FIELD3.to_string(), + FIELD4.to_string(), + ]; + + WriteTableRequest { + table: TABLE.to_string(), + tag_names, + field_names, + entries: vec![write_entry], + } + } + + #[test] + fn test_build_schema_from_write_table_request() { + let schema_config = SchemaConfig { + default_timestamp_column_name: TIMESTAMP_COLUMN.to_string(), + ..SchemaConfig::default() + }; + let write_table_request = generate_write_table_request(); + + let schema = build_schema_from_write_table_request(&schema_config, &write_table_request); + assert!(schema.is_ok()); + + let schema = schema.unwrap(); + + assert_eq!(8, schema.num_columns()); + assert_eq!(2, schema.num_primary_key_columns()); + assert_eq!(TIMESTAMP_COLUMN, schema.timestamp_name()); + let tsid = schema.tsid_column(); + assert!(tsid.is_some()); + + let key_columns = schema.key_columns(); + assert_eq!(2, key_columns.len()); + assert_eq!("tsid", key_columns[0].name); + assert_eq!(TIMESTAMP_COLUMN, key_columns[1].name); + + let columns = schema.normal_columns(); + assert_eq!(6, columns.len()); + + // sorted by column names because of btree + assert_eq!(FIELD1, columns[0].name); + assert!(!columns[0].is_tag); + assert_eq!(DatumKind::Double, columns[0].data_type); + assert_eq!(TAG1, columns[1].name); + assert!(columns[1].is_tag); + assert_eq!(DatumKind::String, columns[1].data_type); + assert_eq!(TAG2, columns[2].name); + assert!(columns[2].is_tag); + assert_eq!(DatumKind::String, columns[2].data_type); + assert_eq!(FIELD3, columns[3].name); + assert!(!columns[3].is_tag); + assert_eq!(DatumKind::String, columns[3].data_type); + assert_eq!(FIELD2, columns[4].name); + assert!(!columns[4].is_tag); + assert_eq!(DatumKind::Double, columns[4].data_type); + assert_eq!(FIELD4, columns[5].name); + assert!(!columns[5].is_tag); + assert_eq!(DatumKind::Boolean, columns[5].data_type); + + for column in columns { + assert!(column.is_nullable); + } + } }