diff --git a/Cargo.lock b/Cargo.lock index 755017be79..34e4aa6983 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -992,6 +992,7 @@ dependencies = [ "log", "snafu 0.6.10", "table_engine", + "tokio", ] [[package]] diff --git a/analytic_engine/src/lib.rs b/analytic_engine/src/lib.rs index 025845afbe..8fa5014f40 100644 --- a/analytic_engine/src/lib.rs +++ b/analytic_engine/src/lib.rs @@ -44,9 +44,16 @@ pub struct Config { /// Batch size to read records from wal to replay pub replay_batch_size: usize, + /// Batch size to replay tables pub max_replay_tables_per_batch: usize, + /// Replay retry limit + pub replay_retry_limit: usize, + + /// Replay retry interval + pub replay_retry_interval: ReadableDuration, + /// Default options for table pub table_opts: TableOptions, @@ -140,6 +147,8 @@ impl Default for Config { wal: WalStorageConfig::RocksDB(Box::default()), remote_engine_client: remote_engine_client::config::Config::default(), recover_mode: RecoverMode::TableBased, + replay_retry_limit: 5, + replay_retry_interval: ReadableDuration::millis(500), } } } diff --git a/catalog/Cargo.toml b/catalog/Cargo.toml index 3316502a15..1e99ed4b6c 100644 --- a/catalog/Cargo.toml +++ b/catalog/Cargo.toml @@ -17,3 +17,4 @@ common_util = { workspace = true } log = { workspace = true } snafu = { workspace = true } table_engine = { workspace = true } +tokio = { workspace = true } diff --git a/catalog/src/table_operator.rs b/catalog/src/table_operator.rs index c31e0b1f68..2883d474d5 100644 --- a/catalog/src/table_operator.rs +++ b/catalog/src/table_operator.rs @@ -1,17 +1,25 @@ // Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. -use std::time::Instant; +use std::{ + collections::HashMap, + time::{Duration, Instant}, +}; +use common_types::table::ShardId; use common_util::{error::BoxError, time::InstantExt}; use log::{error, info, warn}; use snafu::{OptionExt, ResultExt}; -use table_engine::{engine, table::TableRef}; +use table_engine::{ + engine, + table::{TableId, TableRef}, +}; use crate::{ manager::ManagerRef, schema::{ CloseOptions, CloseShardRequest, CloseTableRequest, CreateOptions, CreateTableRequest, DropOptions, DropTableRequest, OpenOptions, OpenShardRequest, OpenTableRequest, SchemaRef, + TableDef, }, Result, TableOperatorNoCause, TableOperatorWithCause, }; @@ -23,25 +31,85 @@ use crate::{ #[derive(Clone)] pub struct TableOperator { catalog_manager: ManagerRef, + retry_limit: usize, + retry_interval: Duration, +} + +enum OpenShardResult { + Success, + Retry(HashMap), +} + +struct OpenShardContext<'a> { + shard_id: ShardId, + table_defs: HashMap, + engine: String, + retry: &'a mut usize, + retry_limit: usize, } impl TableOperator { - pub fn new(catalog_manager: ManagerRef) -> Self { - Self { catalog_manager } + pub fn new(catalog_manager: ManagerRef, retry_limit: usize, retry_interval: Duration) -> Self { + Self { + catalog_manager, + retry_limit, + retry_interval, + } } pub async fn open_shard(&self, request: OpenShardRequest, opts: OpenOptions) -> Result<()> { + let mut retry = 0; + let mut table_defs = request + .table_defs + .into_iter() + .map(|def| (def.id, def)) + .collect::>(); + + loop { + let context = OpenShardContext { + shard_id: request.shard_id, + table_defs, + engine: request.engine.clone(), + retry: &mut retry, + retry_limit: self.retry_limit, + }; + + let once_result = self.open_shard_with_retry(context, opts.clone()).await; + match once_result { + Ok(OpenShardResult::Success) => break Ok(()), + Ok(OpenShardResult::Retry(retry_table_defs)) => { + table_defs = retry_table_defs; + + // Sleep a while before next attempt. + tokio::time::sleep(self.retry_interval).await; + continue; + } + Err(e) => break Err(e), + } + } + } + + async fn open_shard_with_retry( + &self, + context: OpenShardContext<'_>, + opts: OpenOptions, + ) -> Result { + info!( + "TableOperator retry to open shard, retry:{}, retry_limit:{}, shard_id:{}", + context.retry, context.retry_limit, context.shard_id + ); + let instant = Instant::now(); let table_engine = opts.table_engine; - let shard_id = request.shard_id; + let shard_id = context.shard_id; // Generate open requests. - let mut related_schemas = Vec::with_capacity(request.table_defs.len()); - let mut engine_table_defs = Vec::with_capacity(request.table_defs.len()); - for open_ctx in request.table_defs { + let mut related_schemas = Vec::with_capacity(context.table_defs.len()); + let mut engine_table_defs = Vec::with_capacity(context.table_defs.len()); + for open_ctx in context.table_defs.values() { let schema = self.schema_by_name(&open_ctx.catalog_name, &open_ctx.schema_name)?; let table_id = open_ctx.id; - engine_table_defs.push(open_ctx.into_engine_table_def(schema.id())); + engine_table_defs.push(open_ctx.clone().into_engine_table_def(schema.id())); related_schemas.push((table_id, schema)); } @@ -49,7 +117,7 @@ impl TableOperator { let engine_open_shard_req = engine::OpenShardRequest { shard_id, table_defs: engine_table_defs, - engine: request.engine, + engine: context.engine, }; let mut shard_result = table_engine .open_shard(engine_open_shard_req) @@ -62,6 +130,8 @@ impl TableOperator { let mut missing_table_count = 0_u32; let mut open_table_errs = Vec::new(); + // Iter the table results, register the success ones and collect the fail ones. + let mut table_defs = context.table_defs; for (table_id, schema) in related_schemas { let table_result = shard_result .remove(&table_id) @@ -75,10 +145,14 @@ impl TableOperator { Ok(Some(table)) => { schema.register_table(table); success_count += 1; + // Success ones should be remove, we just return the failed ones for retrying. + table_defs.remove(&table_id); } Ok(None) => { error!("TableOperator failed to open a missing table, table_id:{table_id}, schema_id:{:?}, shard_id:{shard_id}", schema.id()); missing_table_count += 1; + // A special failed case unable to recover by retrying, we just ignore them. + table_defs.remove(&table_id); } Err(e) => { error!("TableOperator failed to open table, table_id:{table_id}, schema_id:{:?}, shard_id:{shard_id}, err:{}", schema.id(), e); @@ -92,8 +166,10 @@ impl TableOperator { instant.saturating_elapsed().as_millis(), ); - if missing_table_count == 0 && open_table_errs.is_empty() { - Ok(()) + let result = if missing_table_count == 0 && open_table_errs.is_empty() { + Ok(OpenShardResult::Success) + } else if *context.retry < context.retry_limit && !table_defs.is_empty() { + Ok(OpenShardResult::Retry(table_defs)) } else { let msg = format!( "Failed to open shard, some tables open failed, shard id:{shard_id}, \ @@ -103,7 +179,10 @@ impl TableOperator { ); TableOperatorNoCause { msg }.fail() - } + }; + + *context.retry += 1; + result } pub async fn close_shard(&self, request: CloseShardRequest, opts: CloseOptions) -> Result<()> { diff --git a/interpreters/src/tests.rs b/interpreters/src/tests.rs index 2b2f93b184..95c32597f2 100644 --- a/interpreters/src/tests.rs +++ b/interpreters/src/tests.rs @@ -210,7 +210,11 @@ where let ctx = Context::builder(RequestId::next_id(), None) .default_catalog_and_schema(DEFAULT_CATALOG.to_string(), DEFAULT_SCHEMA.to_string()) .build(); - let table_operator = TableOperator::new(catalog_manager.clone()); + let table_operator = TableOperator::new( + catalog_manager.clone(), + 0, + std::time::Duration::from_millis(0), + ); let table_manipulator = Arc::new(TableManipulatorImpl::new(table_operator)); let insert_factory = Factory::new( ExecutorImpl::new(QueryConfig::default()), @@ -350,7 +354,11 @@ async fn test_interpreters(engine_context: T) { let mock = MockMetaProvider::default(); let engine = test_ctx.clone_engine(); let catalog_manager = Arc::new(build_catalog_manager(engine.clone()).await); - let table_operator = TableOperator::new(catalog_manager.clone()); + let table_operator = TableOperator::new( + catalog_manager.clone(), + 0, + std::time::Duration::from_millis(0), + ); let table_manipulator = Arc::new(TableManipulatorImpl::new(table_operator)); let env = Env { diff --git a/server/src/grpc/meta_event_service/mod.rs b/server/src/grpc/meta_event_service/mod.rs index fcabc3c8de..75448392df 100644 --- a/server/src/grpc/meta_event_service/mod.rs +++ b/server/src/grpc/meta_event_service/mod.rs @@ -2,7 +2,10 @@ // Meta event rpc service implementation. -use std::{sync::Arc, time::Instant}; +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; use analytic_engine::setup::OpenedWals; use async_trait::async_trait; @@ -173,7 +176,12 @@ impl MetaServiceImpl { .catalog_manager .default_catalog_name() .to_string(), - table_operator: TableOperator::new(self.instance.catalog_manager.clone()), + // FIXME: use the exposed configs. + table_operator: TableOperator::new( + self.instance.catalog_manager.clone(), + 5, + Duration::from_secs(5), + ), table_engine: self.instance.table_engine.clone(), partition_table_engine: self.instance.partition_table_engine.clone(), wal_region_closer: self.wal_region_closer.clone(), diff --git a/src/setup.rs b/src/setup.rs index 286649cea5..6379a8078f 100644 --- a/src/setup.rs +++ b/src/setup.rs @@ -292,7 +292,11 @@ async fn build_without_meta( .expect("Failed to fetch table infos for opening"); let catalog_manager = Arc::new(CatalogManagerImpl::new(Arc::new(table_based_manager))); - let table_operator = TableOperator::new(catalog_manager.clone()); + let table_operator = TableOperator::new( + catalog_manager.clone(), + config.analytic.replay_retry_limit, + config.analytic.replay_retry_interval.0, + ); let table_manipulator = Arc::new(catalog_based::TableManipulatorImpl::new( table_operator.clone(), ));