From 99ca8d0aaa53ef8d1d8dc58ca9db215853ea791c Mon Sep 17 00:00:00 2001 From: "chunshao.rcs" Date: Mon, 9 Jan 2023 20:38:30 +0800 Subject: [PATCH] feat: add open_table_on_shard and close_table_on_shard in meta_event_service (#542) * feat: add open_table_on_shard and close_table_on_shard in meta_event_service * refactor by CR --- Cargo.lock | 14 +- Cargo.toml | 2 +- cluster/src/cluster_impl.rs | 62 ++++-- cluster/src/lib.rs | 5 +- server/src/grpc/meta_event_service/mod.rs | 235 +++++++++++++++++----- 5 files changed, 241 insertions(+), 77 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e2e389cbd2..bf90eb750e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -883,7 +883,7 @@ dependencies = [ [[package]] name = "ceresdbproto" version = "0.1.0" -source = "git+https://github.com/CeresDB/ceresdbproto.git?rev=6e12241032e01d00e130b65118dfafc0794a3102#6e12241032e01d00e130b65118dfafc0794a3102" +source = "git+https://github.com/CeresDB/ceresdbproto.git?rev=6cc1754ffc31ddfe0710972c94f8fe5acd61af98#6cc1754ffc31ddfe0710972c94f8fe5acd61af98" dependencies = [ "prost", "protoc-bin-vendored", @@ -1021,7 +1021,7 @@ name = "cluster" version = "1.0.0-alpha02" dependencies = [ "async-trait", - "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=6e12241032e01d00e130b65118dfafc0794a3102)", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=6cc1754ffc31ddfe0710972c94f8fe5acd61af98)", "common_types 1.0.0-alpha02", "common_util", "log", @@ -3192,7 +3192,7 @@ name = "meta_client" version = "1.0.0-alpha02" dependencies = [ "async-trait", - "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=6e12241032e01d00e130b65118dfafc0794a3102)", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=6cc1754ffc31ddfe0710972c94f8fe5acd61af98)", "common_types 1.0.0-alpha02", "common_util", "futures 0.3.21", @@ -4822,7 +4822,7 @@ name = "remote_engine_client" version = "1.0.0-alpha02" dependencies = [ "async-trait", - "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=6e12241032e01d00e130b65118dfafc0794a3102)", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=6cc1754ffc31ddfe0710972c94f8fe5acd61af98)", "clru", "common_types 1.0.0-alpha02", "common_util", @@ -4956,7 +4956,7 @@ name = "router" version = "1.0.0-alpha02" dependencies = [ "async-trait", - "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=6e12241032e01d00e130b65118dfafc0794a3102)", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=6cc1754ffc31ddfe0710972c94f8fe5acd61af98)", "cluster", "common_types 1.0.0-alpha02", "common_util", @@ -5284,7 +5284,7 @@ dependencies = [ "async-trait", "bytes 1.2.1", "catalog", - "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=6e12241032e01d00e130b65118dfafc0794a3102)", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=6cc1754ffc31ddfe0710972c94f8fe5acd61af98)", "cluster", "common_types 1.0.0-alpha02", "common_util", @@ -5623,7 +5623,7 @@ version = "1.0.0-alpha02" dependencies = [ "arrow", "catalog", - "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=6e12241032e01d00e130b65118dfafc0794a3102)", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=6cc1754ffc31ddfe0710972c94f8fe5acd61af98)", "common_types 1.0.0-alpha02", "common_util", "datafusion", diff --git a/Cargo.toml b/Cargo.toml index 1e47becbc8..4cbe191189 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -111,7 +111,7 @@ message_queue = { path = "components/message_queue" } [workspace.dependencies.ceresdbproto] git = "https://github.com/CeresDB/ceresdbproto.git" -rev = "6e12241032e01d00e130b65118dfafc0794a3102" +rev = "6cc1754ffc31ddfe0710972c94f8fe5acd61af98" [workspace.dependencies.datafusion] git = "https://github.com/CeresDB/arrow-datafusion.git" diff --git a/cluster/src/cluster_impl.rs b/cluster/src/cluster_impl.rs index 82268e6462..9e9b883bb7 100644 --- a/cluster/src/cluster_impl.rs +++ b/cluster/src/cluster_impl.rs @@ -6,8 +6,12 @@ use std::{ }; use async_trait::async_trait; -use ceresdbproto::meta_event::{ - CloseShardRequest, CreateTableOnShardRequest, DropTableOnShardRequest, OpenShardRequest, +use ceresdbproto::{ + meta_event::{ + CloseShardRequest, CloseTableOnShardRequest, CreateTableOnShardRequest, + DropTableOnShardRequest, OpenShardRequest, OpenTableOnShardRequest, UpdateShardInfo, + }, + meta_service::TableInfo as TableInfoPb, }; use common_util::runtime::{JoinHandle, Runtime}; use log::{error, info, warn}; @@ -247,14 +251,34 @@ impl Inner { } fn create_table_on_shard(&self, req: &CreateTableOnShardRequest) -> Result<()> { - let update_shard_info = req.update_shard_info.clone().context(ShardNotFound { - msg: "update shard info is missing in CreateTableOnShardRequest", + self.insert_table_to_shard(req.update_shard_info.clone(), req.table_info.clone()) + } + + fn drop_table_on_shard(&self, req: &DropTableOnShardRequest) -> Result<()> { + self.remove_table_to_shard(req.update_shard_info.clone(), req.table_info.clone()) + } + + fn open_table_on_shard(&self, req: &OpenTableOnShardRequest) -> Result<()> { + self.insert_table_to_shard(req.update_shard_info.clone(), req.table_info.clone()) + } + + fn close_table_on_shard(&self, req: &CloseTableOnShardRequest) -> Result<()> { + self.remove_table_to_shard(req.update_shard_info.clone(), req.table_info.clone()) + } + + fn insert_table_to_shard( + &self, + update_shard_info: Option, + table_info: Option, + ) -> Result<()> { + let update_shard_info = update_shard_info.context(ShardNotFound { + msg: "update shard info is missing", })?; let curr_shard_info = update_shard_info.curr_shard_info.context(ShardNotFound { - msg: "current shard info is missing in UpdateShardInfo", + msg: "current shard info is missing", })?; - let table_info = req.table_info.clone().context(TableNotFound { - msg: "table info is missing in CreateTableOnShardRequest", + let table_info = table_info.context(TableNotFound { + msg: "table info is missing", })?; self.shard_tables_cache.try_insert_table_to_shard( @@ -264,15 +288,19 @@ impl Inner { ) } - fn drop_table_on_shard(&self, req: &DropTableOnShardRequest) -> Result<()> { - let update_shard_info = req.update_shard_info.clone().context(ShardNotFound { - msg: "update shard info is missing in DropTableOnShardRequest", + fn remove_table_to_shard( + &self, + update_shard_info: Option, + table_info: Option, + ) -> Result<()> { + let update_shard_info = update_shard_info.context(ShardNotFound { + msg: "update shard info is missing", })?; let curr_shard_info = update_shard_info.curr_shard_info.context(ShardNotFound { - msg: "current shard info is missing in UpdateShardInfo", + msg: "current shard info is missing", })?; - let table_info = req.table_info.clone().context(TableNotFound { - msg: "table info is missing in CreateTableOnShardRequest", + let table_info = table_info.context(TableNotFound { + msg: "table info is missing", })?; self.shard_tables_cache.try_remove_table_from_shard( @@ -332,6 +360,14 @@ impl Cluster for ClusterImpl { self.inner.drop_table_on_shard(req) } + async fn open_table_on_shard(&self, req: &OpenTableOnShardRequest) -> Result<()> { + self.inner.open_table_on_shard(req) + } + + async fn close_table_on_shard(&self, req: &CloseTableOnShardRequest) -> Result<()> { + self.inner.close_table_on_shard(req) + } + async fn route_tables(&self, req: &RouteTablesRequest) -> Result { self.inner.route_tables(req).await } diff --git a/cluster/src/lib.rs b/cluster/src/lib.rs index 93a8147356..e018ecd0c7 100644 --- a/cluster/src/lib.rs +++ b/cluster/src/lib.rs @@ -12,7 +12,8 @@ use std::sync::Arc; use async_trait::async_trait; use ceresdbproto::meta_event::{ - CloseShardRequest, CreateTableOnShardRequest, DropTableOnShardRequest, OpenShardRequest, + CloseShardRequest, CloseTableOnShardRequest, CreateTableOnShardRequest, + DropTableOnShardRequest, OpenShardRequest, OpenTableOnShardRequest, }; use common_types::schema::SchemaName; use common_util::define_result; @@ -118,6 +119,8 @@ pub trait Cluster { async fn close_shard(&self, req: &CloseShardRequest) -> Result; async fn create_table_on_shard(&self, req: &CreateTableOnShardRequest) -> Result<()>; async fn drop_table_on_shard(&self, req: &DropTableOnShardRequest) -> Result<()>; + async fn open_table_on_shard(&self, req: &OpenTableOnShardRequest) -> Result<()>; + async fn close_table_on_shard(&self, req: &CloseTableOnShardRequest) -> Result<()>; async fn route_tables(&self, req: &RouteTablesRequest) -> Result; async fn fetch_nodes(&self) -> Result; } diff --git a/server/src/grpc/meta_event_service/mod.rs b/server/src/grpc/meta_event_service/mod.rs index 5d9ec00802..8fe7795a33 100644 --- a/server/src/grpc/meta_event_service/mod.rs +++ b/server/src/grpc/meta_event_service/mod.rs @@ -8,16 +8,18 @@ use async_trait::async_trait; use catalog::{ manager::ManagerRef, schema::{ - CloseOptions, CreateOptions, CreateTableRequest, DropOptions, DropTableRequest, - OpenOptions, OpenTableRequest, + CloseOptions, CreateOptions, CreateTableRequest, DropOptions, DropTableRequest, NameRef, + OpenOptions, OpenTableRequest, SchemaRef, }, CatalogRef, }; use ceresdbproto::meta_event::{ meta_event_service_server::MetaEventService, ChangeShardRoleRequest, ChangeShardRoleResponse, - CloseShardRequest, CloseShardResponse, CreateTableOnShardRequest, CreateTableOnShardResponse, - DropTableOnShardRequest, DropTableOnShardResponse, MergeShardsRequest, MergeShardsResponse, - OpenShardRequest, OpenShardResponse, SplitShardRequest, SplitShardResponse, + CloseShardRequest, CloseShardResponse, CloseTableOnShardRequest, CloseTableOnShardResponse, + CreateTableOnShardRequest, CreateTableOnShardResponse, DropTableOnShardRequest, + DropTableOnShardResponse, MergeShardsRequest, MergeShardsResponse, OpenShardRequest, + OpenShardResponse, OpenTableOnShardRequest, OpenTableOnShardResponse, SplitShardRequest, + SplitShardResponse, }; use cluster::ClusterRef; use common_types::schema::SchemaEncoder; @@ -107,6 +109,18 @@ impl MetaServiceImpl { DropTableOnShardResponse ); + handle_request!( + open_table_on_shard, + OpenTableOnShardRequest, + OpenTableOnShardResponse + ); + + handle_request!( + close_table_on_shard, + CloseTableOnShardRequest, + CloseTableOnShardResponse + ); + fn handler_ctx(&self) -> HandlerContext { HandlerContext { cluster: self.cluster.clone(), @@ -174,17 +188,7 @@ async fn handle_open_shard(ctx: HandlerContext, request: OpenShardRequest) -> Re }; for table in tables_of_shard.tables { - let schema = default_catalog - .schema_by_name(&table.schema_name) - .map_err(|e| Box::new(e) as _) - .with_context(|| ErrWithCause { - code: StatusCode::Internal, - msg: format!("fail to get schema of table, table_info:{:?}", table), - })? - .with_context(|| ErrNoCause { - code: StatusCode::NotFound, - msg: format!("schema of table is not found, table_info:{:?}", table), - })?; + let schema = find_schema(default_catalog.clone(), &table.schema_name)?; let open_request = OpenTableRequest { catalog_name: ctx.catalog_manager.default_catalog_name().to_string(), @@ -192,7 +196,7 @@ async fn handle_open_shard(ctx: HandlerContext, request: OpenShardRequest) -> Re schema_id: SchemaId::from(table.schema_id), table_name: table.name.clone(), table_id: TableId::new(table.id), - engine: table_engine::ANALYTIC_ENGINE_TYPE.to_string(), + engine: ANALYTIC_ENGINE_TYPE.to_string(), shard_id: shard_info.id, cluster_version: topology.cluster_topology_version, }; @@ -230,17 +234,7 @@ async fn handle_close_shard(ctx: HandlerContext, request: CloseShardRequest) -> table_engine: ctx.table_engine, }; for table in tables_of_shard.tables { - let schema = default_catalog - .schema_by_name(&table.schema_name) - .map_err(|e| Box::new(e) as _) - .with_context(|| ErrWithCause { - code: StatusCode::Internal, - msg: format!("fail to get schema of table, table_info:{:?}", table), - })? - .with_context(|| ErrNoCause { - code: StatusCode::NotFound, - msg: format!("schema of table is not found, table_info:{:?}", table), - })?; + let schema = find_schema(default_catalog.clone(), &table.schema_name)?; let close_request = CloseTableRequest { catalog_name: ctx.catalog_manager.default_catalog_name().to_string(), @@ -248,7 +242,7 @@ async fn handle_close_shard(ctx: HandlerContext, request: CloseShardRequest) -> schema_id: SchemaId::from(table.schema_id), table_name: table.name.clone(), table_id: TableId::new(table.id), - engine: table_engine::ANALYTIC_ENGINE_TYPE.to_string(), + engine: ANALYTIC_ENGINE_TYPE.to_string(), }; schema .close_table(close_request.clone(), opts.clone()) @@ -271,7 +265,7 @@ async fn handle_create_table_on_shard( .create_table_on_shard(&request) .await .map_err(|e| Box::new(e) as _) - .context(ErrWithCause { + .with_context(|| ErrWithCause { code: StatusCode::Internal, msg: format!( "fail to create table on shard in cluster, req:{:?}", @@ -311,22 +305,12 @@ async fn handle_create_table_on_shard( // Create the table by catalog manager afterwards. let default_catalog = ctx.default_catalog()?; - let schema = default_catalog - .schema_by_name(&table.schema_name) - .map_err(|e| Box::new(e) as _) - .with_context(|| ErrWithCause { - code: StatusCode::Internal, - msg: format!("fail to get schema of table, table_info:{:?}", table), - })? - .with_context(|| ErrNoCause { - code: StatusCode::NotFound, - msg: format!("schema of table is not found, table_info:{:?}", table), - })?; + let schema = find_schema(default_catalog, &table.schema_name)?; let table_schema = SchemaEncoder::default() .decode(&request.encoded_schema) .map_err(|e| Box::new(e) as _) - .context(ErrWithCause { + .with_context(|| ErrWithCause { code: StatusCode::BadRequest, msg: format!( "fail to decode encoded schema bytes, raw_bytes:{:?}", @@ -339,7 +323,7 @@ async fn handle_create_table_on_shard( false => PartitionInfoEncoder::default() .decode(&request.encoded_partition_info) .map_err(|e| Box::new(e) as _) - .context(ErrWithCause { + .with_context(|| ErrWithCause { code: StatusCode::BadRequest, msg: format!( "fail to decode encoded partition info bytes, raw_bytes:{:?}", @@ -389,55 +373,182 @@ async fn handle_drop_table_on_shard( .drop_table_on_shard(&request) .await .map_err(|e| Box::new(e) as _) - .context(ErrWithCause { + .with_context(|| ErrWithCause { code: StatusCode::Internal, msg: format!("fail to drop table on shard in cluster, req:{:?}", request), })?; let table = request.table_info.context(ErrNoCause { code: StatusCode::BadRequest, - msg: "table info is missing in the CreateTableOnShardRequest", + msg: "table info is missing in the DropTableOnShardRequest", })?; // Drop the table by catalog manager afterwards. let default_catalog = ctx.default_catalog()?; - let schema = default_catalog - .schema_by_name(&table.schema_name) + let schema = find_schema(default_catalog, &table.schema_name)?; + + let drop_table_request = DropTableRequest { + catalog_name: ctx.catalog_manager.default_catalog_name().to_string(), + schema_name: table.schema_name, + schema_id: SchemaId::from_u32(table.schema_id), + table_name: table.name, + // FIXME: the engine type should not use the default one. + engine: ANALYTIC_ENGINE_TYPE.to_string(), + }; + let drop_opts = DropOptions { + table_engine: ctx.table_engine, + }; + + schema + .drop_table(drop_table_request.clone(), drop_opts) + .await .map_err(|e| Box::new(e) as _) .with_context(|| ErrWithCause { code: StatusCode::Internal, - msg: format!("fail to get schema of table, table_info:{:?}", table), + msg: format!("fail to drop table with request:{:?}", drop_table_request), + })?; + + Ok(()) +} + +async fn handle_open_table_on_shard( + ctx: HandlerContext, + request: OpenTableOnShardRequest, +) -> Result<()> { + ctx.cluster + .open_table_on_shard(&request) + .await + .map_err(|e| Box::new(e) as _) + .with_context(|| ErrWithCause { + code: StatusCode::Internal, + msg: format!("fail to open table on shard in cluster, req:{:?}", request), + })?; + + let topology = ctx + .cluster + .fetch_nodes() + .await + .map_err(|e| Box::new(e) as _) + .with_context(|| ErrWithCause { + code: StatusCode::Internal, + msg: format!( + "fail to get topology while opening table, request:{:?}", + request + ), + })?; + + let shard_info = request + .update_shard_info + .context(ErrNoCause { + code: StatusCode::BadRequest, + msg: "update shard info is missing in the OpenTableOnShardRequest", })? - .with_context(|| ErrNoCause { - code: StatusCode::NotFound, - msg: format!("schema of table is not found, table_info:{:?}", table), + .curr_shard_info + .context(ErrNoCause { + code: StatusCode::BadRequest, + msg: "current shard info is missing ine OpenTableOnShardRequest", })?; + let table = request.table_info.context(ErrNoCause { + code: StatusCode::BadRequest, + msg: "table info is missing in the OpenTableOnShardRequest", + })?; - let drop_table_request = DropTableRequest { + // Open the table by catalog manager afterwards. + let default_catalog = ctx.default_catalog()?; + + let schema = find_schema(default_catalog, &table.schema_name)?; + + let open_table_request = OpenTableRequest { catalog_name: ctx.catalog_manager.default_catalog_name().to_string(), schema_name: table.schema_name, schema_id: SchemaId::from_u32(table.schema_id), table_name: table.name, // FIXME: the engine type should not use the default one. engine: ANALYTIC_ENGINE_TYPE.to_string(), + shard_id: shard_info.id, + cluster_version: topology.cluster_topology_version, + table_id: TableId::new(table.id), }; - let drop_opts = DropOptions { + let open_opts = OpenOptions { table_engine: ctx.table_engine, }; schema - .drop_table(drop_table_request.clone(), drop_opts) + .open_table(open_table_request.clone(), open_opts) .await .map_err(|e| Box::new(e) as _) .with_context(|| ErrWithCause { code: StatusCode::Internal, - msg: format!("fail to drop table with request:{:?}", drop_table_request), + msg: format!("fail to open table with request:{:?}", open_table_request), })?; Ok(()) } +async fn handle_close_table_on_shard( + ctx: HandlerContext, + request: CloseTableOnShardRequest, +) -> Result<()> { + ctx.cluster + .close_table_on_shard(&request) + .await + .map_err(|e| Box::new(e) as _) + .with_context(|| ErrWithCause { + code: StatusCode::Internal, + msg: format!("fail to close table on shard in cluster, req:{:?}", request), + })?; + + let table = request.table_info.context(ErrNoCause { + code: StatusCode::BadRequest, + msg: "table info is missing in the CloseTableOnShardRequest", + })?; + + // Close the table by catalog manager afterwards. + let default_catalog = ctx.default_catalog()?; + + let schema = find_schema(default_catalog, &table.schema_name)?; + + let close_table_request = CloseTableRequest { + catalog_name: ctx.catalog_manager.default_catalog_name().to_string(), + schema_name: table.schema_name, + schema_id: SchemaId::from_u32(table.schema_id), + table_name: table.name, + table_id: TableId::new(table.id), + // FIXME: the engine type should not use the default one. + engine: ANALYTIC_ENGINE_TYPE.to_string(), + }; + let close_opts = CloseOptions { + table_engine: ctx.table_engine, + }; + + schema + .close_table(close_table_request.clone(), close_opts) + .await + .map_err(|e| Box::new(e) as _) + .with_context(|| ErrWithCause { + code: StatusCode::Internal, + msg: format!("fail to close table with request:{:?}", close_table_request), + })?; + + Ok(()) +} + +#[inline] +fn find_schema(catalog: CatalogRef, schema_name: NameRef) -> Result { + catalog + .schema_by_name(schema_name) + .map_err(|e| Box::new(e) as _) + .with_context(|| ErrWithCause { + code: StatusCode::Internal, + msg: format!("fail to get schema, schema:{:?}", schema_name), + })? + .with_context(|| ErrNoCause { + code: StatusCode::NotFound, + msg: format!("schema is not found, schema:{:?}", schema_name), + }) +} + #[async_trait] impl MetaEventService for MetaServiceImpl { async fn open_shard( @@ -468,6 +579,20 @@ impl MetaEventService for MetaServiceImpl { self.drop_table_on_shard_internal(request).await } + async fn open_table_on_shard( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.open_table_on_shard_internal(request).await + } + + async fn close_table_on_shard( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.close_table_on_shard_internal(request).await + } + async fn split_shard( &self, request: tonic::Request,