Skip to content

Commit

Permalink
add retry when open shard.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachelint committed Jul 4, 2023
1 parent f2965b0 commit 865f9e6
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 18 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions analytic_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,16 @@ pub struct Config {

/// Batch size to read records from wal to replay
pub replay_batch_size: usize,

/// Batch size to replay tables
pub max_replay_tables_per_batch: usize,

/// Replay retry limit
pub replay_retry_limit: usize,

/// Replay retry interval
pub replay_retry_interval: ReadableDuration,

/// Default options for table
pub table_opts: TableOptions,

Expand Down Expand Up @@ -140,6 +147,8 @@ impl Default for Config {
wal: WalStorageConfig::RocksDB(Box::default()),
remote_engine_client: remote_engine_client::config::Config::default(),
recover_mode: RecoverMode::TableBased,
replay_retry_limit: 5,
replay_retry_interval: ReadableDuration::millis(500),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ common_util = { workspace = true }
log = { workspace = true }
snafu = { workspace = true }
table_engine = { workspace = true }
tokio = { workspace = true }
105 changes: 92 additions & 13 deletions catalog/src/table_operator.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.

use std::time::Instant;
use std::{
collections::HashMap,
time::{Duration, Instant},
};

use common_types::table::ShardId;
use common_util::{error::BoxError, time::InstantExt};
use log::{error, info, warn};
use snafu::{OptionExt, ResultExt};
use table_engine::{engine, table::TableRef};
use table_engine::{
engine,
table::{TableId, TableRef},
};

use crate::{
manager::ManagerRef,
schema::{
CloseOptions, CloseShardRequest, CloseTableRequest, CreateOptions, CreateTableRequest,
DropOptions, DropTableRequest, OpenOptions, OpenShardRequest, OpenTableRequest, SchemaRef,
TableDef,
},
Result, TableOperatorNoCause, TableOperatorWithCause,
};
Expand All @@ -23,33 +31,93 @@ use crate::{
#[derive(Clone)]
pub struct TableOperator {
catalog_manager: ManagerRef,
retry_limit: usize,
retry_interval: Duration,
}

enum OpenShardResult {
Success,
Retry(HashMap<TableId, TableDef>),
}

struct OpenShardContext<'a> {
shard_id: ShardId,
table_defs: HashMap<TableId, TableDef>,
engine: String,
retry: &'a mut usize,
retry_limit: usize,
}

impl TableOperator {
pub fn new(catalog_manager: ManagerRef) -> Self {
Self { catalog_manager }
pub fn new(catalog_manager: ManagerRef, retry_limit: usize, retry_interval: Duration) -> Self {
Self {
catalog_manager,
retry_limit,
retry_interval,
}
}

pub async fn open_shard(&self, request: OpenShardRequest, opts: OpenOptions) -> Result<()> {
let mut retry = 0;
let mut table_defs = request
.table_defs
.into_iter()
.map(|def| (def.id, def))
.collect::<HashMap<_, _>>();

loop {
let context = OpenShardContext {
shard_id: request.shard_id,
table_defs,
engine: request.engine.clone(),
retry: &mut retry,
retry_limit: self.retry_limit,
};

let once_result = self.open_shard_with_retry(context, opts.clone()).await;
match once_result {
Ok(OpenShardResult::Success) => break Ok(()),
Ok(OpenShardResult::Retry(retry_table_defs)) => {
table_defs = retry_table_defs;

// Sleep a while before next attempt.
tokio::time::sleep(self.retry_interval).await;
continue;
}
Err(e) => break Err(e),
}
}
}

async fn open_shard_with_retry(
&self,
context: OpenShardContext<'_>,
opts: OpenOptions,
) -> Result<OpenShardResult> {
info!(
"TableOperator retry to open shard, retry:{}, retry_limit:{}, shard_id:{}",
context.retry, context.retry_limit, context.shard_id
);

let instant = Instant::now();
let table_engine = opts.table_engine;
let shard_id = request.shard_id;
let shard_id = context.shard_id;

// Generate open requests.
let mut related_schemas = Vec::with_capacity(request.table_defs.len());
let mut engine_table_defs = Vec::with_capacity(request.table_defs.len());
for open_ctx in request.table_defs {
let mut related_schemas = Vec::with_capacity(context.table_defs.len());
let mut engine_table_defs = Vec::with_capacity(context.table_defs.len());
for open_ctx in context.table_defs.values() {
let schema = self.schema_by_name(&open_ctx.catalog_name, &open_ctx.schema_name)?;
let table_id = open_ctx.id;
engine_table_defs.push(open_ctx.into_engine_table_def(schema.id()));
engine_table_defs.push(open_ctx.clone().into_engine_table_def(schema.id()));
related_schemas.push((table_id, schema));
}

// Open tables by table engine.
let engine_open_shard_req = engine::OpenShardRequest {
shard_id,
table_defs: engine_table_defs,
engine: request.engine,
engine: context.engine,
};
let mut shard_result = table_engine
.open_shard(engine_open_shard_req)
Expand All @@ -62,6 +130,8 @@ impl TableOperator {
let mut missing_table_count = 0_u32;
let mut open_table_errs = Vec::new();

// Iter the table results, register the success ones and collect the fail ones.
let mut table_defs = context.table_defs;
for (table_id, schema) in related_schemas {
let table_result = shard_result
.remove(&table_id)
Expand All @@ -75,10 +145,14 @@ impl TableOperator {
Ok(Some(table)) => {
schema.register_table(table);
success_count += 1;
// Success ones should be remove, we just return the failed ones for retrying.
table_defs.remove(&table_id);
}
Ok(None) => {
error!("TableOperator failed to open a missing table, table_id:{table_id}, schema_id:{:?}, shard_id:{shard_id}", schema.id());
missing_table_count += 1;
// A special failed case unable to recover by retrying, we just ignore them.
table_defs.remove(&table_id);
}
Err(e) => {
error!("TableOperator failed to open table, table_id:{table_id}, schema_id:{:?}, shard_id:{shard_id}, err:{}", schema.id(), e);
Expand All @@ -92,8 +166,10 @@ impl TableOperator {
instant.saturating_elapsed().as_millis(),
);

if missing_table_count == 0 && open_table_errs.is_empty() {
Ok(())
let result = if missing_table_count == 0 && open_table_errs.is_empty() {
Ok(OpenShardResult::Success)
} else if *context.retry < context.retry_limit && !table_defs.is_empty() {
Ok(OpenShardResult::Retry(table_defs))
} else {
let msg = format!(
"Failed to open shard, some tables open failed, shard id:{shard_id}, \
Expand All @@ -103,7 +179,10 @@ impl TableOperator {
);

TableOperatorNoCause { msg }.fail()
}
};

*context.retry += 1;
result
}

pub async fn close_shard(&self, request: CloseShardRequest, opts: CloseOptions) -> Result<()> {
Expand Down
12 changes: 10 additions & 2 deletions interpreters/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,11 @@ where
let ctx = Context::builder(RequestId::next_id(), None)
.default_catalog_and_schema(DEFAULT_CATALOG.to_string(), DEFAULT_SCHEMA.to_string())
.build();
let table_operator = TableOperator::new(catalog_manager.clone());
let table_operator = TableOperator::new(
catalog_manager.clone(),
0,
std::time::Duration::from_millis(0),
);
let table_manipulator = Arc::new(TableManipulatorImpl::new(table_operator));
let insert_factory = Factory::new(
ExecutorImpl::new(QueryConfig::default()),
Expand Down Expand Up @@ -350,7 +354,11 @@ async fn test_interpreters<T: EngineBuildContext>(engine_context: T) {
let mock = MockMetaProvider::default();
let engine = test_ctx.clone_engine();
let catalog_manager = Arc::new(build_catalog_manager(engine.clone()).await);
let table_operator = TableOperator::new(catalog_manager.clone());
let table_operator = TableOperator::new(
catalog_manager.clone(),
0,
std::time::Duration::from_millis(0),
);
let table_manipulator = Arc::new(TableManipulatorImpl::new(table_operator));

let env = Env {
Expand Down
12 changes: 10 additions & 2 deletions server/src/grpc/meta_event_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

// Meta event rpc service implementation.

use std::{sync::Arc, time::Instant};
use std::{
sync::Arc,
time::{Duration, Instant},
};

use analytic_engine::setup::OpenedWals;
use async_trait::async_trait;
Expand Down Expand Up @@ -173,7 +176,12 @@ impl<Q: QueryExecutor + 'static> MetaServiceImpl<Q> {
.catalog_manager
.default_catalog_name()
.to_string(),
table_operator: TableOperator::new(self.instance.catalog_manager.clone()),
// FIXME: use the exposed configs.
table_operator: TableOperator::new(
self.instance.catalog_manager.clone(),
5,
Duration::from_secs(5),
),
table_engine: self.instance.table_engine.clone(),
partition_table_engine: self.instance.partition_table_engine.clone(),
wal_region_closer: self.wal_region_closer.clone(),
Expand Down
6 changes: 5 additions & 1 deletion src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,11 @@ async fn build_without_meta<Q: Executor + 'static, T: WalsOpener>(
.expect("Failed to fetch table infos for opening");

let catalog_manager = Arc::new(CatalogManagerImpl::new(Arc::new(table_based_manager)));
let table_operator = TableOperator::new(catalog_manager.clone());
let table_operator = TableOperator::new(
catalog_manager.clone(),
config.analytic.replay_retry_limit,
config.analytic.replay_retry_interval.0,
);
let table_manipulator = Arc::new(catalog_based::TableManipulatorImpl::new(
table_operator.clone(),
));
Expand Down

0 comments on commit 865f9e6

Please sign in to comment.