Skip to content

Commit

Permalink
feat: introduce ObjectStorePicker to replace the two object stores (#496
Browse files Browse the repository at this point in the history
)
  • Loading branch information
ShiKaiWi authored Dec 21, 2022
1 parent 1e2382a commit b17be94
Show file tree
Hide file tree
Showing 17 changed files with 150 additions and 92 deletions.
8 changes: 4 additions & 4 deletions analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ impl Instance {
.new_sst_builder(
&sst_builder_options_clone,
&sst_file_path,
store.default_store(),
store.store_picker(),
)
.context(InvalidSstType { sst_type })?;

Expand Down Expand Up @@ -754,7 +754,7 @@ impl Instance {
.new_sst_builder(
&sst_builder_options,
&sst_file_path,
self.space_store.default_store(),
self.space_store.store_picker(),
)
.context(InvalidSstType {
sst_type: table_data.sst_type,
Expand Down Expand Up @@ -930,7 +930,7 @@ impl SpaceStore {
predicate: Arc::new(Predicate::empty()),
sst_factory: &self.sst_factory,
sst_reader_options,
store: self.store_with_readonly_cache(),
store_picker: self.store_picker(),
merge_iter_options: iter_options.clone(),
need_dedup: table_options.need_dedup(),
reverse: false,
Expand Down Expand Up @@ -966,7 +966,7 @@ impl SpaceStore {
};
let mut sst_builder = self
.sst_factory
.new_sst_builder(&sst_builder_options, &sst_file_path, self.default_store())
.new_sst_builder(&sst_builder_options, &sst_file_path, self.store_picker())
.context(InvalidSstType {
sst_type: table_data.sst_type,
})?;
Expand Down
21 changes: 9 additions & 12 deletions analytic_engine/src/instance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use std::{
use common_util::{define_result, runtime::Runtime};
use log::info;
use mem_collector::MemUsageCollector;
use object_store::ObjectStoreRef;
use snafu::{ResultExt, Snafu};
use table_engine::engine::EngineRuntimes;
use wal::manager::WalManagerRef;
Expand All @@ -35,7 +34,11 @@ use crate::{
meta::ManifestRef,
row_iter::IterOptions,
space::{SpaceId, SpaceRef},
sst::{factory::FactoryRef as SstFactoryRef, file::FilePurger, meta_cache::MetaCacheRef},
sst::{
factory::{FactoryRef as SstFactoryRef, ObjectStorePickerRef},
file::FilePurger,
meta_cache::MetaCacheRef,
},
table::data::TableDataRef,
wal_synchronizer::WalSynchronizer,
TableOptions,
Expand Down Expand Up @@ -99,10 +102,8 @@ pub struct SpaceStore {
manifest: ManifestRef,
/// Wal of all tables
wal_manager: WalManagerRef,
/// Sst storage.
store: ObjectStoreRef,
/// Sst storage with read only storage cache.
store_with_readonly_cache: ObjectStoreRef,
/// Object store picker for persisting data.
store_picker: ObjectStorePickerRef,
/// Sst factory.
sst_factory: SstFactoryRef,

Expand All @@ -128,12 +129,8 @@ impl SpaceStore {
}

impl SpaceStore {
fn default_store(&self) -> &ObjectStoreRef {
&self.store
}

fn store_with_readonly_cache(&self) -> &ObjectStoreRef {
&self.store_with_readonly_cache
fn store_picker(&self) -> &ObjectStorePickerRef {
&self.store_picker
}

/// List all tables of all spaces
Expand Down
14 changes: 7 additions & 7 deletions analytic_engine/src/instance/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use std::{

use common_types::schema::IndexInWriterSchema;
use log::{debug, error, info, trace, warn};
use object_store::ObjectStoreRef;
use snafu::ResultExt;
use table_engine::engine::OpenTableRequest;
use tokio::sync::oneshot;
Expand All @@ -36,7 +35,10 @@ use crate::{
payload::{ReadPayload, WalDecoder},
row_iter::IterOptions,
space::{Space, SpaceId, SpaceRef},
sst::{factory::FactoryRef as SstFactoryRef, file::FilePurger},
sst::{
factory::{FactoryRef as SstFactoryRef, ObjectStorePickerRef},
file::FilePurger,
},
table::data::{TableData, TableDataRef},
wal_synchronizer::{WalSynchronizer, WalSynchronizerConfig},
};
Expand All @@ -47,16 +49,14 @@ impl Instance {
ctx: OpenContext,
manifest: ManifestRef,
wal_manager: WalManagerRef,
store: ObjectStoreRef,
store_with_readonly_cache: ObjectStoreRef,
store_picker: ObjectStorePickerRef,
sst_factory: SstFactoryRef,
) -> Result<Arc<Self>> {
let space_store = Arc::new(SpaceStore {
spaces: RwLock::new(Spaces::default()),
manifest,
wal_manager: wal_manager.clone(),
store: store.clone(),
store_with_readonly_cache,
store_picker: store_picker.clone(),
sst_factory,
meta_cache: ctx.meta_cache.clone(),
});
Expand All @@ -69,7 +69,7 @@ impl Instance {
scheduler_config,
));

let file_purger = FilePurger::start(&bg_runtime, store);
let file_purger = FilePurger::start(&bg_runtime, store_picker.default_store().clone());

let mut wal_synchronizer =
WalSynchronizer::new(WalSynchronizerConfig::default(), wal_manager);
Expand Down
4 changes: 2 additions & 2 deletions analytic_engine/src/instance/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ impl Instance {
predicate: request.predicate.clone(),
sst_factory: &self.space_store.sst_factory,
sst_reader_options: sst_reader_options.clone(),
store: self.space_store.default_store(),
store_picker: self.space_store.store_picker(),
merge_iter_options: iter_options.clone(),
need_dedup: table_options.need_dedup(),
reverse: request.order.is_in_desc_order(),
Expand Down Expand Up @@ -241,7 +241,7 @@ impl Instance {
predicate: request.predicate.clone(),
sst_reader_options: sst_reader_options.clone(),
sst_factory: &self.space_store.sst_factory,
store: self.space_store.default_store(),
store_picker: self.space_store.store_picker(),
};
let builder = chain::Builder::new(chain_config);
let chain_iter = builder
Expand Down
9 changes: 4 additions & 5 deletions analytic_engine/src/row_iter/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use common_types::{
use common_util::define_result;
use futures::StreamExt;
use log::debug;
use object_store::ObjectStoreRef;
use snafu::{ResultExt, Snafu};
use table_engine::{predicate::PredicateRef, table::TableId};

Expand All @@ -21,7 +20,7 @@ use crate::{
},
space::SpaceId,
sst::{
factory::{FactoryRef as SstFactoryRef, SstReaderOptions},
factory::{FactoryRef as SstFactoryRef, ObjectStorePickerRef, SstReaderOptions},
file::FileHandle,
},
table::version::{MemTableVec, SamplingMemTable},
Expand Down Expand Up @@ -61,8 +60,8 @@ pub struct ChainConfig<'a> {
pub sst_reader_options: SstReaderOptions,
/// Sst factory
pub sst_factory: &'a SstFactoryRef,
/// Sst storage
pub store: &'a ObjectStoreRef,
/// Store picker for persisting sst.
pub store_picker: &'a ObjectStorePickerRef,
}

/// Builder for [ChainIterator].
Expand Down Expand Up @@ -144,7 +143,7 @@ impl<'a> Builder<'a> {
sst,
self.config.sst_factory,
&self.config.sst_reader_options,
self.config.store,
self.config.store_picker,
)
.await
.context(BuildStreamFromSst)?;
Expand Down
9 changes: 4 additions & 5 deletions analytic_engine/src/row_iter/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use common_types::{
use common_util::define_result;
use futures::{future::try_join_all, StreamExt};
use log::{debug, info, trace};
use object_store::ObjectStoreRef;
use snafu::{ensure, Backtrace, ResultExt, Snafu};
use table_engine::{predicate::PredicateRef, table::TableId};

Expand All @@ -33,7 +32,7 @@ use crate::{
},
space::SpaceId,
sst::{
factory::{FactoryRef as SstFactoryRef, SstReaderOptions},
factory::{FactoryRef as SstFactoryRef, ObjectStorePickerRef, SstReaderOptions},
file::FileHandle,
manager::{FileId, MAX_LEVEL},
},
Expand Down Expand Up @@ -98,8 +97,8 @@ pub struct MergeConfig<'a> {
pub sst_reader_options: SstReaderOptions,
/// Sst factory
pub sst_factory: &'a SstFactoryRef,
/// Sst storage
pub store: &'a ObjectStoreRef,
/// Store picker for persisting sst.
pub store_picker: &'a ObjectStorePickerRef,

pub merge_iter_options: IterOptions,

Expand Down Expand Up @@ -208,7 +207,7 @@ impl<'a> MergeBuilder<'a> {
f,
self.config.sst_factory,
&self.config.sst_reader_options,
self.config.store,
self.config.store_picker,
)
.await
.context(BuildStreamFromSst)?;
Expand Down
11 changes: 5 additions & 6 deletions analytic_engine/src/row_iter/record_batch_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,14 @@ use datafusion::{
physical_plan::PhysicalExpr,
};
use futures::stream::{self, Stream, StreamExt};
use object_store::ObjectStoreRef;
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
use table_engine::{predicate::Predicate, table::TableId};

use crate::{
memtable::{MemTableRef, ScanContext, ScanRequest},
space::SpaceId,
sst::{
factory::{FactoryRef as SstFactoryRef, SstReaderOptions},
factory::{FactoryRef as SstFactoryRef, ObjectStorePickerRef, SstReaderOptions},
file::FileHandle,
},
table::sst_util,
Expand Down Expand Up @@ -258,15 +257,15 @@ pub async fn filtered_stream_from_sst_file(
sst_file: &FileHandle,
sst_factory: &SstFactoryRef,
sst_reader_options: &SstReaderOptions,
store: &ObjectStoreRef,
store_picker: &ObjectStorePickerRef,
) -> Result<SequencedRecordBatchStream> {
stream_from_sst_file(
space_id,
table_id,
sst_file,
sst_factory,
sst_reader_options,
store,
store_picker,
)
.await
.and_then(|origin_stream| {
Expand All @@ -288,12 +287,12 @@ pub async fn stream_from_sst_file(
sst_file: &FileHandle,
sst_factory: &SstFactoryRef,
sst_reader_options: &SstReaderOptions,
store: &ObjectStoreRef,
store_picker: &ObjectStorePickerRef,
) -> Result<SequencedRecordBatchStream> {
sst_file.read_meter().mark();
let path = sst_util::new_sst_file_path(space_id, table_id, sst_file.id());
let mut sst_reader = sst_factory
.new_sst_reader(sst_reader_options, &path, store)
.new_sst_reader(sst_reader_options, &path, store_picker)
.with_context(|| SstReaderNotFound {
options: sst_reader_options.clone(),
})?;
Expand Down
25 changes: 18 additions & 7 deletions analytic_engine/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::{
ManifestRef,
},
sst::{
factory::FactoryImpl,
factory::{FactoryImpl, ObjectStorePicker, ObjectStorePickerRef, ReadFrequency},
meta_cache::{MetaCache, MetaCacheRef},
},
storage_options::{ObjectStoreOptions, StorageOptions},
Expand Down Expand Up @@ -118,8 +118,7 @@ pub trait EngineBuilder: Send + Sync + Default {
engine_runtimes,
wal,
manifest,
opened_storages.default_store,
opened_storages.store_with_readonly_cache,
Arc::new(opened_storages),
)
.await?;
Ok(Arc::new(TableEngineImpl::new(instance)))
Expand Down Expand Up @@ -351,8 +350,7 @@ async fn open_instance(
engine_runtimes: Arc<EngineRuntimes>,
wal_manager: WalManagerRef,
manifest: ManifestRef,
store: ObjectStoreRef,
store_with_readonly_cache: ObjectStoreRef,
store_picker: ObjectStorePickerRef,
) -> Result<InstanceRef> {
let meta_cache: Option<MetaCacheRef> = config
.sst_meta_cache_cap
Expand All @@ -368,20 +366,33 @@ async fn open_instance(
open_ctx,
manifest,
wal_manager,
store,
store_with_readonly_cache,
store_picker,
Arc::new(FactoryImpl::default()),
)
.await
.context(OpenInstance)?;
Ok(instance)
}

#[derive(Debug)]
struct OpenedStorages {
default_store: ObjectStoreRef,
store_with_readonly_cache: ObjectStoreRef,
}

impl ObjectStorePicker for OpenedStorages {
fn default_store(&self) -> &ObjectStoreRef {
&self.default_store
}

fn pick_by_freq(&self, freq: ReadFrequency) -> &ObjectStoreRef {
match freq {
ReadFrequency::Once => &self.store_with_readonly_cache,
ReadFrequency::Frequent => &self.default_store,
}
}
}

// Build store in multiple layer, access speed decrease in turn.
// MemCacheStore → DiskCacheStore → real ObjectStore(OSS/S3...)
// MemCacheStore(ReadOnly) ↑
Expand Down
Loading

0 comments on commit b17be94

Please sign in to comment.