Skip to content

Commit

Permalink
feat: add table status check (apache#1418)
Browse files Browse the repository at this point in the history
## Rationale
Refer to this issue
apache#1386, currently, if
the status of the shard is abnormal, we cannot get any valid exception
information from the error message `table not found`.

## Detailed Changes
* Add `TableStatus` in `cluster`, you can use it to get the status of
the table in the current cluster..
* Add `SchemaWithCluster`, It wraps the schema inside the cluster,
through which the state of the cluster and schema can be combined.

## Test Plan
Pass CI.
  • Loading branch information
ZuLiangWang authored Jan 8, 2024
1 parent 5377dfd commit dc10253
Show file tree
Hide file tree
Showing 10 changed files with 202 additions and 9 deletions.
3 changes: 3 additions & 0 deletions catalog/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ pub enum Error {
table: String,
backtrace: Backtrace,
},

#[snafu(display("Table is not ready, err:{}", source))]
TableNotReady { source: GenericError },
}

define_result!(Error);
Expand Down
115 changes: 115 additions & 0 deletions catalog_impls/src/cluster_based.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright 2023 The HoraeDB Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use async_trait::async_trait;
use catalog::{
schema,
schema::{
CreateOptions, CreateTableRequest, DropOptions, DropTableRequest, NameRef, Schema,
SchemaRef, TableNotReady,
},
};
use cluster::{ClusterRef, TableStatus};
use generic_error::BoxError;
use snafu::{ResultExt, Snafu};
use table_engine::table::{SchemaId, TableRef};

#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Invalid table status, status:{:?}", status))]
InvalidTableStatus { status: TableStatus },
}

/// A cluster-based implementation for [`schema`].

/// Schema with cluster.
/// It binds cluster and schema and will detect the health status of the cluster
/// when calling the schema interface.
pub(crate) struct SchemaWithCluster {
internal: SchemaRef,

cluster: ClusterRef,
}

impl SchemaWithCluster {
pub(crate) fn new(internal: SchemaRef, cluster: ClusterRef) -> SchemaWithCluster {
SchemaWithCluster { internal, cluster }
}

// Get table status, return None when table not found in shard.
fn table_status(&self, table_name: NameRef) -> Option<TableStatus> {
self.cluster.get_table_status(self.name(), table_name)
}
}

#[async_trait]
impl Schema for SchemaWithCluster {
fn name(&self) -> NameRef {
self.internal.name()
}

fn id(&self) -> SchemaId {
self.internal.id()
}

fn table_by_name(&self, name: NameRef) -> schema::Result<Option<TableRef>> {
let find_table_result = self.internal.table_by_name(name)?;

if find_table_result.is_none() {
return match self.table_status(name) {
// Table not found in schema and shard not contains this table.
None => Ok(None),
// Table not found in schema but shard contains this table.
// Check the status of the shard.
Some(table_status) => InvalidTableStatus {
status: table_status,
}
.fail()
.box_err()
.with_context(|| TableNotReady {})?,
};
}

Ok(find_table_result)
}

async fn create_table(
&self,
request: CreateTableRequest,
opts: CreateOptions,
) -> schema::Result<TableRef> {
self.internal.create_table(request, opts).await
}

async fn drop_table(
&self,
request: DropTableRequest,
opts: DropOptions,
) -> schema::Result<bool> {
self.internal.drop_table(request, opts).await
}

fn all_tables(&self) -> schema::Result<Vec<TableRef>> {
self.internal.all_tables()
}

fn register_table(&self, table: TableRef) {
self.internal.register_table(table)
}

fn unregister_table(&self, table_name: &str) {
self.internal.unregister_table(table_name)
}
}
1 change: 1 addition & 0 deletions catalog_impls/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use system_catalog::{tables::Tables, SystemTableAdapter};

use crate::system_tables::{SystemTables, SystemTablesBuilder};

mod cluster_based;
mod system_tables;
pub mod table_based;
pub mod volatile;
Expand Down
20 changes: 16 additions & 4 deletions catalog_impls/src/volatile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use catalog::{
},
Catalog, CatalogRef, CreateSchemaWithCause,
};
use cluster::shard_set::ShardSet;
use cluster::{shard_set::ShardSet, ClusterRef};
use common_types::schema::SchemaName;
use generic_error::BoxError;
use logger::{debug, info};
Expand All @@ -41,19 +41,23 @@ use snafu::{ensure, OptionExt, ResultExt};
use table_engine::table::{SchemaId, TableRef};
use tokio::sync::Mutex;

use crate::cluster_based::SchemaWithCluster;

/// ManagerImpl manages multiple volatile catalogs.
pub struct ManagerImpl {
catalogs: HashMap<String, Arc<CatalogImpl>>,
shard_set: ShardSet,
meta_client: MetaClientRef,
cluster: ClusterRef,
}

impl ManagerImpl {
pub fn new(shard_set: ShardSet, meta_client: MetaClientRef) -> Self {
pub fn new(shard_set: ShardSet, meta_client: MetaClientRef, cluster: ClusterRef) -> Self {
let mut manager = ManagerImpl {
catalogs: HashMap::new(),
shard_set,
meta_client,
cluster,
};

manager.maybe_create_default_catalog();
Expand Down Expand Up @@ -101,6 +105,7 @@ impl ManagerImpl {
schemas: RwLock::new(HashMap::new()),
shard_set: self.shard_set.clone(),
meta_client: self.meta_client.clone(),
cluster: self.cluster.clone(),
});

self.catalogs.insert(catalog_name, catalog.clone());
Expand All @@ -121,6 +126,7 @@ struct CatalogImpl {
schemas: RwLock<HashMap<SchemaName, SchemaRef>>,
shard_set: ShardSet,
meta_client: MetaClientRef,
cluster: ClusterRef,
}

#[async_trait]
Expand Down Expand Up @@ -171,7 +177,10 @@ impl Catalog for CatalogImpl {
self.shard_set.clone(),
));

schemas.insert(name.to_string(), schema);
let cluster_based: SchemaRef =
Arc::new(SchemaWithCluster::new(schema, self.cluster.clone()));

schemas.insert(name.to_string(), cluster_based);

info!(
"create schema success, catalog:{}, schema:{}",
Expand Down Expand Up @@ -282,7 +291,10 @@ impl Schema for SchemaImpl {
}

fn table_by_name(&self, name: NameRef) -> schema::Result<Option<TableRef>> {
let table = self.tables.read().unwrap().get(name).cloned();
let table = self
.get_table(self.catalog_name.as_str(), self.schema_name.as_str(), name)
.unwrap()
.clone();
Ok(table)
}

Expand Down
21 changes: 20 additions & 1 deletion cluster/src/cluster_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use crate::{
topology::ClusterTopology,
Cluster, ClusterNodesNotFound, ClusterNodesResp, EtcdClientFailureWithCause,
InitEtcdClientConfig, InvalidArguments, MetaClientFailure, OpenShard, OpenShardWithCause,
Result, ShardNotFound,
Result, ShardNotFound, TableStatus,
};

/// ClusterImpl is an implementation of [`Cluster`] based [`MetaClient`].
Expand Down Expand Up @@ -311,6 +311,19 @@ impl Inner {
self.shard_set.get(shard_id)
}

/// Get shard by table name.
///
/// This method is similar to `route_tables`, but it will not send request
/// to meta server, it only load data from local cache.
/// If target table is not found in any shards in this cluster, return None.
/// Otherwise, return the shard where this table is exists.
fn get_shard_by_table_name(&self, schema_name: &str, table_name: &str) -> Option<ShardRef> {
let shards = self.shard_set.all_shards();
shards
.into_iter()
.find(|shard| shard.find_table(schema_name, table_name).is_some())
}

fn close_shard(&self, shard_id: ShardId) -> Result<ShardRef> {
info!("Remove shard from shard_set, id:{shard_id}");
self.shard_set
Expand Down Expand Up @@ -368,6 +381,12 @@ impl Cluster for ClusterImpl {
self.inner.shard(shard_id)
}

fn get_table_status(&self, schema_name: &str, table_name: &str) -> Option<TableStatus> {
self.inner
.get_shard_by_table_name(schema_name, table_name)
.map(|shard| TableStatus::from(shard.get_status()))
}

async fn close_shard(&self, shard_id: ShardId) -> Result<ShardRef> {
self.inner.close_shard(shard_id)
}
Expand Down
24 changes: 22 additions & 2 deletions cluster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ use common_types::schema::SchemaName;
use generic_error::GenericError;
use macros::define_result;
use meta_client::types::{
ClusterNodesRef, RouteTablesRequest, RouteTablesResponse, ShardId, ShardInfo, ShardVersion,
ClusterNodesRef, RouteTablesRequest, RouteTablesResponse, ShardId, ShardInfo, ShardStatus,
ShardVersion,
};
use shard_lock_manager::ShardLockManagerRef;
use snafu::{Backtrace, Snafu};
Expand Down Expand Up @@ -161,6 +162,23 @@ pub enum Error {

define_result!(Error);

#[derive(Debug)]
pub enum TableStatus {
Ready,
Recovering,
Frozen,
}

impl From<ShardStatus> for TableStatus {
fn from(value: ShardStatus) -> Self {
match value {
ShardStatus::Init | ShardStatus::Opening => TableStatus::Recovering,
ShardStatus::Ready => TableStatus::Ready,
ShardStatus::Frozen => TableStatus::Frozen,
}
}
}

pub type ClusterRef = Arc<dyn Cluster + Send + Sync>;

#[derive(Clone, Debug)]
Expand All @@ -184,12 +202,14 @@ pub trait Cluster {
/// None.
fn shard(&self, shard_id: ShardId) -> Option<ShardRef>;

fn get_table_status(&self, schema_name: &str, table_name: &str) -> Option<TableStatus>;

/// Close shard.
///
/// Return error if the shard is not found.
async fn close_shard(&self, shard_id: ShardId) -> Result<ShardRef>;

/// list shards
/// list loaded shards in current node.
fn list_shards(&self) -> Vec<ShardInfo>;

async fn route_tables(&self, req: &RouteTablesRequest) -> Result<RouteTablesResponse>;
Expand Down
10 changes: 10 additions & 0 deletions cluster/src/shard_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,21 @@ impl Shard {
ret
}

pub fn get_status(&self) -> ShardStatus {
let data = self.data.read().unwrap();
data.shard_info.status.clone()
}

pub fn is_opened(&self) -> bool {
let data = self.data.read().unwrap();
data.is_opened()
}

pub fn is_frozen(&self) -> bool {
let data = self.data.read().unwrap();
data.is_frozen()
}

pub async fn close(&self, ctx: CloseContext) -> Result<()> {
let operator = self.operator.lock().await;
operator.close(ctx).await
Expand Down
5 changes: 5 additions & 0 deletions meta_client/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,11 @@ impl ShardInfo {
pub fn is_opened(&self) -> bool {
matches!(self.status, ShardStatus::Ready | ShardStatus::Frozen)
}

#[inline]
pub fn is_ready(&self) -> bool {
matches!(self.status, ShardStatus::Ready)
}
}

#[derive(Debug, Default, Copy, Clone, Eq, PartialEq, Serialize)]
Expand Down
5 changes: 5 additions & 0 deletions router/src/cluster_based.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ mod tests {
use ceresdbproto::storage::{RequestContext, RouteRequest as RouteRequestPb};
use cluster::{
shard_lock_manager::ShardLockManagerRef, shard_set::ShardRef, Cluster, ClusterNodesResp,
TableStatus,
};
use common_types::table::ShardId;
use meta_client::types::{
Expand Down Expand Up @@ -230,6 +231,10 @@ mod tests {
unimplemented!();
}

fn get_table_status(&self, _: &str, _: &str) -> Option<TableStatus> {
unimplemented!()
}

async fn close_shard(&self, _: ShardId) -> cluster::Result<ShardRef> {
unimplemented!();
}
Expand Down
7 changes: 5 additions & 2 deletions src/ceresdb/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,11 @@ async fn build_with_meta<T: WalsOpener>(
};
let engine_proxy = build_table_engine_proxy(engine_builder).await;

let meta_based_manager_ref =
Arc::new(volatile::ManagerImpl::new(shard_set, meta_client.clone()));
let meta_based_manager_ref = Arc::new(volatile::ManagerImpl::new(
shard_set,
meta_client.clone(),
cluster.clone(),
));

// Build catalog manager.
let catalog_manager = Arc::new(CatalogManagerImpl::new(meta_based_manager_ref));
Expand Down

0 comments on commit dc10253

Please sign in to comment.