From 15c9074a5d375def54e802b1442b21c73ae4c0cf Mon Sep 17 00:00:00 2001 From: Jiacai Liu Date: Tue, 13 Dec 2022 15:08:48 +0800 Subject: [PATCH] fix: lru-weighted-cache mem leak (#480) * fix: lru-weighted-cache mem leak * return error when cap is 0 --- Cargo.lock | 13 +- analytic_engine/src/setup.rs | 20 +++- components/object_store/src/mem_cache.rs | 146 +++++++++++++---------- 3 files changed, 105 insertions(+), 74 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 28a1b56179..96e183dd11 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -994,6 +994,12 @@ dependencies = [ "bitflags", ] +[[package]] +name = "clru" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8191fa7302e03607ff0e237d4246cc043ff5b3cb9409d995172ba3bea16b807" + [[package]] name = "cluster" version = "1.0.0-alpha01" @@ -2972,11 +2978,6 @@ dependencies = [ "hashbrown", ] -[[package]] -name = "lru-weighted-cache" -version = "0.1.2" -source = "git+https://github.com/jiacai2050/lru-weighted-cache.git?rev=1cf61aaf88469387e610dc7154fa318843491428#1cf61aaf88469387e610dc7154fa318843491428" - [[package]] name = "lz4" version = "1.23.3" @@ -3583,13 +3584,13 @@ dependencies = [ "async-trait", "bytes 1.2.1", "chrono", + "clru", "common_util", "crc", "futures 0.3.21", "lazy_static", "log", "lru", - "lru-weighted-cache", "object_store 0.5.1", "oss-rust-sdk", "prometheus 0.12.0", diff --git a/analytic_engine/src/setup.rs b/analytic_engine/src/setup.rs index 8ab3aac552..e0d5ad2074 100644 --- a/analytic_engine/src/setup.rs +++ b/analytic_engine/src/setup.rs @@ -2,7 +2,7 @@ //! Setup the analytic engine -use std::{path::Path, pin::Pin, sync::Arc}; +use std::{num::NonZeroUsize, path::Path, pin::Pin, sync::Arc}; use async_trait::async_trait; use common_util::define_result; @@ -82,6 +82,11 @@ pub enum Error { OpenKafka { source: message_queue::kafka::kafka_impl::Error, }, + + #[snafu(display("Failed to create mem cache, err:{}", source))] + OpenMemCache { + source: object_store::mem_cache::Error, + }, } define_result!(Error); @@ -423,11 +428,14 @@ fn open_storage( } if opts.mem_cache_capacity.as_bytes() > 0 { - store = Arc::new(MemCacheStore::new( - opts.mem_cache_partition_bits, - opts.mem_cache_capacity.as_bytes() as usize, - store, - )) as _; + store = Arc::new( + MemCacheStore::try_new( + opts.mem_cache_partition_bits, + NonZeroUsize::new(opts.mem_cache_capacity.as_bytes() as usize).unwrap(), + store, + ) + .context(OpenMemCache)?, + ) as _; } Ok(store) diff --git a/components/object_store/src/mem_cache.rs b/components/object_store/src/mem_cache.rs index b3aa057277..97a19de3d7 100644 --- a/components/object_store/src/mem_cache.rs +++ b/components/object_store/src/mem_cache.rs @@ -5,85 +5,104 @@ //! 2. Builtin Partition to reduce lock contention use std::{ - collections::hash_map::DefaultHasher, - fmt::Display, + collections::hash_map::{DefaultHasher, RandomState}, + fmt::{self, Display}, hash::{Hash, Hasher}, + num::NonZeroUsize, ops::Range, sync::Arc, }; use async_trait::async_trait; use bytes::Bytes; +use clru::{CLruCache, CLruCacheConfig, WeightScale}; use futures::stream::BoxStream; -use lru_weighted_cache::{LruWeightedCache, Weighted}; +use snafu::{OptionExt, Snafu}; use tokio::{io::AsyncWrite, sync::Mutex}; use upstream::{path::Path, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result}; -struct CachedBytes(Bytes); +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("mem cache cap must large than 0",))] + InvalidCapacity, +} + +struct CustomScale; -impl Weighted for CachedBytes { - fn weight(&self) -> usize { - self.0.len() +impl WeightScale for CustomScale { + fn weight(&self, _key: &String, value: &Bytes) -> usize { + value.len() } } -#[derive(Debug)] struct Partition { - inner: Mutex>, + inner: Mutex>, } impl Partition { - fn new(mem_cap: usize) -> Self { + fn new(mem_cap: NonZeroUsize) -> Self { + let cache = CLruCache::with_config(CLruCacheConfig::new(mem_cap).with_scale(CustomScale)); + Self { - inner: Mutex::new(LruWeightedCache::new(1, mem_cap).expect("invalid params")), + inner: Mutex::new(cache), } } } + impl Partition { - // TODO(chenxiang): also support `&str`, this need to changes to - // lru_weighted_cache - async fn get(&self, key: &String) -> Option { + async fn get(&self, key: &str) -> Option { let mut guard = self.inner.lock().await; - guard.get(key).map(|v| v.0.clone()) + guard.get(key).cloned() } async fn insert(&self, key: String, value: Bytes) { let mut guard = self.inner.lock().await; // don't care error now. - _ = guard.insert(key, CachedBytes(value)); + _ = guard.put_with_weight(key, value); + } + + #[cfg(test)] + async fn keys(&self) -> Vec { + let guard = self.inner.lock().await; + guard + .iter() + .map(|(key, _)| key) + .cloned() + .collect::>() } } -#[derive(Debug)] struct MemCache { /// Max memory this store can use - mem_cap: usize, + mem_cap: NonZeroUsize, partitions: Vec>, partition_mask: usize, } impl MemCache { - fn new(partition_bits: usize, mem_cap: usize) -> Self { + fn try_new(partition_bits: usize, mem_cap: NonZeroUsize) -> std::result::Result { let partition_num = 1 << partition_bits; - let cap_per_part = mem_cap / partition_num; + let cap_per_part = mem_cap + .checked_mul(NonZeroUsize::new(partition_num).unwrap()) + .context(InvalidCapacity)?; let partitions = (0..partition_num) .map(|_| Arc::new(Partition::new(cap_per_part))) .collect::>(); - Self { + Ok(Self { mem_cap, partitions, partition_mask: partition_num - 1, - } + }) } - fn locate_partition(&self, key: &String) -> Arc { + fn locate_partition(&self, key: &str) -> Arc { let mut hasher = DefaultHasher::new(); key.hash(&mut hasher); self.partitions[hasher.finish() as usize & self.partition_mask].clone() } - async fn get(&self, key: &String) -> Option { + async fn get(&self, key: &str) -> Option { let partition = self.locate_partition(key); partition.get(key).await } @@ -92,6 +111,21 @@ impl MemCache { let partition = self.locate_partition(&key); partition.insert(key, value).await; } + + #[cfg(test)] + async fn to_string(&self) -> String { + futures::future::join_all( + self.partitions + .iter() + .map(|part| async { part.keys().await.join(",") }), + ) + .await + .into_iter() + .enumerate() + .map(|(part_no, keys)| format!("{}: [{}]", part_no, keys)) + .collect::>() + .join("\n") + } } impl Display for MemCache { @@ -99,30 +133,26 @@ impl Display for MemCache { f.debug_struct("MemCache") .field("mem_cap", &self.mem_cap) .field("mask", &self.partition_mask) - .field("partitons", &self.partitions) + .field("partitons", &self.partitions.len()) .finish() } } -#[derive(Debug)] pub struct MemCacheStore { cache: MemCache, underlying_store: Arc, } impl MemCacheStore { - // Note: mem_cap must be larger than 0 - pub fn new( + pub fn try_new( partition_bits: usize, - mem_cap: usize, + mem_cap: NonZeroUsize, underlying_store: Arc, - ) -> Self { - assert!(mem_cap > 0); - - Self { - cache: MemCache::new(partition_bits, mem_cap), + ) -> std::result::Result { + MemCache::try_new(partition_bits, mem_cap).map(|cache| Self { + cache, underlying_store, - } + }) } fn cache_key(location: &Path, range: &Range) -> String { @@ -136,6 +166,12 @@ impl Display for MemCacheStore { } } +impl fmt::Debug for MemCacheStore { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("MemCacheStore").finish() + } +} + #[async_trait] impl ObjectStore for MemCacheStore { async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> { @@ -216,7 +252,7 @@ mod test { let local_path = tempdir().unwrap(); let local_store = Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap()); - MemCacheStore::new(bits, mem_cap, local_store) + MemCacheStore::try_new(bits, NonZeroUsize::new(mem_cap).unwrap(), local_store).unwrap() } #[tokio::test] @@ -239,10 +275,6 @@ mod test { .get(&MemCacheStore::cache_key(&location, &range0_5)) .await .is_some()); - assert_eq!( - r#"MemCache { mem_cap: 13, mask: 0, partitons: [Partition { inner: Mutex { data: LruWeightedCache { max_item_weight: 13, max_total_weight: 13, current_weight: 5 } } }] }"#, - format!("{}", store) - ); // get bytes from [5, 10), insert to cache let range5_10 = 5..10; @@ -257,18 +289,19 @@ mod test { .get(&MemCacheStore::cache_key(&location, &range5_10)) .await .is_some()); - assert_eq!( - r#"MemCache { mem_cap: 13, mask: 0, partitons: [Partition { inner: Mutex { data: LruWeightedCache { max_item_weight: 13, max_total_weight: 13, current_weight: 10 } } }] }"#, - format!("{}", store) - ); - // get bytes from [5, 10), insert to cache + // get bytes from [10, 15), insert to cache // cache is full, evict [0, 5) - let range10_15 = 5..10; + let range10_15 = 10..15; _ = store .get_range(&location, range10_15.clone()) .await .unwrap(); + assert!(store + .cache + .get(&MemCacheStore::cache_key(&location, &range0_5)) + .await + .is_none()); assert!(store .cache .get(&MemCacheStore::cache_key(&location, &range5_10)) @@ -279,20 +312,6 @@ mod test { .get(&MemCacheStore::cache_key(&location, &range10_15)) .await .is_some()); - assert_eq!( - r#"MemCache { mem_cap: 13, mask: 0, partitons: [Partition { inner: Mutex { data: LruWeightedCache { max_item_weight: 13, max_total_weight: 13, current_weight: 10 } } }] }"#, - format!("{}", store) - ); - - let range10_13 = 10..13; - _ = store - .get_range(&location, range10_13.clone()) - .await - .unwrap(); - assert_eq!( - r#"MemCache { mem_cap: 13, mask: 0, partitons: [Partition { inner: Mutex { data: LruWeightedCache { max_item_weight: 13, max_total_weight: 13, current_weight: 13 } } }] }"#, - format!("{}", store) - ); } #[tokio::test] @@ -314,8 +333,11 @@ mod test { .unwrap(); assert_eq!( - r#"MemCache { mem_cap: 100, mask: 3, partitons: [Partition { inner: Mutex { data: LruWeightedCache { max_item_weight: 25, max_total_weight: 25, current_weight: 0 } } }, Partition { inner: Mutex { data: LruWeightedCache { max_item_weight: 25, max_total_weight: 25, current_weight: 5 } } }, Partition { inner: Mutex { data: LruWeightedCache { max_item_weight: 25, max_total_weight: 25, current_weight: 0 } } }, Partition { inner: Mutex { data: LruWeightedCache { max_item_weight: 25, max_total_weight: 25, current_weight: 5 } } }] }"#, - format!("{}", store) + r#"0: [] +1: [partition.sst-100-105] +2: [] +3: [partition.sst-0-5]"#, + store.cache.to_string().await ); assert!(store