Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: remove shard from cluster topology after shard closed #908

Merged
merged 6 commits into from
May 23, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion analytic_engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::sync::Arc;

use async_trait::async_trait;
use common_util::error::BoxError;
use log::{error, info};
use log::{error, info, warn};
use snafu::ResultExt;
use table_engine::{
engine::{
Expand Down Expand Up @@ -64,6 +64,10 @@ impl TableEngineImpl {
table_opt
});

if let Ok(None) = result {
warn!("Try to open a missing table, open request:{request:?}");
}

open_results.push(result);
}

Expand Down
45 changes: 20 additions & 25 deletions catalog/src/table_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ impl TableOperator {

// Check and register successful opened table into schema.
let mut success_count = 0_u32;
let mut no_table_count = 0_u32;
let mut open_err_count = 0_u32;
let mut missing_table_count = 0_u32;
let mut open_table_errs = Vec::new();

for (schema, open_result) in schemas.into_iter().zip(open_results.into_iter()) {
match open_result {
Expand All @@ -65,31 +65,31 @@ impl TableOperator {
success_count += 1;
}
Ok(None) => {
no_table_count += 1;
missing_table_count += 1;
}
// Has printed error log for it.
Err(_) => {
open_err_count += 1;
Err(e) => {
open_table_errs.push(e);
}
}
}

info!(
"Open shard finish, shard id:{}, cost:{}ms, successful count:{}, no table is opened count:{}, open error count:{}",
shard_id,
"Open shard finish, shard id:{shard_id}, cost:{}ms, success_count:{success_count}, missing_table_count:{missing_table_count}, open_table_errs:{open_table_errs:?}",
instant.saturating_elapsed().as_millis(),
success_count,
no_table_count,
open_err_count
);

if no_table_count == 0 && open_err_count == 0 {
if missing_table_count == 0 && open_table_errs.is_empty() {
Ok(())
} else {
TableOperatorNoCause {
msg: format!(
"Failed to open shard, some tables open failed, shard id:{shard_id}, no table is opened count:{no_table_count}, open error count:{open_err_count}"),
}.fail()
let msg = format!(
"Failed to open shard, some tables open failed, shard id:{shard_id}, \
missing_table_count:{missing_table_count}, \
open_err_count:{}",
open_table_errs.len()
);

TableOperatorNoCause { msg }.fail()
}
}

Expand Down Expand Up @@ -118,34 +118,29 @@ impl TableOperator {

// Check and unregister successful closed table from schema.
let mut success_count = 0_u32;
let mut close_err_count = 0_u32;
let mut close_table_errs = Vec::new();

for (schema, close_result) in schemas.into_iter().zip(close_results.into_iter()) {
match close_result {
Ok(table_name) => {
schema.unregister_table(&table_name);
success_count += 1;
}
Err(_) => {
close_err_count += 1;
}
Err(e) => close_table_errs.push(e),
}
}

info!(
"Close shard finished, shard id:{}, cost:{}ms, success_count:{}, close_err_count:{}",
shard_id,
"Close shard finished, shard id:{shard_id}, cost:{}ms, success_count:{success_count}, close_table_errs:{close_table_errs:?}",
instant.saturating_elapsed().as_millis(),
success_count,
close_err_count
);

if close_err_count == 0 {
if close_table_errs.is_empty() {
Ok(())
} else {
TableOperatorNoCause {
msg: format!(
"Failed to close shard, shard id:{shard_id}, success_count:{success_count}, close_err_count:{close_err_count}",
"Failed to close shard, shard id:{shard_id}, success_count:{success_count}, close_err_count:{}", close_table_errs.len(),
),
}
.fail()
Expand Down
22 changes: 17 additions & 5 deletions cluster/src/cluster_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,7 @@ impl Inner {
msg: "shard tables are missing from the response",
})?;

self.shard_tables_cache
.insert_or_update(tables_of_shard.clone());
self.shard_tables_cache.insert(tables_of_shard.clone());

Ok(tables_of_shard)
}
Expand All @@ -296,20 +295,29 @@ impl Inner {
})
}

#[inline]
fn freeze_shard(&self, shard_id: ShardId) -> Result<TablesOfShard> {
self.shard_tables_cache
.freeze(shard_id)
.with_context(|| ShardNotFound {
msg: format!("try to freeze a non-existent shard, shard_id:{shard_id}"),
})
}

fn create_table_on_shard(&self, req: &CreateTableOnShardRequest) -> Result<()> {
self.insert_table_to_shard(req.update_shard_info.clone(), req.table_info.clone())
}

fn drop_table_on_shard(&self, req: &DropTableOnShardRequest) -> Result<()> {
self.remove_table_to_shard(req.update_shard_info.clone(), req.table_info.clone())
self.remove_table_from_shard(req.update_shard_info.clone(), req.table_info.clone())
}

fn open_table_on_shard(&self, req: &OpenTableOnShardRequest) -> Result<()> {
self.insert_table_to_shard(req.update_shard_info.clone(), req.table_info.clone())
}

fn close_table_on_shard(&self, req: &CloseTableOnShardRequest) -> Result<()> {
self.remove_table_to_shard(req.update_shard_info.clone(), req.table_info.clone())
self.remove_table_from_shard(req.update_shard_info.clone(), req.table_info.clone())
}

fn insert_table_to_shard(
Expand Down Expand Up @@ -338,7 +346,7 @@ impl Inner {
)
}

fn remove_table_to_shard(
fn remove_table_from_shard(
&self,
update_shard_info: Option<UpdateShardInfo>,
table_info: Option<TableInfoPb>,
Expand Down Expand Up @@ -406,6 +414,10 @@ impl Cluster for ClusterImpl {
self.inner.close_shard(shard_id)
}

async fn freeze_shard(&self, shard_id: ShardId) -> Result<TablesOfShard> {
self.inner.freeze_shard(shard_id)
}

async fn create_table_on_shard(&self, req: &CreateTableOnShardRequest) -> Result<()> {
self.inner.create_table_on_shard(req)
}
Expand Down
13 changes: 13 additions & 0 deletions cluster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ pub enum Error {
backtrace: Backtrace,
},

#[snafu(display("Update on a frozen shard, shard_id:{shard_id}.\nBacktrace:\n{backtrace}",))]
UpdateFrozenShard {
shard_id: ShardId,
backtrace: Backtrace,
},

#[snafu(display(
"Cluster nodes are not found in the topology, version:{version}.\nBacktrace:\n{backtrace}",
))]
Expand All @@ -127,7 +133,14 @@ pub trait Cluster {
async fn start(&self) -> Result<()>;
async fn stop(&self) -> Result<()>;
async fn open_shard(&self, shard_info: &ShardInfo) -> Result<TablesOfShard>;
/// Close the shard.
///
/// Return error if the shard is not found.
async fn close_shard(&self, req: ShardId) -> Result<TablesOfShard>;
/// Freeze the shard to reject create/drop table on the shard.
///
/// Return error if the shard is not found.
async fn freeze_shard(&self, req: ShardId) -> Result<TablesOfShard>;
async fn create_table_on_shard(&self, req: &CreateTableOnShardRequest) -> Result<()>;
async fn drop_table_on_shard(&self, req: &DropTableOnShardRequest) -> Result<()>;
async fn open_table_on_shard(&self, req: &OpenTableOnShardRequest) -> Result<()>;
Expand Down
8 changes: 4 additions & 4 deletions cluster/src/shard_lock_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@ impl ShardLockManager {
OnExpired: FnOnce(ShardId) -> Fut + Send + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
info!("Try to grant lock for shard:{shard_id}");
info!("Try to grant lock for shard, shard_id:{shard_id}");

let mut shard_locks = self.shard_locks.write().await;
if let Some(shard_lock) = shard_locks.get_mut(&shard_id) {
Expand Down Expand Up @@ -698,7 +698,7 @@ impl ShardLockManager {
shard_locks.insert(shard_id, shard_lock);
}

info!("Finish granting lock for shard:{shard_id}");
info!("Finish granting lock for shard, shard_id:{shard_id}");
Ok(true)
}

Expand All @@ -707,7 +707,7 @@ impl ShardLockManager {
/// If the lock is not exist, return false. And the `on_lock_expired` won't
/// be triggered.
pub async fn revoke_lock(&self, shard_id: u32) -> Result<bool> {
info!("Try to revoke lock for shard:{shard_id}");
info!("Try to revoke lock for shard, shard_id:{shard_id}");

let mut shard_locks = self.shard_locks.write().await;
let shard_lock = shard_locks.remove(&shard_id);
Expand All @@ -716,7 +716,7 @@ impl ShardLockManager {
let mut etcd_client = self.etcd_client.clone();
v.revoke(&mut etcd_client).await?;

info!("Finish revoking lock for shard:{shard_id}");
info!("Finish revoking lock for shard, shard_id:{shard_id}");
Ok(true)
}
None => {
Expand Down
Loading