diff --git a/Cargo.lock b/Cargo.lock index 45aa776217..b1a04366f5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1109,9 +1109,9 @@ dependencies = [ [[package]] name = "ceresdbproto" -version = "1.0.7" +version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff289995b93a0f20dd77342a2ac85447a68e62c03b56704b588630c4d98b08d3" +checksum = "25926e49d9d931b3089b26aba55cd5057631db452137f45d0d24f8b5dae8a28c" dependencies = [ "prost", "protoc-bin-vendored", diff --git a/cluster/src/cluster_impl.rs b/cluster/src/cluster_impl.rs index 9ff0295cea..f533a2b937 100644 --- a/cluster/src/cluster_impl.rs +++ b/cluster/src/cluster_impl.rs @@ -102,7 +102,7 @@ impl ClusterImpl { loop { let shard_infos = inner .shard_set - .all_opened_shards() + .all_shards() .iter() .map(|shard| shard.shard_info()) .collect(); diff --git a/cluster/src/shard_set.rs b/cluster/src/shard_set.rs index bbbc3cc4aa..a13c5628b2 100644 --- a/cluster/src/shard_set.rs +++ b/cluster/src/shard_set.rs @@ -2,16 +2,17 @@ use std::{collections::HashMap, sync::Arc}; -use meta_client::types::{ShardId, ShardInfo, TableInfo, TablesOfShard}; -use snafu::{ensure, OptionExt}; +use generic_error::BoxError; +use meta_client::types::{ShardId, ShardInfo, ShardStatus, TableInfo, TablesOfShard}; +use snafu::{ensure, OptionExt, ResultExt}; use crate::{ shard_operator::{ CloseContext, CloseTableContext, CreateTableContext, DropTableContext, OpenContext, OpenTableContext, ShardOperator, }, - OpenShardNoCause, Result, ShardVersionMismatch, TableAlreadyExists, TableNotFound, - UpdateFrozenShard, + OpenShardNoCause, OpenShardWithCause, Result, ShardVersionMismatch, TableAlreadyExists, + TableNotFound, UpdateFrozenShard, }; /// Shard set @@ -29,16 +30,6 @@ impl ShardSet { inner.values().cloned().collect() } - // Fetch all opened shards. - pub fn all_opened_shards(&self) -> Vec { - let inner = self.inner.read().unwrap(); - inner - .values() - .filter(|shard| shard.is_opened()) - .cloned() - .collect() - } - // Get the shard by its id. pub fn get(&self, shard_id: ShardId) -> Option { let inner = self.inner.read().unwrap(); @@ -77,7 +68,6 @@ impl Shard { let data = Arc::new(std::sync::RwLock::new(ShardData { shard_info: tables_of_shard.shard_info, tables: tables_of_shard.tables, - status: ShardStatus::default(), })); let operator = tokio::sync::Mutex::new(ShardOperator { data: data.clone() }); @@ -87,6 +77,7 @@ impl Shard { pub fn shard_info(&self) -> ShardInfo { let data = self.data.read().unwrap(); + data.shard_info.clone() } @@ -96,12 +87,19 @@ impl Shard { } pub async fn open(&self, ctx: OpenContext) -> Result<()> { - let operator = self.operator.lock().await; + let operator = self + .operator + .try_lock() + .box_err() + .context(OpenShardWithCause { + msg: "Failed to get shard operator lock", + })?; + { let mut data = self.data.write().unwrap(); if !data.need_open() { return OpenShardNoCause { - msg: format!("Shard is already in opening, id:{}", data.shard_info.id), + msg: "Shard is already in opening", } .fail(); } @@ -161,36 +159,6 @@ pub struct UpdatedTableInfo { pub table_info: TableInfo, } -/// The status changes of a shard as following: -/// -///```plaintext -/// ┌────┐ -/// │Init│ -/// └──┬─┘ -/// ___▽___ -/// ╱ ╲ ┌─────┐ -/// ╱ Opening ╲____│Ready│ -/// ╲ ╱yes └──┬──┘ -/// ╲_______╱ ┌───▽──┐ -/// │Frozen│ -/// └──────┘ -/// ``` -/// When a open request comes in, shard can only be opened when it's in -/// - `Init`, which means it has not been opened before. -/// - `Opening`, which means it has been opened before, but failed. -#[derive(Debug, Default, PartialEq)] -pub enum ShardStatus { - /// Not allowed report to ceresmeta - #[default] - Init, - /// Not allowed report to ceresmeta - Opening, - /// Healthy - Ready, - /// Further updates are prohibited - Frozen, -} - /// Shard data #[derive(Debug)] pub struct ShardData { @@ -199,10 +167,6 @@ pub struct ShardData { /// Tables in shard pub tables: Vec, - - /// Current status - /// The flow of shard status is: opening -> opened -> frozen - pub status: ShardStatus, } impl ShardData { @@ -215,19 +179,19 @@ impl ShardData { #[inline] pub fn freeze(&mut self) { - self.status = ShardStatus::Frozen; + self.shard_info.status = ShardStatus::Frozen; } #[inline] pub fn begin_open(&mut self) { - self.status = ShardStatus::Opening; + self.shard_info.status = ShardStatus::Opening; } #[inline] pub fn finish_open(&mut self) { - assert_eq!(self.status, ShardStatus::Opening); + assert_eq!(self.shard_info.status, ShardStatus::Opening); - self.status = ShardStatus::Ready; + self.shard_info.status = ShardStatus::Ready; } #[inline] @@ -237,12 +201,12 @@ impl ShardData { #[inline] pub fn is_opened(&self) -> bool { - matches!(self.status, ShardStatus::Ready | ShardStatus::Frozen) + self.shard_info.is_opened() } #[inline] fn is_frozen(&self) -> bool { - matches!(self.status, ShardStatus::Frozen) + matches!(self.shard_info.status, ShardStatus::Frozen) } pub fn try_insert_table(&mut self, updated_info: UpdatedTableInfo) -> Result<()> { @@ -290,7 +254,7 @@ impl ShardData { } = updated_info; ensure!( - !matches!(self.status, ShardStatus::Frozen), + !self.is_frozen(), UpdateFrozenShard { shard_id: curr_shard.id, } diff --git a/meta_client/src/types.rs b/meta_client/src/types.rs index b47a5325d4..fc0e1b3c5c 100644 --- a/meta_client/src/types.rs +++ b/meta_client/src/types.rs @@ -162,11 +162,46 @@ pub struct NodeInfo { pub shard_infos: Vec, } +/// The status changes of a shard as following: +/// +///```plaintext +/// ┌────┐ +/// │Init│ +/// └──┬─┘ +/// ___▽___ +/// ╱ ╲ ┌─────┐ +/// ╱ Opening ╲____│Ready│ +/// ╲ ╱yes └──┬──┘ +/// ╲_______╱ ┌───▽──┐ +/// │Frozen│ +/// └──────┘ +/// ``` +/// When an open request comes in, shard can only be opened when it's in +/// - `Init`, which means it has not been opened before. +/// - `Opening`, which means it has been opened before, but failed. +#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize)] +pub enum ShardStatus { + /// Created, but not opened + #[default] + Init, + /// In opening + Opening, + /// Healthy + Ready, + /// Further updates are prohibited + Frozen, +} + #[derive(Debug, Default, Clone, PartialEq, Eq, Serialize)] pub struct ShardInfo { pub id: ShardId, pub role: ShardRole, pub version: ShardVersion, + // This status is only used for request ceresdb send to ceresmeta via heartbeat + // When ceresdb receive this via open shard request, this field is meanless. + // TODO: Use different request and response body between ceresdb and + // ceresmeta. + pub status: ShardStatus, } impl ShardInfo { @@ -174,6 +209,11 @@ impl ShardInfo { pub fn is_leader(&self) -> bool { self.role == ShardRole::Leader } + + #[inline] + pub fn is_opened(&self) -> bool { + matches!(self.status, ShardStatus::Ready | ShardStatus::Frozen) + } } #[derive(Debug, Default, Copy, Clone, Eq, PartialEq, Serialize)] @@ -235,6 +275,11 @@ impl From for meta_service_pb::ShardInfo { id: shard_info.id, role: role as i32, version: shard_info.version, + status: Some(if shard_info.is_opened() { + meta_service_pb::shard_info::Status::Ready + } else { + meta_service_pb::shard_info::Status::PartialOpen + } as i32), } } } @@ -245,6 +290,7 @@ impl From<&meta_service_pb::ShardInfo> for ShardInfo { id: pb_shard_info.id, role: pb_shard_info.role().into(), version: pb_shard_info.version, + status: Default::default(), } } } diff --git a/router/src/cluster_based.rs b/router/src/cluster_based.rs index b9b8e73b1f..15b9a0e306 100644 --- a/router/src/cluster_based.rs +++ b/router/src/cluster_based.rs @@ -224,6 +224,7 @@ mod tests { id: 0, role: Leader, version: 100, + status: Default::default(), }, }], }, diff --git a/server/src/grpc/meta_event_service/mod.rs b/server/src/grpc/meta_event_service/mod.rs index 69f76c49a6..b7f3bfe891 100644 --- a/server/src/grpc/meta_event_service/mod.rs +++ b/server/src/grpc/meta_event_service/mod.rs @@ -299,7 +299,7 @@ async fn do_open_shard(ctx: HandlerContext, shard_info: ShardInfo) -> Result<()> shard.open(open_ctx).await.box_err().context(ErrWithCause { code: StatusCode::Internal, - msg: "fail to open shard", + msg: format!("fail to open shard, id:{}", shard_info.id), }) }