From f73501c95211d617204063e2b18ebe9b62cfdfa7 Mon Sep 17 00:00:00 2001 From: "chunshao.rcs" Date: Tue, 23 May 2023 11:07:57 +0800 Subject: [PATCH] refactor: find new columns to improve write performance --- proxy/src/write.rs | 127 ++++++++++++++++++++++++---------- query_frontend/src/planner.rs | 5 +- 2 files changed, 93 insertions(+), 39 deletions(-) diff --git a/proxy/src/write.rs b/proxy/src/write.rs index cf7497c68d..faf476e24c 100644 --- a/proxy/src/write.rs +++ b/proxy/src/write.rs @@ -10,7 +10,7 @@ use std::{ use bytes::Bytes; use ceresdbproto::storage::{ - storage_service_client::StorageServiceClient, value, RouteRequest, WriteRequest, + storage_service_client::StorageServiceClient, value, RouteRequest, Value, WriteRequest, WriteResponse as WriteResponsePB, WriteSeriesEntry, WriteTableRequest, }; use cluster::config::SchemaConfig; @@ -34,7 +34,7 @@ use query_engine::executor::Executor as QueryExecutor; use query_frontend::{ frontend::{Context as FrontendContext, Frontend}, plan::{AlterTableOperation, AlterTablePlan, InsertPlan, Plan}, - planner::build_schema_from_write_table_request, + planner::{build_column_schema, try_get_data_type_from_value}, provider::CatalogMetaProvider, }; use router::endpoint::Endpoint; @@ -43,7 +43,7 @@ use table_engine::table::TableRef; use tonic::transport::Channel; use crate::{ - error::{ErrNoCause, ErrWithCause, InternalNoCause, Result}, + error::{ErrNoCause, ErrWithCause, Internal, InternalNoCause, Result}, forward::{ForwardResult, ForwarderRef}, Context, Proxy, }; @@ -477,14 +477,6 @@ impl Proxy { code: StatusCode::BAD_REQUEST, })?; let schema = req_ctx.database; - let schema_config = self - .schema_config_provider - .schema_config(&schema) - .box_err() - .with_context(|| ErrWithCause { - code: StatusCode::INTERNAL_SERVER_ERROR, - msg: format!("Fail to fetch schema config, schema_name:{schema}"), - })?; debug!( "Local write begin, catalog:{catalog}, schema:{schema}, request_id:{request_id}, first_table:{:?}, num_tables:{}", @@ -503,7 +495,7 @@ impl Proxy { }; let plan_vec = self - .write_request_to_insert_plan(req.table_requests, schema_config, write_context) + .write_request_to_insert_plan(req.table_requests, write_context) .await?; let mut success = 0; @@ -522,7 +514,6 @@ impl Proxy { async fn write_request_to_insert_plan( &self, table_requests: Vec, - schema_config: Option<&SchemaConfig>, write_context: WriteContext, ) -> Result> { let mut plan_vec = Vec::with_capacity(table_requests.len()); @@ -534,7 +525,6 @@ impl Proxy { deadline, auto_create_table, } = write_context; - let schema_config = schema_config.cloned().unwrap_or_default(); for write_table_req in table_requests { let table_name = &write_table_req.table; self.maybe_open_partition_table_if_not_exist(&catalog, &schema, table_name) @@ -555,7 +545,7 @@ impl Proxy { // * Currently, the decision to add columns is made at the request level, not at // the row level, so the cost is relatively small. let table_schema = table.schema(); - let columns = find_new_columns(&table_schema, &schema_config, &write_table_req)?; + let columns = find_new_columns(&table_schema, &write_table_req)?; if !columns.is_empty() { self.execute_add_columns_plan( request_id, @@ -668,32 +658,95 @@ impl Proxy { fn find_new_columns( schema: &Schema, - schema_config: &SchemaConfig, - write_req: &WriteTableRequest, + write_table_req: &WriteTableRequest, ) -> Result> { - let new_schema = build_schema_from_write_table_request(schema_config, write_req) + let WriteTableRequest { + table, + field_names, + tag_names, + entries: write_entries, + } = write_table_req; + + let mut columns: BTreeMap<_, ColumnSchema> = BTreeMap::new(); + for write_entry in write_entries { + // Parse tags. + for tag in &write_entry.tags { + let name_index = tag.name_index as usize; + ensure!( + name_index < tag_names.len(), + InternalNoCause { + msg: format!( + "Tag {tag:?} is not found in tag_names:{tag_names:?}, table:{table}", + ), + } + ); + + let tag_name = &tag_names[name_index]; + + build_column(&mut columns, schema, tag_name, &tag.value, true)?; + } + + // Parse fields. + for field_group in &write_entry.field_groups { + for field in &field_group.fields { + let field_index = field.name_index as usize; + ensure!( + field_index < field_names.len(), + InternalNoCause { + msg: format!( + "Field {field:?} is not found in field_names:{field_names:?}, table:{table}", + ), + } + ); + if (field.name_index as usize) < field_names.len() { + let field_name = &field_names[field.name_index as usize]; + build_column(&mut columns, schema, field_name, &field.value, false)?; + } + } + } + } + + Ok(columns.into_iter().map(|v| v.1).collect()) +} + +fn build_column<'a>( + columns: &mut BTreeMap<&'a str, ColumnSchema>, + schema: &Schema, + name: &'a str, + value: &Option, + is_tag: bool, +) -> Result<()> { + // Skip adding columns, the following cases: + // 1. Field already exists. + // 2. The new column has been added. + if schema.index_of(name).is_some() || columns.get(name).is_some() { + return Ok(()); + } + + let column_value = value + .as_ref() + .with_context(|| InternalNoCause { + msg: format!("Column value is needed, column:{name}"), + })? + .value + .as_ref() + .with_context(|| InternalNoCause { + msg: format!("Column value type is not supported, column:{name}"), + })?; + + let data_type = try_get_data_type_from_value(column_value) .box_err() - .context(ErrWithCause { - code: StatusCode::INTERNAL_SERVER_ERROR, - msg: "Build schema from write table request failed", + .context(Internal { + msg: "Failed to get data type", })?; - let columns = new_schema.columns(); - let old_columns = schema.columns(); - - // find new columns: - // 1. timestamp column can't be a new column; - // 2. column not in old schema is a new column. - let new_columns = columns - .iter() - .enumerate() - .filter(|(idx, column)| { - *idx != new_schema.timestamp_index() - && !old_columns.iter().any(|c| c.name == column.name) - }) - .map(|(_, column)| column.clone()) - .collect(); - Ok(new_columns) + let column_schema = build_column_schema(name, data_type, is_tag) + .box_err() + .context(Internal { + msg: "Failed to build column schema", + })?; + columns.insert(name, column_schema); + Ok(()) } fn write_table_request_to_insert_plan( diff --git a/query_frontend/src/planner.rs b/query_frontend/src/planner.rs index 0708b80b14..694bc0ee0e 100644 --- a/query_frontend/src/planner.rs +++ b/query_frontend/src/planner.rs @@ -363,7 +363,7 @@ impl<'a, P: MetaProvider> Planner<'a, P> { } } -fn build_column_schema( +pub fn build_column_schema( column_name: &str, data_type: DatumKind, is_tag: bool, @@ -537,7 +537,7 @@ fn ensure_data_type_compatible( Ok(()) } -fn try_get_data_type_from_value(value: &PbValue) -> Result { +pub fn try_get_data_type_from_value(value: &PbValue) -> Result { match value { PbValue::Float64Value(_) => Ok(DatumKind::Double), PbValue::StringValue(_) => Ok(DatumKind::String), @@ -555,6 +555,7 @@ fn try_get_data_type_from_value(value: &PbValue) -> Result { PbValue::VarbinaryValue(_) => Ok(DatumKind::Varbinary), } } + /// A planner wraps the datafusion's logical planner, and delegate sql like /// select/explain to datafusion's planner. pub(crate) struct PlannerDelegate<'a, P: MetaProvider> {