Skip to content

Commit

Permalink
feat: add shard status when heartbeat to meta (#1082)
Browse files Browse the repository at this point in the history
## Rationale
After #1080, ceresdb will not heartbeat to meta until all table open
finished, that is to say that all tables inside a shard will not be used
for write/read, which is unreasonable.

## Detailed Changes
- Introduce PartialOpen status sent to meta, and it will send this
status when part of tables are opened, and meta will route table in this
status.

## Test Plan

Manually
  • Loading branch information
jiacai2050 authored Jul 19, 2023
1 parent ec47df5 commit 37d2dda
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 62 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion cluster/src/cluster_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl ClusterImpl {
loop {
let shard_infos = inner
.shard_set
.all_opened_shards()
.all_shards()
.iter()
.map(|shard| shard.shard_info())
.collect();
Expand Down
80 changes: 22 additions & 58 deletions cluster/src/shard_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@

use std::{collections::HashMap, sync::Arc};

use meta_client::types::{ShardId, ShardInfo, TableInfo, TablesOfShard};
use snafu::{ensure, OptionExt};
use generic_error::BoxError;
use meta_client::types::{ShardId, ShardInfo, ShardStatus, TableInfo, TablesOfShard};
use snafu::{ensure, OptionExt, ResultExt};

use crate::{
shard_operator::{
CloseContext, CloseTableContext, CreateTableContext, DropTableContext, OpenContext,
OpenTableContext, ShardOperator,
},
OpenShardNoCause, Result, ShardVersionMismatch, TableAlreadyExists, TableNotFound,
UpdateFrozenShard,
OpenShardNoCause, OpenShardWithCause, Result, ShardVersionMismatch, TableAlreadyExists,
TableNotFound, UpdateFrozenShard,
};

/// Shard set
Expand All @@ -29,16 +30,6 @@ impl ShardSet {
inner.values().cloned().collect()
}

// Fetch all opened shards.
pub fn all_opened_shards(&self) -> Vec<ShardRef> {
let inner = self.inner.read().unwrap();
inner
.values()
.filter(|shard| shard.is_opened())
.cloned()
.collect()
}

// Get the shard by its id.
pub fn get(&self, shard_id: ShardId) -> Option<ShardRef> {
let inner = self.inner.read().unwrap();
Expand Down Expand Up @@ -77,7 +68,6 @@ impl Shard {
let data = Arc::new(std::sync::RwLock::new(ShardData {
shard_info: tables_of_shard.shard_info,
tables: tables_of_shard.tables,
status: ShardStatus::default(),
}));

let operator = tokio::sync::Mutex::new(ShardOperator { data: data.clone() });
Expand All @@ -87,6 +77,7 @@ impl Shard {

pub fn shard_info(&self) -> ShardInfo {
let data = self.data.read().unwrap();

data.shard_info.clone()
}

Expand All @@ -96,12 +87,19 @@ impl Shard {
}

pub async fn open(&self, ctx: OpenContext) -> Result<()> {
let operator = self.operator.lock().await;
let operator = self
.operator
.try_lock()
.box_err()
.context(OpenShardWithCause {
msg: "Failed to get shard operator lock",
})?;

{
let mut data = self.data.write().unwrap();
if !data.need_open() {
return OpenShardNoCause {
msg: format!("Shard is already in opening, id:{}", data.shard_info.id),
msg: "Shard is already in opening",
}
.fail();
}
Expand Down Expand Up @@ -161,36 +159,6 @@ pub struct UpdatedTableInfo {
pub table_info: TableInfo,
}

/// The status changes of a shard as following:
///
///```plaintext
/// ┌────┐
/// │Init│
/// └──┬─┘
/// ___▽___
/// ╱ ╲ ┌─────┐
/// ╱ Opening ╲____│Ready│
/// ╲ ╱yes └──┬──┘
/// ╲_______╱ ┌───▽──┐
/// │Frozen│
/// └──────┘
/// ```
/// When a open request comes in, shard can only be opened when it's in
/// - `Init`, which means it has not been opened before.
/// - `Opening`, which means it has been opened before, but failed.
#[derive(Debug, Default, PartialEq)]
pub enum ShardStatus {
/// Not allowed report to ceresmeta
#[default]
Init,
/// Not allowed report to ceresmeta
Opening,
/// Healthy
Ready,
/// Further updates are prohibited
Frozen,
}

/// Shard data
#[derive(Debug)]
pub struct ShardData {
Expand All @@ -199,10 +167,6 @@ pub struct ShardData {

/// Tables in shard
pub tables: Vec<TableInfo>,

/// Current status
/// The flow of shard status is: opening -> opened -> frozen
pub status: ShardStatus,
}

impl ShardData {
Expand All @@ -215,19 +179,19 @@ impl ShardData {

#[inline]
pub fn freeze(&mut self) {
self.status = ShardStatus::Frozen;
self.shard_info.status = ShardStatus::Frozen;
}

#[inline]
pub fn begin_open(&mut self) {
self.status = ShardStatus::Opening;
self.shard_info.status = ShardStatus::Opening;
}

#[inline]
pub fn finish_open(&mut self) {
assert_eq!(self.status, ShardStatus::Opening);
assert_eq!(self.shard_info.status, ShardStatus::Opening);

self.status = ShardStatus::Ready;
self.shard_info.status = ShardStatus::Ready;
}

#[inline]
Expand All @@ -237,12 +201,12 @@ impl ShardData {

#[inline]
pub fn is_opened(&self) -> bool {
matches!(self.status, ShardStatus::Ready | ShardStatus::Frozen)
self.shard_info.is_opened()
}

#[inline]
fn is_frozen(&self) -> bool {
matches!(self.status, ShardStatus::Frozen)
matches!(self.shard_info.status, ShardStatus::Frozen)
}

pub fn try_insert_table(&mut self, updated_info: UpdatedTableInfo) -> Result<()> {
Expand Down Expand Up @@ -290,7 +254,7 @@ impl ShardData {
} = updated_info;

ensure!(
!matches!(self.status, ShardStatus::Frozen),
!self.is_frozen(),
UpdateFrozenShard {
shard_id: curr_shard.id,
}
Expand Down
46 changes: 46 additions & 0 deletions meta_client/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,18 +162,58 @@ pub struct NodeInfo {
pub shard_infos: Vec<ShardInfo>,
}

/// The status changes of a shard as following:
///
///```plaintext
/// ┌────┐
/// │Init│
/// └──┬─┘
/// ___▽___
/// ╱ ╲ ┌─────┐
/// ╱ Opening ╲____│Ready│
/// ╲ ╱yes └──┬──┘
/// ╲_______╱ ┌───▽──┐
/// │Frozen│
/// └──────┘
/// ```
/// When an open request comes in, shard can only be opened when it's in
/// - `Init`, which means it has not been opened before.
/// - `Opening`, which means it has been opened before, but failed.
#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize)]
pub enum ShardStatus {
/// Created, but not opened
#[default]
Init,
/// In opening
Opening,
/// Healthy
Ready,
/// Further updates are prohibited
Frozen,
}

#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize)]
pub struct ShardInfo {
pub id: ShardId,
pub role: ShardRole,
pub version: ShardVersion,
// This status is only used for request ceresdb send to ceresmeta via heartbeat
// When ceresdb receive this via open shard request, this field is meanless.
// TODO: Use different request and response body between ceresdb and
// ceresmeta.
pub status: ShardStatus,
}

impl ShardInfo {
#[inline]
pub fn is_leader(&self) -> bool {
self.role == ShardRole::Leader
}

#[inline]
pub fn is_opened(&self) -> bool {
matches!(self.status, ShardStatus::Ready | ShardStatus::Frozen)
}
}

#[derive(Debug, Default, Copy, Clone, Eq, PartialEq, Serialize)]
Expand Down Expand Up @@ -235,6 +275,11 @@ impl From<ShardInfo> for meta_service_pb::ShardInfo {
id: shard_info.id,
role: role as i32,
version: shard_info.version,
status: Some(if shard_info.is_opened() {
meta_service_pb::shard_info::Status::Ready
} else {
meta_service_pb::shard_info::Status::PartialOpen
} as i32),
}
}
}
Expand All @@ -245,6 +290,7 @@ impl From<&meta_service_pb::ShardInfo> for ShardInfo {
id: pb_shard_info.id,
role: pb_shard_info.role().into(),
version: pb_shard_info.version,
status: Default::default(),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions router/src/cluster_based.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ mod tests {
id: 0,
role: Leader,
version: 100,
status: Default::default(),
},
}],
},
Expand Down
2 changes: 1 addition & 1 deletion server/src/grpc/meta_event_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ async fn do_open_shard(ctx: HandlerContext, shard_info: ShardInfo) -> Result<()>

shard.open(open_ctx).await.box_err().context(ErrWithCause {
code: StatusCode::Internal,
msg: "fail to open shard",
msg: format!("fail to open shard, id:{}", shard_info.id),
})
}

Expand Down

0 comments on commit 37d2dda

Please sign in to comment.