Skip to content

Commit

Permalink
fix: failure to open a single table does not interrupt the shard's op…
Browse files Browse the repository at this point in the history
…ening process (apache#722)

* continue to open shard when open table failed

* fmt

* romove unused lines

* Update server/src/grpc/meta_event_service/mod.rs

Co-authored-by: kamille <34352236+Rachelint@users.noreply.github.com>

* add finsih log after open shard

* clippy

* return error when some tables failed to open

* add cost time

* log detail table error

* blank

* chore

---------

Co-authored-by: kamille <34352236+Rachelint@users.noreply.github.com>
  • Loading branch information
MichaelLeeHZ and Rachelint authored Mar 15, 2023
1 parent 9a7ebd1 commit 1d3d34c
Showing 1 changed file with 41 additions and 14 deletions.
55 changes: 41 additions & 14 deletions server/src/grpc/meta_event_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use self::shard_operation::WalCloserAdapter;
use crate::{
grpc::{
meta_event_service::{
error::{ErrNoCause, ErrWithCause, Result, StatusCode},
error::{ErrNoCause, ErrWithCause, Error, Result, StatusCode},
shard_operation::WalRegionCloserRef,
},
metrics::META_EVENT_GRPC_HANDLER_DURATION_HISTOGRAM_VEC,
Expand Down Expand Up @@ -210,6 +210,7 @@ impl HandlerContext {
// implementation.

async fn handle_open_shard(ctx: HandlerContext, request: OpenShardRequest) -> Result<()> {
let instant = Instant::now();
let tables_of_shard =
ctx.cluster
.open_shard(&request)
Expand All @@ -236,6 +237,10 @@ async fn handle_open_shard(ctx: HandlerContext, request: OpenShardRequest) -> Re
table_engine: ctx.table_engine,
};

let mut success = 0;
let mut no_table_count = 0;
let mut open_err_count = 0;

for table in tables_of_shard.tables {
let schema = find_schema(default_catalog.clone(), &table.schema_name)?;

Expand All @@ -249,21 +254,43 @@ async fn handle_open_shard(ctx: HandlerContext, request: OpenShardRequest) -> Re
shard_id: shard_info.id,
cluster_version: topology.cluster_topology_version,
};
schema
.open_table(open_request.clone(), opts.clone())
.await
.box_err()
.with_context(|| ErrWithCause {
code: StatusCode::Internal,
msg: format!("fail to open table, open_request:{open_request:?}"),
})?
.with_context(|| ErrNoCause {
code: StatusCode::Internal,
msg: format!("no table is opened, open_request:{open_request:?}"),
})?;
let result = schema.open_table(open_request.clone(), opts.clone()).await;

match result {
Ok(Some(_)) => {
success += 1;
}
Ok(None) => {
no_table_count += 1;
error!("no table is opened, open_request:{open_request:?}");
}
Err(e) => {
open_err_count += 1;
error!("fail to open table, open_request:{open_request:?}, err:{e}");
}
};
}

Ok(())
info!(
"Open shard finish, shard id:{}, cost:{}ms, successful count:{}, no table is opened count:{}, open error count:{}",
shard_info.id,
instant.saturating_elapsed().as_millis(),
success,
no_table_count,
open_err_count
);

if no_table_count == 0 && open_err_count == 0 {
Ok(())
} else {
Err(Error::ErrNoCause {
code: StatusCode::Internal,
msg: format!(
"Failed to open shard:{}, some tables open failed, no table is opened count:{}, open error count:{}",
shard_info.id, no_table_count, open_err_count
),
})
}
}

async fn handle_close_shard(ctx: HandlerContext, request: CloseShardRequest) -> Result<()> {
Expand Down

0 comments on commit 1d3d34c

Please sign in to comment.