Skip to content

Commit

Permalink
refactor: use partition lock for memory cache (#936)
Browse files Browse the repository at this point in the history
## Related Issues
Ralated #914 

## Detailed Changes
Add `build_fixed_seed_ahasher` to build fixed  seeds ahasher.
Use PartitionedMutex in `MemCache`.

## Test Plan 
UT.
  • Loading branch information
tanruixiang authored May 29, 2023
1 parent 019add1 commit 005eb26
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 103 deletions.
9 changes: 7 additions & 2 deletions common_types/src/hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

// custom hash mod

use std::hash::BuildHasher;

/* We compared the speed difference between murmur3 and ahash for a string of
length 10, and the results show that ahash has a clear advantage.
Average time to DefaultHash a string of length 10: 33.6364 nanoseconds
Expand All @@ -11,17 +13,20 @@
One of the reasons is as follows:
https://github.com/tkaitchuck/aHash/blob/master/README.md#goals-and-non-goals
*/
pub use ahash::AHasher;
use ahash::AHasher;
use byteorder::{ByteOrder, LittleEndian};
use murmur3::murmur3_x64_128;

pub fn hash64(mut bytes: &[u8]) -> u64 {
let mut out = [0; 16];
murmur3_x64_128(&mut bytes, 0, &mut out);
// in most cases we run on little endian target
LittleEndian::read_u64(&out[0..8])
}

pub fn build_fixed_seed_ahasher() -> AHasher {
ahash::RandomState::with_seeds(0, 0, 0, 0).build_hasher()
}

#[cfg(test)]
mod test {
use super::*;
Expand Down
57 changes: 33 additions & 24 deletions common_util/src/partitioned_lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,22 @@ use std::{
sync::{Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard},
};

use common_types::hash::AHasher;
use common_types::hash::build_fixed_seed_ahasher;
/// Simple partitioned `RwLock`
pub struct PartitionedRwLock<T> {
partitions: Vec<RwLock<T>>,
partition_mask: usize,
}

impl<T> PartitionedRwLock<T>
where
T: Clone,
{
pub fn new(t: T, partition_bit: usize) -> Self {
impl<T> PartitionedRwLock<T> {
pub fn new<F>(init_fn: F, partition_bit: usize) -> Self
where
F: Fn() -> T,
{
let partition_num = 1 << partition_bit;
let partitions = (0..partition_num)
.map(|_| RwLock::new(t.clone()))
.collect::<Vec<_>>();
let partitions = (1..partition_num)
.map(|_| RwLock::new(init_fn()))
.collect::<Vec<RwLock<T>>>();
Self {
partitions,
partition_mask: partition_num - 1,
Expand All @@ -42,7 +42,8 @@ where
}

fn get_partition<K: Eq + Hash>(&self, key: &K) -> &RwLock<T> {
let mut hasher = AHasher::default();
let mut hasher = build_fixed_seed_ahasher();

key.hash(&mut hasher);

&self.partitions[(hasher.finish() as usize) & self.partition_mask]
Expand All @@ -55,20 +56,21 @@ where
}

/// Simple partitioned `Mutex`
#[derive(Debug)]
pub struct PartitionedMutex<T> {
partitions: Vec<Mutex<T>>,
partition_mask: usize,
}

impl<T> PartitionedMutex<T>
where
T: Clone,
{
pub fn new(t: T, partition_bit: usize) -> Self {
impl<T> PartitionedMutex<T> {
pub fn new<F>(init_fn: F, partition_bit: usize) -> Self
where
F: Fn() -> T,
{
let partition_num = 1 << partition_bit;
let partitions = (0..partition_num)
.map(|_| Mutex::new(t.clone()))
.collect::<Vec<_>>();
.map(|_| Mutex::new(init_fn()))
.collect::<Vec<Mutex<T>>>();
Self {
partitions,
partition_mask: partition_num - 1,
Expand All @@ -82,7 +84,7 @@ where
}

fn get_partition<K: Eq + Hash>(&self, key: &K) -> &Mutex<T> {
let mut hasher = AHasher::default();
let mut hasher = build_fixed_seed_ahasher();
key.hash(&mut hasher);
&self.partitions[(hasher.finish() as usize) & self.partition_mask]
}
Expand All @@ -91,6 +93,11 @@ where
fn get_partition_by_index(&self, index: usize) -> &Mutex<T> {
&self.partitions[index]
}

/// This function should be marked with `#[cfg(test)]`, but there is [an issue](https://github.com/rust-lang/cargo/issues/8379) in cargo, so public this function now.
pub fn get_all_partition(&self) -> &Vec<Mutex<T>> {
&self.partitions
}
}

#[cfg(test)]
Expand All @@ -101,7 +108,8 @@ mod tests {

#[test]
fn test_partitioned_rwlock() {
let test_locked_map = PartitionedRwLock::new(HashMap::new(), 4);
let init_hmap = HashMap::new;
let test_locked_map = PartitionedRwLock::new(init_hmap, 4);
let test_key = "test_key".to_string();
let test_value = "test_value".to_string();

Expand All @@ -118,7 +126,8 @@ mod tests {

#[test]
fn test_partitioned_mutex() {
let test_locked_map = PartitionedMutex::new(HashMap::new(), 4);
let init_hmap = HashMap::new;
let test_locked_map = PartitionedMutex::new(init_hmap, 4);
let test_key = "test_key".to_string();
let test_value = "test_value".to_string();

Expand All @@ -135,8 +144,8 @@ mod tests {

#[test]
fn test_partitioned_mutex_vis_different_partition() {
let tmp_vec: Vec<f32> = Vec::new();
let test_locked_map = PartitionedMutex::new(tmp_vec, 4);
let init_vec = Vec::<i32>::new;
let test_locked_map = PartitionedMutex::new(init_vec, 4);
let mutex_first = test_locked_map.get_partition_by_index(0);

let mut _tmp_data = mutex_first.lock().unwrap();
Expand All @@ -149,8 +158,8 @@ mod tests {

#[test]
fn test_partitioned_rwmutex_vis_different_partition() {
let tmp_vec: Vec<f32> = Vec::new();
let test_locked_map = PartitionedRwLock::new(tmp_vec, 4);
let init_vec = Vec::<i32>::new;
let test_locked_map = PartitionedRwLock::new(init_vec, 4);
let mutex_first = test_locked_map.get_partition_by_index(0);
let mut _tmp = mutex_first.write().unwrap();
assert!(mutex_first.try_write().is_err());
Expand Down
103 changes: 26 additions & 77 deletions components/object_store/src/mem_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@
//! 2. Builtin Partition to reduce lock contention

use std::{
collections::hash_map::{DefaultHasher, RandomState},
collections::hash_map::RandomState,
fmt::{self, Display},
hash::{Hash, Hasher},
num::NonZeroUsize,
ops::Range,
sync::{Arc, Mutex},
sync::Arc,
};

use async_trait::async_trait;
use bytes::Bytes;
use clru::{CLruCache, CLruCacheConfig, WeightScale};
use common_util::partitioned_lock::PartitionedMutex;
use futures::stream::BoxStream;
use snafu::{OptionExt, Snafu};
use tokio::io::AsyncWrite;
Expand All @@ -37,54 +37,10 @@ impl WeightScale<String, Bytes> for CustomScale {
}
}

struct Partition {
inner: Mutex<CLruCache<String, Bytes, RandomState, CustomScale>>,
}

impl Partition {
fn new(mem_cap: NonZeroUsize) -> Self {
let cache = CLruCache::with_config(CLruCacheConfig::new(mem_cap).with_scale(CustomScale));

Self {
inner: Mutex::new(cache),
}
}
}

impl Partition {
fn get(&self, key: &str) -> Option<Bytes> {
let mut guard = self.inner.lock().unwrap();
guard.get(key).cloned()
}

fn peek(&self, key: &str) -> Option<Bytes> {
// FIXME: actually, here write lock is not necessary.
let guard = self.inner.lock().unwrap();
guard.peek(key).cloned()
}

fn insert(&self, key: String, value: Bytes) {
let mut guard = self.inner.lock().unwrap();
// don't care error now.
_ = guard.put_with_weight(key, value);
}

#[cfg(test)]
fn keys(&self) -> Vec<String> {
let guard = self.inner.lock().unwrap();
guard
.iter()
.map(|(key, _)| key)
.cloned()
.collect::<Vec<_>>()
}
}

pub struct MemCache {
/// Max memory this store can use
mem_cap: NonZeroUsize,
partitions: Vec<Arc<Partition>>,
partition_mask: usize,
inner: PartitionedMutex<CLruCache<String, Bytes, RandomState, CustomScale>>,
}

pub type MemCacheRef = Arc<MemCache>;
Expand All @@ -98,44 +54,38 @@ impl MemCache {
let cap_per_part = mem_cap
.checked_mul(NonZeroUsize::new(partition_num).unwrap())
.context(InvalidCapacity)?;
let partitions = (0..partition_num)
.map(|_| Arc::new(Partition::new(cap_per_part)))
.collect::<Vec<_>>();

Ok(Self {
mem_cap,
partitions,
partition_mask: partition_num - 1,
})
}

fn locate_partition(&self, key: &str) -> Arc<Partition> {
let mut hasher = DefaultHasher::new();
key.hash(&mut hasher);
self.partitions[hasher.finish() as usize & self.partition_mask].clone()
let inin_lru =
|| CLruCache::with_config(CLruCacheConfig::new(cap_per_part).with_scale(CustomScale));
let inner = PartitionedMutex::new(inin_lru, partition_bits);
Ok(Self { mem_cap, inner })
}

fn get(&self, key: &str) -> Option<Bytes> {
let partition = self.locate_partition(key);
partition.get(key)
self.inner.lock(&key).get(key).cloned()
}

fn peek(&self, key: &str) -> Option<Bytes> {
let partition = self.locate_partition(key);
partition.peek(key)
self.inner.lock(&key).peek(key).cloned()
}

fn insert(&self, key: String, value: Bytes) {
let partition = self.locate_partition(&key);
partition.insert(key, value);
// don't care error now.
_ = self.inner.lock(&key).put_with_weight(key, value);
}

/// Give a description of the cache state.

#[cfg(test)]
fn keys(&self, part: &CLruCache<String, Bytes, RandomState, CustomScale>) -> Vec<String> {
part.iter().map(|(key, _)| key).cloned().collect::<Vec<_>>()
}

#[cfg(test)]
fn state_desc(&self) -> String {
self.partitions
self.inner
.get_all_partition()
.iter()
.map(|part| part.keys().join(","))
.map(|part| self.keys(&part.lock().unwrap()).join(","))
.enumerate()
.map(|(part_no, keys)| format!("{part_no}: [{keys}]"))
.collect::<Vec<_>>()
Expand All @@ -147,8 +97,7 @@ impl Display for MemCache {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MemCache")
.field("mem_cap", &self.mem_cap)
.field("mask", &self.partition_mask)
.field("partitions", &self.partitions.len())
.field("partitions", &self.inner.get_all_partition().len())
.finish()
}
}
Expand Down Expand Up @@ -375,10 +324,10 @@ mod test {
.unwrap();

assert_eq!(
r#"0: []
1: [partition.sst-100-105]
2: []
3: [partition.sst-0-5]"#,
r#"0: [partition.sst-0-5]
1: []
2: [partition.sst-100-105]
3: []"#,
store.cache.as_ref().state_desc()
);

Expand Down

0 comments on commit 005eb26

Please sign in to comment.