Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: failure to open a single table does not interrupt the shard's opening process #722

Merged
merged 11 commits into from
Mar 15, 2023
6 changes: 6 additions & 0 deletions server/src/grpc/meta_event_service/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@ pub enum Error {
msg: String,
source: GenericError,
},

#[snafu(display("Open shard error, code:{:?}, message:{}", code, msg))]
OpenShardErr { code: StatusCode, msg: String },
chunshao90 marked this conversation as resolved.
Show resolved Hide resolved
}

impl Error {
pub fn code(&self) -> StatusCode {
match *self {
Error::ErrNoCause { code, .. } => code,
Error::ErrWithCause { code, .. } => code,
Error::OpenShardErr { code, .. } => code,
}
}

Expand All @@ -42,6 +46,8 @@ impl Error {
let first_line = error_util::remove_backtrace_from_err(&err_string);
format!("{msg}. Caused by: {first_line}")
}

Error::OpenShardErr { msg, .. } => msg.clone(),
}
}
}
Expand Down
55 changes: 41 additions & 14 deletions server/src/grpc/meta_event_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use self::shard_operation::WalCloserAdapter;
use crate::{
grpc::{
meta_event_service::{
error::{ErrNoCause, ErrWithCause, Result, StatusCode},
error::{ErrNoCause, ErrWithCause, Error, Result, StatusCode},
shard_operation::WalRegionCloserRef,
},
metrics::META_EVENT_GRPC_HANDLER_DURATION_HISTOGRAM_VEC,
Expand Down Expand Up @@ -210,6 +210,7 @@ impl HandlerContext {
// implementation.

async fn handle_open_shard(ctx: HandlerContext, request: OpenShardRequest) -> Result<()> {
let instant = Instant::now();
let tables_of_shard =
ctx.cluster
.open_shard(&request)
Expand All @@ -236,6 +237,9 @@ async fn handle_open_shard(ctx: HandlerContext, request: OpenShardRequest) -> Re
table_engine: ctx.table_engine,
};

let mut success = 0;
let mut fail = 0;
let mut err_list = vec![];
for table in tables_of_shard.tables {
let schema = find_schema(default_catalog.clone(), &table.schema_name)?;

Expand All @@ -249,21 +253,44 @@ async fn handle_open_shard(ctx: HandlerContext, request: OpenShardRequest) -> Re
shard_id: shard_info.id,
cluster_version: topology.cluster_topology_version,
};
schema
.open_table(open_request.clone(), opts.clone())
.await
.box_err()
.with_context(|| ErrWithCause {
code: StatusCode::Internal,
msg: format!("fail to open table, open_request:{open_request:?}"),
})?
.with_context(|| ErrNoCause {
code: StatusCode::Internal,
msg: format!("no table is opened, open_request:{open_request:?}"),
})?;
let result = schema.open_table(open_request.clone(), opts.clone()).await;

match result {
Ok(Some(_)) => {
success += 1;
}
Ok(None) => {
MichaelLeeHZ marked this conversation as resolved.
Show resolved Hide resolved
fail += 1;
error!("no table is opened, open_request:{open_request:?}");
err_list.push(table.name);
}
Err(e) => {
MichaelLeeHZ marked this conversation as resolved.
Show resolved Hide resolved
fail += 1;
error!("fail to open table, open_request:{open_request:?}, err:{e}");
err_list.push(table.name);
}
};
}

Ok(())
info!(
"Open shard finish, shard id:{}, cost:{}ms, successful tables:{}, failed tables:{}",
shard_info.id,
instant.saturating_elapsed().as_millis(),
success,
fail
);

if err_list.is_empty() {
Ok(())
} else {
Err(Error::OpenShardErr {
code: StatusCode::Internal,
msg: format!(
"Failed to open shard:{}, because of failed tables:{err_list:?}",
shard_info.id
chunshao90 marked this conversation as resolved.
Show resolved Hide resolved
),
})
}
}

async fn handle_close_shard(ctx: HandlerContext, request: CloseShardRequest) -> Result<()> {
Expand Down