From 106f8180ac9bfbb85d471e5b69cdf761801e49ed Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Thu, 23 Feb 2023 15:50:50 +0800 Subject: [PATCH 01/10] refactor: remove version of region in WAL --- analytic_engine/src/instance/close.rs | 1 - analytic_engine/src/instance/mod.rs | 6 +- analytic_engine/src/instance/open.rs | 1 - analytic_engine/src/manifest/details.rs | 39 +---- analytic_engine/src/manifest/mod.rs | 1 - benchmarks/src/wal_write_bench.rs | 2 +- wal/src/manager.rs | 42 +---- wal/src/message_queue_impl/namespace.rs | 45 ++--- wal/src/message_queue_impl/test_util.rs | 2 +- wal/src/rocks_impl/manager.rs | 8 +- wal/src/table_kv_impl/namespace.rs | 14 +- wal/src/table_kv_impl/table_unit.rs | 4 +- wal/src/tests/read_write.rs | 222 +++++++----------------- 13 files changed, 109 insertions(+), 278 deletions(-) diff --git a/analytic_engine/src/instance/close.rs b/analytic_engine/src/instance/close.rs index 78ca35b4a6..0f46b7c2b2 100644 --- a/analytic_engine/src/instance/close.rs +++ b/analytic_engine/src/instance/close.rs @@ -76,7 +76,6 @@ impl Instance { let snapshot_request = SnapshotRequest { space_id: space.id, table_id: table_data.id, - cluster_version: table_data.shard_info.cluster_version, shard_id: table_data.shard_info.shard_id, }; self.space_store diff --git a/analytic_engine/src/instance/mod.rs b/analytic_engine/src/instance/mod.rs index c93acf6b5e..7575ecd41f 100644 --- a/analytic_engine/src/instance/mod.rs +++ b/analytic_engine/src/instance/mod.rs @@ -231,9 +231,5 @@ pub type InstanceRef = Arc; #[inline] pub(crate) fn create_wal_location(table_id: TableId, shard_info: TableShardInfo) -> WalLocation { - WalLocation::new( - shard_info.shard_id as u64, - shard_info.cluster_version, - table_id, - ) + WalLocation::new(shard_info.shard_id as u64, table_id) } diff --git a/analytic_engine/src/instance/open.rs b/analytic_engine/src/instance/open.rs index 7638a86e30..ed1ee8d0b8 100644 --- a/analytic_engine/src/instance/open.rs +++ b/analytic_engine/src/instance/open.rs @@ -216,7 +216,6 @@ impl Instance { let load_req = LoadRequest { space_id, table_id, - cluster_version: request.cluster_version, shard_id: request.shard_id, }; let manifest_data = self diff --git a/analytic_engine/src/manifest/details.rs b/analytic_engine/src/manifest/details.rs index a5da7003bf..fbf5349511 100644 --- a/analytic_engine/src/manifest/details.rs +++ b/analytic_engine/src/manifest/details.rs @@ -291,11 +291,7 @@ impl Manifest for ManifestImpl { let table_id = request.meta_update.table_id(); let shard_id = request.shard_info.shard_id; - let location = WalLocation::new( - shard_id as u64, - request.shard_info.cluster_version, - table_id.as_u64(), - ); + let location = WalLocation::new(shard_id as u64, table_id.as_u64()); let space_id = request.meta_update.space_id(); let table_id = request.meta_update.table_id(); self.store_update_to_wal(request.meta_update, location) @@ -310,11 +306,7 @@ impl Manifest for ManifestImpl { async fn load_data(&self, load_req: &LoadRequest) -> GenericResult> { info!("Manifest load data, request:{:?}", load_req); - let location = WalLocation::new( - load_req.shard_id as u64, - load_req.cluster_version, - load_req.table_id.as_u64(), - ); + let location = WalLocation::new(load_req.shard_id as u64, load_req.table_id.as_u64()); let log_store = WalBasedLogStore { opts: self.opts.clone(), @@ -340,11 +332,7 @@ impl Manifest for ManifestImpl { info!("Manifest do snapshot, request:{:?}", request); let table_id = request.table_id; - let location = WalLocation::new( - request.shard_id as u64, - request.cluster_version, - table_id.as_u64(), - ); + let location = WalLocation::new(request.shard_id as u64, table_id.as_u64()); let space_id = request.space_id; let table_id = request.table_id; @@ -1078,7 +1066,6 @@ mod tests { let load_req = LoadRequest { table_id, shard_id: DEFAULT_SHARD_ID, - cluster_version: DEFAULT_CLUSTER_VERSION, space_id: ctx.schema_id.as_u32(), }; let expected_table_manifest_data = manifest_data_builder.build(); @@ -1161,11 +1148,7 @@ mod tests { expr: Bytes::from("test"), linear: false, })); - let location = WalLocation::new( - DEFAULT_SHARD_ID as u64, - DEFAULT_CLUSTER_VERSION, - table_id.as_u64(), - ); + let location = WalLocation::new(DEFAULT_SHARD_ID as u64, table_id.as_u64()); let mut manifest_data_builder = TableManifestDataBuilder::default(); let manifest = ctx.open_manifest().await; ctx.add_table_with_manifest( @@ -1191,7 +1174,6 @@ mod tests { let load_req = LoadRequest { space_id: ctx.schema_id.as_u32(), table_id, - cluster_version: DEFAULT_CLUSTER_VERSION, shard_id: DEFAULT_SHARD_ID, }; ctx.check_table_manifest_data_with_manifest( @@ -1212,7 +1194,6 @@ mod tests { let load_req = LoadRequest { space_id: ctx.schema_id.as_u32(), table_id, - cluster_version: DEFAULT_CLUSTER_VERSION, shard_id: table_id.as_u64() as u32, }; let mut manifest_data_builder = TableManifestDataBuilder::default(); @@ -1236,11 +1217,7 @@ mod tests { ) .await; - let location = WalLocation::new( - DEFAULT_SHARD_ID as u64, - DEFAULT_CLUSTER_VERSION, - table_id.as_u64(), - ); + let location = WalLocation::new(DEFAULT_SHARD_ID as u64, table_id.as_u64()); manifest .maybe_do_snapshot(ctx.schema_id.as_u32(), table_id, location, true) .await @@ -1369,11 +1346,7 @@ mod tests { input_updates: Vec, updates_after_snapshot: Vec, ) { - let location = WalLocation::new( - DEFAULT_SHARD_ID as u64, - DEFAULT_CLUSTER_VERSION, - table_id.as_u64(), - ); + let location = WalLocation::new(DEFAULT_SHARD_ID as u64, table_id.as_u64()); let log_store = MemLogStore::from_updates(&input_updates); let snapshot_store = MemSnapshotStore::new(); diff --git a/analytic_engine/src/manifest/mod.rs b/analytic_engine/src/manifest/mod.rs index 256fa30156..e454b828c7 100644 --- a/analytic_engine/src/manifest/mod.rs +++ b/analytic_engine/src/manifest/mod.rs @@ -22,7 +22,6 @@ use crate::{ pub struct LoadRequest { pub space_id: SpaceId, pub table_id: TableId, - pub cluster_version: u64, pub shard_id: ShardId, } diff --git a/benchmarks/src/wal_write_bench.rs b/benchmarks/src/wal_write_bench.rs index 5b27374e22..d31e1546cc 100644 --- a/benchmarks/src/wal_write_bench.rs +++ b/benchmarks/src/wal_write_bench.rs @@ -75,7 +75,7 @@ impl WalWriteBench { .expect("should succeed to open WalNamespaceImpl(Memory)"); let values = self.build_value_vec(); - let wal_encoder = LogBatchEncoder::create(WalLocation::new(1, 1, 1)); + let wal_encoder = LogBatchEncoder::create(WalLocation::new(1, 1)); let log_batch = wal_encoder .encode_batch::>(values.as_slice()) .expect("should succeed to encode payload batch"); diff --git a/wal/src/manager.rs b/wal/src/manager.rs index 981fa44098..76a0f30631 100644 --- a/wal/src/manager.rs +++ b/wal/src/manager.rs @@ -7,7 +7,7 @@ use std::{collections::VecDeque, fmt, sync::Arc, time::Duration}; use async_trait::async_trait; pub use common_types::SequenceNumber; use common_types::{ - table::{TableId, DEFAULT_CLUSTER_VERSION, DEFAULT_SHARD_ID}, + table::{TableId, DEFAULT_SHARD_ID}, MAX_SEQUENCE_NUMBER, MIN_SEQUENCE_NUMBER, }; use common_util::{error::BoxError, runtime::Runtime}; @@ -126,44 +126,24 @@ pub mod error { define_result!(Error); } +pub type RegionId = u64; + /// Decide where to write logs #[derive(Debug, Default, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)] pub struct WalLocation { - pub versioned_region_id: VersionedRegionId, + pub region_id: RegionId, pub table_id: TableId, } impl WalLocation { - pub fn new(region_id: u64, region_version: u64, table_id: TableId) -> Self { - let versioned_region_id = VersionedRegionId { - version: region_version, - id: region_id, - }; - + pub fn new(region_id: RegionId, table_id: TableId) -> Self { WalLocation { - versioned_region_id, + region_id, table_id, } } } -/// Region id with version -/// -/// Region is used to describe a set of table's log unit. -/// -/// The `id` is used to identify the `Region`, can be mapped to table's related -/// information(e.g. `shard id`, `table id`). -/// -/// The `version` is introduced for solving the following bug: -/// https://github.com/CeresDB/ceresdb/issues/441. -/// It may be mapped to cluster version(while shard moved from nodes, -/// it may changed to mark this moving) now. -#[derive(Debug, Default, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)] -pub struct VersionedRegionId { - pub version: u64, - pub id: u64, -} - #[derive(Debug, Clone)] pub struct WriteContext { /// Timeout to write wal and it only takes effect when writing to a Wal on a @@ -258,11 +238,7 @@ pub struct ReadRequest { impl Default for ReadRequest { fn default() -> Self { Self { - location: WalLocation::new( - DEFAULT_SHARD_ID as u64, - DEFAULT_CLUSTER_VERSION, - TableId::MIN, - ), + location: WalLocation::new(DEFAULT_SHARD_ID as u64, TableId::MIN), start: ReadBoundary::Min, end: ReadBoundary::Min, } @@ -271,8 +247,8 @@ impl Default for ReadRequest { #[derive(Debug, Clone, Default)] pub struct ScanRequest { - /// Region id of the wals to be scanned - pub versioned_region_id: VersionedRegionId, + /// Id of the region to scan + pub region_id: RegionId, } pub type ScanContext = ReadContext; diff --git a/wal/src/message_queue_impl/namespace.rs b/wal/src/message_queue_impl/namespace.rs index 224f6fcc45..7b2fd06c7a 100644 --- a/wal/src/message_queue_impl/namespace.rs +++ b/wal/src/message_queue_impl/namespace.rs @@ -15,8 +15,7 @@ use crate::{ kv_encoder::LogEncoding, log_batch::{LogEntry, LogWriteBatch}, manager::{ - ReadContext, ReadRequest, ScanContext, ScanRequest, VersionedRegionId, WalLocation, - WriteContext, + ReadContext, ReadRequest, RegionId, ScanContext, ScanRequest, WalLocation, WriteContext, }, message_queue_impl::{ config::Config, @@ -239,7 +238,7 @@ impl fmt::Debug for Namespace { struct NamespaceInner { namespace: String, // TODO: should use some strategies(such as lru) to clean the invalid region. - regions: Arc>>>, + regions: Arc>>>, message_queue: Arc, meta_encoding: MetaEncoding, log_encoding: LogEncoding, @@ -265,14 +264,14 @@ impl NamespaceInner { /// about above operations on an empty region. async fn get_or_open_region( &self, - versioned_region_id: VersionedRegionId, + region_id: RegionId, ) -> std::result::Result, region::Error> { { let regions = self.regions.read().await; - if let Some(region) = regions.get(&versioned_region_id) { + if let Some(region) = regions.get(®ion_id) { debug!( - "Region exists and return it, namespace:{}, versioned region id:{:?}", - self.namespace, versioned_region_id + "Region exists and return it, namespace:{}, region id:{:?}", + self.namespace, region_id ); return Ok(region.clone()); } @@ -280,27 +279,21 @@ impl NamespaceInner { let mut regions = self.regions.write().await; // Multiple tables share one region, so double check here is needed. - if let Some(region) = regions.get(&versioned_region_id) { + if let Some(region) = regions.get(®ion_id) { debug!( - "Region exists and return it, namespace:{}, versioned region id:{:?}", - self.namespace, versioned_region_id + "Region exists and return it, namespace:{}, region id:{:?}", + self.namespace, region_id ); return Ok(region.clone()); } - let region = Arc::new( - Region::open( - &self.namespace, - versioned_region_id.id, - self.message_queue.clone(), - ) - .await?, - ); - regions.insert(versioned_region_id, region.clone()); + let region = + Arc::new(Region::open(&self.namespace, region_id, self.message_queue.clone()).await?); + regions.insert(region_id, region.clone()); info!( - "Region open successfully, namespace:{}, versioned region id:{:?}", - self.namespace, versioned_region_id + "Region open successfully, namespace:{}, region id:{:?}", + self.namespace, region_id ); Ok(region) @@ -308,7 +301,7 @@ impl NamespaceInner { pub async fn sequence_num(&self, location: WalLocation) -> Result { let region = self - .get_or_open_region(location.versioned_region_id) + .get_or_open_region(location.region_id) .await .context(GetSequence { namespace: self.namespace.clone(), @@ -340,7 +333,7 @@ impl NamespaceInner { ); let region = self - .get_or_open_region(request.location.versioned_region_id) + .get_or_open_region(request.location.region_id) .await .context(ReadWithCause { namespace: self.namespace.clone(), @@ -375,7 +368,7 @@ impl NamespaceInner { ); let region = self - .get_or_open_region(request.versioned_region_id) + .get_or_open_region(request.region_id) .await .context(ScanWithCause { namespace: self.namespace.clone(), @@ -414,7 +407,7 @@ impl NamespaceInner { ); let region = self - .get_or_open_region(log_batch.location.versioned_region_id) + .get_or_open_region(log_batch.location.region_id) .await .context(Write { namespace: self.namespace.clone(), @@ -437,7 +430,7 @@ impl NamespaceInner { debug!("Mark table logs delete in namespace, namespace:{}, location:{:?}, delete to sequence number:{}", self.namespace, location, sequence_num); let region = self - .get_or_open_region(location.versioned_region_id) + .get_or_open_region(location.region_id) .await .context(MarkDeleteTo { namespace: self.namespace.clone(), diff --git a/wal/src/message_queue_impl/test_util.rs b/wal/src/message_queue_impl/test_util.rs index 16a3659336..6e3f5e63b1 100644 --- a/wal/src/message_queue_impl/test_util.rs +++ b/wal/src/message_queue_impl/test_util.rs @@ -59,7 +59,7 @@ impl TestContext { .into_iter() .map(|(table_id, data)| { let log_batch_encoder = - LogBatchEncoder::create(WalLocation::new(region_id, region_version, table_id)); + LogBatchEncoder::create(WalLocation::new(region_id, table_id)); let log_write_batch = log_batch_encoder .encode_batch::(&data) .unwrap(); diff --git a/wal/src/rocks_impl/manager.rs b/wal/src/rocks_impl/manager.rs index 7283f7c286..d70b5a0d51 100644 --- a/wal/src/rocks_impl/manager.rs +++ b/wal/src/rocks_impl/manager.rs @@ -162,7 +162,7 @@ impl TableUnit { return Ok(RocksLogIterator::new_empty(self.log_encoding.clone(), iter)); }; - let region_id = req.location.versioned_region_id.id; + let region_id = req.location.region_id; let (min_log_key, max_log_key) = ( self.log_key(region_id, start_sequence), self.log_key(region_id, end_sequence), @@ -187,7 +187,7 @@ impl TableUnit { let mut key_buf = BytesMut::new(); for entry in &batch.entries { - let region_id = batch.location.versioned_region_id.id; + let region_id = batch.location.region_id; self.log_encoding .encode_key( &mut key_buf, @@ -722,7 +722,7 @@ impl WalManager for RocksImpl { sequence_num: SequenceNumber, ) -> Result<()> { if let Some(table_unit) = self.table_unit(&location) { - let region_id = location.versioned_region_id.id; + let region_id = location.region_id; return table_unit .delete_entries_up_to(region_id, sequence_num) .await; @@ -768,7 +768,7 @@ impl WalManager for RocksImpl { let read_opts = ReadOptions::default(); let iter = DBIterator::new(self.db.clone(), read_opts); - let region_id = req.versioned_region_id.id; + let region_id = req.region_id; let (min_log_key, max_log_key) = ( CommonLogKey::new(region_id, TableId::MIN, SequenceNumber::MIN), CommonLogKey::new(region_id, TableId::MAX, SequenceNumber::MAX), diff --git a/wal/src/table_kv_impl/namespace.rs b/wal/src/table_kv_impl/namespace.rs index 07a8fc5fb1..b517517352 100644 --- a/wal/src/table_kv_impl/namespace.rs +++ b/wal/src/table_kv_impl/namespace.rs @@ -519,7 +519,7 @@ impl NamespaceInner { // TODO(yingwen): Provide a close_table_unit() method. async fn open_table_unit(&self, location: WalLocation) -> Result> { - let region_id = location.versioned_region_id.id; + let region_id = location.region_id; let table_id = location.table_id; let table_unit_meta_table = self.table_unit_meta_table(table_id); @@ -576,7 +576,7 @@ impl NamespaceInner { &self.table_kv, self.config.new_init_scan_ctx(), table_unit_meta_table, - location.versioned_region_id.id, + location.region_id, location.table_id, buckets, ) @@ -688,7 +688,7 @@ impl NamespaceInner { // during reading start/end sequence. let buckets = self.list_buckets(); - let region_id = request.versioned_region_id.id; + let region_id = request.region_id; let min_log_key = CommonLogKey::new(region_id, TableId::MIN, SequenceNumber::MIN); let max_log_key = CommonLogKey::new(region_id, TableId::MAX, SequenceNumber::MAX); @@ -1453,10 +1453,7 @@ fn purge_buckets( mod tests { use std::sync::Arc; - use common_types::{ - bytes::BytesMut, - table::{DEFAULT_CLUSTER_VERSION, DEFAULT_SHARD_ID}, - }; + use common_types::{bytes::BytesMut, table::DEFAULT_SHARD_ID}; use common_util::runtime::{Builder, Runtime}; use table_kv::{memory::MemoryImpl, KeyBoundary, ScanContext, ScanRequest}; @@ -1759,8 +1756,7 @@ mod tests { runtime.block_on(async { let namespace = NamespaceMocker::new(table_kv.clone(), runtime.clone()).build(); let table_id = 123; - let location = - WalLocation::new(DEFAULT_SHARD_ID as u64, DEFAULT_CLUSTER_VERSION, table_id); + let location = WalLocation::new(DEFAULT_SHARD_ID as u64, table_id); let seq1 = write_test_payloads(&namespace, location, 1000, 1004).await; write_test_payloads(&namespace, location, 1005, 1009).await; diff --git a/wal/src/table_kv_impl/table_unit.rs b/wal/src/table_kv_impl/table_unit.rs index 56357423bd..eef4673c55 100644 --- a/wal/src/table_kv_impl/table_unit.rs +++ b/wal/src/table_kv_impl/table_unit.rs @@ -354,7 +354,7 @@ impl TableUnit { None => return Ok(TableLogIterator::new_empty(table_kv.clone())), }; - let region_id = request.location.versioned_region_id.id; + let region_id = request.location.region_id; let table_id = request.location.table_id; let min_log_key = CommonLogKey::new(region_id, table_id, start_sequence); let max_log_key = CommonLogKey::new(region_id, table_id, end_sequence); @@ -914,7 +914,7 @@ impl TableUnitWriter { let log_encoding = CommonLogEncoding::newest(); let entries_num = log_batch.len() as u64; - let region_id = log_batch.location.versioned_region_id.id; + let region_id = log_batch.location.region_id; let table_id = log_batch.location.table_id; let (wb, max_sequence_num) = { let mut wb = T::WriteBatch::with_capacity(log_batch.len()); diff --git a/wal/src/tests/read_write.rs b/wal/src/tests/read_write.rs index ed9913deca..96b8a0da45 100644 --- a/wal/src/tests/read_write.rs +++ b/wal/src/tests/read_write.rs @@ -6,14 +6,12 @@ use std::{ }; use common_types::{ - table::{TableId, DEFAULT_CLUSTER_VERSION, DEFAULT_SHARD_ID}, + table::{TableId, DEFAULT_SHARD_ID}, SequenceNumber, }; use crate::{ - manager::{ - ReadBoundary, ReadRequest, ScanRequest, VersionedRegionId, WalLocation, WalManagerRef, - }, + manager::{ReadBoundary, ReadRequest, ScanRequest, WalLocation, WalManagerRef}, tests::util::{ KafkaWalBuilder, MemoryTableWalBuilder, RocksTestEnv, RocksWalBuilder, TableKvTestEnv, TestEnv, TestPayload, TestTableData, WalBuilder, @@ -88,7 +86,7 @@ fn test_simple_read_write_default_batch(builder: B) { let env = TestEnv::new(2, builder); env.runtime.block_on(simple_read_write( &env, - WalLocation::new(DEFAULT_SHARD_ID as u64, DEFAULT_CLUSTER_VERSION, table_id), + WalLocation::new(DEFAULT_SHARD_ID as u64, table_id), )); } @@ -101,7 +99,7 @@ fn test_simple_read_write_different_batch_size(builder: B) { env.read_ctx.batch_size = batch_size; env.runtime.block_on(simple_read_write( &env, - WalLocation::new(DEFAULT_SHARD_ID as u64, DEFAULT_CLUSTER_VERSION, table_id), + WalLocation::new(DEFAULT_SHARD_ID as u64, table_id), )); } } @@ -174,11 +172,10 @@ fn test_move_from_nodes(builder: B) { // Use two wal managers to represent datanode 1 and datanode 2. // At first, write some things in node 1. let wal_1 = env.build_wal().await; - let region_version_1 = 0; simple_read_write_with_range_and_wal( &env, wal_1.clone(), - WalLocation::new(region_id, region_version_1, table_id), + WalLocation::new(region_id, table_id), 0, 10, ) @@ -187,11 +184,10 @@ fn test_move_from_nodes(builder: B) { // The table are move to node 2 but in the same shard, so its region id is still // 0, but region version changed to 1 for distinguishing this moving. let wal_2 = env.build_wal().await; - let region_version_2 = 1; simple_read_write_with_range_and_wal( &env, wal_2, - WalLocation::new(region_id, region_version_2, table_id), + WalLocation::new(region_id, table_id), 10, 20, ) @@ -200,11 +196,10 @@ fn test_move_from_nodes(builder: B) { // Finally, the table with the same shard is moved to node 1 again. // If version changed, wal manager can distinguish that // the region info in it is outdated, it should reopen the region. - let region_version_3 = 2; simple_read_write_with_range_and_wal( &env, wal_1, - WalLocation::new(region_id, region_version_3, table_id), + WalLocation::new(region_id, table_id), 20, 30, ) @@ -303,12 +298,8 @@ async fn simple_read_write_with_range_and_wal_internal( /// Test the read with different kinds of boundaries. async fn read_with_boundary(env: &TestEnv) { let wal = env.build_wal().await; - let versioned_region_id = VersionedRegionId { - version: DEFAULT_CLUSTER_VERSION, - id: DEFAULT_SHARD_ID as u64, - }; let location = WalLocation { - versioned_region_id, + region_id: DEFAULT_SHARD_ID as u64, table_id: TableId::MIN, }; let (payload_batch, write_batch) = env.build_log_batch(location, 0, 10).await; @@ -403,12 +394,12 @@ async fn write_multiple_regions_parallelly(env: Arc(env: &TestEnv, result_len: usize) { let wal = env.build_wal().await; for result_idx in 0..result_len { let region_id = result_idx as u64; - let region_version = result_idx as u64; let table_id = result_idx as u64; let (payload_batch, write_batch) = env - .build_log_batch(WalLocation::new(region_id, region_version, table_id), 0, 10) + .build_log_batch(WalLocation::new(region_id, table_id), 0, 10) .await; let seq = wal .write(&env.write_ctx, &write_batch) @@ -442,19 +432,12 @@ async fn reopen(env: &TestEnv, result_len: usize) { .expect("should succeed to write"); let last_seq = wal - .sequence_num(WalLocation::new(region_id, region_version, table_id)) + .sequence_num(WalLocation::new(region_id, table_id)) .await .unwrap(); assert_eq!(seq, last_seq); - write_results.push(( - region_id, - region_version, - table_id, - payload_batch, - write_batch, - seq, - )); + write_results.push((region_id, table_id, payload_batch, write_batch, seq)); } wal.close_gracefully().await.unwrap(); } @@ -462,9 +445,9 @@ async fn reopen(env: &TestEnv, result_len: usize) { // Reopen the wal. let wal = env.build_wal().await; - for (region_id, region_version, table_id, payload_batch, write_batch, seq) in write_results { + for (region_id, table_id, payload_batch, write_batch, seq) in write_results { let read_req = ReadRequest { - location: WalLocation::new(region_id, region_version, table_id), + location: WalLocation::new(region_id, table_id), start: ReadBoundary::Included(seq + 1 - write_batch.entries.len() as u64), end: ReadBoundary::Included(seq), }; @@ -493,7 +476,7 @@ async fn complex_read_write(env: &TestEnv) { let (start_val, mid_val, end_val) = (0, 10, 50); let (payload_batch1, write_batch_1) = env .build_log_batch( - WalLocation::new(DEFAULT_SHARD_ID as u64, DEFAULT_CLUSTER_VERSION, table_id), + WalLocation::new(DEFAULT_SHARD_ID as u64, table_id), start_val, mid_val, ) @@ -504,7 +487,7 @@ async fn complex_read_write(env: &TestEnv) { .expect("should succeed to write"); let (payload_batch2, write_batch_2) = env .build_log_batch( - WalLocation::new(DEFAULT_SHARD_ID as u64, DEFAULT_CLUSTER_VERSION, table_id), + WalLocation::new(DEFAULT_SHARD_ID as u64, table_id), mid_val, end_val, ) @@ -518,7 +501,7 @@ async fn complex_read_write(env: &TestEnv) { check_write_batch( env, wal.clone(), - WalLocation::new(DEFAULT_SHARD_ID as u64, DEFAULT_CLUSTER_VERSION, table_id), + WalLocation::new(DEFAULT_SHARD_ID as u64, table_id), seq_1, &payload_batch1, ) @@ -527,7 +510,7 @@ async fn complex_read_write(env: &TestEnv) { check_write_batch( env, wal.clone(), - WalLocation::new(DEFAULT_SHARD_ID as u64, DEFAULT_CLUSTER_VERSION, table_id), + WalLocation::new(DEFAULT_SHARD_ID as u64, table_id), seq_2, &payload_batch2, ) @@ -538,7 +521,7 @@ async fn complex_read_write(env: &TestEnv) { check_write_batch( env, wal.clone(), - WalLocation::new(DEFAULT_SHARD_ID as u64, DEFAULT_CLUSTER_VERSION, table_id), + WalLocation::new(DEFAULT_SHARD_ID as u64, table_id), seq_3, &payload_batch3, ) @@ -554,7 +537,7 @@ async fn complex_read_write(env: &TestEnv) { check_write_batch( env, wal.clone(), - WalLocation::new(DEFAULT_SHARD_ID as u64, DEFAULT_CLUSTER_VERSION, table_id), + WalLocation::new(DEFAULT_SHARD_ID as u64, table_id), seq_4, &payload_batch4, ) @@ -568,11 +551,7 @@ async fn simple_write_delete(env: &TestEnv) { let table_id = 0; let wal = env.build_wal().await; let (payload_batch, write_batch) = env - .build_log_batch( - WalLocation::new(DEFAULT_SHARD_ID as u64, DEFAULT_CLUSTER_VERSION, table_id), - 0, - 10, - ) + .build_log_batch(WalLocation::new(DEFAULT_SHARD_ID as u64, table_id), 0, 10) .await; let seq = wal .write(&env.write_ctx, &write_batch) @@ -581,31 +560,24 @@ async fn simple_write_delete(env: &TestEnv) { check_write_batch( env, wal.clone(), - WalLocation::new(DEFAULT_SHARD_ID as u64, DEFAULT_CLUSTER_VERSION, table_id), + WalLocation::new(DEFAULT_SHARD_ID as u64, table_id), seq, &payload_batch, ) .await; let last_seq = wal - .sequence_num(WalLocation::new( - DEFAULT_SHARD_ID as u64, - DEFAULT_CLUSTER_VERSION, - table_id, - )) + .sequence_num(WalLocation::new(DEFAULT_SHARD_ID as u64, table_id)) .await .unwrap(); assert_eq!(seq, last_seq); // delete all logs - wal.mark_delete_entries_up_to( - WalLocation::new(DEFAULT_SHARD_ID as u64, DEFAULT_CLUSTER_VERSION, table_id), - seq, - ) - .await - .expect("should succeed to delete"); + wal.mark_delete_entries_up_to(WalLocation::new(DEFAULT_SHARD_ID as u64, table_id), seq) + .await + .expect("should succeed to delete"); let read_req = ReadRequest { - location: WalLocation::new(DEFAULT_SHARD_ID as u64, DEFAULT_CLUSTER_VERSION, table_id), + location: WalLocation::new(DEFAULT_SHARD_ID as u64, table_id), start: ReadBoundary::Min, end: ReadBoundary::Max, }; @@ -619,11 +591,7 @@ async fn simple_write_delete(env: &TestEnv) { // Sequence num remains unchanged. let last_seq = wal - .sequence_num(WalLocation::new( - DEFAULT_SHARD_ID as u64, - DEFAULT_CLUSTER_VERSION, - table_id, - )) + .sequence_num(WalLocation::new(DEFAULT_SHARD_ID as u64, table_id)) .await .unwrap(); assert_eq!(seq, last_seq); @@ -636,11 +604,7 @@ async fn write_delete_half(env: &TestEnv) { let table_id = 0; let wal = env.build_wal().await; let (mut payload_batch, write_batch) = env - .build_log_batch( - WalLocation::new(DEFAULT_SHARD_ID as u64, DEFAULT_CLUSTER_VERSION, table_id), - 0, - 10, - ) + .build_log_batch(WalLocation::new(DEFAULT_SHARD_ID as u64, table_id), 0, 10) .await; let seq = wal .write(&env.write_ctx, &write_batch) @@ -649,21 +613,18 @@ async fn write_delete_half(env: &TestEnv) { check_write_batch( env, wal.clone(), - WalLocation::new(DEFAULT_SHARD_ID as u64, DEFAULT_CLUSTER_VERSION, table_id), + WalLocation::new(DEFAULT_SHARD_ID as u64, table_id), seq, &payload_batch, ) .await; // delete all logs - wal.mark_delete_entries_up_to( - WalLocation::new(DEFAULT_SHARD_ID as u64, DEFAULT_CLUSTER_VERSION, table_id), - seq / 2, - ) - .await - .expect("should succeed to delete"); + wal.mark_delete_entries_up_to(WalLocation::new(DEFAULT_SHARD_ID as u64, table_id), seq / 2) + .await + .expect("should succeed to delete"); let read_req = ReadRequest { - location: WalLocation::new(DEFAULT_SHARD_ID as u64, DEFAULT_CLUSTER_VERSION, table_id), + location: WalLocation::new(DEFAULT_SHARD_ID as u64, table_id), start: ReadBoundary::Min, end: ReadBoundary::Max, }; @@ -678,11 +639,7 @@ async fn write_delete_half(env: &TestEnv) { // Sequence num remains unchanged. let last_seq = wal - .sequence_num(WalLocation::new( - DEFAULT_SHARD_ID as u64, - DEFAULT_CLUSTER_VERSION, - table_id, - )) + .sequence_num(WalLocation::new(DEFAULT_SHARD_ID as u64, table_id)) .await .unwrap(); assert_eq!(seq, last_seq); @@ -695,11 +652,7 @@ async fn write_delete_multiple_regions(env: &TestEnv) { let (table_id_1, table_id_2) = (1, 2); let wal = env.build_wal().await; let (_, write_batch_1) = env - .build_log_batch( - WalLocation::new(DEFAULT_SHARD_ID as u64, DEFAULT_CLUSTER_VERSION, table_id_1), - 0, - 10, - ) + .build_log_batch(WalLocation::new(DEFAULT_SHARD_ID as u64, table_id_1), 0, 10) .await; let seq_1 = wal .write(&env.write_ctx, &write_batch_1) @@ -708,7 +661,7 @@ async fn write_delete_multiple_regions(env: &TestEnv) { let (payload_batch2, write_batch_2) = env .build_log_batch( - WalLocation::new(DEFAULT_SHARD_ID as u64, DEFAULT_CLUSTER_VERSION, table_id_2), + WalLocation::new(DEFAULT_SHARD_ID as u64, table_id_2), 10, 20, ) @@ -719,14 +672,11 @@ async fn write_delete_multiple_regions(env: &TestEnv) { .expect("should succeed to write"); // delete all logs of region 1. - wal.mark_delete_entries_up_to( - WalLocation::new(DEFAULT_SHARD_ID as u64, DEFAULT_CLUSTER_VERSION, table_id_1), - seq_1, - ) - .await - .expect("should succeed to delete"); + wal.mark_delete_entries_up_to(WalLocation::new(DEFAULT_SHARD_ID as u64, table_id_1), seq_1) + .await + .expect("should succeed to delete"); let read_req = ReadRequest { - location: WalLocation::new(DEFAULT_SHARD_ID as u64, DEFAULT_CLUSTER_VERSION, table_id_1), + location: WalLocation::new(DEFAULT_SHARD_ID as u64, table_id_1), start: ReadBoundary::Min, end: ReadBoundary::Max, }; @@ -740,7 +690,7 @@ async fn write_delete_multiple_regions(env: &TestEnv) { check_write_batch( env, wal.clone(), - WalLocation::new(DEFAULT_SHARD_ID as u64, DEFAULT_CLUSTER_VERSION, table_id_2), + WalLocation::new(DEFAULT_SHARD_ID as u64, table_id_2), seq_2, &payload_batch2, ) @@ -754,33 +704,21 @@ async fn sequence_increase_monotonically_multiple_writes(env: &Te let table_id = 0; let wal = env.build_wal().await; let (_, write_batch1) = env - .build_log_batch( - WalLocation::new(DEFAULT_SHARD_ID as u64, DEFAULT_CLUSTER_VERSION, table_id), - 0, - 10, - ) + .build_log_batch(WalLocation::new(DEFAULT_SHARD_ID as u64, table_id), 0, 10) .await; let seq_1 = wal .write(&env.write_ctx, &write_batch1) .await .expect("should succeed to write"); let (_, write_batch2) = env - .build_log_batch( - WalLocation::new(DEFAULT_SHARD_ID as u64, DEFAULT_CLUSTER_VERSION, table_id), - 0, - 10, - ) + .build_log_batch(WalLocation::new(DEFAULT_SHARD_ID as u64, table_id), 0, 10) .await; let seq_2 = wal .write(&env.write_ctx, &write_batch2) .await .expect("should succeed to write"); let (_, write_batch3) = env - .build_log_batch( - WalLocation::new(DEFAULT_SHARD_ID as u64, DEFAULT_CLUSTER_VERSION, table_id), - 0, - 10, - ) + .build_log_batch(WalLocation::new(DEFAULT_SHARD_ID as u64, table_id), 0, 10) .await; let seq_3 = wal @@ -800,11 +738,7 @@ async fn sequence_increase_monotonically_delete_write(env: &TestE let table_id = 0; let wal = env.build_wal().await; let (_, write_batch1) = env - .build_log_batch( - WalLocation::new(DEFAULT_SHARD_ID as u64, DEFAULT_CLUSTER_VERSION, table_id), - 0, - 10, - ) + .build_log_batch(WalLocation::new(DEFAULT_SHARD_ID as u64, table_id), 0, 10) .await; // write let seq_1 = wal @@ -812,18 +746,11 @@ async fn sequence_increase_monotonically_delete_write(env: &TestE .await .expect("should succeed to write"); // delete - wal.mark_delete_entries_up_to( - WalLocation::new(DEFAULT_SHARD_ID as u64, DEFAULT_CLUSTER_VERSION, table_id), - seq_1, - ) - .await - .expect("should succeed to delete"); + wal.mark_delete_entries_up_to(WalLocation::new(DEFAULT_SHARD_ID as u64, table_id), seq_1) + .await + .expect("should succeed to delete"); let (_, write_batch2) = env - .build_log_batch( - WalLocation::new(DEFAULT_SHARD_ID as u64, DEFAULT_CLUSTER_VERSION, table_id), - 0, - 10, - ) + .build_log_batch(WalLocation::new(DEFAULT_SHARD_ID as u64, table_id), 0, 10) .await; // write again let seq_2 = wal @@ -832,11 +759,7 @@ async fn sequence_increase_monotonically_delete_write(env: &TestE .expect("should succeed to write"); let last_seq = wal - .sequence_num(WalLocation::new( - DEFAULT_SHARD_ID as u64, - DEFAULT_CLUSTER_VERSION, - table_id, - )) + .sequence_num(WalLocation::new(DEFAULT_SHARD_ID as u64, table_id)) .await .unwrap(); assert_eq!(seq_2, last_seq); @@ -852,11 +775,7 @@ async fn sequence_increase_monotonically_delete_reopen_write(env: let table_id = 0; let wal = env.build_wal().await; let (_, write_batch1) = env - .build_log_batch( - WalLocation::new(DEFAULT_SHARD_ID as u64, DEFAULT_CLUSTER_VERSION, table_id), - 0, - 10, - ) + .build_log_batch(WalLocation::new(DEFAULT_SHARD_ID as u64, table_id), 0, 10) .await; // write let seq_1 = wal @@ -864,12 +783,9 @@ async fn sequence_increase_monotonically_delete_reopen_write(env: .await .expect("should succeed to write"); // delete - wal.mark_delete_entries_up_to( - WalLocation::new(DEFAULT_SHARD_ID as u64, DEFAULT_CLUSTER_VERSION, table_id), - seq_1, - ) - .await - .expect("should succeed to delete"); + wal.mark_delete_entries_up_to(WalLocation::new(DEFAULT_SHARD_ID as u64, table_id), seq_1) + .await + .expect("should succeed to delete"); // restart wal.close_gracefully().await.unwrap(); @@ -878,11 +794,7 @@ async fn sequence_increase_monotonically_delete_reopen_write(env: let wal = env.build_wal().await; // write again let (_, write_batch2) = env - .build_log_batch( - WalLocation::new(DEFAULT_SHARD_ID as u64, DEFAULT_CLUSTER_VERSION, table_id), - 0, - 10, - ) + .build_log_batch(WalLocation::new(DEFAULT_SHARD_ID as u64, table_id), 0, 10) .await; let seq_2 = wal .write(&env.write_ctx, &write_batch2) @@ -890,11 +802,7 @@ async fn sequence_increase_monotonically_delete_reopen_write(env: .expect("should succeed to write"); let last_seq = wal - .sequence_num(WalLocation::new( - DEFAULT_SHARD_ID as u64, - DEFAULT_CLUSTER_VERSION, - table_id, - )) + .sequence_num(WalLocation::new(DEFAULT_SHARD_ID as u64, table_id)) .await .unwrap(); assert_eq!(seq_2, last_seq); @@ -911,11 +819,7 @@ async fn write_scan(env: &TestEnv) { let wal = env.build_wal().await; // Write table 0. let (payload_batch1, write_batch1) = env - .build_log_batch( - WalLocation::new(DEFAULT_SHARD_ID as u64, DEFAULT_CLUSTER_VERSION, table_id_1), - 0, - 10, - ) + .build_log_batch(WalLocation::new(DEFAULT_SHARD_ID as u64, table_id_1), 0, 10) .await; let seq_1 = wal @@ -925,11 +829,7 @@ async fn write_scan(env: &TestEnv) { // Write table 1. let (payload_batch2, write_batch2) = env - .build_log_batch( - WalLocation::new(DEFAULT_SHARD_ID as u64, DEFAULT_CLUSTER_VERSION, table_id_2), - 0, - 10, - ) + .build_log_batch(WalLocation::new(DEFAULT_SHARD_ID as u64, table_id_2), 0, 10) .await; let seq_2 = wal @@ -939,7 +839,7 @@ async fn write_scan(env: &TestEnv) { // Scan and compare. let scan_request = ScanRequest { - versioned_region_id: VersionedRegionId::default(), + region_id: DEFAULT_SHARD_ID as u64, }; let iter = wal .scan(&env.read_ctx, &scan_request) From daaf3cbdcd5a4341194144723b5e587829caf7c6 Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Mon, 27 Feb 2023 15:01:15 +0800 Subject: [PATCH 02/10] refactor: set up server --- analytic_engine/src/setup.rs | 131 +++++++++------------- common_util/src/metric.rs | 20 ++-- server/src/grpc/meta_event_service/mod.rs | 19 +++- wal/src/manager.rs | 26 +++-- wal/src/message_queue_impl/namespace.rs | 6 + wal/src/message_queue_impl/wal.rs | 12 +- wal/src/rocks_impl/manager.rs | 13 ++- wal/src/table_kv_impl/wal.rs | 10 +- 8 files changed, 130 insertions(+), 107 deletions(-) diff --git a/analytic_engine/src/setup.rs b/analytic_engine/src/setup.rs index daaf622111..53ba94a5fa 100644 --- a/analytic_engine/src/setup.rs +++ b/analytic_engine/src/setup.rs @@ -107,72 +107,54 @@ const MANIFEST_DIR_NAME: &str = "manifest"; const STORE_DIR_NAME: &str = "store"; const DISK_CACHE_DIR_NAME: &str = "sst_cache"; -#[derive(Default)] -pub struct EngineBuildContextBuilder { +#[derive(Clone)] +struct EngineBuilder2 { config: Config, router: Option, + engine_runtimes: Arc, + data_wal: WalManagerRef, + manifest_wal: WalManagerRef, } -impl EngineBuildContextBuilder { - pub fn config(mut self, config: Config) -> Self { - self.config = config; - self - } - - pub fn router(mut self, router: RouterRef) -> Self { - self.router = Some(router); - self - } +impl EngineBuilder2 { + async fn build(self) -> Result { + let opened_storages = open_storage(self.config.storage.clone()).await?; + let manifest = ManifestImpl::open( + self.config.manifest.clone(), + self.manifest_wal.clone(), + opened_storages.default_store().clone(), + ) + .await + .context(OpenManifest)?; - pub fn build(self) -> EngineBuildContext { - EngineBuildContext { - config: self.config, - router: self.router, - } + let instance = open_instance( + self.config, + self.engine_runtimes, + self.data_wal, + Arc::new(manifest), + Arc::new(opened_storages), + self.router, + ) + .await?; + Ok(Arc::new(TableEngineImpl::new(instance))) } } -#[derive(Clone)] -pub struct EngineBuildContext { - pub config: Config, - pub router: Option, +#[derive(Debug, Clone)] +struct OpenedWals { + data_wal: WalManagerRef, + manifest_wal: WalManagerRef, } /// Analytic engine builder. #[async_trait] pub trait EngineBuilder: Send + Sync + Default { - /// Build the analytic engine from `config` and `engine_runtimes`. - async fn build( - &self, - context: EngineBuildContext, - engine_runtimes: Arc, - ) -> Result { - let opened_storages = open_storage(context.config.storage.clone()).await?; - let (wal, manifest) = self - .open_wal_and_manifest( - context.config.clone(), - engine_runtimes.clone(), - opened_storages.default_store().clone(), - ) - .await?; - let instance = open_instance( - context.config.clone(), - engine_runtimes, - wal, - manifest, - Arc::new(opened_storages), - context.router, - ) - .await?; - Ok(Arc::new(TableEngineImpl::new(instance))) - } - async fn open_wal_and_manifest( &self, config: Config, engine_runtimes: Arc, object_store: ObjectStoreRef, - ) -> Result<(WalManagerRef, ManifestRef)>; + ) -> Result; } /// [RocksEngine] builder. @@ -186,7 +168,7 @@ impl EngineBuilder for RocksDBWalEngineBuilder { config: Config, engine_runtimes: Arc, object_store: ObjectStoreRef, - ) -> Result<(WalManagerRef, ManifestRef)> { + ) -> Result { let rocksdb_wal_config = match config.wal { WalStorageConfig::RocksDB(config) => *config, _ => { @@ -203,7 +185,7 @@ impl EngineBuilder for RocksDBWalEngineBuilder { let write_runtime = engine_runtimes.write_runtime.clone(); let data_path = Path::new(&rocksdb_wal_config.data_dir); let wal_path = data_path.join(WAL_DIR_NAME); - let wal_manager = WalBuilder::with_default_rocksdb_config(wal_path, write_runtime.clone()) + let data_wal = WalBuilder::with_default_rocksdb_config(wal_path, write_runtime.clone()) .build() .context(OpenWal)?; @@ -211,16 +193,11 @@ impl EngineBuilder for RocksDBWalEngineBuilder { let manifest_wal = WalBuilder::with_default_rocksdb_config(manifest_path, write_runtime) .build() .context(OpenManifestWal)?; - - let manifest = ManifestImpl::open( - config.manifest.clone(), - Arc::new(manifest_wal), - object_store, - ) - .await - .context(OpenManifest)?; - - Ok((Arc::new(wal_manager), Arc::new(manifest))) + let opened_wals = OpenedWals { + data_wal: Arc::new(data_wal), + manifest_wal: Arc::new(manifest_wal), + }; + Ok(opened_wals) } } @@ -235,7 +212,7 @@ impl EngineBuilder for ObkvWalEngineBuilder { config: Config, engine_runtimes: Arc, object_store: ObjectStoreRef, - ) -> Result<(WalManagerRef, ManifestRef)> { + ) -> Result { let obkv_wal_config = match &config.wal { WalStorageConfig::Obkv(config) => config.clone(), _ => { @@ -262,7 +239,6 @@ impl EngineBuilder for ObkvWalEngineBuilder { config.manifest.clone(), engine_runtimes, obkv, - object_store, ) .await } @@ -284,7 +260,7 @@ impl EngineBuilder for MemWalEngineBuilder { config: Config, engine_runtimes: Arc, object_store: ObjectStoreRef, - ) -> Result<(WalManagerRef, ManifestRef)> { + ) -> Result { let obkv_wal_config = match &config.wal { WalStorageConfig::Obkv(config) => config.clone(), _ => { @@ -302,8 +278,7 @@ impl EngineBuilder for MemWalEngineBuilder { *obkv_wal_config, config.manifest.clone(), engine_runtimes, - self.table_kv.clone(), - object_store, + self.table_kv, ) .await } @@ -319,7 +294,7 @@ impl EngineBuilder for KafkaWalEngineBuilder { config: Config, engine_runtimes: Arc, object_store: ObjectStoreRef, - ) -> Result<(WalManagerRef, ManifestRef)> { + ) -> Result { let kafka_wal_config = match &config.wal { WalStorageConfig::Kafka(config) => config.clone(), _ => { @@ -337,7 +312,7 @@ impl EngineBuilder for KafkaWalEngineBuilder { let kafka = KafkaImpl::new(kafka_wal_config.kafka.clone()) .await .context(OpenKafka)?; - let wal_manager = MessageQueueImpl::new( + let data_wal = MessageQueueImpl::new( WAL_DIR_NAME.to_string(), kafka.clone(), bg_runtime.clone(), @@ -351,11 +326,10 @@ impl EngineBuilder for KafkaWalEngineBuilder { kafka_wal_config.meta_namespace, ); - let manifest = ManifestImpl::open(config.manifest, Arc::new(manifest_wal), object_store) - .await - .context(OpenManifest)?; - - Ok((Arc::new(wal_manager), Arc::new(manifest))) + Ok(OpenedWals { + data_wal: Arc::new(data_wal), + manifest_wal: Arc::new(manifest_wal), + }) } } @@ -364,15 +338,14 @@ async fn open_wal_and_manifest_with_table_kv( manifest_opts: ManifestOptions, engine_runtimes: Arc, table_kv: T, - object_store: ObjectStoreRef, -) -> Result<(WalManagerRef, ManifestRef)> { +) -> Result { let runtimes = WalRuntimes { read_runtime: engine_runtimes.read_runtime.clone(), write_runtime: engine_runtimes.write_runtime.clone(), bg_runtime: engine_runtimes.bg_runtime.clone(), }; - let wal_manager = WalNamespaceImpl::open( + let data_wal = WalNamespaceImpl::open( table_kv.clone(), runtimes.clone(), WAL_DIR_NAME, @@ -389,11 +362,11 @@ async fn open_wal_and_manifest_with_table_kv( ) .await .context(OpenManifestWal)?; - let manifest = ManifestImpl::open(manifest_opts, Arc::new(manifest_wal), object_store) - .await - .context(OpenManifest)?; - Ok((Arc::new(wal_manager), Arc::new(manifest))) + Ok(OpenedWals { + data_wal: Arc::new(data_wal), + manifest_wal: Arc::new(manifest_wal), + }) } async fn open_instance( diff --git a/common_util/src/metric.rs b/common_util/src/metric.rs index 3219a3c757..8eedfdb933 100644 --- a/common_util/src/metric.rs +++ b/common_util/src/metric.rs @@ -16,7 +16,7 @@ use crate::time; /// Meters are used to calculate rate of an event. #[derive(Debug)] pub struct Meter { - moving_avarages: ExponentiallyWeightedMovingAverages, + moving_averages: ExponentiallyWeightedMovingAverages, count: AtomicU64, start_time: SystemTime, } @@ -30,7 +30,7 @@ impl Default for Meter { impl Meter { pub fn new() -> Meter { Meter { - moving_avarages: ExponentiallyWeightedMovingAverages::new(), + moving_averages: ExponentiallyWeightedMovingAverages::new(), count: AtomicU64::from(0), start_time: SystemTime::now(), } @@ -42,23 +42,23 @@ impl Meter { pub fn mark_n(&self, n: u64) { self.count.fetch_add(n, Ordering::Relaxed); - self.moving_avarages.tick_if_needed(); - self.moving_avarages.update(n); + self.moving_averages.tick_if_needed(); + self.moving_averages.update(n); } pub fn h1_rate(&self) -> f64 { - self.moving_avarages.tick_if_needed(); - self.moving_avarages.h1_rate() + self.moving_averages.tick_if_needed(); + self.moving_averages.h1_rate() } pub fn h2_rate(&self) -> f64 { - self.moving_avarages.tick_if_needed(); - self.moving_avarages.h2_rate() + self.moving_averages.tick_if_needed(); + self.moving_averages.h2_rate() } pub fn m15_rate(&self) -> f64 { - self.moving_avarages.tick_if_needed(); - self.moving_avarages.m15_rate() + self.moving_averages.tick_if_needed(); + self.moving_averages.m15_rate() } pub fn count(&self) -> u64 { diff --git a/server/src/grpc/meta_event_service/mod.rs b/server/src/grpc/meta_event_service/mod.rs index 14372d6b5a..fb8bbd3d98 100644 --- a/server/src/grpc/meta_event_service/mod.rs +++ b/server/src/grpc/meta_event_service/mod.rs @@ -131,7 +131,7 @@ impl MetaServiceImpl { CloseTableOnShardResponse ); - fn handler_ctx(&self) -> HandlerContext { + fn handler_ctx(&self) -> HandlerContext { HandlerContext { cluster: self.cluster.clone(), catalog_manager: self.instance.catalog_manager.clone(), @@ -140,14 +140,19 @@ impl MetaServiceImpl { } } +pub trait WalRegionCloser: Debug { + fn close_region(&self, region_id: RegionId) -> Result<()>; +} + /// Context for handling all kinds of meta event service. -struct HandlerContext { +struct HandlerContext { cluster: ClusterRef, catalog_manager: ManagerRef, table_engine: TableEngineRef, + wal_shard_closer: C, } -impl HandlerContext { +impl HandlerContext { fn default_catalog(&self) -> Result { let default_catalog_name = self.catalog_manager.default_catalog_name(); let default_catalog = self @@ -224,7 +229,10 @@ async fn handle_open_shard(ctx: HandlerContext, request: OpenShardRequest) -> Re Ok(()) } -async fn handle_close_shard(ctx: HandlerContext, request: CloseShardRequest) -> Result<()> { +async fn handle_close_shard( + ctx: HandlerContext, + request: CloseShardRequest, +) -> Result<()> { let tables_of_shard = ctx.cluster .close_shard(&request) @@ -261,7 +269,8 @@ async fn handle_close_shard(ctx: HandlerContext, request: CloseShardRequest) -> })?; } - Ok(()) + // try to close wal region + ctx.wal_shard_closer.close_region(request.shard_id as u64) } async fn handle_create_table_on_shard( diff --git a/wal/src/manager.rs b/wal/src/manager.rs index 76a0f30631..8987f44332 100644 --- a/wal/src/manager.rs +++ b/wal/src/manager.rs @@ -14,16 +14,13 @@ use common_util::{error::BoxError, runtime::Runtime}; pub use error::*; use snafu::ResultExt; -use crate::{ - log_batch::{LogEntry, LogWriteBatch, PayloadDecoder}, - manager, -}; +use crate::log_batch::{LogEntry, LogWriteBatch, PayloadDecoder}; pub mod error { use common_util::{define_result, error::GenericError}; use snafu::{Backtrace, Snafu}; - use crate::manager::WalLocation; + use crate::manager::{RegionId, WalLocation}; // Now most error from manage implementation don't have backtrace, so we add // backtrace here. @@ -110,6 +107,18 @@ pub mod error { backtrace: Backtrace, }, + #[snafu(display( + "Failed to close wal region, region_id:{}, err:{}.\nBacktrace:\n{}", + source, + region, + backtrace + ))] + CloseRegion { + source: GenericError, + region: RegionId, + backtrace: Backtrace, + }, + #[snafu(display("Failed to close wal, err:{}.\nBacktrace:\n{}", source, backtrace))] Close { source: GenericError, @@ -285,6 +294,9 @@ pub trait WalManager: Send + Sync + fmt::Debug + 'static { sequence_num: SequenceNumber, ) -> Result<()>; + /// Close a region. + async fn close_region(&self, region: RegionId) -> Result<()>; + /// Close the wal gracefully. async fn close_gracefully(&self) -> Result<()>; @@ -361,7 +373,7 @@ impl BatchLogIteratorAdapter { let payload = decoder .decode(&mut raw_payload) .box_err() - .context(manager::Decoding)?; + .context(error::Decoding)?; let log_entry = LogEntry { table_id: raw_log_entry.table_id, sequence: raw_log_entry.sequence, @@ -399,7 +411,7 @@ impl BatchLogIteratorAdapter { let payload = decoder .decode(&mut raw_payload) .box_err() - .context(manager::Decoding)?; + .context(error::Decoding)?; let log_entry = LogEntry { table_id: raw_log_entry.table_id, sequence: raw_log_entry.sequence, diff --git a/wal/src/message_queue_impl/namespace.rs b/wal/src/message_queue_impl/namespace.rs index 7b2fd06c7a..be7d30e6c1 100644 --- a/wal/src/message_queue_impl/namespace.rs +++ b/wal/src/message_queue_impl/namespace.rs @@ -162,6 +162,12 @@ impl Namespace { } } + pub async fn close_region(&self, region_id: RegionId) -> Result<()> { + let mut regions = self.inner.regions.write().await; + regions.remove(®ion_id); + Ok(()) + } + /// Close namespace /// /// Mainly clear the regions and wait logs cleaning routine to stop. diff --git a/wal/src/message_queue_impl/wal.rs b/wal/src/message_queue_impl/wal.rs index f8eff2fef4..b0c3208eaf 100644 --- a/wal/src/message_queue_impl/wal.rs +++ b/wal/src/message_queue_impl/wal.rs @@ -13,8 +13,8 @@ use snafu::ResultExt; use crate::{ log_batch::{LogEntry, LogWriteBatch}, manager::{ - error::*, AsyncLogIterator, BatchLogIteratorAdapter, ReadContext, ReadRequest, ScanContext, - ScanRequest, WalLocation, WalManager, WriteContext, + error::*, AsyncLogIterator, BatchLogIteratorAdapter, ReadContext, ReadRequest, RegionId, + ScanContext, ScanRequest, WalLocation, WalManager, WriteContext, }, message_queue_impl::{ config::Config, @@ -59,6 +59,14 @@ impl WalManager for MessageQueueImpl { .context(Delete) } + async fn close_region(&self, region_id: RegionId) -> Result<()> { + self.0 + .close_region(region_id) + .await + .box_err() + .context(Close) + } + async fn close_gracefully(&self) -> Result<()> { self.0.close().await.box_err().context(Close) } diff --git a/wal/src/rocks_impl/manager.rs b/wal/src/rocks_impl/manager.rs index d70b5a0d51..64bbce5993 100644 --- a/wal/src/rocks_impl/manager.rs +++ b/wal/src/rocks_impl/manager.rs @@ -27,8 +27,8 @@ use crate::{ kv_encoder::{CommonLogEncoding, CommonLogKey, MaxSeqMetaEncoding, MaxSeqMetaValue, MetaKey}, log_batch::{LogEntry, LogWriteBatch}, manager::{ - error::*, BatchLogIteratorAdapter, ReadContext, ReadRequest, ScanContext, ScanRequest, - SyncLogIterator, WalLocation, WalManager, WriteContext, + error::*, BatchLogIteratorAdapter, ReadContext, ReadRequest, RegionId, ScanContext, + ScanRequest, SyncLogIterator, WalLocation, WalManager, WriteContext, }, }; @@ -731,6 +731,15 @@ impl WalManager for RocksImpl { Ok(()) } + async fn close_region(&self, region_id: RegionId) -> Result<()> { + debug!( + "Close region for RocksDB based WAL is noop operation, region_id:{}", + region_id + ); + + Ok(()) + } + async fn close_gracefully(&self) -> Result<()> { info!("Close rocksdb wal gracefully"); diff --git a/wal/src/table_kv_impl/wal.rs b/wal/src/table_kv_impl/wal.rs index 68028c675d..660f804ab3 100644 --- a/wal/src/table_kv_impl/wal.rs +++ b/wal/src/table_kv_impl/wal.rs @@ -7,14 +7,14 @@ use std::{fmt, str, sync::Arc}; use async_trait::async_trait; use common_types::SequenceNumber; use common_util::error::BoxError; -use log::info; +use log::{info, warn}; use snafu::ResultExt; use table_kv::TableKv; use crate::{ log_batch::LogWriteBatch, manager::{ - self, error::*, BatchLogIteratorAdapter, ReadContext, ReadRequest, ScanContext, + self, error::*, BatchLogIteratorAdapter, ReadContext, ReadRequest, RegionId, ScanContext, ScanRequest, WalLocation, WalManager, }, table_kv_impl::{ @@ -118,6 +118,12 @@ impl WalManager for WalNamespaceImpl { .context(Delete) } + async fn close_region(&self, region: RegionId) -> Result<()> { + warn!("Close region is not supported yet, region:{}", region); + + Ok(()) + } + async fn close_gracefully(&self) -> Result<()> { info!( "Close table kv wal gracefully, namespace:{}", From cedfa6c0bd6930fbde173903b96ce4469b74c517 Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Mon, 27 Feb 2023 20:46:25 +0800 Subject: [PATCH 03/10] refactor: setup procedure --- Cargo.lock | 1 + analytic_engine/src/setup.rs | 117 ++++++++---------- analytic_engine/src/tests/util.rs | 6 +- server/Cargo.toml | 1 + server/src/grpc/meta_event_service/mod.rs | 72 ++++++++--- .../meta_event_service/shard_operation.rs | 40 ++++++ server/src/grpc/mod.rs | 17 ++- server/src/server.rs | 13 ++ src/setup.rs | 60 +++++---- 9 files changed, 213 insertions(+), 114 deletions(-) create mode 100644 server/src/grpc/meta_event_service/shard_operation.rs diff --git a/Cargo.lock b/Cargo.lock index 0b4c1555cf..41fde56d5d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5637,6 +5637,7 @@ dependencies = [ "tokio 1.25.0", "tokio-stream", "tonic", + "wal", "warp", "zstd 0.12.1+zstd.1.5.2", ] diff --git a/analytic_engine/src/setup.rs b/analytic_engine/src/setup.rs index 53ba94a5fa..c5d62f48d2 100644 --- a/analytic_engine/src/setup.rs +++ b/analytic_engine/src/setup.rs @@ -27,7 +27,7 @@ use table_kv::{memory::MemoryImpl, obkv::ObkvImpl, TableKv}; use wal::{ manager::{self, WalManagerRef}, message_queue_impl::wal::MessageQueueImpl, - rocks_impl::manager::Builder as WalBuilder, + rocks_impl::manager::Builder as RocksWalBuilder, table_kv_impl::{wal::WalNamespaceImpl, WalRuntimes}, }; @@ -35,10 +35,7 @@ use crate::{ context::OpenContext, engine::TableEngineImpl, instance::{Instance, InstanceRef}, - manifest::{ - details::{ManifestImpl, Options as ManifestOptions}, - ManifestRef, - }, + manifest::{details::ManifestImpl, ManifestRef}, sst::{ factory::{FactoryImpl, ObjectStorePicker, ObjectStorePickerRef, ReadFrequency}, meta_data::cache::{MetaCache, MetaCacheRef}, @@ -107,30 +104,32 @@ const MANIFEST_DIR_NAME: &str = "manifest"; const STORE_DIR_NAME: &str = "store"; const DISK_CACHE_DIR_NAME: &str = "sst_cache"; +/// Builder for [TableEngine]. +/// +/// [TableEngine]: table_engine::engine::TableEngine #[derive(Clone)] -struct EngineBuilder2 { - config: Config, - router: Option, - engine_runtimes: Arc, - data_wal: WalManagerRef, - manifest_wal: WalManagerRef, +pub struct EngineBuilder<'a> { + pub config: &'a Config, + pub router: Option, + pub engine_runtimes: Arc, + pub opened_wals: OpenedWals, } -impl EngineBuilder2 { - async fn build(self) -> Result { +impl<'a> EngineBuilder<'a> { + pub async fn build(self) -> Result { let opened_storages = open_storage(self.config.storage.clone()).await?; let manifest = ManifestImpl::open( self.config.manifest.clone(), - self.manifest_wal.clone(), + self.opened_wals.manifest_wal.clone(), opened_storages.default_store().clone(), ) .await .context(OpenManifest)?; let instance = open_instance( - self.config, + self.config.clone(), self.engine_runtimes, - self.data_wal, + self.opened_wals.data_wal, Arc::new(manifest), Arc::new(opened_storages), self.router, @@ -141,19 +140,18 @@ impl EngineBuilder2 { } #[derive(Debug, Clone)] -struct OpenedWals { - data_wal: WalManagerRef, - manifest_wal: WalManagerRef, +pub struct OpenedWals { + pub data_wal: WalManagerRef, + pub manifest_wal: WalManagerRef, } /// Analytic engine builder. #[async_trait] -pub trait EngineBuilder: Send + Sync + Default { - async fn open_wal_and_manifest( +pub trait WalsOpener: Send + Sync + Default { + async fn open_wals( &self, - config: Config, + config: &WalStorageConfig, engine_runtimes: Arc, - object_store: ObjectStoreRef, ) -> Result; } @@ -162,20 +160,18 @@ pub trait EngineBuilder: Send + Sync + Default { pub struct RocksDBWalEngineBuilder; #[async_trait] -impl EngineBuilder for RocksDBWalEngineBuilder { - async fn open_wal_and_manifest( +impl WalsOpener for RocksDBWalEngineBuilder { + async fn open_wals( &self, - config: Config, + config: &WalStorageConfig, engine_runtimes: Arc, - object_store: ObjectStoreRef, ) -> Result { - let rocksdb_wal_config = match config.wal { - WalStorageConfig::RocksDB(config) => *config, + let rocksdb_wal_config = match &config { + WalStorageConfig::RocksDB(config) => config.clone(), _ => { return InvalidWalConfig { msg: format!( - "invalid wal storage config while opening rocksDB wal, config:{:?}", - config.wal + "invalid wal storage config while opening rocksDB wal, config:{config:?}" ), } .fail(); @@ -185,14 +181,16 @@ impl EngineBuilder for RocksDBWalEngineBuilder { let write_runtime = engine_runtimes.write_runtime.clone(); let data_path = Path::new(&rocksdb_wal_config.data_dir); let wal_path = data_path.join(WAL_DIR_NAME); - let data_wal = WalBuilder::with_default_rocksdb_config(wal_path, write_runtime.clone()) - .build() - .context(OpenWal)?; + let data_wal = + RocksWalBuilder::with_default_rocksdb_config(wal_path, write_runtime.clone()) + .build() + .context(OpenWal)?; let manifest_path = data_path.join(MANIFEST_DIR_NAME); - let manifest_wal = WalBuilder::with_default_rocksdb_config(manifest_path, write_runtime) - .build() - .context(OpenManifestWal)?; + let manifest_wal = + RocksWalBuilder::with_default_rocksdb_config(manifest_path, write_runtime) + .build() + .context(OpenManifestWal)?; let opened_wals = OpenedWals { data_wal: Arc::new(data_wal), manifest_wal: Arc::new(manifest_wal), @@ -206,20 +204,18 @@ impl EngineBuilder for RocksDBWalEngineBuilder { pub struct ObkvWalEngineBuilder; #[async_trait] -impl EngineBuilder for ObkvWalEngineBuilder { - async fn open_wal_and_manifest( +impl WalsOpener for ObkvWalEngineBuilder { + async fn open_wals( &self, - config: Config, + config: &WalStorageConfig, engine_runtimes: Arc, - object_store: ObjectStoreRef, ) -> Result { - let obkv_wal_config = match &config.wal { + let obkv_wal_config = match config { WalStorageConfig::Obkv(config) => config.clone(), _ => { return InvalidWalConfig { msg: format!( - "invalid wal storage config while opening obkv wal, config:{:?}", - config.wal + "invalid wal storage config while opening obkv wal, config:{config:?}" ), } .fail(); @@ -234,13 +230,7 @@ impl EngineBuilder for ObkvWalEngineBuilder { .await .context(RuntimeExec)??; - open_wal_and_manifest_with_table_kv( - *obkv_wal_config, - config.manifest.clone(), - engine_runtimes, - obkv, - ) - .await + open_wal_and_manifest_with_table_kv(*obkv_wal_config, engine_runtimes, obkv).await } } @@ -254,20 +244,18 @@ pub struct MemWalEngineBuilder { } #[async_trait] -impl EngineBuilder for MemWalEngineBuilder { - async fn open_wal_and_manifest( +impl WalsOpener for MemWalEngineBuilder { + async fn open_wals( &self, - config: Config, + config: &WalStorageConfig, engine_runtimes: Arc, - object_store: ObjectStoreRef, ) -> Result { - let obkv_wal_config = match &config.wal { + let obkv_wal_config = match config { WalStorageConfig::Obkv(config) => config.clone(), _ => { return InvalidWalConfig { msg: format!( - "invalid wal storage config while opening memory wal, config:{:?}", - config.wal + "invalid wal storage config while opening memory wal, config:{config:?}" ), } .fail(); @@ -276,9 +264,8 @@ impl EngineBuilder for MemWalEngineBuilder { open_wal_and_manifest_with_table_kv( *obkv_wal_config, - config.manifest.clone(), engine_runtimes, - self.table_kv, + self.table_kv.clone(), ) .await } @@ -288,14 +275,13 @@ impl EngineBuilder for MemWalEngineBuilder { pub struct KafkaWalEngineBuilder; #[async_trait] -impl EngineBuilder for KafkaWalEngineBuilder { - async fn open_wal_and_manifest( +impl WalsOpener for KafkaWalEngineBuilder { + async fn open_wals( &self, - config: Config, + config: &WalStorageConfig, engine_runtimes: Arc, - object_store: ObjectStoreRef, ) -> Result { - let kafka_wal_config = match &config.wal { + let kafka_wal_config = match config { WalStorageConfig::Kafka(config) => config.clone(), _ => { return InvalidWalConfig { @@ -335,7 +321,6 @@ impl EngineBuilder for KafkaWalEngineBuilder { async fn open_wal_and_manifest_with_table_kv( config: ObkvWalConfig, - manifest_opts: ManifestOptions, engine_runtimes: Arc, table_kv: T, ) -> Result { diff --git a/analytic_engine/src/tests/util.rs b/analytic_engine/src/tests/util.rs index 46b92ece26..eb657772cc 100644 --- a/analytic_engine/src/tests/util.rs +++ b/analytic_engine/src/tests/util.rs @@ -31,8 +31,8 @@ use tempfile::TempDir; use crate::{ setup::{ - EngineBuildContext, EngineBuildContextBuilder, EngineBuilder, MemWalEngineBuilder, - RocksDBWalEngineBuilder, + EngineBuildContext, EngineBuildContextBuilder, MemWalEngineBuilder, + RocksDBWalEngineBuilder, WalsOpener, }, storage_options::{LocalOptions, ObjectStoreOptions, StorageOptions}, tests::table::{self, FixedSchemaTable, RowTuple}, @@ -451,7 +451,7 @@ impl Default for Builder { } pub trait EngineContext: Clone + Default { - type EngineBuilder: EngineBuilder; + type EngineBuilder: WalsOpener; fn engine_builder(&self) -> Self::EngineBuilder; fn config(&self) -> Config; diff --git a/server/Cargo.toml b/server/Cargo.toml index 53b1d1a05d..8aa7275a35 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -49,6 +49,7 @@ table_engine = { workspace = true } tokio = { workspace = true } tokio-stream = { version = "0.1", features = ["net"] } tonic = { workspace = true } +wal = { workspace = true } warp = "0.3" zstd = { workspace = true } diff --git a/server/src/grpc/meta_event_service/mod.rs b/server/src/grpc/meta_event_service/mod.rs index fb8bbd3d98..b975044252 100644 --- a/server/src/grpc/meta_event_service/mod.rs +++ b/server/src/grpc/meta_event_service/mod.rs @@ -4,6 +4,7 @@ use std::{sync::Arc, time::Instant}; +use analytic_engine::setup::OpenedWals; use async_trait::async_trait; use catalog::{ manager::ManagerRef, @@ -36,21 +37,56 @@ use table_engine::{ }; use tonic::Response; +use self::shard_operation::WalCloserAdapter; use crate::{ grpc::{ - meta_event_service::error::{ErrNoCause, ErrWithCause, Result, StatusCode}, + meta_event_service::{ + error::{ErrNoCause, ErrWithCause, Result, StatusCode}, + shard_operation::WalRegionCloserRef, + }, metrics::META_EVENT_GRPC_HANDLER_DURATION_HISTOGRAM_VEC, }, instance::InstanceRef, }; pub(crate) mod error; +mod shard_operation; -#[derive(Clone)] -pub struct MetaServiceImpl { +/// Builder for [MetaServiceImpl]. +pub struct Builder { pub cluster: ClusterRef, pub instance: InstanceRef, pub runtime: Arc, + pub opened_wals: OpenedWals, +} + +impl Builder { + pub fn build(self) -> MetaServiceImpl { + let Self { + cluster, + instance, + runtime, + opened_wals, + } = self; + + MetaServiceImpl { + cluster, + instance, + runtime, + wal_region_closer: Arc::new(WalCloserAdapter { + data_wal: opened_wals.data_wal, + manifest_wal: opened_wals.manifest_wal, + }), + } + } +} + +#[derive(Clone)] +pub struct MetaServiceImpl { + cluster: ClusterRef, + instance: InstanceRef, + runtime: Arc, + wal_region_closer: WalRegionCloserRef, } macro_rules! handle_request { @@ -131,28 +167,25 @@ impl MetaServiceImpl { CloseTableOnShardResponse ); - fn handler_ctx(&self) -> HandlerContext { + fn handler_ctx(&self) -> HandlerContext { HandlerContext { cluster: self.cluster.clone(), catalog_manager: self.instance.catalog_manager.clone(), table_engine: self.instance.table_engine.clone(), + wal_region_closer: self.wal_region_closer.clone(), } } } -pub trait WalRegionCloser: Debug { - fn close_region(&self, region_id: RegionId) -> Result<()>; -} - /// Context for handling all kinds of meta event service. -struct HandlerContext { +struct HandlerContext { cluster: ClusterRef, catalog_manager: ManagerRef, table_engine: TableEngineRef, - wal_shard_closer: C, + wal_region_closer: WalRegionCloserRef, } -impl HandlerContext { +impl HandlerContext { fn default_catalog(&self) -> Result { let default_catalog_name = self.catalog_manager.default_catalog_name(); let default_catalog = self @@ -172,6 +205,10 @@ impl HandlerContext { } } +// TODO: maybe we should encapsulate the logic of handling meta event into a +// trait, so that we don't need to expose the logic to the meta event service +// implementation. + async fn handle_open_shard(ctx: HandlerContext, request: OpenShardRequest) -> Result<()> { let tables_of_shard = ctx.cluster @@ -229,10 +266,7 @@ async fn handle_open_shard(ctx: HandlerContext, request: OpenShardRequest) -> Re Ok(()) } -async fn handle_close_shard( - ctx: HandlerContext, - request: CloseShardRequest, -) -> Result<()> { +async fn handle_close_shard(ctx: HandlerContext, request: CloseShardRequest) -> Result<()> { let tables_of_shard = ctx.cluster .close_shard(&request) @@ -270,7 +304,13 @@ async fn handle_close_shard( } // try to close wal region - ctx.wal_shard_closer.close_region(request.shard_id as u64) + ctx.wal_region_closer + .close_region(request.shard_id) + .await + .with_context(|| ErrWithCause { + code: StatusCode::Internal, + msg: format!("fail to close wal region, shard_id:{}", request.shard_id), + }) } async fn handle_create_table_on_shard( diff --git a/server/src/grpc/meta_event_service/shard_operation.rs b/server/src/grpc/meta_event_service/shard_operation.rs new file mode 100644 index 0000000000..8e73165644 --- /dev/null +++ b/server/src/grpc/meta_event_service/shard_operation.rs @@ -0,0 +1,40 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. + +//! Implementation of operations on shards. + +// TODO: Currently, only a specific operation (close wal region) is implemented, +// and it is expected to encapsulate more operations on **Shard** in the future. + +use std::sync::Arc; + +use async_trait::async_trait; +use common_types::table::ShardId; +use common_util::error::{BoxError, GenericResult}; +use wal::manager::WalManagerRef; + +#[async_trait] +pub trait WalRegionCloser: std::fmt::Debug + Send + Sync { + async fn close_region(&self, shard_id: ShardId) -> GenericResult<()>; +} + +pub type WalRegionCloserRef = Arc; + +#[derive(Debug)] +pub struct WalCloserAdapter { + pub data_wal: WalManagerRef, + pub manifest_wal: WalManagerRef, +} + +#[async_trait] +impl WalRegionCloser for WalCloserAdapter { + async fn close_region(&self, shard_id: ShardId) -> GenericResult<()> { + let region_id = shard_id as u64; + + self.data_wal.close_region(region_id).await.box_err()?; + self.manifest_wal.close_region(region_id).await.box_err()?; + + Ok(()) + } +} diff --git a/server/src/grpc/mod.rs b/server/src/grpc/mod.rs index 4ccdb89688..83dc4cfe22 100644 --- a/server/src/grpc/mod.rs +++ b/server/src/grpc/mod.rs @@ -10,6 +10,7 @@ use std::{ time::Duration, }; +use analytic_engine::setup::OpenedWals; use ceresdbproto::{ meta_event::meta_event_service_server::MetaEventServiceServer, remote_engine::remote_engine_service_server::RemoteEngineServiceServer, @@ -92,6 +93,9 @@ pub enum Error { #[snafu(display("Missing router.\nBacktrace:\n{}", backtrace))] MissingRouter { backtrace: Backtrace }, + #[snafu(display("Missing wals.\nBacktrace:\n{}", backtrace))] + MissingWals { backtrace: Backtrace }, + #[snafu(display("Missing schema config provider.\nBacktrace:\n{}", backtrace))] MissingSchemaConfigProvider { backtrace: Backtrace }, @@ -205,6 +209,7 @@ pub struct Builder { instance: Option>, router: Option, cluster: Option, + opened_wals: Option, schema_config_provider: Option, forward_config: Option, } @@ -220,6 +225,7 @@ impl Builder { instance: None, router: None, cluster: None, + opened_wals: None, schema_config_provider: None, forward_config: None, } @@ -262,6 +268,11 @@ impl Builder { self } + pub fn opened_wals(mut self, opened_wals: OpenedWals) -> Self { + self.opened_wals = Some(opened_wals); + self + } + pub fn schema_config_provider(mut self, provider: SchemaConfigProviderRef) -> Self { self.schema_config_provider = Some(provider); self @@ -283,17 +294,19 @@ impl Builder { let runtimes = self.runtimes.context(MissingRuntimes)?; let instance = self.instance.context(MissingInstance)?; let router = self.router.context(MissingRouter)?; + let opened_wals = self.opened_wals.context(MissingWals)?; let schema_config_provider = self .schema_config_provider .context(MissingSchemaConfigProvider)?; let meta_rpc_server = self.cluster.map(|v| { - let meta_service = MetaServiceImpl { + let builder = meta_event_service::Builder { cluster: v, instance: instance.clone(), runtime: runtimes.meta_runtime.clone(), + opened_wals, }; - MetaEventServiceServer::new(meta_service) + MetaEventServiceServer::new(builder.build()) }); let remote_engine_server = { diff --git a/server/src/server.rs b/server/src/server.rs index ccf51cbd89..42e313b8a1 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -4,6 +4,7 @@ use std::sync::Arc; +use analytic_engine::setup::OpenedWals; use catalog::manager::ManagerRef; use cluster::ClusterRef; use df_operator::registry::FunctionRegistryRef; @@ -56,6 +57,9 @@ pub enum Error { #[snafu(display("Missing function registry.\nBacktrace:\n{}", backtrace))] MissingFunctionRegistry { backtrace: Backtrace }, + #[snafu(display("Missing wals.\nBacktrace:\n{}", backtrace))] + MissingWals { backtrace: Backtrace }, + #[snafu(display("Missing limiter.\nBacktrace:\n{}", backtrace))] MissingLimiter { backtrace: Backtrace }, @@ -177,6 +181,7 @@ pub struct Builder { router: Option, schema_config_provider: Option, local_tables_recoverer: Option, + opened_wals: Option, } impl Builder { @@ -196,6 +201,7 @@ impl Builder { router: None, schema_config_provider: None, local_tables_recoverer: None, + opened_wals: None, } } @@ -267,6 +273,11 @@ impl Builder { self } + pub fn opened_wals(mut self, opened_wals: OpenedWals) -> Self { + self.opened_wals = Some(opened_wals); + self + } + /// Build and run the server pub fn build(self) -> Result> { // Build instance @@ -275,6 +286,7 @@ impl Builder { let table_engine = self.table_engine.context(MissingTableEngine)?; let table_manipulator = self.table_manipulator.context(MissingTableManipulator)?; let function_registry = self.function_registry.context(MissingFunctionRegistry)?; + let opened_wals = self.opened_wals.context(MissingWals)?; let instance = { let instance = Instance { @@ -335,6 +347,7 @@ impl Builder { .instance(instance.clone()) .router(router) .cluster(self.cluster.clone()) + .opened_wals(opened_wals) .schema_config_provider(provider) .forward_config(self.config.forward) .timeout(self.config.timeout.map(|v| v.0)) diff --git a/src/setup.rs b/src/setup.rs index 17bb300a22..d7e607bb39 100644 --- a/src/setup.rs +++ b/src/setup.rs @@ -7,8 +7,8 @@ use std::sync::Arc; use analytic_engine::{ self, setup::{ - EngineBuildContext, EngineBuildContextBuilder, EngineBuilder, KafkaWalEngineBuilder, - ObkvWalEngineBuilder, RocksDBWalEngineBuilder, + EngineBuilder, KafkaWalEngineBuilder, ObkvWalEngineBuilder, RocksDBWalEngineBuilder, + WalsOpener, }, WalStorageConfig, }; @@ -119,7 +119,7 @@ async fn run_server_with_runtimes( runtimes: Arc, log_runtime: Arc, ) where - T: EngineBuilder, + T: WalsOpener, { // Init function registry. let mut function_registry = FunctionRegistryImpl::new(); @@ -180,17 +180,13 @@ async fn run_server_with_runtimes( server.stop().await; } -async fn build_table_engine( - context: EngineBuildContext, - runtimes: Arc, - engine_builder: T, -) -> Arc { - // Build all table engine +// Build proxy for all table engines. +async fn build_table_engine_proxy<'a>(engine_builder: EngineBuilder<'a>) -> Arc { // Create memory engine let memory = MemoryTableEngine; // Create analytic engine let analytic = engine_builder - .build(context, runtimes.clone()) + .build() .await .expect("Failed to setup analytic engine"); @@ -201,12 +197,12 @@ async fn build_table_engine( }) } -async fn build_with_meta( +async fn build_with_meta( config: &Config, cluster_config: &ClusterConfig, builder: Builder, runtimes: Arc, - engine_builder: T, + wal_opener: T, ) -> Builder { // Build meta related modules. let node_meta_info = NodeMetaInfo { @@ -234,13 +230,17 @@ async fn build_with_meta( }; let router = Arc::new(ClusterBasedRouter::new(cluster.clone())); - // Build table engine. - let build_context_builder = EngineBuildContextBuilder::default(); - let build_context = build_context_builder - .config(config.analytic.clone()) - .router(router.clone()) - .build(); - let engine_proxy = build_table_engine(build_context, runtimes.clone(), engine_builder).await; + let opened_wals = wal_opener + .open_wals(&config.analytic.wal, runtimes.clone()) + .await + .expect("Failed to setup analytic engine"); + let engine_builder = EngineBuilder { + config: &config.analytic, + router: Some(router.clone()), + engine_runtimes: runtimes.clone(), + opened_wals: opened_wals.clone(), + }; + let engine_proxy = build_table_engine_proxy(engine_builder).await; let meta_based_manager_ref = Arc::new(volatile::ManagerImpl::new( shard_tables_cache, @@ -258,23 +258,29 @@ async fn build_with_meta( .catalog_manager(catalog_manager) .table_manipulator(table_manipulator) .cluster(cluster) + .opened_wals(opened_wals) .router(router) .schema_config_provider(schema_config_provider) } -async fn build_without_meta( +async fn build_without_meta( config: &Config, static_route_config: &StaticRouteConfig, builder: Builder, runtimes: Arc, - engine_builder: T, + wal_builder: T, ) -> Builder { - // Build table engine. - let build_context_builder = EngineBuildContextBuilder::default(); - let build_context = build_context_builder - .config(config.analytic.clone()) - .build(); - let engine_proxy = build_table_engine(build_context, runtimes.clone(), engine_builder).await; + let opened_wals = wal_builder + .open_wals(&config.analytic.wal, runtimes.clone()) + .await + .expect("Failed to setup analytic engine"); + let engine_builder = EngineBuilder { + config: &config.analytic, + router: None, + engine_runtimes: runtimes.clone(), + opened_wals, + }; + let engine_proxy = build_table_engine_proxy(engine_builder).await; // Create catalog manager, use analytic engine as backend. let analytic = engine_proxy.analytic.clone(); From 0c9a27dbcd38fde137f2d218999b8dc0be64feb5 Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Mon, 6 Mar 2023 14:56:22 +0800 Subject: [PATCH 04/10] refactor: fix unit test --- analytic_engine/src/setup.rs | 16 ++--- analytic_engine/src/tests/alter_test.rs | 23 ++++--- analytic_engine/src/tests/compaction_test.rs | 8 ++- analytic_engine/src/tests/drop_test.rs | 28 ++++---- analytic_engine/src/tests/open_test.rs | 8 ++- analytic_engine/src/tests/read_write_test.rs | 51 +++++++------- analytic_engine/src/tests/util.rs | 70 ++++++++++---------- catalog_impls/src/table_based.rs | 10 +-- interpreters/src/tests.rs | 4 +- src/setup.rs | 31 +++------ 10 files changed, 121 insertions(+), 128 deletions(-) diff --git a/analytic_engine/src/setup.rs b/analytic_engine/src/setup.rs index c5d62f48d2..3d8636a39e 100644 --- a/analytic_engine/src/setup.rs +++ b/analytic_engine/src/setup.rs @@ -157,10 +157,10 @@ pub trait WalsOpener: Send + Sync + Default { /// [RocksEngine] builder. #[derive(Default)] -pub struct RocksDBWalEngineBuilder; +pub struct RocksDBWalsOpener; #[async_trait] -impl WalsOpener for RocksDBWalEngineBuilder { +impl WalsOpener for RocksDBWalsOpener { async fn open_wals( &self, config: &WalStorageConfig, @@ -201,10 +201,10 @@ impl WalsOpener for RocksDBWalEngineBuilder { /// [ReplicatedEngine] builder. #[derive(Default)] -pub struct ObkvWalEngineBuilder; +pub struct ObkvWalsOpener; #[async_trait] -impl WalsOpener for ObkvWalEngineBuilder { +impl WalsOpener for ObkvWalsOpener { async fn open_wals( &self, config: &WalStorageConfig, @@ -239,12 +239,12 @@ impl WalsOpener for ObkvWalEngineBuilder { /// All engine built by this builder share same [MemoryImpl] instance, so the /// data wrote by the engine still remains after the engine dropped. #[derive(Default)] -pub struct MemWalEngineBuilder { +pub struct MemWalsOpener { table_kv: MemoryImpl, } #[async_trait] -impl WalsOpener for MemWalEngineBuilder { +impl WalsOpener for MemWalsOpener { async fn open_wals( &self, config: &WalStorageConfig, @@ -272,10 +272,10 @@ impl WalsOpener for MemWalEngineBuilder { } #[derive(Default)] -pub struct KafkaWalEngineBuilder; +pub struct KafkaWalsOpener; #[async_trait] -impl WalsOpener for KafkaWalEngineBuilder { +impl WalsOpener for KafkaWalsOpener { async fn open_wals( &self, config: &WalStorageConfig, diff --git a/analytic_engine/src/tests/alter_test.rs b/analytic_engine/src/tests/alter_test.rs index b815e30618..1acdf137c5 100644 --- a/analytic_engine/src/tests/alter_test.rs +++ b/analytic_engine/src/tests/alter_test.rs @@ -20,7 +20,8 @@ use crate::{ row_util, table::{self, FixedSchemaTable}, util::{ - EngineContext, MemoryEngineContext, Null, RocksDBEngineContext, TestContext, TestEnv, + EngineBuildContext, MemoryEngineBuildContext, Null, RocksDBEngineContext, TestContext, + TestEnv, }, }, }; @@ -34,11 +35,11 @@ fn test_alter_table_add_column_rocks() { #[ignore = "Enable this test when manifest use another snapshot implementation"] #[test] fn test_alter_table_add_column_mem_wal() { - let memory_ctx = MemoryEngineContext::default(); + let memory_ctx = MemoryEngineBuildContext::default(); test_alter_table_add_column(memory_ctx); } -fn test_alter_table_add_column(engine_context: T) { +fn test_alter_table_add_column(engine_context: T) { let env = TestEnv::builder().build(); let mut test_ctx = env.new_context(engine_context); @@ -109,7 +110,7 @@ fn add_columns(schema_builder: schema::Builder) -> schema::Builder { .unwrap() } -async fn alter_schema_same_schema_version_case( +async fn alter_schema_same_schema_version_case( test_ctx: &TestContext, table_name: &str, ) { @@ -131,7 +132,7 @@ async fn alter_schema_same_schema_version_case( assert!(res.is_err()); } -async fn alter_schema_old_pre_version_case( +async fn alter_schema_old_pre_version_case( test_ctx: &TestContext, table_name: &str, ) { @@ -157,7 +158,7 @@ async fn alter_schema_old_pre_version_case( assert!(res.is_err()); } -async fn alter_schema_add_column_case( +async fn alter_schema_add_column_case( test_ctx: &mut TestContext, table_name: &str, start_ms: i64, @@ -345,7 +346,7 @@ async fn alter_schema_add_column_case( .await; } -async fn check_read_row_group( +async fn check_read_row_group( test_ctx: &TestContext, msg: &str, table_name: &str, @@ -375,11 +376,11 @@ fn test_alter_table_options_rocks() { #[ignore = "Enable this test when manifest use another snapshot implementation"] #[test] fn test_alter_table_options_mem_wal() { - let memory_ctx = MemoryEngineContext::default(); + let memory_ctx = MemoryEngineBuildContext::default(); test_alter_table_options(memory_ctx); } -fn test_alter_table_options(engine_context: T) { +fn test_alter_table_options(engine_context: T) { let env = TestEnv::builder().build(); let mut test_ctx = env.new_context(engine_context); @@ -418,7 +419,7 @@ fn test_alter_table_options(engine_context: T) { }); } -async fn alter_immutable_option_case( +async fn alter_immutable_option_case( test_ctx: &TestContext, table_name: &str, opt_key: &str, @@ -439,7 +440,7 @@ async fn alter_immutable_option_case( assert_options_eq(&old_opts, &opts_after_alter); } -async fn alter_mutable_option_case( +async fn alter_mutable_option_case( test_ctx: &mut TestContext, table_name: &str, opt_key: &str, diff --git a/analytic_engine/src/tests/compaction_test.rs b/analytic_engine/src/tests/compaction_test.rs index 9e96a75498..1146cbe93e 100644 --- a/analytic_engine/src/tests/compaction_test.rs +++ b/analytic_engine/src/tests/compaction_test.rs @@ -7,7 +7,9 @@ use table_engine::table::FlushRequest; use crate::{ compaction::SizeTieredCompactionOptions, - tests::util::{self, EngineContext, MemoryEngineContext, RocksDBEngineContext, TestEnv}, + tests::util::{ + self, EngineBuildContext, MemoryEngineBuildContext, RocksDBEngineContext, TestEnv, + }, }; #[test] @@ -20,11 +22,11 @@ fn test_table_compact_current_segment_rocks() { #[test] #[ignore = "https://github.com/CeresDB/ceresdb/issues/427"] fn test_table_compact_current_segment_mem_wal() { - let memory_ctx = MemoryEngineContext::default(); + let memory_ctx = MemoryEngineBuildContext::default(); test_table_compact_current_segment(memory_ctx); } -fn test_table_compact_current_segment(engine_context: T) { +fn test_table_compact_current_segment(engine_context: T) { let env = TestEnv::builder().build(); let mut test_ctx = env.new_context(engine_context); diff --git a/analytic_engine/src/tests/drop_test.rs b/analytic_engine/src/tests/drop_test.rs index 767ffa2bc9..76f92cdc2f 100644 --- a/analytic_engine/src/tests/drop_test.rs +++ b/analytic_engine/src/tests/drop_test.rs @@ -9,7 +9,7 @@ use table_engine::table::AlterSchemaRequest; use crate::tests::{ table::FixedSchemaTable, - util::{self, EngineContext, MemoryEngineContext, RocksDBEngineContext, TestEnv}, + util::{self, EngineBuildContext, MemoryEngineBuildContext, RocksDBEngineContext, TestEnv}, }; #[test] @@ -20,11 +20,11 @@ fn test_drop_table_once_rocks() { #[test] fn test_drop_table_once_mem_wal() { - let memory_ctx = MemoryEngineContext::default(); + let memory_ctx = MemoryEngineBuildContext::default(); test_drop_table_once(memory_ctx); } -fn test_drop_table_once(engine_context: T) { +fn test_drop_table_once(engine_context: T) { let env = TestEnv::builder().build(); let mut test_ctx = env.new_context(engine_context); @@ -63,11 +63,11 @@ fn test_drop_table_again_rocks() { #[test] fn test_drop_table_again_mem_wal() { - let memory_ctx = MemoryEngineContext::default(); + let memory_ctx = MemoryEngineBuildContext::default(); test_drop_table_again(memory_ctx); } -fn test_drop_table_again(engine_context: T) { +fn test_drop_table_again(engine_context: T) { let env = TestEnv::builder().build(); let mut test_ctx = env.new_context(engine_context); @@ -100,11 +100,11 @@ fn test_drop_create_table_mixed_rocks() { #[test] fn test_drop_create_table_mixed_mem_wal() { - let memory_ctx = MemoryEngineContext::default(); + let memory_ctx = MemoryEngineBuildContext::default(); test_drop_create_table_mixed(memory_ctx); } -fn test_drop_create_table_mixed(engine_context: T) { +fn test_drop_create_table_mixed(engine_context: T) { let env = TestEnv::builder().build(); let mut test_ctx = env.new_context(engine_context); @@ -148,7 +148,7 @@ fn test_drop_create_table_mixed(engine_context: T) { }); } -fn test_drop_create_same_table_case(flush: bool, engine_context: T) { +fn test_drop_create_same_table_case(flush: bool, engine_context: T) { let env = TestEnv::builder().build(); let mut test_ctx = env.new_context(engine_context); @@ -213,11 +213,11 @@ fn test_drop_create_same_table_rocks() { #[test] fn test_drop_create_same_table_mem_wal() { - let memory_ctx = MemoryEngineContext::default(); + let memory_ctx = MemoryEngineBuildContext::default(); test_drop_create_same_table(memory_ctx); } -fn test_drop_create_same_table(engine_context: T) { +fn test_drop_create_same_table(engine_context: T) { test_drop_create_same_table_case::(false, engine_context.clone()); test_drop_create_same_table_case::(true, engine_context); @@ -231,11 +231,11 @@ fn test_alter_schema_drop_create_rocks() { #[test] fn test_alter_schema_drop_create_mem_wal() { - let memory_ctx = MemoryEngineContext::default(); + let memory_ctx = MemoryEngineBuildContext::default(); test_alter_schema_drop_create(memory_ctx); } -fn test_alter_schema_drop_create(engine_context: T) { +fn test_alter_schema_drop_create(engine_context: T) { let env = TestEnv::builder().build(); let mut test_ctx = env.new_context(engine_context); @@ -288,11 +288,11 @@ fn test_alter_options_drop_create_rocks() { #[test] fn test_alter_options_drop_create_mem_wal() { - let memory_ctx = MemoryEngineContext::default(); + let memory_ctx = MemoryEngineBuildContext::default(); test_alter_options_drop_create(memory_ctx); } -fn test_alter_options_drop_create(engine_context: T) { +fn test_alter_options_drop_create(engine_context: T) { let env = TestEnv::builder().build(); let mut test_ctx = env.new_context(engine_context); diff --git a/analytic_engine/src/tests/open_test.rs b/analytic_engine/src/tests/open_test.rs index c9b98d36a9..60c0869e08 100644 --- a/analytic_engine/src/tests/open_test.rs +++ b/analytic_engine/src/tests/open_test.rs @@ -2,7 +2,9 @@ //! Engine open test. -use crate::tests::util::{EngineContext, MemoryEngineContext, RocksDBEngineContext, TestEnv}; +use crate::tests::util::{ + EngineBuildContext, MemoryEngineBuildContext, RocksDBEngineContext, TestEnv, +}; #[test] fn test_open_engine_rocks() { @@ -12,11 +14,11 @@ fn test_open_engine_rocks() { #[test] fn test_open_engine_mem_wal() { - let memory_ctx = MemoryEngineContext::default(); + let memory_ctx = MemoryEngineBuildContext::default(); test_open_engine(memory_ctx); } -fn test_open_engine(engine_context: T) { +fn test_open_engine(engine_context: T) { let env = TestEnv::builder().build(); let mut test_ctx = env.new_context(engine_context); diff --git a/analytic_engine/src/tests/read_write_test.rs b/analytic_engine/src/tests/read_write_test.rs index da4344104c..40752153f4 100644 --- a/analytic_engine/src/tests/read_write_test.rs +++ b/analytic_engine/src/tests/read_write_test.rs @@ -11,7 +11,8 @@ use table_engine::table::ReadOrder; use crate::{ table_options, tests::util::{ - self, EngineContext, MemoryEngineContext, RocksDBEngineContext, TestContext, TestEnv, + self, EngineBuildContext, MemoryEngineBuildContext, RocksDBEngineContext, TestContext, + TestEnv, }, }; @@ -23,11 +24,11 @@ fn test_multi_table_read_write_rocks() { #[test] fn test_multi_table_read_write_mem_wal() { - let memory_ctx = MemoryEngineContext::default(); + let memory_ctx = MemoryEngineBuildContext::default(); test_multi_table_read_write(memory_ctx); } -fn test_multi_table_read_write(engine_context: T) { +fn test_multi_table_read_write(engine_context: T) { let env = TestEnv::builder().build(); let mut test_ctx = env.new_context(engine_context); @@ -175,11 +176,11 @@ fn test_table_write_read_rocks() { #[test] fn test_table_write_read_mem_wal() { - let memory_ctx = MemoryEngineContext::default(); + let memory_ctx = MemoryEngineBuildContext::default(); test_table_write_read(memory_ctx); } -fn test_table_write_read(engine_context: T) { +fn test_table_write_read(engine_context: T) { let env = TestEnv::builder().build(); let mut test_ctx = env.new_context(engine_context); @@ -254,11 +255,11 @@ fn test_table_write_get_rocks() { #[test] fn test_table_write_get_mem_wal() { - let memory_ctx = MemoryEngineContext::default(); + let memory_ctx = MemoryEngineBuildContext::default(); test_table_write_get(memory_ctx); } -fn test_table_write_get(engine_context: T) { +fn test_table_write_get(engine_context: T) { let env = TestEnv::builder().build(); let mut test_ctx = env.new_context(engine_context); @@ -330,10 +331,10 @@ fn test_table_write_get_override_rocks() { #[test] fn test_table_write_get_override_mem_wal() { - test_table_write_get_override::(); + test_table_write_get_override::(); } -fn test_table_write_get_override() { +fn test_table_write_get_override() { test_table_write_get_override_case::(FlushPoint::NoFlush, T::default()); test_table_write_get_override_case::(FlushPoint::AfterFirstWrite, T::default()); @@ -351,7 +352,7 @@ enum FlushPoint { FirstAndOverwrite, } -fn test_table_write_get_override_case( +fn test_table_write_get_override_case( flush_point: FlushPoint, engine_context: T, ) { @@ -511,15 +512,15 @@ fn test_db_write_buffer_size_rocks() { #[test] fn test_db_write_buffer_size_mem_wal() { - let memory_ctx = MemoryEngineContext::default(); + let memory_ctx = MemoryEngineBuildContext::default(); // Use different table name to avoid metrics collision. test_db_write_buffer_size("test_db_write_buffer_size_mem_wal", memory_ctx); } -fn test_db_write_buffer_size(table_name: &str, engine_context: T) { - let env = TestEnv::builder().build(); - let mut test_ctx = env.new_context(engine_context); - test_ctx.context.config.db_write_buffer_size = 1; +fn test_db_write_buffer_size(table_name: &str, engine_context: T) { + let mut env = TestEnv::builder().build(); + env.config.db_write_buffer_size = 1; + let test_ctx = env.new_context(engine_context); test_write_buffer_size_overflow(table_name, env, test_ctx); } @@ -532,19 +533,19 @@ fn test_space_write_buffer_size_rocks() { #[test] fn test_space_write_buffer_size_mem_wal() { - let memory_ctx = MemoryEngineContext::default(); + let memory_ctx = MemoryEngineBuildContext::default(); // Use different table name to avoid metrics collision. test_space_write_buffer_size("test_space_write_buffer_size_mem_wal", memory_ctx); } -fn test_space_write_buffer_size(table_name: &str, engine_context: T) { - let env = TestEnv::builder().build(); - let mut test_ctx = env.new_context(engine_context); - test_ctx.context.config.space_write_buffer_size = 1; +fn test_space_write_buffer_size(table_name: &str, engine_context: T) { + let mut env = TestEnv::builder().build(); + env.config.db_write_buffer_size = 1; + let test_ctx = env.new_context(engine_context); test_write_buffer_size_overflow(table_name, env, test_ctx); } -fn test_write_buffer_size_overflow( +fn test_write_buffer_size_overflow( test_table_name: &str, env: TestEnv, test_ctx: TestContext, @@ -665,11 +666,11 @@ fn test_table_write_read_reverse_rocks() { #[test] fn test_table_write_read_reverse_mem_wal() { - let memory_ctx = MemoryEngineContext::default(); + let memory_ctx = MemoryEngineBuildContext::default(); test_table_write_read_reverse(memory_ctx); } -fn test_table_write_read_reverse(engine_context: T) { +fn test_table_write_read_reverse(engine_context: T) { let env = TestEnv::builder().build(); let mut test_ctx = env.new_context(engine_context); @@ -752,11 +753,11 @@ fn test_table_write_read_reverse_after_flush_rocks() { #[test] #[ignore = "https://github.com/CeresDB/ceresdb/issues/313"] fn test_table_write_read_reverse_after_flush_mem_wal() { - let memory_ctx = MemoryEngineContext::default(); + let memory_ctx = MemoryEngineBuildContext::default(); test_table_write_read_reverse_after_flush(memory_ctx); } -fn test_table_write_read_reverse_after_flush(engine_context: T) { +fn test_table_write_read_reverse_after_flush(engine_context: T) { let env = TestEnv::builder().build(); let mut test_ctx = env.new_context(engine_context); diff --git a/analytic_engine/src/tests/util.rs b/analytic_engine/src/tests/util.rs index eb657772cc..3ff8500902 100644 --- a/analytic_engine/src/tests/util.rs +++ b/analytic_engine/src/tests/util.rs @@ -30,10 +30,7 @@ use table_engine::{ use tempfile::TempDir; use crate::{ - setup::{ - EngineBuildContext, EngineBuildContextBuilder, MemWalEngineBuilder, - RocksDBWalEngineBuilder, WalsOpener, - }, + setup::{EngineBuilder, MemWalsOpener, RocksDBWalsOpener, WalsOpener}, storage_options::{LocalOptions, ObjectStoreOptions, StorageOptions}, tests::table::{self, FixedSchemaTable, RowTuple}, Config, RocksDBConfig, WalStorageConfig, @@ -50,7 +47,7 @@ impl From for Datum { } } -pub async fn check_read_with_order( +pub async fn check_read_with_order( test_ctx: &TestContext, fixed_schema_table: &FixedSchemaTable, msg: &str, @@ -72,7 +69,7 @@ pub async fn check_read_with_order( } } -pub async fn check_read( +pub async fn check_read( test_ctx: &TestContext, fixed_schema_table: &FixedSchemaTable, msg: &str, @@ -90,7 +87,7 @@ pub async fn check_read( .await } -pub async fn check_get( +pub async fn check_get( test_ctx: &TestContext, fixed_schema_table: &FixedSchemaTable, msg: &str, @@ -108,10 +105,9 @@ pub async fn check_get( } } -pub struct TestContext { - pub context: EngineBuildContext, +pub struct TestContext { runtimes: Arc, - builder: T::EngineBuilder, + context: T, pub engine: Option, pub schema_id: SchemaId, last_table_seq: u32, @@ -119,14 +115,23 @@ pub struct TestContext { name_to_tables: HashMap, } -impl TestContext { +impl TestContext { pub async fn open(&mut self) { - let engine = self - .builder - .build(self.context.clone(), self.runtimes.clone()) + let config = self.context.config(); + let opened_wals = self + .context + .wal_opener() + .open_wals(&config.wal, self.runtimes.clone()) .await .unwrap(); - self.engine = Some(engine); + + let engine_builder = EngineBuilder { + config: &config, + router: None, + engine_runtimes: self.runtimes.clone(), + opened_wals, + }; + self.engine = Some(engine_builder.build().await.unwrap()); } pub async fn reopen(&mut self) { @@ -362,7 +367,7 @@ impl TestContext { } } -impl TestContext { +impl TestContext { pub fn clone_engine(&self) -> TableEngineRef { self.engine.clone().unwrap() } @@ -379,13 +384,10 @@ impl TestEnv { Builder::default() } - pub fn new_context(&self, engine_context: T) -> TestContext { + pub fn new_context(&self, engine_context: T) -> TestContext { TestContext { - context: EngineBuildContextBuilder::default() - .config(engine_context.config()) - .build(), + context: engine_context, runtimes: self.runtimes.clone(), - builder: engine_context.engine_builder(), engine: None, schema_id: SchemaId::from_u32(100), last_table_seq: 1, @@ -450,10 +452,10 @@ impl Default for Builder { } } -pub trait EngineContext: Clone + Default { - type EngineBuilder: WalsOpener; +pub trait EngineBuildContext: Clone + Default { + type WalOpener: WalsOpener; - fn engine_builder(&self) -> Self::EngineBuilder; + fn wal_opener(&self) -> Self::WalOpener; fn config(&self) -> Config; } @@ -512,11 +514,11 @@ impl Clone for RocksDBEngineContext { } } -impl EngineContext for RocksDBEngineContext { - type EngineBuilder = RocksDBWalEngineBuilder; +impl EngineBuildContext for RocksDBEngineContext { + type WalOpener = RocksDBWalsOpener; - fn engine_builder(&self) -> Self::EngineBuilder { - RocksDBWalEngineBuilder::default() + fn wal_opener(&self) -> Self::WalOpener { + RocksDBWalsOpener::default() } fn config(&self) -> Config { @@ -525,11 +527,11 @@ impl EngineContext for RocksDBEngineContext { } #[derive(Clone)] -pub struct MemoryEngineContext { +pub struct MemoryEngineBuildContext { config: Config, } -impl Default for MemoryEngineContext { +impl Default for MemoryEngineBuildContext { fn default() -> Self { let dir = tempfile::tempdir().unwrap(); @@ -552,11 +554,11 @@ impl Default for MemoryEngineContext { } } -impl EngineContext for MemoryEngineContext { - type EngineBuilder = MemWalEngineBuilder; +impl EngineBuildContext for MemoryEngineBuildContext { + type WalOpener = MemWalsOpener; - fn engine_builder(&self) -> Self::EngineBuilder { - MemWalEngineBuilder::default() + fn wal_opener(&self) -> Self::WalOpener { + MemWalsOpener::default() } fn config(&self) -> Config { diff --git a/catalog_impls/src/table_based.rs b/catalog_impls/src/table_based.rs index 71f9874e6b..4f0797c545 100644 --- a/catalog_impls/src/table_based.rs +++ b/catalog_impls/src/table_based.rs @@ -905,7 +905,7 @@ impl Schema for SchemaImpl { mod tests { use std::{collections::HashMap, sync::Arc}; - use analytic_engine::tests::util::{EngineContext, RocksDBEngineContext, TestEnv}; + use analytic_engine::tests::util::{EngineBuildContext, RocksDBEngineContext, TestEnv}; use catalog::{ consts::DEFAULT_CATALOG, manager::Manager, @@ -967,7 +967,7 @@ mod tests { async fn test_catalog_by_name_schema_by_name(engine_context: T) where - T: EngineContext, + T: EngineBuildContext, { let env = TestEnv::builder().build(); let mut test_ctx = env.new_context(engine_context); @@ -1013,7 +1013,7 @@ mod tests { async fn test_maybe_create_schema_by_name(engine_context: T) where - T: EngineContext, + T: EngineBuildContext, { let env = TestEnv::builder().build(); let mut test_ctx = env.new_context(engine_context); @@ -1043,7 +1043,7 @@ mod tests { test_create_table(rocksdb_ctx).await; } - async fn test_create_table(engine_context: T) { + async fn test_create_table(engine_context: T) { let env = TestEnv::builder().build(); let mut test_ctx = env.new_context(engine_context); test_ctx.open().await; @@ -1089,7 +1089,7 @@ mod tests { test_drop_table(rocksdb_ctx).await; } - async fn test_drop_table(engine_context: T) { + async fn test_drop_table(engine_context: T) { let env = TestEnv::builder().build(); let mut test_ctx = env.new_context(engine_context); test_ctx.open().await; diff --git a/interpreters/src/tests.rs b/interpreters/src/tests.rs index 3d3918e87d..9004bc672e 100644 --- a/interpreters/src/tests.rs +++ b/interpreters/src/tests.rs @@ -2,7 +2,7 @@ use std::sync::Arc; -use analytic_engine::tests::util::{EngineContext, RocksDBEngineContext, TestEnv}; +use analytic_engine::tests::util::{EngineBuildContext, RocksDBEngineContext, TestEnv}; use catalog::{ consts::{DEFAULT_CATALOG, DEFAULT_SCHEMA}, manager::ManagerRef, @@ -341,7 +341,7 @@ async fn test_interpreters_rocks() { test_interpreters(rocksdb_ctx).await; } -async fn test_interpreters(engine_context: T) { +async fn test_interpreters(engine_context: T) { let env = TestEnv::builder().build(); let mut test_ctx = env.new_context(engine_context); test_ctx.open().await; diff --git a/src/setup.rs b/src/setup.rs index d7e607bb39..2b05656b59 100644 --- a/src/setup.rs +++ b/src/setup.rs @@ -6,10 +6,7 @@ use std::sync::Arc; use analytic_engine::{ self, - setup::{ - EngineBuilder, KafkaWalEngineBuilder, ObkvWalEngineBuilder, RocksDBWalEngineBuilder, - WalsOpener, - }, + setup::{EngineBuilder, KafkaWalsOpener, ObkvWalsOpener, RocksDBWalsOpener, WalsOpener}, WalStorageConfig, }; use catalog::{manager::ManagerRef, schema::OpenOptions, CatalogRef}; @@ -85,30 +82,18 @@ pub fn run_server(config: Config, log_runtime: RuntimeLevel) { runtimes.bg_runtime.block_on(async { match config.analytic.wal { WalStorageConfig::RocksDB(_) => { - run_server_with_runtimes::( - config, - engine_runtimes, - log_runtime, - ) - .await + run_server_with_runtimes::(config, engine_runtimes, log_runtime) + .await } WalStorageConfig::Obkv(_) => { - run_server_with_runtimes::( - config, - engine_runtimes, - log_runtime, - ) - .await; + run_server_with_runtimes::(config, engine_runtimes, log_runtime) + .await; } WalStorageConfig::Kafka(_) => { - run_server_with_runtimes::( - config, - engine_runtimes, - log_runtime, - ) - .await; + run_server_with_runtimes::(config, engine_runtimes, log_runtime) + .await; } } }); @@ -181,7 +166,7 @@ async fn run_server_with_runtimes( } // Build proxy for all table engines. -async fn build_table_engine_proxy<'a>(engine_builder: EngineBuilder<'a>) -> Arc { +async fn build_table_engine_proxy(engine_builder: EngineBuilder<'_>) -> Arc { // Create memory engine let memory = MemoryTableEngine; // Create analytic engine From 2f1c4a1f15fc95c2b74e717bfaf18a5841f480d3 Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Mon, 6 Mar 2023 15:31:13 +0800 Subject: [PATCH 05/10] chore: rename context to build context --- analytic_engine/src/tests/alter_test.rs | 8 ++++---- analytic_engine/src/tests/compaction_test.rs | 4 ++-- analytic_engine/src/tests/drop_test.rs | 16 +++++++++------- analytic_engine/src/tests/open_test.rs | 4 ++-- analytic_engine/src/tests/read_write_test.rs | 18 +++++++++--------- analytic_engine/src/tests/util.rs | 8 ++++---- catalog_impls/src/table_based.rs | 10 +++++----- interpreters/src/tests.rs | 4 ++-- 8 files changed, 37 insertions(+), 35 deletions(-) diff --git a/analytic_engine/src/tests/alter_test.rs b/analytic_engine/src/tests/alter_test.rs index 1acdf137c5..3582cf95c6 100644 --- a/analytic_engine/src/tests/alter_test.rs +++ b/analytic_engine/src/tests/alter_test.rs @@ -20,15 +20,15 @@ use crate::{ row_util, table::{self, FixedSchemaTable}, util::{ - EngineBuildContext, MemoryEngineBuildContext, Null, RocksDBEngineContext, TestContext, - TestEnv, + EngineBuildContext, MemoryEngineBuildContext, Null, RocksDBEngineBuildContext, + TestContext, TestEnv, }, }, }; #[test] fn test_alter_table_add_column_rocks() { - let rocksdb_ctx = RocksDBEngineContext::default(); + let rocksdb_ctx = RocksDBEngineBuildContext::default(); test_alter_table_add_column(rocksdb_ctx); } @@ -369,7 +369,7 @@ async fn check_read_row_group( #[test] fn test_alter_table_options_rocks() { - let rocksdb_ctx = RocksDBEngineContext::default(); + let rocksdb_ctx = RocksDBEngineBuildContext::default(); test_alter_table_options(rocksdb_ctx); } diff --git a/analytic_engine/src/tests/compaction_test.rs b/analytic_engine/src/tests/compaction_test.rs index 1146cbe93e..85a2f57fbc 100644 --- a/analytic_engine/src/tests/compaction_test.rs +++ b/analytic_engine/src/tests/compaction_test.rs @@ -8,14 +8,14 @@ use table_engine::table::FlushRequest; use crate::{ compaction::SizeTieredCompactionOptions, tests::util::{ - self, EngineBuildContext, MemoryEngineBuildContext, RocksDBEngineContext, TestEnv, + self, EngineBuildContext, MemoryEngineBuildContext, RocksDBEngineBuildContext, TestEnv, }, }; #[test] #[ignore = "https://github.com/CeresDB/ceresdb/issues/427"] fn test_table_compact_current_segment_rocks() { - let rocksdb_ctx = RocksDBEngineContext::default(); + let rocksdb_ctx = RocksDBEngineBuildContext::default(); test_table_compact_current_segment(rocksdb_ctx); } diff --git a/analytic_engine/src/tests/drop_test.rs b/analytic_engine/src/tests/drop_test.rs index 76f92cdc2f..ab621c9866 100644 --- a/analytic_engine/src/tests/drop_test.rs +++ b/analytic_engine/src/tests/drop_test.rs @@ -9,12 +9,14 @@ use table_engine::table::AlterSchemaRequest; use crate::tests::{ table::FixedSchemaTable, - util::{self, EngineBuildContext, MemoryEngineBuildContext, RocksDBEngineContext, TestEnv}, + util::{ + self, EngineBuildContext, MemoryEngineBuildContext, RocksDBEngineBuildContext, TestEnv, + }, }; #[test] fn test_drop_table_once_rocks() { - let rocksdb_ctx = RocksDBEngineContext::default(); + let rocksdb_ctx = RocksDBEngineBuildContext::default(); test_drop_table_once(rocksdb_ctx); } @@ -57,7 +59,7 @@ fn test_drop_table_once(engine_context: T) { #[test] fn test_drop_table_again_rocks() { - let rocksdb_ctx = RocksDBEngineContext::default(); + let rocksdb_ctx = RocksDBEngineBuildContext::default(); test_drop_table_again(rocksdb_ctx); } @@ -94,7 +96,7 @@ fn test_drop_table_again(engine_context: T) { #[test] fn test_drop_create_table_mixed_rocks() { - let rocksdb_ctx = RocksDBEngineContext::default(); + let rocksdb_ctx = RocksDBEngineBuildContext::default(); test_drop_create_table_mixed(rocksdb_ctx); } @@ -207,7 +209,7 @@ fn test_drop_create_same_table_case(flush: bool, engine_c #[test] fn test_drop_create_same_table_rocks() { - let rocksdb_ctx = RocksDBEngineContext::default(); + let rocksdb_ctx = RocksDBEngineBuildContext::default(); test_drop_create_same_table(rocksdb_ctx); } @@ -225,7 +227,7 @@ fn test_drop_create_same_table(engine_context: T) { #[test] fn test_alter_schema_drop_create_rocks() { - let rocksdb_ctx = RocksDBEngineContext::default(); + let rocksdb_ctx = RocksDBEngineBuildContext::default(); test_alter_schema_drop_create(rocksdb_ctx); } @@ -282,7 +284,7 @@ fn test_alter_schema_drop_create(engine_context: T) { #[test] fn test_alter_options_drop_create_rocks() { - let rocksdb_ctx = RocksDBEngineContext::default(); + let rocksdb_ctx = RocksDBEngineBuildContext::default(); test_alter_options_drop_create(rocksdb_ctx); } diff --git a/analytic_engine/src/tests/open_test.rs b/analytic_engine/src/tests/open_test.rs index 60c0869e08..e3294d731c 100644 --- a/analytic_engine/src/tests/open_test.rs +++ b/analytic_engine/src/tests/open_test.rs @@ -3,12 +3,12 @@ //! Engine open test. use crate::tests::util::{ - EngineBuildContext, MemoryEngineBuildContext, RocksDBEngineContext, TestEnv, + EngineBuildContext, MemoryEngineBuildContext, RocksDBEngineBuildContext, TestEnv, }; #[test] fn test_open_engine_rocks() { - let rocksdb_ctx = RocksDBEngineContext::default(); + let rocksdb_ctx = RocksDBEngineBuildContext::default(); test_open_engine(rocksdb_ctx); } diff --git a/analytic_engine/src/tests/read_write_test.rs b/analytic_engine/src/tests/read_write_test.rs index 40752153f4..4c38028d77 100644 --- a/analytic_engine/src/tests/read_write_test.rs +++ b/analytic_engine/src/tests/read_write_test.rs @@ -11,14 +11,14 @@ use table_engine::table::ReadOrder; use crate::{ table_options, tests::util::{ - self, EngineBuildContext, MemoryEngineBuildContext, RocksDBEngineContext, TestContext, + self, EngineBuildContext, MemoryEngineBuildContext, RocksDBEngineBuildContext, TestContext, TestEnv, }, }; #[test] fn test_multi_table_read_write_rocks() { - let rocksdb_ctx = RocksDBEngineContext::default(); + let rocksdb_ctx = RocksDBEngineBuildContext::default(); test_multi_table_read_write(rocksdb_ctx); } @@ -170,7 +170,7 @@ fn test_multi_table_read_write(engine_context: T) { #[test] fn test_table_write_read_rocks() { - let rocksdb_ctx = RocksDBEngineContext::default(); + let rocksdb_ctx = RocksDBEngineBuildContext::default(); test_table_write_read(rocksdb_ctx); } @@ -249,7 +249,7 @@ fn test_table_write_read(engine_context: T) { #[test] fn test_table_write_get_rocks() { - let rocksdb_ctx = RocksDBEngineContext::default(); + let rocksdb_ctx = RocksDBEngineBuildContext::default(); test_table_write_get(rocksdb_ctx); } @@ -326,7 +326,7 @@ fn test_table_write_get(engine_context: T) { #[test] fn test_table_write_get_override_rocks() { - test_table_write_get_override::(); + test_table_write_get_override::(); } #[test] @@ -505,7 +505,7 @@ fn test_table_write_get_override_case( #[test] fn test_db_write_buffer_size_rocks() { - let rocksdb_ctx = RocksDBEngineContext::default(); + let rocksdb_ctx = RocksDBEngineBuildContext::default(); // Use different table name to avoid metrics collision. test_db_write_buffer_size("test_db_write_buffer_size_rocks", rocksdb_ctx); } @@ -526,7 +526,7 @@ fn test_db_write_buffer_size(table_name: &str, engine_con #[test] fn test_space_write_buffer_size_rocks() { - let rocksdb_ctx = RocksDBEngineContext::default(); + let rocksdb_ctx = RocksDBEngineBuildContext::default(); // Use different table name to avoid metrics collision. test_space_write_buffer_size("test_space_write_buffer_size_rocks", rocksdb_ctx); } @@ -660,7 +660,7 @@ fn test_write_buffer_size_overflow( #[test] fn test_table_write_read_reverse_rocks() { - let rocksdb_ctx = RocksDBEngineContext::default(); + let rocksdb_ctx = RocksDBEngineBuildContext::default(); test_table_write_read_reverse(rocksdb_ctx); } @@ -746,7 +746,7 @@ fn test_table_write_read_reverse(engine_context: T) { #[test] #[ignore = "https://github.com/CeresDB/ceresdb/issues/313"] fn test_table_write_read_reverse_after_flush_rocks() { - let rocksdb_ctx = RocksDBEngineContext::default(); + let rocksdb_ctx = RocksDBEngineBuildContext::default(); test_table_write_read_reverse_after_flush(rocksdb_ctx); } diff --git a/analytic_engine/src/tests/util.rs b/analytic_engine/src/tests/util.rs index 3ff8500902..8c06e621ca 100644 --- a/analytic_engine/src/tests/util.rs +++ b/analytic_engine/src/tests/util.rs @@ -459,11 +459,11 @@ pub trait EngineBuildContext: Clone + Default { fn config(&self) -> Config; } -pub struct RocksDBEngineContext { +pub struct RocksDBEngineBuildContext { config: Config, } -impl Default for RocksDBEngineContext { +impl Default for RocksDBEngineBuildContext { fn default() -> Self { let dir = tempfile::tempdir().unwrap(); @@ -489,7 +489,7 @@ impl Default for RocksDBEngineContext { } } -impl Clone for RocksDBEngineContext { +impl Clone for RocksDBEngineBuildContext { fn clone(&self) -> Self { let mut config = self.config.clone(); @@ -514,7 +514,7 @@ impl Clone for RocksDBEngineContext { } } -impl EngineBuildContext for RocksDBEngineContext { +impl EngineBuildContext for RocksDBEngineBuildContext { type WalOpener = RocksDBWalsOpener; fn wal_opener(&self) -> Self::WalOpener { diff --git a/catalog_impls/src/table_based.rs b/catalog_impls/src/table_based.rs index 4f0797c545..03cc1dd7bc 100644 --- a/catalog_impls/src/table_based.rs +++ b/catalog_impls/src/table_based.rs @@ -905,7 +905,7 @@ impl Schema for SchemaImpl { mod tests { use std::{collections::HashMap, sync::Arc}; - use analytic_engine::tests::util::{EngineBuildContext, RocksDBEngineContext, TestEnv}; + use analytic_engine::tests::util::{EngineBuildContext, RocksDBEngineBuildContext, TestEnv}; use catalog::{ consts::DEFAULT_CATALOG, manager::Manager, @@ -961,7 +961,7 @@ mod tests { #[tokio::test] async fn test_catalog_by_name_schema_by_name_rocks() { - let rocksdb_ctx = RocksDBEngineContext::default(); + let rocksdb_ctx = RocksDBEngineBuildContext::default(); test_catalog_by_name_schema_by_name(rocksdb_ctx).await; } @@ -1007,7 +1007,7 @@ mod tests { #[tokio::test] async fn test_maybe_create_schema_by_name_rocks() { - let rocksdb_ctx = RocksDBEngineContext::default(); + let rocksdb_ctx = RocksDBEngineBuildContext::default(); test_maybe_create_schema_by_name(rocksdb_ctx).await; } @@ -1039,7 +1039,7 @@ mod tests { #[tokio::test] async fn test_create_table_rocks() { - let rocksdb_ctx = RocksDBEngineContext::default(); + let rocksdb_ctx = RocksDBEngineBuildContext::default(); test_create_table(rocksdb_ctx).await; } @@ -1085,7 +1085,7 @@ mod tests { #[tokio::test] async fn test_drop_table_rocks() { - let rocksdb_ctx = RocksDBEngineContext::default(); + let rocksdb_ctx = RocksDBEngineBuildContext::default(); test_drop_table(rocksdb_ctx).await; } diff --git a/interpreters/src/tests.rs b/interpreters/src/tests.rs index 9004bc672e..9e532d0dbe 100644 --- a/interpreters/src/tests.rs +++ b/interpreters/src/tests.rs @@ -2,7 +2,7 @@ use std::sync::Arc; -use analytic_engine::tests::util::{EngineBuildContext, RocksDBEngineContext, TestEnv}; +use analytic_engine::tests::util::{EngineBuildContext, RocksDBEngineBuildContext, TestEnv}; use catalog::{ consts::{DEFAULT_CATALOG, DEFAULT_SCHEMA}, manager::ManagerRef, @@ -337,7 +337,7 @@ where #[tokio::test] async fn test_interpreters_rocks() { common_util::tests::init_log_for_test(); - let rocksdb_ctx = RocksDBEngineContext::default(); + let rocksdb_ctx = RocksDBEngineBuildContext::default(); test_interpreters(rocksdb_ctx).await; } From c9294f610d6b4bbb90d39dd441f60c579f5bd833 Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Mon, 6 Mar 2023 16:06:19 +0800 Subject: [PATCH 06/10] fix: avoid wal dropped when reopen --- analytic_engine/src/tests/util.rs | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/analytic_engine/src/tests/util.rs b/analytic_engine/src/tests/util.rs index 8c06e621ca..5f0ed72bc5 100644 --- a/analytic_engine/src/tests/util.rs +++ b/analytic_engine/src/tests/util.rs @@ -30,7 +30,7 @@ use table_engine::{ use tempfile::TempDir; use crate::{ - setup::{EngineBuilder, MemWalsOpener, RocksDBWalsOpener, WalsOpener}, + setup::{EngineBuilder, MemWalsOpener, OpenedWals, RocksDBWalsOpener, WalsOpener}, storage_options::{LocalOptions, ObjectStoreOptions, StorageOptions}, tests::table::{self, FixedSchemaTable, RowTuple}, Config, RocksDBConfig, WalStorageConfig, @@ -109,6 +109,7 @@ pub struct TestContext { runtimes: Arc, context: T, pub engine: Option, + pub opened_wals: Option, pub schema_id: SchemaId, last_table_seq: u32, @@ -118,19 +119,23 @@ pub struct TestContext { impl TestContext { pub async fn open(&mut self) { let config = self.context.config(); - let opened_wals = self - .context - .wal_opener() - .open_wals(&config.wal, self.runtimes.clone()) - .await - .unwrap(); + let opened_wals = if let Some(opened_wals) = self.opened_wals.take() { + opened_wals + } else { + self.context + .wal_opener() + .open_wals(&config.wal, self.runtimes.clone()) + .await + .unwrap() + }; let engine_builder = EngineBuilder { config: &config, router: None, engine_runtimes: self.runtimes.clone(), - opened_wals, + opened_wals: opened_wals.clone(), }; + self.opened_wals = Some(opened_wals); self.engine = Some(engine_builder.build().await.unwrap()); } @@ -384,11 +389,12 @@ impl TestEnv { Builder::default() } - pub fn new_context(&self, engine_context: T) -> TestContext { + pub fn new_context(&self, build_context: T) -> TestContext { TestContext { - context: engine_context, + context: build_context, runtimes: self.runtimes.clone(), engine: None, + opened_wals: None, schema_id: SchemaId::from_u32(100), last_table_seq: 1, name_to_tables: HashMap::new(), From 7dda429cd50997c64adef5d7dfe1230b52409634 Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Mon, 6 Mar 2023 16:19:42 +0800 Subject: [PATCH 07/10] chore: avoid useless config updating in ut --- analytic_engine/src/tests/alter_test.rs | 13 ++++---- analytic_engine/src/tests/read_write_test.rs | 3 +- analytic_engine/src/tests/util.rs | 32 +++++++++++--------- 3 files changed, 26 insertions(+), 22 deletions(-) diff --git a/analytic_engine/src/tests/alter_test.rs b/analytic_engine/src/tests/alter_test.rs index 3582cf95c6..ef982960b1 100644 --- a/analytic_engine/src/tests/alter_test.rs +++ b/analytic_engine/src/tests/alter_test.rs @@ -15,6 +15,7 @@ use log::info; use table_engine::table::AlterSchemaRequest; use crate::{ + setup::WalsOpener, table_options::TableOptions, tests::{ row_util, @@ -110,7 +111,7 @@ fn add_columns(schema_builder: schema::Builder) -> schema::Builder { .unwrap() } -async fn alter_schema_same_schema_version_case( +async fn alter_schema_same_schema_version_case( test_ctx: &TestContext, table_name: &str, ) { @@ -132,7 +133,7 @@ async fn alter_schema_same_schema_version_case( assert!(res.is_err()); } -async fn alter_schema_old_pre_version_case( +async fn alter_schema_old_pre_version_case( test_ctx: &TestContext, table_name: &str, ) { @@ -158,7 +159,7 @@ async fn alter_schema_old_pre_version_case( assert!(res.is_err()); } -async fn alter_schema_add_column_case( +async fn alter_schema_add_column_case( test_ctx: &mut TestContext, table_name: &str, start_ms: i64, @@ -346,7 +347,7 @@ async fn alter_schema_add_column_case( .await; } -async fn check_read_row_group( +async fn check_read_row_group( test_ctx: &TestContext, msg: &str, table_name: &str, @@ -419,7 +420,7 @@ fn test_alter_table_options(engine_context: T) { }); } -async fn alter_immutable_option_case( +async fn alter_immutable_option_case( test_ctx: &TestContext, table_name: &str, opt_key: &str, @@ -440,7 +441,7 @@ async fn alter_immutable_option_case( assert_options_eq(&old_opts, &opts_after_alter); } -async fn alter_mutable_option_case( +async fn alter_mutable_option_case( test_ctx: &mut TestContext, table_name: &str, opt_key: &str, diff --git a/analytic_engine/src/tests/read_write_test.rs b/analytic_engine/src/tests/read_write_test.rs index 4c38028d77..0b912218b5 100644 --- a/analytic_engine/src/tests/read_write_test.rs +++ b/analytic_engine/src/tests/read_write_test.rs @@ -9,6 +9,7 @@ use log::info; use table_engine::table::ReadOrder; use crate::{ + setup::WalsOpener, table_options, tests::util::{ self, EngineBuildContext, MemoryEngineBuildContext, RocksDBEngineBuildContext, TestContext, @@ -545,7 +546,7 @@ fn test_space_write_buffer_size(table_name: &str, engine_ test_write_buffer_size_overflow(table_name, env, test_ctx); } -fn test_write_buffer_size_overflow( +fn test_write_buffer_size_overflow( test_table_name: &str, env: TestEnv, test_ctx: TestContext, diff --git a/analytic_engine/src/tests/util.rs b/analytic_engine/src/tests/util.rs index 5f0ed72bc5..cd7a6f9727 100644 --- a/analytic_engine/src/tests/util.rs +++ b/analytic_engine/src/tests/util.rs @@ -47,7 +47,7 @@ impl From for Datum { } } -pub async fn check_read_with_order( +pub async fn check_read_with_order( test_ctx: &TestContext, fixed_schema_table: &FixedSchemaTable, msg: &str, @@ -69,7 +69,7 @@ pub async fn check_read_with_order( } } -pub async fn check_read( +pub async fn check_read( test_ctx: &TestContext, fixed_schema_table: &FixedSchemaTable, msg: &str, @@ -87,7 +87,7 @@ pub async fn check_read( .await } -pub async fn check_get( +pub async fn check_get( test_ctx: &TestContext, fixed_schema_table: &FixedSchemaTable, msg: &str, @@ -105,9 +105,10 @@ pub async fn check_get( } } -pub struct TestContext { +pub struct TestContext { + config: Config, + wals_opener: T, runtimes: Arc, - context: T, pub engine: Option, pub opened_wals: Option, pub schema_id: SchemaId, @@ -116,21 +117,19 @@ pub struct TestContext { name_to_tables: HashMap, } -impl TestContext { +impl TestContext { pub async fn open(&mut self) { - let config = self.context.config(); let opened_wals = if let Some(opened_wals) = self.opened_wals.take() { opened_wals } else { - self.context - .wal_opener() - .open_wals(&config.wal, self.runtimes.clone()) + self.wals_opener + .open_wals(&self.config.wal, self.runtimes.clone()) .await .unwrap() }; let engine_builder = EngineBuilder { - config: &config, + config: &self.config, router: None, engine_runtimes: self.runtimes.clone(), opened_wals: opened_wals.clone(), @@ -372,7 +371,7 @@ impl TestContext { } } -impl TestContext { +impl TestContext { pub fn clone_engine(&self) -> TableEngineRef { self.engine.clone().unwrap() } @@ -390,8 +389,11 @@ impl TestEnv { } pub fn new_context(&self, build_context: T) -> TestContext { + let config = build_context.config(); + let wals_opener = build_context.wals_opener(); TestContext { - context: build_context, + config, + wals_opener, runtimes: self.runtimes.clone(), engine: None, opened_wals: None, @@ -459,9 +461,9 @@ impl Default for Builder { } pub trait EngineBuildContext: Clone + Default { - type WalOpener: WalsOpener; + type WalsOpener: WalsOpener; - fn wal_opener(&self) -> Self::WalOpener; + fn wal_opener(&self) -> Self::WalsOpener; fn config(&self) -> Config; } From bedb2646755b1f92642d0da74e17535a6f5cb493 Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Mon, 6 Mar 2023 16:30:04 +0800 Subject: [PATCH 08/10] fix: wrong config for test write buffer --- analytic_engine/src/tests/read_write_test.rs | 15 ++++++----- analytic_engine/src/tests/util.rs | 26 +++++++++++++------- 2 files changed, 24 insertions(+), 17 deletions(-) diff --git a/analytic_engine/src/tests/read_write_test.rs b/analytic_engine/src/tests/read_write_test.rs index 0b912218b5..783f46aa42 100644 --- a/analytic_engine/src/tests/read_write_test.rs +++ b/analytic_engine/src/tests/read_write_test.rs @@ -519,9 +519,9 @@ fn test_db_write_buffer_size_mem_wal() { } fn test_db_write_buffer_size(table_name: &str, engine_context: T) { - let mut env = TestEnv::builder().build(); - env.config.db_write_buffer_size = 1; - let test_ctx = env.new_context(engine_context); + let env = TestEnv::builder().build(); + let mut test_ctx = env.new_context(engine_context); + test_ctx.config_mut().db_write_buffer_size = 1; test_write_buffer_size_overflow(table_name, env, test_ctx); } @@ -540,18 +540,17 @@ fn test_space_write_buffer_size_mem_wal() { } fn test_space_write_buffer_size(table_name: &str, engine_context: T) { - let mut env = TestEnv::builder().build(); - env.config.db_write_buffer_size = 1; - let test_ctx = env.new_context(engine_context); + let env = TestEnv::builder().build(); + let mut test_ctx = env.new_context(engine_context); + test_ctx.config_mut().space_write_buffer_size = 1; test_write_buffer_size_overflow(table_name, env, test_ctx); } fn test_write_buffer_size_overflow( test_table_name: &str, env: TestEnv, - test_ctx: TestContext, + mut test_ctx: TestContext, ) { - let mut test_ctx = test_ctx; env.block_on(async { test_ctx.open().await; diff --git a/analytic_engine/src/tests/util.rs b/analytic_engine/src/tests/util.rs index cd7a6f9727..f5912a7ca0 100644 --- a/analytic_engine/src/tests/util.rs +++ b/analytic_engine/src/tests/util.rs @@ -109,9 +109,9 @@ pub struct TestContext { config: Config, wals_opener: T, runtimes: Arc, - pub engine: Option, - pub opened_wals: Option, - pub schema_id: SchemaId, + engine: Option, + opened_wals: Option, + schema_id: SchemaId, last_table_seq: u32, name_to_tables: HashMap, @@ -372,6 +372,10 @@ impl TestContext { } impl TestContext { + pub fn config_mut(&mut self) -> &mut Config { + &mut self.config + } + pub fn clone_engine(&self) -> TableEngineRef { self.engine.clone().unwrap() } @@ -388,9 +392,13 @@ impl TestEnv { Builder::default() } - pub fn new_context(&self, build_context: T) -> TestContext { + pub fn new_context( + &self, + build_context: T, + ) -> TestContext { let config = build_context.config(); let wals_opener = build_context.wals_opener(); + TestContext { config, wals_opener, @@ -463,7 +471,7 @@ impl Default for Builder { pub trait EngineBuildContext: Clone + Default { type WalsOpener: WalsOpener; - fn wal_opener(&self) -> Self::WalsOpener; + fn wals_opener(&self) -> Self::WalsOpener; fn config(&self) -> Config; } @@ -523,9 +531,9 @@ impl Clone for RocksDBEngineBuildContext { } impl EngineBuildContext for RocksDBEngineBuildContext { - type WalOpener = RocksDBWalsOpener; + type WalsOpener = RocksDBWalsOpener; - fn wal_opener(&self) -> Self::WalOpener { + fn wals_opener(&self) -> Self::WalsOpener { RocksDBWalsOpener::default() } @@ -563,9 +571,9 @@ impl Default for MemoryEngineBuildContext { } impl EngineBuildContext for MemoryEngineBuildContext { - type WalOpener = MemWalsOpener; + type WalsOpener = MemWalsOpener; - fn wal_opener(&self) -> Self::WalOpener { + fn wals_opener(&self) -> Self::WalsOpener { MemWalsOpener::default() } From 30b697d753666cf3021a97e7d1fe7d4e38557f27 Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Mon, 6 Mar 2023 20:41:13 +0800 Subject: [PATCH 09/10] fix: implement close region for table-kv based wal --- wal/src/table_kv_impl/namespace.rs | 16 +++++++++++++++- wal/src/table_kv_impl/wal.rs | 12 +++++++----- wal/src/tests/read_write.rs | 8 ++++---- 3 files changed, 26 insertions(+), 10 deletions(-) diff --git a/wal/src/table_kv_impl/namespace.rs b/wal/src/table_kv_impl/namespace.rs index b517517352..7069825061 100644 --- a/wal/src/table_kv_impl/namespace.rs +++ b/wal/src/table_kv_impl/namespace.rs @@ -26,7 +26,8 @@ use crate::{ kv_encoder::CommonLogKey, log_batch::LogWriteBatch, manager::{ - self, ReadContext, ReadRequest, ScanContext, ScanRequest, SequenceNumber, WalLocation, + self, ReadContext, ReadRequest, RegionId, ScanContext, ScanRequest, SequenceNumber, + WalLocation, }, table_kv_impl::{ consts, encoding, @@ -636,6 +637,15 @@ impl NamespaceInner { Ok(common_types::MIN_SEQUENCE_NUMBER) } + /// Close the region. + async fn close_region(&self, region_id: RegionId) -> Result<()> { + let mut table_units = self.table_units.write().unwrap(); + // remote the table unit belongs to this region. + table_units.retain(|_, v| v.region_id() != region_id); + + Ok(()) + } + /// Read log from this namespace. Note that the iterating the iterator may /// still block caller thread now. async fn read_log(&self, ctx: &ReadContext, req: &ReadRequest) -> Result> { @@ -1114,6 +1124,10 @@ impl Namespace { self.inner.last_sequence(location).await } + pub async fn close_region(&self, region_id: RegionId) -> Result<()> { + self.inner.close_region(region_id).await + } + /// Read log from this namespace. Note that the iterating the iterator may /// still block caller thread now. pub async fn read_log( diff --git a/wal/src/table_kv_impl/wal.rs b/wal/src/table_kv_impl/wal.rs index 660f804ab3..b71b01a685 100644 --- a/wal/src/table_kv_impl/wal.rs +++ b/wal/src/table_kv_impl/wal.rs @@ -7,7 +7,7 @@ use std::{fmt, str, sync::Arc}; use async_trait::async_trait; use common_types::SequenceNumber; use common_util::error::BoxError; -use log::{info, warn}; +use log::info; use snafu::ResultExt; use table_kv::TableKv; @@ -118,10 +118,12 @@ impl WalManager for WalNamespaceImpl { .context(Delete) } - async fn close_region(&self, region: RegionId) -> Result<()> { - warn!("Close region is not supported yet, region:{}", region); - - Ok(()) + async fn close_region(&self, region_id: RegionId) -> Result<()> { + self.namespace + .close_region(region_id) + .await + .box_err() + .context(CloseRegion { region: region_id }) } async fn close_gracefully(&self) -> Result<()> { diff --git a/wal/src/tests/read_write.rs b/wal/src/tests/read_write.rs index 96b8a0da45..697219b649 100644 --- a/wal/src/tests/read_write.rs +++ b/wal/src/tests/read_write.rs @@ -182,11 +182,12 @@ fn test_move_from_nodes(builder: B) { .await; // The table are move to node 2 but in the same shard, so its region id is still - // 0, but region version changed to 1 for distinguishing this moving. + // 0. + wal_1.close_region(region_id).await.unwrap(); let wal_2 = env.build_wal().await; simple_read_write_with_range_and_wal( &env, - wal_2, + wal_2.clone(), WalLocation::new(region_id, table_id), 10, 20, @@ -194,8 +195,7 @@ fn test_move_from_nodes(builder: B) { .await; // Finally, the table with the same shard is moved to node 1 again. - // If version changed, wal manager can distinguish that - // the region info in it is outdated, it should reopen the region. + wal_2.close_region(region_id).await.unwrap(); simple_read_write_with_range_and_wal( &env, wal_1, From 39e571a646179c0ce7c03140f6a553e876c2f9c7 Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Tue, 7 Mar 2023 14:16:25 +0800 Subject: [PATCH 10/10] chore: fix missing wals for no meta mode --- src/setup.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/setup.rs b/src/setup.rs index 2b05656b59..2e36c84717 100644 --- a/src/setup.rs +++ b/src/setup.rs @@ -263,7 +263,7 @@ async fn build_without_meta( config: &config.analytic, router: None, engine_runtimes: runtimes.clone(), - opened_wals, + opened_wals: opened_wals.clone(), }; let engine_proxy = build_table_engine_proxy(engine_builder).await; @@ -314,6 +314,7 @@ async fn build_without_meta( .catalog_manager(catalog_manager) .table_manipulator(table_manipulator) .router(router) + .opened_wals(opened_wals) .schema_config_provider(schema_config_provider) .local_tables_recoverer(local_tables_recoverer) }