Skip to content

Commit

Permalink
refactor: set up server
Browse files Browse the repository at this point in the history
  • Loading branch information
ShiKaiWi committed Feb 27, 2023
1 parent e0a8214 commit 8af6247
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 107 deletions.
131 changes: 52 additions & 79 deletions analytic_engine/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,72 +107,54 @@ const MANIFEST_DIR_NAME: &str = "manifest";
const STORE_DIR_NAME: &str = "store";
const DISK_CACHE_DIR_NAME: &str = "sst_cache";

#[derive(Default)]
pub struct EngineBuildContextBuilder {
#[derive(Clone)]
struct EngineBuilder2 {
config: Config,
router: Option<RouterRef>,
engine_runtimes: Arc<EngineRuntimes>,
data_wal: WalManagerRef,
manifest_wal: WalManagerRef,
}

impl EngineBuildContextBuilder {
pub fn config(mut self, config: Config) -> Self {
self.config = config;
self
}

pub fn router(mut self, router: RouterRef) -> Self {
self.router = Some(router);
self
}
impl EngineBuilder2 {
async fn build(self) -> Result<TableEngineRef> {
let opened_storages = open_storage(self.config.storage.clone()).await?;
let manifest = ManifestImpl::open(
self.config.manifest.clone(),
self.manifest_wal.clone(),
opened_storages.default_store().clone(),
)
.await
.context(OpenManifest)?;

pub fn build(self) -> EngineBuildContext {
EngineBuildContext {
config: self.config,
router: self.router,
}
let instance = open_instance(
self.config,
self.engine_runtimes,
self.data_wal,
Arc::new(manifest),
Arc::new(opened_storages),
self.router,
)
.await?;
Ok(Arc::new(TableEngineImpl::new(instance)))
}
}

#[derive(Clone)]
pub struct EngineBuildContext {
pub config: Config,
pub router: Option<RouterRef>,
#[derive(Debug, Clone)]
struct OpenedWals {
data_wal: WalManagerRef,
manifest_wal: WalManagerRef,
}

/// Analytic engine builder.
#[async_trait]
pub trait EngineBuilder: Send + Sync + Default {
/// Build the analytic engine from `config` and `engine_runtimes`.
async fn build(
&self,
context: EngineBuildContext,
engine_runtimes: Arc<EngineRuntimes>,
) -> Result<TableEngineRef> {
let opened_storages = open_storage(context.config.storage.clone()).await?;
let (wal, manifest) = self
.open_wal_and_manifest(
context.config.clone(),
engine_runtimes.clone(),
opened_storages.default_store().clone(),
)
.await?;
let instance = open_instance(
context.config.clone(),
engine_runtimes,
wal,
manifest,
Arc::new(opened_storages),
context.router,
)
.await?;
Ok(Arc::new(TableEngineImpl::new(instance)))
}

async fn open_wal_and_manifest(
&self,
config: Config,
engine_runtimes: Arc<EngineRuntimes>,
object_store: ObjectStoreRef,
) -> Result<(WalManagerRef, ManifestRef)>;
) -> Result<OpenedWals>;
}

/// [RocksEngine] builder.
Expand All @@ -186,7 +168,7 @@ impl EngineBuilder for RocksDBWalEngineBuilder {
config: Config,
engine_runtimes: Arc<EngineRuntimes>,
object_store: ObjectStoreRef,
) -> Result<(WalManagerRef, ManifestRef)> {
) -> Result<OpenedWals> {
let rocksdb_wal_config = match config.wal {
WalStorageConfig::RocksDB(config) => *config,
_ => {
Expand All @@ -203,24 +185,19 @@ impl EngineBuilder for RocksDBWalEngineBuilder {
let write_runtime = engine_runtimes.write_runtime.clone();
let data_path = Path::new(&rocksdb_wal_config.data_dir);
let wal_path = data_path.join(WAL_DIR_NAME);
let wal_manager = WalBuilder::with_default_rocksdb_config(wal_path, write_runtime.clone())
let data_wal = WalBuilder::with_default_rocksdb_config(wal_path, write_runtime.clone())
.build()
.context(OpenWal)?;

let manifest_path = data_path.join(MANIFEST_DIR_NAME);
let manifest_wal = WalBuilder::with_default_rocksdb_config(manifest_path, write_runtime)
.build()
.context(OpenManifestWal)?;

let manifest = ManifestImpl::open(
config.manifest.clone(),
Arc::new(manifest_wal),
object_store,
)
.await
.context(OpenManifest)?;

Ok((Arc::new(wal_manager), Arc::new(manifest)))
let opened_wals = OpenedWals {
data_wal: Arc::new(data_wal),
manifest_wal: Arc::new(manifest_wal),
};
Ok(opened_wals)
}
}

Expand All @@ -235,7 +212,7 @@ impl EngineBuilder for ObkvWalEngineBuilder {
config: Config,
engine_runtimes: Arc<EngineRuntimes>,
object_store: ObjectStoreRef,
) -> Result<(WalManagerRef, ManifestRef)> {
) -> Result<OpenedWals> {
let obkv_wal_config = match &config.wal {
WalStorageConfig::Obkv(config) => config.clone(),
_ => {
Expand All @@ -262,7 +239,6 @@ impl EngineBuilder for ObkvWalEngineBuilder {
config.manifest.clone(),
engine_runtimes,
obkv,
object_store,
)
.await
}
Expand All @@ -284,7 +260,7 @@ impl EngineBuilder for MemWalEngineBuilder {
config: Config,
engine_runtimes: Arc<EngineRuntimes>,
object_store: ObjectStoreRef,
) -> Result<(WalManagerRef, ManifestRef)> {
) -> Result<OpenedWals> {
let obkv_wal_config = match &config.wal {
WalStorageConfig::Obkv(config) => config.clone(),
_ => {
Expand All @@ -302,8 +278,7 @@ impl EngineBuilder for MemWalEngineBuilder {
*obkv_wal_config,
config.manifest.clone(),
engine_runtimes,
self.table_kv.clone(),
object_store,
self.table_kv,
)
.await
}
Expand All @@ -319,7 +294,7 @@ impl EngineBuilder for KafkaWalEngineBuilder {
config: Config,
engine_runtimes: Arc<EngineRuntimes>,
object_store: ObjectStoreRef,
) -> Result<(WalManagerRef, ManifestRef)> {
) -> Result<OpenedWals> {
let kafka_wal_config = match &config.wal {
WalStorageConfig::Kafka(config) => config.clone(),
_ => {
Expand All @@ -337,7 +312,7 @@ impl EngineBuilder for KafkaWalEngineBuilder {
let kafka = KafkaImpl::new(kafka_wal_config.kafka.clone())
.await
.context(OpenKafka)?;
let wal_manager = MessageQueueImpl::new(
let data_wal = MessageQueueImpl::new(
WAL_DIR_NAME.to_string(),
kafka.clone(),
bg_runtime.clone(),
Expand All @@ -351,11 +326,10 @@ impl EngineBuilder for KafkaWalEngineBuilder {
kafka_wal_config.meta_namespace,
);

let manifest = ManifestImpl::open(config.manifest, Arc::new(manifest_wal), object_store)
.await
.context(OpenManifest)?;

Ok((Arc::new(wal_manager), Arc::new(manifest)))
Ok(OpenedWals {
data_wal: Arc::new(data_wal),
manifest_wal: Arc::new(manifest_wal),
})
}
}

Expand All @@ -364,15 +338,14 @@ async fn open_wal_and_manifest_with_table_kv<T: TableKv>(
manifest_opts: ManifestOptions,
engine_runtimes: Arc<EngineRuntimes>,
table_kv: T,
object_store: ObjectStoreRef,
) -> Result<(WalManagerRef, ManifestRef)> {
) -> Result<OpenedWals> {
let runtimes = WalRuntimes {
read_runtime: engine_runtimes.read_runtime.clone(),
write_runtime: engine_runtimes.write_runtime.clone(),
bg_runtime: engine_runtimes.bg_runtime.clone(),
};

let wal_manager = WalNamespaceImpl::open(
let data_wal = WalNamespaceImpl::open(
table_kv.clone(),
runtimes.clone(),
WAL_DIR_NAME,
Expand All @@ -389,11 +362,11 @@ async fn open_wal_and_manifest_with_table_kv<T: TableKv>(
)
.await
.context(OpenManifestWal)?;
let manifest = ManifestImpl::open(manifest_opts, Arc::new(manifest_wal), object_store)
.await
.context(OpenManifest)?;

Ok((Arc::new(wal_manager), Arc::new(manifest)))
Ok(OpenedWals {
data_wal: Arc::new(data_wal),
manifest_wal: Arc::new(manifest_wal),
})
}

async fn open_instance(
Expand Down
20 changes: 10 additions & 10 deletions common_util/src/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::time;
/// Meters are used to calculate rate of an event.
#[derive(Debug)]
pub struct Meter {
moving_avarages: ExponentiallyWeightedMovingAverages,
moving_averages: ExponentiallyWeightedMovingAverages,
count: AtomicU64,
start_time: SystemTime,
}
Expand All @@ -30,7 +30,7 @@ impl Default for Meter {
impl Meter {
pub fn new() -> Meter {
Meter {
moving_avarages: ExponentiallyWeightedMovingAverages::new(),
moving_averages: ExponentiallyWeightedMovingAverages::new(),
count: AtomicU64::from(0),
start_time: SystemTime::now(),
}
Expand All @@ -42,23 +42,23 @@ impl Meter {

pub fn mark_n(&self, n: u64) {
self.count.fetch_add(n, Ordering::Relaxed);
self.moving_avarages.tick_if_needed();
self.moving_avarages.update(n);
self.moving_averages.tick_if_needed();
self.moving_averages.update(n);
}

pub fn h1_rate(&self) -> f64 {
self.moving_avarages.tick_if_needed();
self.moving_avarages.h1_rate()
self.moving_averages.tick_if_needed();
self.moving_averages.h1_rate()
}

pub fn h2_rate(&self) -> f64 {
self.moving_avarages.tick_if_needed();
self.moving_avarages.h2_rate()
self.moving_averages.tick_if_needed();
self.moving_averages.h2_rate()
}

pub fn m15_rate(&self) -> f64 {
self.moving_avarages.tick_if_needed();
self.moving_avarages.m15_rate()
self.moving_averages.tick_if_needed();
self.moving_averages.m15_rate()
}

pub fn count(&self) -> u64 {
Expand Down
19 changes: 14 additions & 5 deletions server/src/grpc/meta_event_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl<Q: QueryExecutor + 'static> MetaServiceImpl<Q> {
CloseTableOnShardResponse
);

fn handler_ctx(&self) -> HandlerContext {
fn handler_ctx(&self) -> HandlerContext<C> {
HandlerContext {
cluster: self.cluster.clone(),
catalog_manager: self.instance.catalog_manager.clone(),
Expand All @@ -140,14 +140,19 @@ impl<Q: QueryExecutor + 'static> MetaServiceImpl<Q> {
}
}

pub trait WalRegionCloser: Debug {
fn close_region(&self, region_id: RegionId) -> Result<()>;
}

/// Context for handling all kinds of meta event service.
struct HandlerContext {
struct HandlerContext<C> {
cluster: ClusterRef,
catalog_manager: ManagerRef,
table_engine: TableEngineRef,
wal_shard_closer: C,
}

impl HandlerContext {
impl<C> HandlerContext<C> {
fn default_catalog(&self) -> Result<CatalogRef> {
let default_catalog_name = self.catalog_manager.default_catalog_name();
let default_catalog = self
Expand Down Expand Up @@ -224,7 +229,10 @@ async fn handle_open_shard(ctx: HandlerContext, request: OpenShardRequest) -> Re
Ok(())
}

async fn handle_close_shard(ctx: HandlerContext, request: CloseShardRequest) -> Result<()> {
async fn handle_close_shard<C: WalRegionCloser>(
ctx: HandlerContext<C>,
request: CloseShardRequest,
) -> Result<()> {
let tables_of_shard =
ctx.cluster
.close_shard(&request)
Expand Down Expand Up @@ -261,7 +269,8 @@ async fn handle_close_shard(ctx: HandlerContext, request: CloseShardRequest) ->
})?;
}

Ok(())
// try to close wal region
ctx.wal_shard_closer.close_region(request.shard_id as u64)
}

async fn handle_create_table_on_shard(
Expand Down
Loading

0 comments on commit 8af6247

Please sign in to comment.