Skip to content

Commit

Permalink
feat: support store with readonly cache (apache#490)
Browse files Browse the repository at this point in the history
* feat: support store with readonly cache

* chore: refactor the method name of MemCacheStore

* chore: use peek instead of get from readonly cache
  • Loading branch information
ShiKaiWi authored Dec 19, 2022
1 parent b2d2f1f commit 897e778
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 48 deletions.
8 changes: 4 additions & 4 deletions analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ impl Instance {
.new_sst_builder(
&sst_builder_options_clone,
&sst_file_path,
store.store_ref(),
store.default_store(),
)
.context(InvalidSstType { sst_type })?;

Expand Down Expand Up @@ -754,7 +754,7 @@ impl Instance {
.new_sst_builder(
&sst_builder_options,
&sst_file_path,
self.space_store.store_ref(),
self.space_store.default_store(),
)
.context(InvalidSstType {
sst_type: table_data.sst_type,
Expand Down Expand Up @@ -930,7 +930,7 @@ impl SpaceStore {
predicate: Arc::new(Predicate::empty()),
sst_factory: &self.sst_factory,
sst_reader_options,
store: self.store_ref(),
store: self.store_with_readonly_cache(),
merge_iter_options: iter_options.clone(),
need_dedup: table_options.need_dedup(),
reverse: false,
Expand Down Expand Up @@ -966,7 +966,7 @@ impl SpaceStore {
};
let mut sst_builder = self
.sst_factory
.new_sst_builder(&sst_builder_options, &sst_file_path, self.store_ref())
.new_sst_builder(&sst_builder_options, &sst_file_path, self.default_store())
.context(InvalidSstType {
sst_type: table_data.sst_type,
})?;
Expand Down
8 changes: 7 additions & 1 deletion analytic_engine/src/instance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ pub struct SpaceStore {
wal_manager: WalManagerRef,
/// Sst storage.
store: ObjectStoreRef,
/// Sst storage with read only storage cache.
store_with_readonly_cache: ObjectStoreRef,
/// Sst factory.
sst_factory: SstFactoryRef,

Expand All @@ -126,10 +128,14 @@ impl SpaceStore {
}

impl SpaceStore {
fn store_ref(&self) -> &ObjectStoreRef {
fn default_store(&self) -> &ObjectStoreRef {
&self.store
}

fn store_with_readonly_cache(&self) -> &ObjectStoreRef {
&self.store_with_readonly_cache
}

/// List all tables of all spaces
pub fn list_all_tables(&self, tables: &mut Vec<TableDataRef>) {
let spaces = self.spaces.read().unwrap();
Expand Down
2 changes: 2 additions & 0 deletions analytic_engine/src/instance/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,15 @@ impl Instance {
manifest: ManifestRef,
wal_manager: WalManagerRef,
store: ObjectStoreRef,
store_with_readonly_cache: ObjectStoreRef,
sst_factory: SstFactoryRef,
) -> Result<Arc<Self>> {
let space_store = Arc::new(SpaceStore {
spaces: RwLock::new(Spaces::default()),
manifest,
wal_manager: wal_manager.clone(),
store: store.clone(),
store_with_readonly_cache,
sst_factory,
meta_cache: ctx.meta_cache.clone(),
});
Expand Down
4 changes: 2 additions & 2 deletions analytic_engine/src/instance/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ impl Instance {
predicate: request.predicate.clone(),
sst_factory: &self.space_store.sst_factory,
sst_reader_options: sst_reader_options.clone(),
store: self.space_store.store_ref(),
store: self.space_store.default_store(),
merge_iter_options: iter_options.clone(),
need_dedup: table_options.need_dedup(),
reverse: request.order.is_in_desc_order(),
Expand Down Expand Up @@ -241,7 +241,7 @@ impl Instance {
predicate: request.predicate.clone(),
sst_reader_options: sst_reader_options.clone(),
sst_factory: &self.space_store.sst_factory,
store: self.space_store.store_ref(),
store: self.space_store.default_store(),
};
let builder = chain::Builder::new(chain_config);
let chain_iter = builder
Expand Down
55 changes: 42 additions & 13 deletions analytic_engine/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@ use common_util::define_result;
use futures::Future;
use message_queue::kafka::kafka_impl::KafkaImpl;
use object_store::{
aliyun::AliyunOSS, disk_cache::DiskCacheStore, mem_cache::MemCacheStore,
metrics::StoreWithMetrics, prefix::StoreWithPrefix, LocalFileSystem, ObjectStoreRef,
aliyun::AliyunOSS,
disk_cache::DiskCacheStore,
mem_cache::{MemCache, MemCacheStore},
metrics::StoreWithMetrics,
prefix::StoreWithPrefix,
LocalFileSystem, ObjectStoreRef,
};
use snafu::{Backtrace, ResultExt, Snafu};
use table_engine::engine::{EngineRuntimes, TableEngineRef};
Expand Down Expand Up @@ -108,8 +112,16 @@ pub trait EngineBuilder: Send + Sync + Default {
let (wal, manifest) = self
.open_wal_and_manifest(config.clone(), engine_runtimes.clone())
.await?;
let store = open_storage(config.storage.clone()).await?;
let instance = open_instance(config.clone(), engine_runtimes, wal, manifest, store).await?;
let opened_storages = open_storage(config.storage.clone()).await?;
let instance = open_instance(
config.clone(),
engine_runtimes,
wal,
manifest,
opened_storages.default_store,
opened_storages.store_with_readonly_cache,
)
.await?;
Ok(Arc::new(TableEngineImpl::new(instance)))
}

Expand Down Expand Up @@ -340,6 +352,7 @@ async fn open_instance(
wal_manager: WalManagerRef,
manifest: ManifestRef,
store: ObjectStoreRef,
store_with_readonly_cache: ObjectStoreRef,
) -> Result<InstanceRef> {
let meta_cache: Option<MetaCacheRef> = config
.sst_meta_cache_cap
Expand All @@ -356,16 +369,22 @@ async fn open_instance(
manifest,
wal_manager,
store,
store_with_readonly_cache,
Arc::new(FactoryImpl::default()),
)
.await
.context(OpenInstance)?;
Ok(instance)
}

struct OpenedStorages {
default_store: ObjectStoreRef,
store_with_readonly_cache: ObjectStoreRef,
}

// Build store in multiple layer, access speed decrease in turn.
// MemCacheStore -> DiskCacheStore -> real ObjectStore(OSS/S3...)
//
// MemCacheStore DiskCacheStore real ObjectStore(OSS/S3...)
// MemCacheStore(ReadOnly) ↑
// ```plaintext
// +-------------------------------+
// | MemCacheStore |
Expand All @@ -378,7 +397,7 @@ async fn open_instance(
// ```
fn open_storage(
opts: StorageOptions,
) -> Pin<Box<dyn Future<Output = Result<ObjectStoreRef>> + Send>> {
) -> Pin<Box<dyn Future<Output = Result<OpenedStorages>> + Send>> {
Box::pin(async move {
let mut store = match opts.object_store {
ObjectStoreOptions::Local(local_opts) => {
Expand Down Expand Up @@ -428,16 +447,26 @@ fn open_storage(
}

if opts.mem_cache_capacity.as_bytes() > 0 {
store = Arc::new(
MemCacheStore::try_new(
let mem_cache = Arc::new(
MemCache::try_new(
opts.mem_cache_partition_bits,
NonZeroUsize::new(opts.mem_cache_capacity.as_bytes() as usize).unwrap(),
store,
)
.context(OpenMemCache)?,
) as _;
);
let default_store = Arc::new(MemCacheStore::new(mem_cache.clone(), store.clone())) as _;
let store_with_readonly_cache =
Arc::new(MemCacheStore::new_with_readonly_cache(mem_cache, store)) as _;
Ok(OpenedStorages {
default_store,
store_with_readonly_cache,
})
} else {
let store_with_readonly_cache = store.clone();
Ok(OpenedStorages {
default_store: store,
store_with_readonly_cache,
})
}

Ok(store)
})
}
107 changes: 79 additions & 28 deletions components/object_store/src/mem_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use snafu::{OptionExt, Snafu};
use tokio::{io::AsyncWrite, sync::Mutex};
use upstream::{path::Path, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result};

use crate::ObjectStoreRef;

#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("mem cache cap must large than 0",))]
Expand Down Expand Up @@ -55,6 +57,12 @@ impl Partition {
guard.get(key).cloned()
}

async fn peek(&self, key: &str) -> Option<Bytes> {
// FIXME: actually, here write lock is not necessary.
let guard = self.inner.lock().await;
guard.peek(key).cloned()
}

async fn insert(&self, key: String, value: Bytes) {
let mut guard = self.inner.lock().await;
// don't care error now.
Expand All @@ -72,15 +80,20 @@ impl Partition {
}
}

struct MemCache {
pub struct MemCache {
/// Max memory this store can use
mem_cap: NonZeroUsize,
partitions: Vec<Arc<Partition>>,
partition_mask: usize,
}

pub type MemCacheRef = Arc<MemCache>;

impl MemCache {
fn try_new(partition_bits: usize, mem_cap: NonZeroUsize) -> std::result::Result<Self, Error> {
pub fn try_new(
partition_bits: usize,
mem_cap: NonZeroUsize,
) -> std::result::Result<Self, Error> {
let partition_num = 1 << partition_bits;
let cap_per_part = mem_cap
.checked_mul(NonZeroUsize::new(partition_num).unwrap())
Expand All @@ -107,6 +120,11 @@ impl MemCache {
partition.get(key).await
}

async fn peek(&self, key: &str) -> Option<Bytes> {
let partition = self.locate_partition(key);
partition.peek(key).await
}

async fn insert(&self, key: String, value: Bytes) {
let partition = self.locate_partition(&key);
partition.insert(key, value).await;
Expand All @@ -133,31 +151,72 @@ impl Display for MemCache {
f.debug_struct("MemCache")
.field("mem_cap", &self.mem_cap)
.field("mask", &self.partition_mask)
.field("partitons", &self.partitions.len())
.field("partitions", &self.partitions.len())
.finish()
}
}

/// Assembled with [`MemCache`], the [`MemCacheStore`] can cache the loaded data
/// from the `underlying_store` to avoid unnecessary data loading.
///
/// With the `read_only_cache` field, caller can control whether to do caching
/// for the loaded data. BTW, all the accesses are forced to the order:
/// `cache` -> `underlying_store`.
pub struct MemCacheStore {
cache: MemCache,
underlying_store: Arc<dyn ObjectStore>,
cache: MemCacheRef,
underlying_store: ObjectStoreRef,
readonly_cache: bool,
}

impl MemCacheStore {
pub fn try_new(
partition_bits: usize,
mem_cap: NonZeroUsize,
underlying_store: Arc<dyn ObjectStore>,
) -> std::result::Result<Self, Error> {
MemCache::try_new(partition_bits, mem_cap).map(|cache| Self {
/// Create a default [`MemCacheStore`].
pub fn new(cache: MemCacheRef, underlying_store: ObjectStoreRef) -> Self {
Self {
cache,
underlying_store,
})
readonly_cache: false,
}
}

/// Create a [`MemCacheStore`] with a readonly cache.
pub fn new_with_readonly_cache(cache: MemCacheRef, underlying_store: ObjectStoreRef) -> Self {
Self {
cache,
underlying_store,
readonly_cache: true,
}
}

fn cache_key(location: &Path, range: &Range<usize>) -> String {
format!("{}-{}-{}", location, range.start, range.end)
}

async fn get_range_with_rw_cache(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
// TODO(chenxiang): What if there are some overlapping range in cache?
// A request with range [5, 10) can also use [0, 20) cache
let cache_key = Self::cache_key(location, &range);
if let Some(bytes) = self.cache.get(&cache_key).await {
return Ok(bytes);
}

// TODO(chenxiang): What if two threads reach here? It's better to
// pend one thread, and only let one to fetch data from underlying store.
let bytes = self.underlying_store.get_range(location, range).await?;
self.cache.insert(cache_key, bytes.clone()).await;

Ok(bytes)
}

async fn get_range_with_ro_cache(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
let cache_key = Self::cache_key(location, &range);
if let Some(bytes) = self.cache.peek(&cache_key).await {
return Ok(bytes);
}

// TODO(chenxiang): What if two threads reach here? It's better to
// pend one thread, and only let one to fetch data from underlying store.
self.underlying_store.get_range(location, range).await
}
}

impl Display for MemCacheStore {
Expand Down Expand Up @@ -199,21 +258,11 @@ impl ObjectStore for MemCacheStore {
}

async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
// TODO(chenxiang): What if there are some overlapping range in cache?
// A request with range [5, 10) can also use [0, 20) cache
let cache_key = Self::cache_key(location, &range);
if let Some(bytes) = self.cache.get(&cache_key).await {
return Ok(bytes);
}

// TODO(chenxiang): What if two threads reach here? It's better to
// pend one thread, and only let one to fetch data from underlying store.
let bytes = self.underlying_store.get_range(location, range).await;
if let Ok(bytes) = &bytes {
self.cache.insert(cache_key, bytes.clone()).await;
if self.readonly_cache {
self.get_range_with_ro_cache(location, range).await
} else {
self.get_range_with_rw_cache(location, range).await
}

bytes
}

async fn head(&self, location: &Path) -> Result<ObjectMeta> {
Expand Down Expand Up @@ -252,7 +301,9 @@ mod test {
let local_path = tempdir().unwrap();
let local_store = Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap());

MemCacheStore::try_new(bits, NonZeroUsize::new(mem_cap).unwrap(), local_store).unwrap()
let mem_cache =
Arc::new(MemCache::try_new(bits, NonZeroUsize::new(mem_cap).unwrap()).unwrap());
MemCacheStore::new(mem_cache, local_store)
}

#[tokio::test]
Expand Down Expand Up @@ -337,7 +388,7 @@ mod test {
1: [partition.sst-100-105]
2: []
3: [partition.sst-0-5]"#,
store.cache.to_string().await
store.cache.as_ref().to_string().await
);

assert!(store
Expand Down

0 comments on commit 897e778

Please sign in to comment.