Skip to content

Commit

Permalink
refactor: remove version of region in WAL
Browse files Browse the repository at this point in the history
  • Loading branch information
ShiKaiWi committed Feb 27, 2023
1 parent 9e06dcc commit e0a8214
Show file tree
Hide file tree
Showing 13 changed files with 109 additions and 278 deletions.
1 change: 0 additions & 1 deletion analytic_engine/src/instance/close.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ impl Instance {
let snapshot_request = SnapshotRequest {
space_id: space.id,
table_id: table_data.id,
cluster_version: table_data.shard_info.cluster_version,
shard_id: table_data.shard_info.shard_id,
};
self.space_store
Expand Down
6 changes: 1 addition & 5 deletions analytic_engine/src/instance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,5 @@ pub type InstanceRef = Arc<Instance>;

#[inline]
pub(crate) fn create_wal_location(table_id: TableId, shard_info: TableShardInfo) -> WalLocation {
WalLocation::new(
shard_info.shard_id as u64,
shard_info.cluster_version,
table_id,
)
WalLocation::new(shard_info.shard_id as u64, table_id)
}
1 change: 0 additions & 1 deletion analytic_engine/src/instance/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,6 @@ impl Instance {
let load_req = LoadRequest {
space_id,
table_id,
cluster_version: request.cluster_version,
shard_id: request.shard_id,
};
let manifest_data = self
Expand Down
39 changes: 6 additions & 33 deletions analytic_engine/src/manifest/details.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,11 +291,7 @@ impl Manifest for ManifestImpl {

let table_id = request.meta_update.table_id();
let shard_id = request.shard_info.shard_id;
let location = WalLocation::new(
shard_id as u64,
request.shard_info.cluster_version,
table_id.as_u64(),
);
let location = WalLocation::new(shard_id as u64, table_id.as_u64());
let space_id = request.meta_update.space_id();
let table_id = request.meta_update.table_id();
self.store_update_to_wal(request.meta_update, location)
Expand All @@ -310,11 +306,7 @@ impl Manifest for ManifestImpl {
async fn load_data(&self, load_req: &LoadRequest) -> GenericResult<Option<TableManifestData>> {
info!("Manifest load data, request:{:?}", load_req);

let location = WalLocation::new(
load_req.shard_id as u64,
load_req.cluster_version,
load_req.table_id.as_u64(),
);
let location = WalLocation::new(load_req.shard_id as u64, load_req.table_id.as_u64());

let log_store = WalBasedLogStore {
opts: self.opts.clone(),
Expand All @@ -340,11 +332,7 @@ impl Manifest for ManifestImpl {
info!("Manifest do snapshot, request:{:?}", request);

let table_id = request.table_id;
let location = WalLocation::new(
request.shard_id as u64,
request.cluster_version,
table_id.as_u64(),
);
let location = WalLocation::new(request.shard_id as u64, table_id.as_u64());
let space_id = request.space_id;
let table_id = request.table_id;

Expand Down Expand Up @@ -1068,7 +1056,6 @@ mod tests {
let load_req = LoadRequest {
table_id,
shard_id: DEFAULT_SHARD_ID,
cluster_version: DEFAULT_CLUSTER_VERSION,
space_id: ctx.schema_id.as_u32(),
};
let expected_table_manifest_data = manifest_data_builder.build();
Expand Down Expand Up @@ -1151,11 +1138,7 @@ mod tests {
expr: Bytes::from("test"),
linear: false,
}));
let location = WalLocation::new(
DEFAULT_SHARD_ID as u64,
DEFAULT_CLUSTER_VERSION,
table_id.as_u64(),
);
let location = WalLocation::new(DEFAULT_SHARD_ID as u64, table_id.as_u64());
let mut manifest_data_builder = TableManifestDataBuilder::default();
let manifest = ctx.open_manifest().await;
ctx.add_table_with_manifest(
Expand All @@ -1181,7 +1164,6 @@ mod tests {
let load_req = LoadRequest {
space_id: ctx.schema_id.as_u32(),
table_id,
cluster_version: DEFAULT_CLUSTER_VERSION,
shard_id: DEFAULT_SHARD_ID,
};
ctx.check_table_manifest_data_with_manifest(
Expand All @@ -1202,7 +1184,6 @@ mod tests {
let load_req = LoadRequest {
space_id: ctx.schema_id.as_u32(),
table_id,
cluster_version: DEFAULT_CLUSTER_VERSION,
shard_id: table_id.as_u64() as u32,
};
let mut manifest_data_builder = TableManifestDataBuilder::default();
Expand All @@ -1226,11 +1207,7 @@ mod tests {
)
.await;

let location = WalLocation::new(
DEFAULT_SHARD_ID as u64,
DEFAULT_CLUSTER_VERSION,
table_id.as_u64(),
);
let location = WalLocation::new(DEFAULT_SHARD_ID as u64, table_id.as_u64());
manifest
.maybe_do_snapshot(ctx.schema_id.as_u32(), table_id, location, true)
.await
Expand Down Expand Up @@ -1359,11 +1336,7 @@ mod tests {
input_updates: Vec<MetaUpdate>,
updates_after_snapshot: Vec<MetaUpdate>,
) {
let location = WalLocation::new(
DEFAULT_SHARD_ID as u64,
DEFAULT_CLUSTER_VERSION,
table_id.as_u64(),
);
let location = WalLocation::new(DEFAULT_SHARD_ID as u64, table_id.as_u64());
let log_store = MemLogStore::from_updates(&input_updates);
let snapshot_store = MemSnapshotStore::new();

Expand Down
1 change: 0 additions & 1 deletion analytic_engine/src/manifest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use crate::{
pub struct LoadRequest {
pub space_id: SpaceId,
pub table_id: TableId,
pub cluster_version: u64,
pub shard_id: ShardId,
}

Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/wal_write_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl WalWriteBench {
.expect("should succeed to open WalNamespaceImpl(Memory)");

let values = self.build_value_vec();
let wal_encoder = LogBatchEncoder::create(WalLocation::new(1, 1, 1));
let wal_encoder = LogBatchEncoder::create(WalLocation::new(1, 1));
let log_batch = wal_encoder
.encode_batch::<WritePayload, Vec<u8>>(values.as_slice())
.expect("should succeed to encode payload batch");
Expand Down
42 changes: 9 additions & 33 deletions wal/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{collections::VecDeque, fmt, sync::Arc, time::Duration};
use async_trait::async_trait;
pub use common_types::SequenceNumber;
use common_types::{
table::{TableId, DEFAULT_CLUSTER_VERSION, DEFAULT_SHARD_ID},
table::{TableId, DEFAULT_SHARD_ID},
MAX_SEQUENCE_NUMBER, MIN_SEQUENCE_NUMBER,
};
use common_util::{error::BoxError, runtime::Runtime};
Expand Down Expand Up @@ -126,44 +126,24 @@ pub mod error {
define_result!(Error);
}

pub type RegionId = u64;

/// Decide where to write logs
#[derive(Debug, Default, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct WalLocation {
pub versioned_region_id: VersionedRegionId,
pub region_id: RegionId,
pub table_id: TableId,
}

impl WalLocation {
pub fn new(region_id: u64, region_version: u64, table_id: TableId) -> Self {
let versioned_region_id = VersionedRegionId {
version: region_version,
id: region_id,
};

pub fn new(region_id: RegionId, table_id: TableId) -> Self {
WalLocation {
versioned_region_id,
region_id,
table_id,
}
}
}

/// Region id with version
///
/// Region is used to describe a set of table's log unit.
///
/// The `id` is used to identify the `Region`, can be mapped to table's related
/// information(e.g. `shard id`, `table id`).
///
/// The `version` is introduced for solving the following bug:
/// https://github.com/CeresDB/ceresdb/issues/441.
/// It may be mapped to cluster version(while shard moved from nodes,
/// it may changed to mark this moving) now.
#[derive(Debug, Default, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct VersionedRegionId {
pub version: u64,
pub id: u64,
}

#[derive(Debug, Clone)]
pub struct WriteContext {
/// Timeout to write wal and it only takes effect when writing to a Wal on a
Expand Down Expand Up @@ -258,11 +238,7 @@ pub struct ReadRequest {
impl Default for ReadRequest {
fn default() -> Self {
Self {
location: WalLocation::new(
DEFAULT_SHARD_ID as u64,
DEFAULT_CLUSTER_VERSION,
TableId::MIN,
),
location: WalLocation::new(DEFAULT_SHARD_ID as u64, TableId::MIN),
start: ReadBoundary::Min,
end: ReadBoundary::Min,
}
Expand All @@ -271,8 +247,8 @@ impl Default for ReadRequest {

#[derive(Debug, Clone, Default)]
pub struct ScanRequest {
/// Region id of the wals to be scanned
pub versioned_region_id: VersionedRegionId,
/// Id of the region to scan
pub region_id: RegionId,
}

pub type ScanContext = ReadContext;
Expand Down
45 changes: 19 additions & 26 deletions wal/src/message_queue_impl/namespace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ use crate::{
kv_encoder::LogEncoding,
log_batch::{LogEntry, LogWriteBatch},
manager::{
ReadContext, ReadRequest, ScanContext, ScanRequest, VersionedRegionId, WalLocation,
WriteContext,
ReadContext, ReadRequest, RegionId, ScanContext, ScanRequest, WalLocation, WriteContext,
},
message_queue_impl::{
config::Config,
Expand Down Expand Up @@ -239,7 +238,7 @@ impl<M: MessageQueue> fmt::Debug for Namespace<M> {
struct NamespaceInner<M: MessageQueue> {
namespace: String,
// TODO: should use some strategies(such as lru) to clean the invalid region.
regions: Arc<RwLock<HashMap<VersionedRegionId, RegionRef<M>>>>,
regions: Arc<RwLock<HashMap<RegionId, RegionRef<M>>>>,
message_queue: Arc<M>,
meta_encoding: MetaEncoding,
log_encoding: LogEncoding,
Expand All @@ -265,50 +264,44 @@ impl<M: MessageQueue> NamespaceInner<M> {
/// about above operations on an empty region.
async fn get_or_open_region(
&self,
versioned_region_id: VersionedRegionId,
region_id: RegionId,
) -> std::result::Result<RegionRef<M>, region::Error> {
{
let regions = self.regions.read().await;
if let Some(region) = regions.get(&versioned_region_id) {
if let Some(region) = regions.get(&region_id) {
debug!(
"Region exists and return it, namespace:{}, versioned region id:{:?}",
self.namespace, versioned_region_id
"Region exists and return it, namespace:{}, region id:{:?}",
self.namespace, region_id
);
return Ok(region.clone());
}
}

let mut regions = self.regions.write().await;
// Multiple tables share one region, so double check here is needed.
if let Some(region) = regions.get(&versioned_region_id) {
if let Some(region) = regions.get(&region_id) {
debug!(
"Region exists and return it, namespace:{}, versioned region id:{:?}",
self.namespace, versioned_region_id
"Region exists and return it, namespace:{}, region id:{:?}",
self.namespace, region_id
);
return Ok(region.clone());
}

let region = Arc::new(
Region::open(
&self.namespace,
versioned_region_id.id,
self.message_queue.clone(),
)
.await?,
);
regions.insert(versioned_region_id, region.clone());
let region =
Arc::new(Region::open(&self.namespace, region_id, self.message_queue.clone()).await?);
regions.insert(region_id, region.clone());

info!(
"Region open successfully, namespace:{}, versioned region id:{:?}",
self.namespace, versioned_region_id
"Region open successfully, namespace:{}, region id:{:?}",
self.namespace, region_id
);

Ok(region)
}

pub async fn sequence_num(&self, location: WalLocation) -> Result<SequenceNumber> {
let region = self
.get_or_open_region(location.versioned_region_id)
.get_or_open_region(location.region_id)
.await
.context(GetSequence {
namespace: self.namespace.clone(),
Expand Down Expand Up @@ -340,7 +333,7 @@ impl<M: MessageQueue> NamespaceInner<M> {
);

let region = self
.get_or_open_region(request.location.versioned_region_id)
.get_or_open_region(request.location.region_id)
.await
.context(ReadWithCause {
namespace: self.namespace.clone(),
Expand Down Expand Up @@ -375,7 +368,7 @@ impl<M: MessageQueue> NamespaceInner<M> {
);

let region = self
.get_or_open_region(request.versioned_region_id)
.get_or_open_region(request.region_id)
.await
.context(ScanWithCause {
namespace: self.namespace.clone(),
Expand Down Expand Up @@ -414,7 +407,7 @@ impl<M: MessageQueue> NamespaceInner<M> {
);

let region = self
.get_or_open_region(log_batch.location.versioned_region_id)
.get_or_open_region(log_batch.location.region_id)
.await
.context(Write {
namespace: self.namespace.clone(),
Expand All @@ -437,7 +430,7 @@ impl<M: MessageQueue> NamespaceInner<M> {
debug!("Mark table logs delete in namespace, namespace:{}, location:{:?}, delete to sequence number:{}", self.namespace, location, sequence_num);

let region = self
.get_or_open_region(location.versioned_region_id)
.get_or_open_region(location.region_id)
.await
.context(MarkDeleteTo {
namespace: self.namespace.clone(),
Expand Down
2 changes: 1 addition & 1 deletion wal/src/message_queue_impl/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl<Mq: MessageQueue> TestContext<Mq> {
.into_iter()
.map(|(table_id, data)| {
let log_batch_encoder =
LogBatchEncoder::create(WalLocation::new(region_id, region_version, table_id));
LogBatchEncoder::create(WalLocation::new(region_id, table_id));
let log_write_batch = log_batch_encoder
.encode_batch::<TestPayload, u32>(&data)
.unwrap();
Expand Down
8 changes: 4 additions & 4 deletions wal/src/rocks_impl/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ impl TableUnit {
return Ok(RocksLogIterator::new_empty(self.log_encoding.clone(), iter));
};

let region_id = req.location.versioned_region_id.id;
let region_id = req.location.region_id;
let (min_log_key, max_log_key) = (
self.log_key(region_id, start_sequence),
self.log_key(region_id, end_sequence),
Expand All @@ -187,7 +187,7 @@ impl TableUnit {
let mut key_buf = BytesMut::new();

for entry in &batch.entries {
let region_id = batch.location.versioned_region_id.id;
let region_id = batch.location.region_id;
self.log_encoding
.encode_key(
&mut key_buf,
Expand Down Expand Up @@ -722,7 +722,7 @@ impl WalManager for RocksImpl {
sequence_num: SequenceNumber,
) -> Result<()> {
if let Some(table_unit) = self.table_unit(&location) {
let region_id = location.versioned_region_id.id;
let region_id = location.region_id;
return table_unit
.delete_entries_up_to(region_id, sequence_num)
.await;
Expand Down Expand Up @@ -768,7 +768,7 @@ impl WalManager for RocksImpl {
let read_opts = ReadOptions::default();
let iter = DBIterator::new(self.db.clone(), read_opts);

let region_id = req.versioned_region_id.id;
let region_id = req.region_id;
let (min_log_key, max_log_key) = (
CommonLogKey::new(region_id, TableId::MIN, SequenceNumber::MIN),
CommonLogKey::new(region_id, TableId::MAX, SequenceNumber::MAX),
Expand Down
Loading

0 comments on commit e0a8214

Please sign in to comment.