diff --git a/Cargo.lock b/Cargo.lock index 85f4e7abb9..18d6ec605e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -881,7 +881,7 @@ dependencies = [ [[package]] name = "ceresdbproto" version = "0.1.0" -source = "git+https://github.com/CeresDB/ceresdbproto.git?rev=b9c45bcdbf7d55d5889d42b4c8017282819e6049#b9c45bcdbf7d55d5889d42b4c8017282819e6049" +source = "git+https://github.com/CeresDB/ceresdbproto.git?rev=1ea76ec99bebee298dfb27b6a15feeb4f42c3aa2#1ea76ec99bebee298dfb27b6a15feeb4f42c3aa2" dependencies = [ "prost", "protoc-bin-vendored", @@ -1019,7 +1019,7 @@ name = "cluster" version = "1.0.0-alpha01" dependencies = [ "async-trait", - "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=b9c45bcdbf7d55d5889d42b4c8017282819e6049)", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=1ea76ec99bebee298dfb27b6a15feeb4f42c3aa2)", "common_types 1.0.0-alpha01", "common_util", "log", @@ -3190,7 +3190,7 @@ name = "meta_client" version = "1.0.0-alpha01" dependencies = [ "async-trait", - "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=b9c45bcdbf7d55d5889d42b4c8017282819e6049)", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=1ea76ec99bebee298dfb27b6a15feeb4f42c3aa2)", "common_types 1.0.0-alpha01", "common_util", "futures 0.3.21", @@ -4820,7 +4820,7 @@ name = "remote_engine_client" version = "1.0.0-alpha01" dependencies = [ "async-trait", - "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=b9c45bcdbf7d55d5889d42b4c8017282819e6049)", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=1ea76ec99bebee298dfb27b6a15feeb4f42c3aa2)", "clru", "common_types 1.0.0-alpha01", "common_util", @@ -4952,7 +4952,7 @@ name = "router" version = "1.0.0-alpha01" dependencies = [ "async-trait", - "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=b9c45bcdbf7d55d5889d42b4c8017282819e6049)", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=1ea76ec99bebee298dfb27b6a15feeb4f42c3aa2)", "cluster", "common_types 1.0.0-alpha01", "common_util", @@ -5280,7 +5280,7 @@ dependencies = [ "async-trait", "bytes 1.2.1", "catalog", - "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=b9c45bcdbf7d55d5889d42b4c8017282819e6049)", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=1ea76ec99bebee298dfb27b6a15feeb4f42c3aa2)", "cluster", "common_types 1.0.0-alpha01", "common_util", @@ -5619,7 +5619,7 @@ version = "1.0.0-alpha01" dependencies = [ "arrow", "catalog", - "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=b9c45bcdbf7d55d5889d42b4c8017282819e6049)", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=1ea76ec99bebee298dfb27b6a15feeb4f42c3aa2)", "common_types 1.0.0-alpha01", "common_util", "datafusion", diff --git a/Cargo.toml b/Cargo.toml index 7d70bcdcb2..1ebfd5d22d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -111,7 +111,7 @@ message_queue = { path = "components/message_queue" } [workspace.dependencies.ceresdbproto] git = "https://github.com/CeresDB/ceresdbproto.git" -rev = "b9c45bcdbf7d55d5889d42b4c8017282819e6049" +rev = "1ea76ec99bebee298dfb27b6a15feeb4f42c3aa2" [workspace.dependencies.datafusion] git = "https://github.com/CeresDB/arrow-datafusion.git" diff --git a/analytic_engine/src/meta/meta_update.rs b/analytic_engine/src/meta/meta_update.rs index ab11ec196f..172a5b84c4 100644 --- a/analytic_engine/src/meta/meta_update.rs +++ b/analytic_engine/src/meta/meta_update.rs @@ -36,6 +36,11 @@ pub enum Error { #[snafu(display("Failed to convert schema, err:{}", source))] ConvertSchema { source: common_types::schema::Error }, + #[snafu(display("Failed to convert partition info, err:{}", source))] + ConvertPartitionInfo { + source: table_engine::partition::Error, + }, + #[snafu(display("Empty table schema.\nBacktrace:\n{}", backtrace))] EmptyTableSchema { backtrace: Backtrace }, @@ -213,13 +218,14 @@ pub struct AddTableMeta { impl From for meta_pb::AddTableMeta { fn from(v: AddTableMeta) -> Self { + let partition_info = v.partition_info.map(|v| v.into()); meta_pb::AddTableMeta { space_id: v.space_id, table_id: v.table_id.as_u64(), table_name: v.table_name, schema: Some(common_pb::TableSchema::from(&v.schema)), options: Some(analytic_common::TableOptions::from(v.opts)), - partition_info: v.partition_info.map(|info| info.into()), + partition_info, } } } @@ -230,13 +236,20 @@ impl TryFrom for AddTableMeta { fn try_from(src: meta_pb::AddTableMeta) -> Result { let table_schema = src.schema.context(EmptyTableSchema)?; let opts = src.options.context(EmptyTableOptions)?; + let partition_info = match src.partition_info { + Some(partition_info) => { + Some(PartitionInfo::try_from(partition_info).context(ConvertPartitionInfo)?) + } + None => None, + }; + Ok(Self { space_id: src.space_id, table_id: TableId::from(src.table_id), table_name: src.table_name, schema: Schema::try_from(table_schema).context(ConvertSchema)?, opts: TableOptions::from(opts), - partition_info: src.partition_info.map(|v| v.into()), + partition_info, }) } } diff --git a/interpreters/src/table_manipulator/meta_based.rs b/interpreters/src/table_manipulator/meta_based.rs index e751ef7116..c4dd3d6fff 100644 --- a/interpreters/src/table_manipulator/meta_based.rs +++ b/interpreters/src/table_manipulator/meta_based.rs @@ -4,12 +4,15 @@ use async_trait::async_trait; use common_types::schema::SchemaEncoder; use log::info; use meta_client::{ - types::{CreateTableRequest, DropTableRequest, PartitionInfo}, + types::{CreateTableRequest, DropTableRequest, PartitionTableInfo}, MetaClientRef, }; use snafu::ResultExt; use sql::plan::{CreateTablePlan, DropTablePlan}; -use table_engine::{engine::TableEngineRef, partition::format_sub_partition_table_name}; +use table_engine::{ + engine::TableEngineRef, + partition::{format_sub_partition_table_name, PartitionInfoEncoder}, +}; use crate::{ context::Context, @@ -45,13 +48,29 @@ impl TableManipulator for TableManipulatorImpl { ), })?; - let partition_info = plan.partition_info.map(|v| { + let sub_table_names = plan.partition_info.clone().map(|v| { v.get_definitions() .iter() .map(|def| format_sub_partition_table_name(&plan.table, &def.name)) .collect::>() }); + let partition_table_info = + sub_table_names.map(|v| PartitionTableInfo { sub_table_names: v }); + + let encoder = PartitionInfoEncoder::default(); + let encoded_partition_info = match plan.partition_info.clone() { + None => Vec::new(), + Some(v) => encoder + .encode(v) + .map_err(|e| Box::new(e) as _) + .with_context(|| CreateWithCause { + msg: format!( + "fail to encode partition info, ctx:{:?}, plan:{:?}", + ctx, plan + ), + })?, + }; let req = CreateTableRequest { schema_name: ctx.default_schema().to_string(), name: plan.table, @@ -59,7 +78,8 @@ impl TableManipulator for TableManipulatorImpl { engine: plan.engine, create_if_not_exist: plan.if_not_exists, options: plan.options, - partition_info: partition_info.map(|v| PartitionInfo { names: v }), + partition_table_info, + encoded_partition_info, }; let resp = self diff --git a/meta_client/src/types.rs b/meta_client/src/types.rs index 7edb84e386..d5542029f1 100644 --- a/meta_client/src/types.rs +++ b/meta_client/src/types.rs @@ -33,8 +33,8 @@ pub struct AllocSchemaIdResponse { } #[derive(Clone, Debug)] -pub struct PartitionInfo { - pub names: Vec, +pub struct PartitionTableInfo { + pub sub_table_names: Vec, } #[derive(Clone, Debug)] @@ -45,7 +45,8 @@ pub struct CreateTableRequest { pub engine: String, pub create_if_not_exist: bool, pub options: HashMap, - pub partition_info: Option, + pub partition_table_info: Option, + pub encoded_partition_info: Vec, } #[derive(Clone, Debug)] @@ -306,6 +307,11 @@ impl From for AllocSchemaIdResponse { impl From for meta_service_pb::CreateTableRequest { fn from(req: CreateTableRequest) -> Self { + let partition_table_info = + req.partition_table_info + .map(|v| meta_service_pb::PartitionTableInfo { + sub_table_names: v.sub_table_names, + }); Self { header: None, schema_name: req.schema_name, @@ -314,9 +320,8 @@ impl From for meta_service_pb::CreateTableRequest { engine: req.engine, create_if_not_exist: req.create_if_not_exist, options: req.options, - partition_info: req - .partition_info - .map(|v| meta_service_pb::PartitionInfo { names: v.names }), + encoded_partition_info: req.encoded_partition_info, + partition_table_info, } } } diff --git a/proto/protos/meta_update.proto b/proto/protos/meta_update.proto index 314cdee353..1ca743aaf7 100644 --- a/proto/protos/meta_update.proto +++ b/proto/protos/meta_update.proto @@ -41,9 +41,13 @@ message AddTableMeta { common.TableSchema schema = 4; // Options of the table analytic_common.TableOptions options = 5; - oneof partition_info { - HashPartitionInfo hash = 6; - KeyPartitionInfo key_partition = 7; + PartitionInfo partition_info = 6; +} + +message PartitionInfo { + oneof partition_info_enum { + HashPartitionInfo hash = 1; + KeyPartitionInfo key = 2; } } diff --git a/server/src/grpc/meta_event_service/mod.rs b/server/src/grpc/meta_event_service/mod.rs index b13bd87db0..5d9ec00802 100644 --- a/server/src/grpc/meta_event_service/mod.rs +++ b/server/src/grpc/meta_event_service/mod.rs @@ -28,6 +28,7 @@ use query_engine::executor::Executor as QueryExecutor; use snafu::{OptionExt, ResultExt}; use table_engine::{ engine::{CloseTableRequest, TableEngineRef, TableState}, + partition::PartitionInfoEncoder, table::{SchemaId, TableId}, ANALYTIC_ENGINE_TYPE, }; @@ -333,6 +334,20 @@ async fn handle_create_table_on_shard( ), })?; + let partition_info = match request.encoded_partition_info.is_empty() { + true => None, + false => PartitionInfoEncoder::default() + .decode(&request.encoded_partition_info) + .map_err(|e| Box::new(e) as _) + .context(ErrWithCause { + code: StatusCode::BadRequest, + msg: format!( + "fail to decode encoded partition info bytes, raw_bytes:{:?}", + request.encoded_partition_info + ), + })?, + }; + let create_table_request = CreateTableRequest { catalog_name: ctx.catalog_manager.default_catalog_name().to_string(), schema_name: table.schema_name, @@ -344,7 +359,7 @@ async fn handle_create_table_on_shard( state: TableState::Stable, shard_id: shard_info.id, cluster_version: topology.cluster_topology_version, - partition_info: None, + partition_info, }; let create_opts = CreateOptions { table_engine: ctx.table_engine, diff --git a/table_engine/src/partition/mod.rs b/table_engine/src/partition/mod.rs index bcde89a70d..c5138a6305 100644 --- a/table_engine/src/partition/mod.rs +++ b/table_engine/src/partition/mod.rs @@ -5,12 +5,13 @@ pub mod rule; use common_types::bytes::Bytes; -use proto::meta_update as meta_pb; -use snafu::{Backtrace, Snafu}; +use prost::Message; +use proto::{meta_update as meta_pb, meta_update::partition_info::PartitionInfoEnum}; +use snafu::{ensure, Backtrace, ResultExt, Snafu}; + +const DEFAULT_PARTITION_INFO_ENCODING_VERSION: u8 = 0; -// TODO: we should refactor for splitting the errors. #[derive(Debug, Snafu)] -#[snafu(visibility(pub))] pub enum Error { #[snafu(display( "Failed to build partition rule, msg:{}.\nBacktrace:{}\n", @@ -35,6 +36,32 @@ pub enum Error { #[snafu(display("Internal error occurred, msg:{}", msg,))] Internal { msg: String }, + + #[snafu(display("Failed to encode partition info by protobuf, err:{}", source))] + EncodePartitionInfoToPb { source: prost::EncodeError }, + + #[snafu(display( + "Failed to decode partition info from protobuf bytes, buf:{:?}, err:{}", + buf, + source, + ))] + DecodePartitionInfoToPb { + buf: Vec, + source: prost::DecodeError, + }, + + #[snafu(display("Encoded partition info content is empty.\nBacktrace:\n{}", backtrace))] + EmptyEncodedPartitionInfo { backtrace: Backtrace }, + + #[snafu(display( + "Invalid partition info encoding version, version:{}.\nBacktrace:\n{}", + version, + backtrace + ))] + InvalidPartitionInfoEncodingVersion { version: u8, backtrace: Backtrace }, + + #[snafu(display("Partition info could not be empty"))] + EmptyPartitionInfo {}, } define_result!(Error); @@ -101,38 +128,99 @@ impl From for Definition { } } -impl From for meta_pb::add_table_meta::PartitionInfo { +impl From for HashPartitionInfo { + fn from(partition_info_pb: meta_pb::HashPartitionInfo) -> Self { + HashPartitionInfo { + definitions: partition_info_pb + .definitions + .into_iter() + .map(|v| v.into()) + .collect(), + expr: Bytes::from(partition_info_pb.expr), + linear: partition_info_pb.linear, + } + } +} + +impl From for meta_pb::HashPartitionInfo { + fn from(partition_info: HashPartitionInfo) -> Self { + meta_pb::HashPartitionInfo { + definitions: partition_info + .definitions + .into_iter() + .map(|v| v.into()) + .collect(), + expr: Bytes::into(partition_info.expr), + linear: partition_info.linear, + } + } +} + +impl From for KeyPartitionInfo { + fn from(partition_info_pb: meta_pb::KeyPartitionInfo) -> Self { + KeyPartitionInfo { + definitions: partition_info_pb + .definitions + .into_iter() + .map(|v| v.into()) + .collect(), + partition_key: partition_info_pb.partition_key, + linear: partition_info_pb.linear, + } + } +} + +impl From for meta_pb::KeyPartitionInfo { + fn from(partition_info: KeyPartitionInfo) -> Self { + meta_pb::KeyPartitionInfo { + definitions: partition_info + .definitions + .into_iter() + .map(|v| v.into()) + .collect(), + partition_key: partition_info.partition_key, + linear: partition_info.linear, + } + } +} + +impl From for meta_pb::PartitionInfo { fn from(partition_info: PartitionInfo) -> Self { match partition_info { - PartitionInfo::Hash(v) => Self::Hash(meta_pb::HashPartitionInfo { - definitions: v.definitions.into_iter().map(|v| v.into()).collect(), - expr: v.expr.to_vec(), - linear: v.linear, - }), - PartitionInfo::Key(v) => Self::KeyPartition(meta_pb::KeyPartitionInfo { - definitions: v.definitions.into_iter().map(|v| v.into()).collect(), - partition_key: v.partition_key, - linear: v.linear, - }), + PartitionInfo::Hash(v) => { + let hash_partition_info = meta_pb::HashPartitionInfo::from(v); + meta_pb::PartitionInfo { + partition_info_enum: Some(PartitionInfoEnum::Hash(hash_partition_info)), + } + } + PartitionInfo::Key(v) => { + let key_partition_info = meta_pb::KeyPartitionInfo::from(v); + meta_pb::PartitionInfo { + partition_info_enum: Some(PartitionInfoEnum::Key(key_partition_info)), + } + } } } } -impl From for PartitionInfo { - fn from(pb: meta_pb::add_table_meta::PartitionInfo) -> Self { - match pb { - meta_pb::add_table_meta::PartitionInfo::Hash(v) => Self::Hash(HashPartitionInfo { - definitions: v.definitions.into_iter().map(|v| v.into()).collect(), - expr: Bytes::from(v.expr), - linear: v.linear, - }), - meta_pb::add_table_meta::PartitionInfo::KeyPartition(v) => { - Self::Key(KeyPartitionInfo { - definitions: v.definitions.into_iter().map(|v| v.into()).collect(), - partition_key: v.partition_key, - linear: v.linear, - }) - } +impl TryFrom for PartitionInfo { + type Error = Error; + + fn try_from( + partition_info_pb: meta_pb::PartitionInfo, + ) -> std::result::Result { + match partition_info_pb.partition_info_enum { + Some(partition_info_enum) => match partition_info_enum { + PartitionInfoEnum::Hash(v) => { + let hash_partition_info = HashPartitionInfo::from(v); + Ok(Self::Hash(hash_partition_info)) + } + PartitionInfoEnum::Key(v) => { + let key_partition_info = KeyPartitionInfo::from(v); + Ok(Self::Key(key_partition_info)) + } + }, + None => Err(Error::EmptyPartitionInfo {}), } } } @@ -140,3 +228,81 @@ impl From for PartitionInfo { pub fn format_sub_partition_table_name(table_name: &str, partition_name: &str) -> String { format!("____{}_{}", table_name, partition_name) } + +/// Encoder for partition info with version control. +pub struct PartitionInfoEncoder { + version: u8, +} + +impl Default for PartitionInfoEncoder { + fn default() -> Self { + Self::new(DEFAULT_PARTITION_INFO_ENCODING_VERSION) + } +} + +impl PartitionInfoEncoder { + fn new(version: u8) -> Self { + Self { version } + } + + pub fn encode(&self, partition_info: PartitionInfo) -> Result> { + let pb_partition_info = meta_pb::PartitionInfo::from(partition_info); + let mut buf = Vec::with_capacity(1 + pb_partition_info.encoded_len() as usize); + buf.push(self.version); + + pb_partition_info + .encode(&mut buf) + .context(EncodePartitionInfoToPb)?; + + Ok(buf) + } + + pub fn decode(&self, buf: &[u8]) -> Result> { + if buf.is_empty() { + return Ok(None); + } + + self.ensure_version(buf[0])?; + + let pb_partition_info = + meta_pb::PartitionInfo::decode(&buf[1..]).context(DecodePartitionInfoToPb { buf })?; + + Ok(Some(PartitionInfo::try_from(pb_partition_info)?)) + } + + fn ensure_version(&self, version: u8) -> Result<()> { + ensure!( + self.version == version, + InvalidPartitionInfoEncodingVersion { version } + ); + Ok(()) + } +} + +#[test] +fn test_partition_info_encoder() { + let partition_info = PartitionInfo::Key(KeyPartitionInfo { + definitions: vec![ + Definition { + name: "p0".to_string(), + origin_name: Some("partition_0".to_string()), + }, + Definition { + name: "p1".to_string(), + origin_name: None, + }, + ], + partition_key: vec!["col1".to_string(), "col2".to_string(), "col3".to_string()], + linear: false, + }); + let partition_info_encoder = PartitionInfoEncoder::default(); + let encode_partition_info = partition_info_encoder + .encode(partition_info.clone()) + .unwrap(); + let decode_partition_info = partition_info_encoder + .decode(&encode_partition_info) + .unwrap() + .unwrap(); + + assert_eq!(decode_partition_info, partition_info); +}