diff --git a/analytic_engine/src/instance/engine.rs b/analytic_engine/src/instance/engine.rs index 00b6ba8745..e32fa064d5 100644 --- a/analytic_engine/src/instance/engine.rs +++ b/analytic_engine/src/instance/engine.rs @@ -218,6 +218,18 @@ pub enum Error { #[snafu(display("Failed to open shard, msg:{}.\nBacktrace:\n{}", msg, backtrace))] OpenTablesOfShard { msg: String, backtrace: Backtrace }, + + #[snafu(display("Failed to replay wal, msg:{:?}, err:{}", msg, source))] + ReplayWalWithCause { + msg: Option, + source: GenericError, + }, + + #[snafu(display("Failed to replay wal, msg:{:?}.\nBacktrace:\n{}", msg, backtrace))] + ReplayWalNoCause { + msg: Option, + backtrace: Backtrace, + }, } define_result!(Error); @@ -250,7 +262,9 @@ impl From for table_engine::engine::Error { | Error::DoManifestSnapshot { .. } | Error::OpenManifest { .. } | Error::TableNotExist { .. } - | Error::OpenTablesOfShard { .. } => Self::Unexpected { + | Error::OpenTablesOfShard { .. } + | Error::ReplayWalNoCause { .. } + | Error::ReplayWalWithCause { .. } => Self::Unexpected { source: Box::new(err), }, } diff --git a/analytic_engine/src/instance/flush_compaction.rs b/analytic_engine/src/instance/flush_compaction.rs index 0dc4c6208c..4e860def74 100644 --- a/analytic_engine/src/instance/flush_compaction.rs +++ b/analytic_engine/src/instance/flush_compaction.rs @@ -130,6 +130,18 @@ pub enum Error { #[snafu(display("Other failure, msg:{}.\nBacktrace:\n{:?}", msg, backtrace))] Other { msg: String, backtrace: Backtrace }, + + #[snafu(display("Failed to run flush job, msg:{:?}, err:{}", msg, source))] + FlushJobWithCause { + msg: Option, + source: GenericError, + }, + + #[snafu(display("Failed to run flush job, msg:{:?}.\nBacktrace:\n{}", msg, backtrace))] + FlushJobNoCause { + msg: Option, + backtrace: Backtrace, + }, } define_result!(Error); @@ -163,6 +175,7 @@ pub struct TableFlushRequest { pub max_sequence: SequenceNumber, } +#[derive(Clone)] pub struct Flusher { pub space_store: SpaceStoreRef, @@ -311,7 +324,15 @@ impl FlushTask { // Start flush duration timer. let local_metrics = self.table_data.metrics.local_flush_metrics(); let _timer = local_metrics.start_flush_timer(); - self.dump_memtables(request_id, &mems_to_flush).await?; + self.dump_memtables(request_id, &mems_to_flush) + .await + .box_err() + .context(FlushJobWithCause { + msg: Some(format!( + "table:{}, table_id:{}, request_id:{request_id}", + self.table_data.name, self.table_data.id + )), + })?; self.table_data .set_last_flush_time(time::current_time_millis()); diff --git a/analytic_engine/src/instance/mod.rs b/analytic_engine/src/instance/mod.rs index 89a71ba2d6..1faf254f08 100644 --- a/analytic_engine/src/instance/mod.rs +++ b/analytic_engine/src/instance/mod.rs @@ -15,6 +15,7 @@ pub(crate) mod mem_collector; pub mod open; mod read; pub(crate) mod serial_executor; +pub mod wal_replayer; pub(crate) mod write; use std::sync::Arc; @@ -44,7 +45,7 @@ use crate::{ meta_data::cache::MetaCacheRef, }, table::data::{TableDataRef, TableShardInfo}, - TableOptions, + RecoverMode, TableOptions, }; #[allow(clippy::enum_variant_names)] @@ -159,6 +160,7 @@ pub struct Instance { /// Options for scanning sst pub(crate) scan_options: ScanOptions, pub(crate) iter_options: Option, + pub(crate) recover_mode: RecoverMode, } impl Instance { diff --git a/analytic_engine/src/instance/open.rs b/analytic_engine/src/instance/open.rs index 6c4d178fe2..70e9d75fee 100644 --- a/analytic_engine/src/instance/open.rs +++ b/analytic_engine/src/instance/open.rs @@ -3,19 +3,16 @@ //! Open logic of instance use std::{ - collections::{HashMap, VecDeque}, + collections::HashMap, sync::{Arc, RwLock}, }; -use common_types::{schema::IndexInWriterSchema, table::ShardId}; -use log::{debug, error, info, trace}; +use common_types::table::ShardId; +use log::info; use object_store::ObjectStoreRef; use snafu::ResultExt; use table_engine::{engine::TableDef, table::TableId}; -use wal::{ - log_batch::LogEntry, - manager::{ReadBoundary, ReadContext, ReadRequest, WalManager, WalManagerRef}, -}; +use wal::manager::WalManagerRef; use super::{engine::OpenTablesOfShard, flush_compaction::Flusher}; use crate::{ @@ -23,16 +20,12 @@ use crate::{ context::OpenContext, engine, instance::{ - self, - engine::{ApplyMemTable, FlushTable, OpenManifest, ReadMetaUpdate, ReadWal, Result}, - flush_compaction::TableFlushOptions, + engine::{OpenManifest, ReadMetaUpdate, Result}, mem_collector::MemUsageCollector, - serial_executor::TableOpSerialExecutor, - write::MemTableWriter, + wal_replayer::{ReplayMode, WalReplayer}, Instance, SpaceStore, }, manifest::{details::ManifestImpl, LoadRequest, Manifest, ManifestRef}, - payload::{ReadPayload, WalDecoder}, row_iter::IterOptions, space::{SpaceAndTable, SpaceRef, Spaces}, sst::{ @@ -41,6 +34,7 @@ use crate::{ }, table::data::TableDataRef, table_meta_set_impl::TableMetaSetImpl, + RecoverMode, }; const MAX_RECORD_BATCHES_IN_FLIGHT_WHEN_COMPACTION_READ: usize = 64; @@ -133,6 +127,7 @@ impl Instance { .map(|v| v.as_byte() as usize), iter_options, scan_options, + recover_mode: ctx.config.recover_mode, }); Ok(instance) @@ -150,6 +145,7 @@ impl Instance { self.replay_batch_size, self.make_flusher(), self.max_retry_flush_limit, + self.recover_mode, )?; shard_opener.open().await @@ -197,10 +193,11 @@ struct ShardOpener { shard_id: ShardId, manifest: ManifestRef, wal_manager: WalManagerRef, - states: HashMap, + stages: HashMap, wal_replay_batch_size: usize, flusher: Flusher, max_retry_flush_limit: usize, + recover_mode: RecoverMode, } impl ShardOpener { @@ -211,8 +208,9 @@ impl ShardOpener { wal_replay_batch_size: usize, flusher: Flusher, max_retry_flush_limit: usize, + recover_mode: RecoverMode, ) -> Result { - let mut states = HashMap::with_capacity(shard_context.table_ctxs.len()); + let mut stages = HashMap::with_capacity(shard_context.table_ctxs.len()); for table_ctx in shard_context.table_ctxs { let space = &table_ctx.space; let table_id = table_ctx.table_def.id; @@ -226,17 +224,18 @@ impl ShardOpener { space: table_ctx.space, }) }; - states.insert(table_id, state); + stages.insert(table_id, state); } Ok(Self { shard_id: shard_context.shard_id, manifest, wal_manager, - states, + stages, wal_replay_batch_size, flusher, max_retry_flush_limit, + recover_mode, }) } @@ -248,9 +247,9 @@ impl ShardOpener { self.recover_table_datas().await?; // Retrieve the table results and return. - let states = std::mem::take(&mut self.states); - let mut table_results = HashMap::with_capacity(states.len()); - for (table_id, state) in states { + let stages = std::mem::take(&mut self.stages); + let mut table_results = HashMap::with_capacity(stages.len()); + for (table_id, state) in stages { match state { TableOpenStage::Failed(e) => { table_results.insert(table_id, Err(e)); @@ -274,7 +273,7 @@ impl ShardOpener { /// Recover table meta data from manifest based on shard. async fn recover_table_metas(&mut self) -> Result<()> { - for (table_id, state) in self.states.iter_mut() { + for (table_id, state) in self.stages.iter_mut() { match state { // Only do the meta recovery work in `RecoverTableMeta` state. TableOpenStage::RecoverTableMeta(ctx) => { @@ -319,46 +318,65 @@ impl ShardOpener { /// Recover table data based on shard. async fn recover_table_datas(&mut self) -> Result<()> { - for state in self.states.values_mut() { - match state { + // Replay wal logs of tables. + let mut replay_table_datas = Vec::with_capacity(self.stages.len()); + for (table_id, stage) in self.stages.iter_mut() { + match stage { // Only do the wal recovery work in `RecoverTableData` state. TableOpenStage::RecoverTableData(ctx) => { - let table_data = ctx.table_data.clone(); - let read_ctx = ReadContext { - batch_size: self.wal_replay_batch_size, - ..Default::default() - }; - - let result = match Self::recover_single_table_data( - &self.flusher, - self.max_retry_flush_limit, - self.wal_manager.as_ref(), - table_data.clone(), - self.wal_replay_batch_size, - &read_ctx, - ) - .await - { - Ok(()) => Ok((table_data, ctx.space.clone())), - Err(e) => Err(e), - }; - - match result { - Ok((table_data, space)) => { - *state = TableOpenStage::Success(Some(SpaceAndTable::new( - space, table_data, - ))); - } - Err(e) => *state = TableOpenStage::Failed(e), - } + replay_table_datas.push(ctx.table_data.clone()); } // Table was found opened, or failed in meta recovery stage. TableOpenStage::Failed(_) | TableOpenStage::Success(_) => {} TableOpenStage::RecoverTableMeta(_) => { return OpenTablesOfShard { - msg: format!("unexpected table state:{state:?}"), + msg: format!( + "unexpected stage, stage:{stage:?}, table_id:{table_id}, shard_id:{}", + self.shard_id + ), } - .fail() + .fail(); + } + } + } + + let replay_mode = match self.recover_mode { + RecoverMode::TableBased => ReplayMode::TableBased, + RecoverMode::ShardBased => ReplayMode::RegionBased, + }; + let mut wal_replayer = WalReplayer::new( + &replay_table_datas, + self.shard_id, + self.wal_manager.clone(), + self.wal_replay_batch_size, + self.flusher.clone(), + self.max_retry_flush_limit, + replay_mode, + ); + let mut table_results = wal_replayer.replay().await?; + + // Process the replay results. + for table_data in replay_table_datas { + let table_id = table_data.id; + // Each `table_data` has its related `stage` in `stages`, impossible to panic + // here. + let stage = self.stages.get_mut(&table_id).unwrap(); + let failed_table_opt = table_results.remove(&table_id); + + match (&stage, failed_table_opt) { + (TableOpenStage::RecoverTableData(ctx), None) => { + let space_table = SpaceAndTable::new(ctx.space.clone(), ctx.table_data.clone()); + *stage = TableOpenStage::Success(Some(space_table)); + } + + (TableOpenStage::RecoverTableData(_), Some(e)) => { + *stage = TableOpenStage::Failed(e); + } + + (other_stage, _) => { + return OpenTablesOfShard { + msg: format!("unexpected stage, stage:{other_stage:?}, table_id:{table_id}, shard_id:{}", self.shard_id), + }.fail(); } } } @@ -398,171 +416,4 @@ impl ShardOpener { Ok(()) } - - /// Recover table data from wal. - /// - /// Called by write worker - pub(crate) async fn recover_single_table_data( - flusher: &Flusher, - max_retry_flush_limit: usize, - wal_manager: &dyn WalManager, - table_data: TableDataRef, - replay_batch_size: usize, - read_ctx: &ReadContext, - ) -> Result<()> { - debug!( - "Instance recover table from wal, replay batch size:{}, table id:{}, shard info:{:?}", - replay_batch_size, table_data.id, table_data.shard_info - ); - - let table_location = table_data.table_location(); - let wal_location = - instance::create_wal_location(table_location.id, table_location.shard_info); - let read_req = ReadRequest { - location: wal_location, - start: ReadBoundary::Excluded(table_data.current_version().flushed_sequence()), - end: ReadBoundary::Max, - }; - - // Read all wal of current table. - let mut log_iter = wal_manager - .read_batch(read_ctx, &read_req) - .await - .context(ReadWal)?; - - let mut serial_exec = table_data.serial_exec.lock().await; - let mut log_entry_buf = VecDeque::with_capacity(replay_batch_size); - loop { - // fetch entries to log_entry_buf - let decoder = WalDecoder::default(); - log_entry_buf = log_iter - .next_log_entries(decoder, log_entry_buf) - .await - .context(ReadWal)?; - - // Replay all log entries of current table - Self::replay_table_log_entries( - flusher, - max_retry_flush_limit, - &mut serial_exec, - &table_data, - &log_entry_buf, - ) - .await?; - - // No more entries. - if log_entry_buf.is_empty() { - break; - } - } - - Ok(()) - } - - /// Replay all log entries into memtable and flush if necessary. - async fn replay_table_log_entries( - flusher: &Flusher, - max_retry_flush_limit: usize, - serial_exec: &mut TableOpSerialExecutor, - table_data: &TableDataRef, - log_entries: &VecDeque>, - ) -> Result<()> { - if log_entries.is_empty() { - info!( - "Instance replay an empty table log entries, table:{}, table_id:{:?}", - table_data.name, table_data.id - ); - - // No data in wal - return Ok(()); - } - - let last_sequence = log_entries.back().unwrap().sequence; - - debug!( - "Instance replay table log entries begin, table:{}, table_id:{:?}, sequence:{}", - table_data.name, table_data.id, last_sequence - ); - - for log_entry in log_entries { - let (sequence, payload) = (log_entry.sequence, &log_entry.payload); - - // Apply to memtable - match payload { - ReadPayload::Write { row_group } => { - trace!( - "Instance replay row_group, table:{}, row_group:{:?}", - table_data.name, - row_group - ); - - let table_schema_version = table_data.schema_version(); - if table_schema_version != row_group.schema().version() { - // Data with old schema should already been flushed, but we avoid panic - // here. - error!( - "Ignore data with mismatch schema version during replaying, \ - table:{}, \ - table_id:{:?}, \ - expect:{}, \ - actual:{}, \ - last_sequence:{}, \ - sequence:{}", - table_data.name, - table_data.id, - table_schema_version, - row_group.schema().version(), - last_sequence, - sequence, - ); - - continue; - } - - let index_in_writer = - IndexInWriterSchema::for_same_schema(row_group.schema().num_columns()); - let memtable_writer = MemTableWriter::new(table_data.clone(), serial_exec); - memtable_writer - .write(sequence, &row_group.into(), index_in_writer) - .context(ApplyMemTable { - space_id: table_data.space_id, - table: &table_data.name, - table_id: table_data.id, - })?; - - // Flush the table if necessary. - if table_data.should_flush_table(serial_exec) { - let opts = TableFlushOptions { - res_sender: None, - max_retry_flush_limit, - }; - let flush_scheduler = serial_exec.flush_scheduler(); - flusher - .schedule_flush(flush_scheduler, table_data, opts) - .await - .context(FlushTable { - space_id: table_data.space_id, - table: &table_data.name, - table_id: table_data.id, - })?; - } - } - ReadPayload::AlterSchema { .. } | ReadPayload::AlterOptions { .. } => { - // Ignore records except Data. - // - // - DDL (AlterSchema and AlterOptions) should be recovered - // from Manifest on start. - } - } - } - - debug!( - "Instance replay table log entries end, table:{}, table_id:{:?}, last_sequence:{}", - table_data.name, table_data.id, last_sequence - ); - - table_data.set_last_sequence(last_sequence); - - Ok(()) - } } diff --git a/analytic_engine/src/instance/serial_executor.rs b/analytic_engine/src/instance/serial_executor.rs index 0e48ce5f18..0608f78c2e 100644 --- a/analytic_engine/src/instance/serial_executor.rs +++ b/analytic_engine/src/instance/serial_executor.rs @@ -223,6 +223,8 @@ fn on_flush_finished(schedule_sync: ScheduleSyncRef, res: &Result<()>) { *flush_state = FlushState::Ready; } Err(e) => { + error!("Failed to run flush task, err:{e}"); + schedule_sync.inc_flush_failure_count(); let err_msg = e.to_string(); *flush_state = FlushState::Failed { err_msg }; diff --git a/analytic_engine/src/instance/wal_replayer.rs b/analytic_engine/src/instance/wal_replayer.rs new file mode 100644 index 0000000000..5d494450e4 --- /dev/null +++ b/analytic_engine/src/instance/wal_replayer.rs @@ -0,0 +1,596 @@ +// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. + +//! Wal replayer + +use std::{ + collections::{HashMap, VecDeque}, + fmt::Display, + ops::Range, +}; + +use async_trait::async_trait; +use common_types::{schema::IndexInWriterSchema, table::ShardId}; +use common_util::error::BoxError; +use log::{debug, error, info, trace}; +use snafu::ResultExt; +use table_engine::table::TableId; +use tokio::sync::MutexGuard; +use wal::{ + log_batch::LogEntry, + manager::{ + ReadBoundary, ReadContext, ReadRequest, RegionId, ScanContext, ScanRequest, WalManagerRef, + }, +}; + +use crate::{ + instance::{ + self, + engine::{Error, ReplayWalWithCause, Result}, + flush_compaction::{Flusher, TableFlushOptions}, + serial_executor::TableOpSerialExecutor, + write::MemTableWriter, + }, + payload::{ReadPayload, WalDecoder}, + table::data::TableDataRef, +}; + +/// Wal replayer supporting both table based and region based +// TODO: limit the memory usage in `RegionBased` mode. +pub struct WalReplayer<'a> { + context: ReplayContext, + replay: Box, + table_datas: &'a [TableDataRef], +} + +impl<'a> WalReplayer<'a> { + pub fn new( + table_datas: &'a [TableDataRef], + shard_id: ShardId, + wal_manager: WalManagerRef, + wal_replay_batch_size: usize, + flusher: Flusher, + max_retry_flush_limit: usize, + replay_mode: ReplayMode, + ) -> Self { + let context = ReplayContext { + shard_id, + wal_manager, + wal_replay_batch_size, + flusher, + max_retry_flush_limit, + }; + + let replay = Self::build_replay(replay_mode); + + Self { + replay, + context, + table_datas, + } + } + + fn build_replay(mode: ReplayMode) -> Box { + info!("Replay wal in mode:{mode:?}"); + + match mode { + ReplayMode::RegionBased => Box::new(RegionBasedReplay), + ReplayMode::TableBased => Box::new(TableBasedReplay), + } + } + + /// Replay tables and return the failed tables and the causes. + pub async fn replay(&mut self) -> Result { + // Build replay action according to mode. + info!( + "Replay wal logs begin, context:{}, tables:{:?}", + self.context, self.table_datas + ); + let result = self.replay.run(&self.context, self.table_datas).await; + info!( + "Replay wal logs finish, context:{}, tables:{:?}", + self.context, self.table_datas, + ); + + result + } +} + +pub struct ReplayContext { + pub shard_id: ShardId, + pub wal_manager: WalManagerRef, + pub wal_replay_batch_size: usize, + pub flusher: Flusher, + pub max_retry_flush_limit: usize, +} + +impl Display for ReplayContext { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ReplayContext") + .field("shard_id", &self.shard_id) + .field("replay_batch_size", &self.wal_replay_batch_size) + .field("max_retry_flush_limit", &self.max_retry_flush_limit) + .finish() + } +} + +#[derive(Debug, Clone, Copy)] +pub enum ReplayMode { + RegionBased, + TableBased, +} + +pub type FailedTables = HashMap; + +/// Replay action, the abstract of different replay strategies +#[async_trait] +trait Replay: Send + Sync + 'static { + async fn run( + &self, + context: &ReplayContext, + table_datas: &[TableDataRef], + ) -> Result; +} + +/// Table based wal replay +struct TableBasedReplay; + +#[async_trait] +impl Replay for TableBasedReplay { + async fn run( + &self, + context: &ReplayContext, + table_datas: &[TableDataRef], + ) -> Result { + debug!("Replay wal logs on table mode, context:{context}, tables:{table_datas:?}",); + + let mut faileds = HashMap::new(); + let read_ctx = ReadContext { + batch_size: context.wal_replay_batch_size, + ..Default::default() + }; + for table_data in table_datas { + let table_id = table_data.id; + if let Err(e) = Self::recover_table_logs(context, table_data, &read_ctx).await { + faileds.insert(table_id, e); + } + } + + Ok(faileds) + } +} + +impl TableBasedReplay { + async fn recover_table_logs( + context: &ReplayContext, + table_data: &TableDataRef, + read_ctx: &ReadContext, + ) -> Result<()> { + let table_location = table_data.table_location(); + let wal_location = + instance::create_wal_location(table_location.id, table_location.shard_info); + let read_req = ReadRequest { + location: wal_location, + start: ReadBoundary::Excluded(table_data.current_version().flushed_sequence()), + end: ReadBoundary::Max, + }; + + // Read all wal of current table. + let mut log_iter = context + .wal_manager + .read_batch(read_ctx, &read_req) + .await + .box_err() + .context(ReplayWalWithCause { msg: None })?; + + let mut serial_exec = table_data.serial_exec.lock().await; + let mut log_entry_buf = VecDeque::with_capacity(context.wal_replay_batch_size); + loop { + // fetch entries to log_entry_buf + let decoder = WalDecoder::default(); + log_entry_buf = log_iter + .next_log_entries(decoder, log_entry_buf) + .await + .box_err() + .context(ReplayWalWithCause { msg: None })?; + + if log_entry_buf.is_empty() { + break; + } + + // Replay all log entries of current table + replay_table_log_entries( + &context.flusher, + context.max_retry_flush_limit, + &mut serial_exec, + table_data, + log_entry_buf.iter(), + ) + .await?; + } + + Ok(()) + } +} + +/// Region based wal replay +struct RegionBasedReplay; + +#[async_trait] +impl Replay for RegionBasedReplay { + async fn run( + &self, + context: &ReplayContext, + table_datas: &[TableDataRef], + ) -> Result { + debug!("Replay wal logs on region mode, context:{context}, tables:{table_datas:?}",); + + // Init all table results to be oks, and modify to errs when failed to replay. + let mut faileds = FailedTables::new(); + let scan_ctx = ScanContext { + batch_size: context.wal_replay_batch_size, + ..Default::default() + }; + + Self::replay_region_logs(context, table_datas, &scan_ctx, &mut faileds).await?; + + Ok(faileds) + } +} + +impl RegionBasedReplay { + /// Replay logs in same region. + /// + /// Steps: + /// + Scan all logs of region. + /// + Split logs according to table ids. + /// + Replay logs to recover data of tables. + async fn replay_region_logs( + context: &ReplayContext, + table_datas: &[TableDataRef], + scan_ctx: &ScanContext, + faileds: &mut FailedTables, + ) -> Result<()> { + // Scan all wal logs of current shard. + let scan_req = ScanRequest { + region_id: context.shard_id as RegionId, + }; + + let mut log_iter = context + .wal_manager + .scan(scan_ctx, &scan_req) + .await + .box_err() + .context(ReplayWalWithCause { msg: None })?; + let mut log_entry_buf = VecDeque::with_capacity(context.wal_replay_batch_size); + + // Lock all related tables. + let mut serial_exec_ctxs = HashMap::with_capacity(table_datas.len()); + for table_data in table_datas { + let serial_exec = table_data.serial_exec.lock().await; + let serial_exec_ctx = SerialExecContext { + table_data: table_data.clone(), + serial_exec, + }; + serial_exec_ctxs.insert(table_data.id, serial_exec_ctx); + } + + // Split and replay logs. + loop { + let decoder = WalDecoder::default(); + log_entry_buf = log_iter + .next_log_entries(decoder, log_entry_buf) + .await + .box_err() + .context(ReplayWalWithCause { msg: None })?; + + if log_entry_buf.is_empty() { + break; + } + + Self::replay_single_batch(context, &log_entry_buf, &mut serial_exec_ctxs, faileds) + .await?; + } + + Ok(()) + } + + async fn replay_single_batch( + context: &ReplayContext, + log_batch: &VecDeque>, + serial_exec_ctxs: &mut HashMap>, + faileds: &mut FailedTables, + ) -> Result<()> { + let mut table_batches = Vec::new(); + // TODO: No `group_by` method in `VecDeque`, so implement it manually here... + Self::split_log_batch_by_table(log_batch, &mut table_batches); + + // TODO: Replay logs of different tables in parallel. + for table_batch in table_batches { + // Some tables may have failed in previous replay, ignore them. + if faileds.contains_key(&table_batch.table_id) { + continue; + } + + // Replay all log entries of current table. + // Some tables may have been moved to other shards or dropped, ignore such logs. + if let Some(ctx) = serial_exec_ctxs.get_mut(&table_batch.table_id) { + let result = replay_table_log_entries( + &context.flusher, + context.max_retry_flush_limit, + &mut ctx.serial_exec, + &ctx.table_data, + log_batch.range(table_batch.range), + ) + .await; + + // If occur error, mark this table as failed and store the cause. + if let Err(e) = result { + faileds.insert(table_batch.table_id, e); + } + } + } + + Ok(()) + } + + fn split_log_batch_by_table

( + log_batch: &VecDeque>, + table_batches: &mut Vec, + ) { + table_batches.clear(); + + if log_batch.is_empty() { + return; + } + + // Split log batch by table id, for example: + // input batch: + // |1|1|2|2|2|3|3|3|3| + // + // output batches: + // |1|1|, |2|2|2|, |3|3|3|3| + let mut start_log_idx = 0usize; + let mut curr_log_idx = 0usize; + let mut start_table_id = log_batch.get(start_log_idx).unwrap().table_id; + loop { + let time_to_break = curr_log_idx == log_batch.len(); + let found_end_idx = if time_to_break { + true + } else { + let current_table_id = log_batch.get(curr_log_idx).unwrap().table_id; + current_table_id != start_table_id + }; + + if found_end_idx { + table_batches.push(TableBatch { + table_id: TableId::new(start_table_id), + range: start_log_idx..curr_log_idx, + }); + + // Step to next start idx. + start_log_idx = curr_log_idx; + start_table_id = if time_to_break { + // The final round, just set it to max as an invalid flag. + u64::MAX + } else { + log_batch.get(start_log_idx).unwrap().table_id + }; + } + + if time_to_break { + break; + } + curr_log_idx += 1; + } + } +} + +#[derive(Debug, Eq, PartialEq)] +struct TableBatch { + table_id: TableId, + range: Range, +} + +struct SerialExecContext<'a> { + table_data: TableDataRef, + serial_exec: MutexGuard<'a, TableOpSerialExecutor>, +} + +/// Replay all log entries into memtable and flush if necessary +async fn replay_table_log_entries( + flusher: &Flusher, + max_retry_flush_limit: usize, + serial_exec: &mut TableOpSerialExecutor, + table_data: &TableDataRef, + log_entries: impl Iterator>, +) -> Result<()> { + let flushed_sequence = table_data.current_version().flushed_sequence(); + debug!( + "Replay table log entries begin, table:{}, table_id:{:?}, last_sequence:{}, flushed_sequence:{flushed_sequence}", + table_data.name, table_data.id, table_data.last_sequence(), + ); + + for log_entry in log_entries { + let (sequence, payload) = (log_entry.sequence, &log_entry.payload); + + // Ignore too old logs(sequence <= `flushed_sequence`). + if sequence <= flushed_sequence { + continue; + } + + // Apply logs to memtable. + match payload { + ReadPayload::Write { row_group } => { + trace!( + "Instance replay row_group, table:{}, row_group:{:?}", + table_data.name, + row_group + ); + + // TODO: too strict check here, should be modified to like what in + // `ColumnSchema::compatible_for_write`.` + let table_schema_version = table_data.schema_version(); + if table_schema_version != row_group.schema().version() { + // Data with old schema should already been flushed, but we avoid panic + // here. + error!( + "Ignore data with mismatch schema version during replaying, \ + table:{}, \ + table_id:{:?}, \ + expect:{}, \ + actual:{}, \ + last_sequence:{}, \ + sequence:{}", + table_data.name, + table_data.id, + table_schema_version, + row_group.schema().version(), + table_data.last_sequence(), + sequence, + ); + + continue; + } + + let index_in_writer = + IndexInWriterSchema::for_same_schema(row_group.schema().num_columns()); + let memtable_writer = MemTableWriter::new(table_data.clone(), serial_exec); + memtable_writer + .write(sequence, &row_group.into(), index_in_writer) + .box_err() + .context(ReplayWalWithCause { + msg: Some(format!( + "table_id:{}, table_name:{}, space_id:{}", + table_data.space_id, table_data.name, table_data.id + )), + })?; + + // Flush the table if necessary. + if table_data.should_flush_table(serial_exec) { + let opts = TableFlushOptions { + res_sender: None, + max_retry_flush_limit, + }; + let flush_scheduler = serial_exec.flush_scheduler(); + flusher + .schedule_flush(flush_scheduler, table_data, opts) + .await + .box_err() + .context(ReplayWalWithCause { + msg: Some(format!( + "table_id:{}, table_name:{}, space_id:{}", + table_data.space_id, table_data.name, table_data.id + )), + })?; + } + } + ReadPayload::AlterSchema { .. } | ReadPayload::AlterOptions { .. } => { + // Ignore records except Data. + // + // - DDL (AlterSchema and AlterOptions) should be recovered from + // Manifest on start. + } + } + + table_data.set_last_sequence(sequence); + } + + debug!( + "Replay table log entries finish, table:{}, table_id:{:?}, last_sequence:{}, flushed_sequence:{}", + table_data.name, table_data.id, table_data.last_sequence(), table_data.current_version().flushed_sequence() + ); + + Ok(()) +} + +#[cfg(test)] +mod tests { + use std::collections::VecDeque; + + use table_engine::table::TableId; + use wal::log_batch::LogEntry; + + use crate::instance::wal_replayer::{RegionBasedReplay, TableBatch}; + + #[test] + fn test_split_log_batch_by_table() { + let test_set = test_set(); + for (test_batch, expected) in test_set { + check_split_result(&test_batch, &expected); + } + } + + fn test_set() -> Vec<(VecDeque>, Vec)> { + let test_log_batch1: VecDeque> = VecDeque::from([ + LogEntry { + table_id: 0, + sequence: 1, + payload: 0, + }, + LogEntry { + table_id: 0, + sequence: 2, + payload: 0, + }, + LogEntry { + table_id: 0, + sequence: 3, + payload: 0, + }, + LogEntry { + table_id: 1, + sequence: 1, + payload: 0, + }, + LogEntry { + table_id: 1, + sequence: 2, + payload: 0, + }, + LogEntry { + table_id: 2, + sequence: 1, + payload: 0, + }, + ]); + let expected1 = vec![ + TableBatch { + table_id: TableId::new(0), + range: 0..3, + }, + TableBatch { + table_id: TableId::new(1), + range: 3..5, + }, + TableBatch { + table_id: TableId::new(2), + range: 5..6, + }, + ]; + + let test_log_batch2: VecDeque> = VecDeque::from([LogEntry { + table_id: 0, + sequence: 1, + payload: 0, + }]); + let expected2 = vec![TableBatch { + table_id: TableId::new(0), + range: 0..1, + }]; + + let test_log_batch3: VecDeque> = VecDeque::default(); + let expected3 = vec![]; + + vec![ + (test_log_batch1, expected1), + (test_log_batch2, expected2), + (test_log_batch3, expected3), + ] + } + + fn check_split_result(batch: &VecDeque>, expected: &[TableBatch]) { + let mut table_batches = Vec::new(); + RegionBasedReplay::split_log_batch_by_table(batch, &mut table_batches); + assert_eq!(&table_batches, expected); + } +} diff --git a/analytic_engine/src/lib.rs b/analytic_engine/src/lib.rs index 9e95d8e97b..025845afbe 100644 --- a/analytic_engine/src/lib.rs +++ b/analytic_engine/src/lib.rs @@ -97,9 +97,21 @@ pub struct Config { /// + Kafka pub wal: WalStorageConfig, + /// Recover mode + /// + /// + TableBased, tables on same shard will be recovered table by table. + /// + ShardBased, tables on same shard will be recovered together. + pub recover_mode: RecoverMode, + pub remote_engine_client: remote_engine_client::config::Config, } +#[derive(Debug, Clone, Copy, Deserialize, Serialize)] +pub enum RecoverMode { + TableBased, + ShardBased, +} + impl Default for Config { fn default() -> Self { Self { @@ -127,6 +139,7 @@ impl Default for Config { max_bytes_per_write_batch: None, wal: WalStorageConfig::RocksDB(Box::default()), remote_engine_client: remote_engine_client::config::Config::default(), + recover_mode: RecoverMode::TableBased, } } } diff --git a/analytic_engine/src/tests/alter_test.rs b/analytic_engine/src/tests/alter_test.rs index 614cab7541..c6f4b08eec 100644 --- a/analytic_engine/src/tests/alter_test.rs +++ b/analytic_engine/src/tests/alter_test.rs @@ -20,24 +20,25 @@ use crate::{ tests::{ row_util, table::{self, FixedSchemaTable}, - util::{ - EngineBuildContext, MemoryEngineBuildContext, Null, RocksDBEngineBuildContext, - TestContext, TestEnv, - }, + util::{memory_ctxs, rocksdb_ctxs, EngineBuildContext, Null, TestContext, TestEnv}, }, }; #[test] fn test_alter_table_add_column_rocks() { - let rocksdb_ctx = RocksDBEngineBuildContext::default(); - test_alter_table_add_column(rocksdb_ctx); + let rocksdb_ctxs = rocksdb_ctxs(); + for ctx in rocksdb_ctxs { + test_alter_table_add_column(ctx); + } } #[ignore = "Enable this test when manifest use another snapshot implementation"] #[test] fn test_alter_table_add_column_mem_wal() { - let memory_ctx = MemoryEngineBuildContext::default(); - test_alter_table_add_column(memory_ctx); + let memory_ctxs = memory_ctxs(); + for ctx in memory_ctxs { + test_alter_table_add_column(ctx); + } } fn test_alter_table_add_column(engine_context: T) { @@ -370,15 +371,19 @@ async fn check_read_row_group( #[test] fn test_alter_table_options_rocks() { - let rocksdb_ctx = RocksDBEngineBuildContext::default(); - test_alter_table_options(rocksdb_ctx); + let rocksdb_ctxs = rocksdb_ctxs(); + for ctx in rocksdb_ctxs { + test_alter_table_options(ctx); + } } #[ignore = "Enable this test when manifest use another snapshot implementation"] #[test] fn test_alter_table_options_mem_wal() { - let memory_ctx = MemoryEngineBuildContext::default(); - test_alter_table_options(memory_ctx); + let memory_ctxs = memory_ctxs(); + for ctx in memory_ctxs { + test_alter_table_options(ctx); + } } fn test_alter_table_options(engine_context: T) { diff --git a/analytic_engine/src/tests/drop_test.rs b/analytic_engine/src/tests/drop_test.rs index 5dd0be033a..c915ae1482 100644 --- a/analytic_engine/src/tests/drop_test.rs +++ b/analytic_engine/src/tests/drop_test.rs @@ -10,7 +10,8 @@ use table_engine::table::AlterSchemaRequest; use crate::tests::{ table::FixedSchemaTable, util::{ - self, EngineBuildContext, MemoryEngineBuildContext, RocksDBEngineBuildContext, TestEnv, + self, memory_ctxs, rocksdb_ctxs, EngineBuildContext, MemoryEngineBuildContext, + RocksDBEngineBuildContext, TestEnv, }, }; @@ -209,14 +210,18 @@ fn test_drop_create_same_table_case(flush: bool, engine_c #[test] fn test_drop_create_same_table_rocks() { - let rocksdb_ctx = RocksDBEngineBuildContext::default(); - test_drop_create_same_table(rocksdb_ctx); + let rocksdb_ctxs = rocksdb_ctxs(); + for ctx in rocksdb_ctxs { + test_drop_create_same_table(ctx); + } } #[test] fn test_drop_create_same_table_mem_wal() { - let memory_ctx = MemoryEngineBuildContext::default(); - test_drop_create_same_table(memory_ctx); + let memory_ctxs = memory_ctxs(); + for ctx in memory_ctxs { + test_drop_create_same_table(ctx); + } } fn test_drop_create_same_table(engine_context: T) { @@ -227,14 +232,18 @@ fn test_drop_create_same_table(engine_context: T) { #[test] fn test_alter_schema_drop_create_rocks() { - let rocksdb_ctx = RocksDBEngineBuildContext::default(); - test_alter_schema_drop_create(rocksdb_ctx); + let rocksdb_ctxs = rocksdb_ctxs(); + for ctx in rocksdb_ctxs { + test_alter_schema_drop_create(ctx); + } } #[test] fn test_alter_schema_drop_create_mem_wal() { - let memory_ctx = MemoryEngineBuildContext::default(); - test_alter_schema_drop_create(memory_ctx); + let memory_ctxs = memory_ctxs(); + for ctx in memory_ctxs { + test_alter_schema_drop_create(ctx); + } } fn test_alter_schema_drop_create(engine_context: T) { @@ -284,14 +293,18 @@ fn test_alter_schema_drop_create(engine_context: T) { #[test] fn test_alter_options_drop_create_rocks() { - let rocksdb_ctx = RocksDBEngineBuildContext::default(); - test_alter_options_drop_create(rocksdb_ctx); + let rocksdb_ctxs = rocksdb_ctxs(); + for ctx in rocksdb_ctxs { + test_alter_options_drop_create(ctx); + } } #[test] fn test_alter_options_drop_create_mem_wal() { - let memory_ctx = MemoryEngineBuildContext::default(); - test_alter_options_drop_create(memory_ctx); + let memory_ctxs = memory_ctxs(); + for ctx in memory_ctxs { + test_alter_options_drop_create(ctx); + } } fn test_alter_options_drop_create(engine_context: T) { diff --git a/analytic_engine/src/tests/read_write_test.rs b/analytic_engine/src/tests/read_write_test.rs index 783f46aa42..7092fd98c6 100644 --- a/analytic_engine/src/tests/read_write_test.rs +++ b/analytic_engine/src/tests/read_write_test.rs @@ -1,4 +1,4 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. //! Read write test. @@ -11,22 +11,23 @@ use table_engine::table::ReadOrder; use crate::{ setup::WalsOpener, table_options, - tests::util::{ - self, EngineBuildContext, MemoryEngineBuildContext, RocksDBEngineBuildContext, TestContext, - TestEnv, - }, + tests::util::{self, memory_ctxs, rocksdb_ctxs, EngineBuildContext, TestContext, TestEnv}, }; #[test] fn test_multi_table_read_write_rocks() { - let rocksdb_ctx = RocksDBEngineBuildContext::default(); - test_multi_table_read_write(rocksdb_ctx); + let rocksdb_ctxs = rocksdb_ctxs(); + for ctx in rocksdb_ctxs { + test_multi_table_read_write(ctx); + } } #[test] fn test_multi_table_read_write_mem_wal() { - let memory_ctx = MemoryEngineBuildContext::default(); - test_multi_table_read_write(memory_ctx); + let memory_ctxs = memory_ctxs(); + for ctx in memory_ctxs { + test_multi_table_read_write(ctx); + } } fn test_multi_table_read_write(engine_context: T) { @@ -171,14 +172,18 @@ fn test_multi_table_read_write(engine_context: T) { #[test] fn test_table_write_read_rocks() { - let rocksdb_ctx = RocksDBEngineBuildContext::default(); - test_table_write_read(rocksdb_ctx); + let rocksdb_ctxs = rocksdb_ctxs(); + for ctx in rocksdb_ctxs { + test_table_write_read(ctx); + } } #[test] fn test_table_write_read_mem_wal() { - let memory_ctx = MemoryEngineBuildContext::default(); - test_table_write_read(memory_ctx); + let memory_ctxs = memory_ctxs(); + for ctx in memory_ctxs { + test_table_write_read(ctx); + } } fn test_table_write_read(engine_context: T) { @@ -192,7 +197,7 @@ fn test_table_write_read(engine_context: T) { let fixed_schema_table = test_ctx.create_fixed_schema_table(test_table1).await; let start_ms = test_ctx.start_ms(); - let rows = [ + let rows: [(&str, Timestamp, &str, f64, f64, &str); 3] = [ ( "key1", Timestamp::new(start_ms), @@ -250,14 +255,18 @@ fn test_table_write_read(engine_context: T) { #[test] fn test_table_write_get_rocks() { - let rocksdb_ctx = RocksDBEngineBuildContext::default(); - test_table_write_get(rocksdb_ctx); + let rocksdb_ctxs = rocksdb_ctxs(); + for ctx in rocksdb_ctxs { + test_table_write_get(ctx); + } } #[test] fn test_table_write_get_mem_wal() { - let memory_ctx = MemoryEngineBuildContext::default(); - test_table_write_get(memory_ctx); + let memory_ctxs = memory_ctxs(); + for ctx in memory_ctxs { + test_table_write_get(ctx); + } } fn test_table_write_get(engine_context: T) { @@ -327,22 +336,28 @@ fn test_table_write_get(engine_context: T) { #[test] fn test_table_write_get_override_rocks() { - test_table_write_get_override::(); + let rocksdb_ctxs = rocksdb_ctxs(); + for ctx in rocksdb_ctxs { + test_table_write_get_override(ctx); + } } #[test] fn test_table_write_get_override_mem_wal() { - test_table_write_get_override::(); + let memory_ctxs = memory_ctxs(); + for ctx in memory_ctxs { + test_table_write_get_override(ctx); + } } -fn test_table_write_get_override() { - test_table_write_get_override_case::(FlushPoint::NoFlush, T::default()); +fn test_table_write_get_override(engine_context: T) { + test_table_write_get_override_case::(FlushPoint::NoFlush, engine_context.clone()); - test_table_write_get_override_case::(FlushPoint::AfterFirstWrite, T::default()); + test_table_write_get_override_case::(FlushPoint::AfterFirstWrite, engine_context.clone()); - test_table_write_get_override_case::(FlushPoint::AfterOverwrite, T::default()); + test_table_write_get_override_case::(FlushPoint::AfterOverwrite, engine_context.clone()); - test_table_write_get_override_case::(FlushPoint::FirstAndOverwrite, T::default()); + test_table_write_get_override_case::(FlushPoint::FirstAndOverwrite, engine_context); } #[derive(Debug)] @@ -506,16 +521,20 @@ fn test_table_write_get_override_case( #[test] fn test_db_write_buffer_size_rocks() { - let rocksdb_ctx = RocksDBEngineBuildContext::default(); - // Use different table name to avoid metrics collision. - test_db_write_buffer_size("test_db_write_buffer_size_rocks", rocksdb_ctx); + let rocksdb_ctxs = rocksdb_ctxs(); + for ctx in rocksdb_ctxs { + // Use different table name to avoid metrics collision. + test_db_write_buffer_size("test_db_write_buffer_size_rocks", ctx); + } } #[test] fn test_db_write_buffer_size_mem_wal() { - let memory_ctx = MemoryEngineBuildContext::default(); - // Use different table name to avoid metrics collision. - test_db_write_buffer_size("test_db_write_buffer_size_mem_wal", memory_ctx); + let memory_ctxs = memory_ctxs(); + for ctx in memory_ctxs { + // Use different table name to avoid metrics collision. + test_db_write_buffer_size("test_db_write_buffer_size_mem_wal", ctx); + } } fn test_db_write_buffer_size(table_name: &str, engine_context: T) { @@ -527,16 +546,20 @@ fn test_db_write_buffer_size(table_name: &str, engine_con #[test] fn test_space_write_buffer_size_rocks() { - let rocksdb_ctx = RocksDBEngineBuildContext::default(); - // Use different table name to avoid metrics collision. - test_space_write_buffer_size("test_space_write_buffer_size_rocks", rocksdb_ctx); + let rocksdb_ctxs = rocksdb_ctxs(); + for ctx in rocksdb_ctxs { + // Use different table name to avoid metrics collision. + test_space_write_buffer_size("test_space_write_buffer_size_rocks", ctx); + } } #[test] fn test_space_write_buffer_size_mem_wal() { - let memory_ctx = MemoryEngineBuildContext::default(); - // Use different table name to avoid metrics collision. - test_space_write_buffer_size("test_space_write_buffer_size_mem_wal", memory_ctx); + let memory_ctxs = memory_ctxs(); + for ctx in memory_ctxs { + // Use different table name to avoid metrics collision. + test_space_write_buffer_size("test_space_write_buffer_size_mem_wal", ctx); + } } fn test_space_write_buffer_size(table_name: &str, engine_context: T) { @@ -660,14 +683,18 @@ fn test_write_buffer_size_overflow( #[test] fn test_table_write_read_reverse_rocks() { - let rocksdb_ctx = RocksDBEngineBuildContext::default(); - test_table_write_read_reverse(rocksdb_ctx); + let rocksdb_ctxs = rocksdb_ctxs(); + for ctx in rocksdb_ctxs { + test_table_write_read_reverse(ctx); + } } #[test] fn test_table_write_read_reverse_mem_wal() { - let memory_ctx = MemoryEngineBuildContext::default(); - test_table_write_read_reverse(memory_ctx); + let memory_ctxs = memory_ctxs(); + for ctx in memory_ctxs { + test_table_write_read_reverse(ctx); + } } fn test_table_write_read_reverse(engine_context: T) { @@ -746,15 +773,19 @@ fn test_table_write_read_reverse(engine_context: T) { #[test] #[ignore = "https://github.com/CeresDB/ceresdb/issues/313"] fn test_table_write_read_reverse_after_flush_rocks() { - let rocksdb_ctx = RocksDBEngineBuildContext::default(); - test_table_write_read_reverse_after_flush(rocksdb_ctx); + let rocksdb_ctxs = rocksdb_ctxs(); + for ctx in rocksdb_ctxs { + test_table_write_read_reverse_after_flush(ctx); + } } #[test] #[ignore = "https://github.com/CeresDB/ceresdb/issues/313"] fn test_table_write_read_reverse_after_flush_mem_wal() { - let memory_ctx = MemoryEngineBuildContext::default(); - test_table_write_read_reverse_after_flush(memory_ctx); + let memory_ctxs = memory_ctxs(); + for ctx in memory_ctxs { + test_table_write_read_reverse_after_flush(ctx); + } } fn test_table_write_read_reverse_after_flush(engine_context: T) { diff --git a/analytic_engine/src/tests/util.rs b/analytic_engine/src/tests/util.rs index 0e2c897ecc..0cc8fb94e3 100644 --- a/analytic_engine/src/tests/util.rs +++ b/analytic_engine/src/tests/util.rs @@ -8,7 +8,7 @@ use common_types::{ datum::Datum, record_batch::RecordBatch, row::{Row, RowGroup}, - table::DEFAULT_SHARD_ID, + table::{ShardId, DEFAULT_SHARD_ID}, time::Timestamp, }; use common_util::{ @@ -20,8 +20,8 @@ use log::info; use object_store::config::{LocalOptions, ObjectStoreOptions, StorageOptions}; use table_engine::{ engine::{ - CreateTableRequest, DropTableRequest, EngineRuntimes, OpenTableRequest, - Result as EngineResult, TableEngineRef, + CreateTableRequest, DropTableRequest, EngineRuntimes, OpenShardRequest, OpenTableRequest, + Result as EngineResult, TableDef, TableEngineRef, }, table::{ AlterSchemaRequest, FlushRequest, GetRequest, ReadOrder, ReadRequest, Result, SchemaId, @@ -33,7 +33,7 @@ use tempfile::TempDir; use crate::{ setup::{EngineBuilder, MemWalsOpener, OpenedWals, RocksDBWalsOpener, WalsOpener}, tests::table::{self, FixedSchemaTable, RowTuple}, - Config, RocksDBConfig, WalStorageConfig, + Config, RecoverMode, RocksDBConfig, WalStorageConfig, }; const DAY_MS: i64 = 24 * 60 * 60 * 1000; @@ -113,6 +113,7 @@ pub struct TestContext { opened_wals: Option, schema_id: SchemaId, last_table_seq: u32, + open_method: OpenTablesMethod, name_to_tables: HashMap, } @@ -169,8 +170,69 @@ impl TestContext { self.open().await; - for (id, name) in table_infos { - self.open_table(id, name).await; + match self.open_method { + OpenTablesMethod::WithOpenTable => { + for (id, name) in table_infos { + self.open_table(id, name).await; + } + } + OpenTablesMethod::WithOpenShard => { + self.open_tables_of_shard(table_infos, DEFAULT_SHARD_ID) + .await; + } + } + } + + pub async fn reopen_with_tables_of_shard(&mut self, tables: &[&str], shard_id: ShardId) { + let table_infos: Vec<_> = tables + .iter() + .map(|name| { + let table_id = self.name_to_tables.get(*name).unwrap().id(); + (table_id, *name) + }) + .collect(); + { + // Close all tables. + self.name_to_tables.clear(); + + // Close engine. + let engine = self.engine.take().unwrap(); + engine.close().await.unwrap(); + } + + self.open().await; + + self.open_tables_of_shard(table_infos, shard_id).await + } + + async fn open_tables_of_shard(&mut self, table_infos: Vec<(TableId, &str)>, shard_id: ShardId) { + let table_defs = table_infos + .into_iter() + .map(|table| TableDef { + catalog_name: "ceresdb".to_string(), + schema_name: "public".to_string(), + schema_id: self.schema_id, + id: table.0, + name: table.1.to_string(), + }) + .collect(); + + let open_shard_request = OpenShardRequest { + shard_id, + table_defs, + engine: table_engine::ANALYTIC_ENGINE_TYPE.to_string(), + }; + + let tables = self + .engine() + .open_shard(open_shard_request) + .await + .unwrap() + .into_values() + .map(|result| result.unwrap().unwrap()); + + for table in tables { + self.name_to_tables.insert(table.name().to_string(), table); } } @@ -368,6 +430,12 @@ impl TestContext { } } +#[derive(Clone, Copy)] +pub enum OpenTablesMethod { + WithOpenTable, + WithOpenShard, +} + impl TestContext { pub fn config_mut(&mut self) -> &mut Config { &mut self.config @@ -405,6 +473,7 @@ impl TestEnv { schema_id: SchemaId::from_u32(100), last_table_seq: 1, name_to_tables: HashMap::new(), + open_method: build_context.open_method(), } } @@ -474,10 +543,22 @@ pub trait EngineBuildContext: Clone + Default { fn wals_opener(&self) -> Self::WalsOpener; fn config(&self) -> Config; + fn open_method(&self) -> OpenTablesMethod; } pub struct RocksDBEngineBuildContext { config: Config, + open_method: OpenTablesMethod, +} + +impl RocksDBEngineBuildContext { + pub fn new(mode: RecoverMode, open_method: OpenTablesMethod) -> Self { + let mut context = Self::default(); + context.config.recover_mode = mode; + context.open_method = open_method; + + context + } } impl Default for RocksDBEngineBuildContext { @@ -504,7 +585,10 @@ impl Default for RocksDBEngineBuildContext { ..Default::default() }; - Self { config } + Self { + config, + open_method: OpenTablesMethod::WithOpenTable, + } } } @@ -531,7 +615,10 @@ impl Clone for RocksDBEngineBuildContext { ..Default::default() })); - Self { config } + Self { + config, + open_method: self.open_method, + } } } @@ -545,11 +632,26 @@ impl EngineBuildContext for RocksDBEngineBuildContext { fn config(&self) -> Config { self.config.clone() } + + fn open_method(&self) -> OpenTablesMethod { + self.open_method + } } #[derive(Clone)] pub struct MemoryEngineBuildContext { config: Config, + open_method: OpenTablesMethod, +} + +impl MemoryEngineBuildContext { + pub fn new(mode: RecoverMode, open_method: OpenTablesMethod) -> Self { + let mut context = Self::default(); + context.config.recover_mode = mode; + context.open_method = open_method; + + context + } } impl Default for MemoryEngineBuildContext { @@ -572,7 +674,10 @@ impl Default for MemoryEngineBuildContext { ..Default::default() }; - Self { config } + Self { + config, + open_method: OpenTablesMethod::WithOpenTable, + } } } @@ -586,4 +691,26 @@ impl EngineBuildContext for MemoryEngineBuildContext { fn config(&self) -> Config { self.config.clone() } + + fn open_method(&self) -> OpenTablesMethod { + self.open_method + } +} + +pub fn rocksdb_ctxs() -> Vec { + vec![ + RocksDBEngineBuildContext::new(RecoverMode::TableBased, OpenTablesMethod::WithOpenTable), + RocksDBEngineBuildContext::new(RecoverMode::ShardBased, OpenTablesMethod::WithOpenTable), + RocksDBEngineBuildContext::new(RecoverMode::TableBased, OpenTablesMethod::WithOpenShard), + RocksDBEngineBuildContext::new(RecoverMode::ShardBased, OpenTablesMethod::WithOpenShard), + ] +} + +pub fn memory_ctxs() -> Vec { + vec![ + MemoryEngineBuildContext::new(RecoverMode::TableBased, OpenTablesMethod::WithOpenTable), + MemoryEngineBuildContext::new(RecoverMode::ShardBased, OpenTablesMethod::WithOpenTable), + MemoryEngineBuildContext::new(RecoverMode::TableBased, OpenTablesMethod::WithOpenShard), + MemoryEngineBuildContext::new(RecoverMode::ShardBased, OpenTablesMethod::WithOpenShard), + ] } diff --git a/table_engine/src/table.rs b/table_engine/src/table.rs index 8d7638b685..821bdb6195 100644 --- a/table_engine/src/table.rs +++ b/table_engine/src/table.rs @@ -249,6 +249,7 @@ impl From for TableSeq { pub struct TableId(u64); impl TableId { + pub const MAX: TableId = TableId(u64::MAX); /// Min table id. pub const MIN: TableId = TableId(0); diff --git a/wal/src/message_queue_impl/region.rs b/wal/src/message_queue_impl/region.rs index 49f331dfac..c38f87b4bb 100644 --- a/wal/src/message_queue_impl/region.rs +++ b/wal/src/message_queue_impl/region.rs @@ -586,7 +586,7 @@ impl Region { table_id ); - inner.mark_delete_to(table_id, sequence_num).await.unwrap(); + inner.mark_delete_to(table_id, sequence_num).await?; ( inner.make_meta_snapshot().await,