Skip to content

Commit

Permalink
feat: add open_table_on_shard and close_table_on_shard in meta_event_…
Browse files Browse the repository at this point in the history
…service (#542)

* feat: add open_table_on_shard and close_table_on_shard in meta_event_service

* refactor by CR
  • Loading branch information
chunshao90 authored Jan 9, 2023
1 parent 8e53951 commit 99ca8d0
Show file tree
Hide file tree
Showing 5 changed files with 241 additions and 77 deletions.
14 changes: 7 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ message_queue = { path = "components/message_queue" }

[workspace.dependencies.ceresdbproto]
git = "https://github.com/CeresDB/ceresdbproto.git"
rev = "6e12241032e01d00e130b65118dfafc0794a3102"
rev = "6cc1754ffc31ddfe0710972c94f8fe5acd61af98"

[workspace.dependencies.datafusion]
git = "https://github.com/CeresDB/arrow-datafusion.git"
Expand Down
62 changes: 49 additions & 13 deletions cluster/src/cluster_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@ use std::{
};

use async_trait::async_trait;
use ceresdbproto::meta_event::{
CloseShardRequest, CreateTableOnShardRequest, DropTableOnShardRequest, OpenShardRequest,
use ceresdbproto::{
meta_event::{
CloseShardRequest, CloseTableOnShardRequest, CreateTableOnShardRequest,
DropTableOnShardRequest, OpenShardRequest, OpenTableOnShardRequest, UpdateShardInfo,
},
meta_service::TableInfo as TableInfoPb,
};
use common_util::runtime::{JoinHandle, Runtime};
use log::{error, info, warn};
Expand Down Expand Up @@ -247,14 +251,34 @@ impl Inner {
}

fn create_table_on_shard(&self, req: &CreateTableOnShardRequest) -> Result<()> {
let update_shard_info = req.update_shard_info.clone().context(ShardNotFound {
msg: "update shard info is missing in CreateTableOnShardRequest",
self.insert_table_to_shard(req.update_shard_info.clone(), req.table_info.clone())
}

fn drop_table_on_shard(&self, req: &DropTableOnShardRequest) -> Result<()> {
self.remove_table_to_shard(req.update_shard_info.clone(), req.table_info.clone())
}

fn open_table_on_shard(&self, req: &OpenTableOnShardRequest) -> Result<()> {
self.insert_table_to_shard(req.update_shard_info.clone(), req.table_info.clone())
}

fn close_table_on_shard(&self, req: &CloseTableOnShardRequest) -> Result<()> {
self.remove_table_to_shard(req.update_shard_info.clone(), req.table_info.clone())
}

fn insert_table_to_shard(
&self,
update_shard_info: Option<UpdateShardInfo>,
table_info: Option<TableInfoPb>,
) -> Result<()> {
let update_shard_info = update_shard_info.context(ShardNotFound {
msg: "update shard info is missing",
})?;
let curr_shard_info = update_shard_info.curr_shard_info.context(ShardNotFound {
msg: "current shard info is missing in UpdateShardInfo",
msg: "current shard info is missing",
})?;
let table_info = req.table_info.clone().context(TableNotFound {
msg: "table info is missing in CreateTableOnShardRequest",
let table_info = table_info.context(TableNotFound {
msg: "table info is missing",
})?;

self.shard_tables_cache.try_insert_table_to_shard(
Expand All @@ -264,15 +288,19 @@ impl Inner {
)
}

fn drop_table_on_shard(&self, req: &DropTableOnShardRequest) -> Result<()> {
let update_shard_info = req.update_shard_info.clone().context(ShardNotFound {
msg: "update shard info is missing in DropTableOnShardRequest",
fn remove_table_to_shard(
&self,
update_shard_info: Option<UpdateShardInfo>,
table_info: Option<TableInfoPb>,
) -> Result<()> {
let update_shard_info = update_shard_info.context(ShardNotFound {
msg: "update shard info is missing",
})?;
let curr_shard_info = update_shard_info.curr_shard_info.context(ShardNotFound {
msg: "current shard info is missing in UpdateShardInfo",
msg: "current shard info is missing",
})?;
let table_info = req.table_info.clone().context(TableNotFound {
msg: "table info is missing in CreateTableOnShardRequest",
let table_info = table_info.context(TableNotFound {
msg: "table info is missing",
})?;

self.shard_tables_cache.try_remove_table_from_shard(
Expand Down Expand Up @@ -332,6 +360,14 @@ impl Cluster for ClusterImpl {
self.inner.drop_table_on_shard(req)
}

async fn open_table_on_shard(&self, req: &OpenTableOnShardRequest) -> Result<()> {
self.inner.open_table_on_shard(req)
}

async fn close_table_on_shard(&self, req: &CloseTableOnShardRequest) -> Result<()> {
self.inner.close_table_on_shard(req)
}

async fn route_tables(&self, req: &RouteTablesRequest) -> Result<RouteTablesResponse> {
self.inner.route_tables(req).await
}
Expand Down
5 changes: 4 additions & 1 deletion cluster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use std::sync::Arc;

use async_trait::async_trait;
use ceresdbproto::meta_event::{
CloseShardRequest, CreateTableOnShardRequest, DropTableOnShardRequest, OpenShardRequest,
CloseShardRequest, CloseTableOnShardRequest, CreateTableOnShardRequest,
DropTableOnShardRequest, OpenShardRequest, OpenTableOnShardRequest,
};
use common_types::schema::SchemaName;
use common_util::define_result;
Expand Down Expand Up @@ -118,6 +119,8 @@ pub trait Cluster {
async fn close_shard(&self, req: &CloseShardRequest) -> Result<TablesOfShard>;
async fn create_table_on_shard(&self, req: &CreateTableOnShardRequest) -> Result<()>;
async fn drop_table_on_shard(&self, req: &DropTableOnShardRequest) -> Result<()>;
async fn open_table_on_shard(&self, req: &OpenTableOnShardRequest) -> Result<()>;
async fn close_table_on_shard(&self, req: &CloseTableOnShardRequest) -> Result<()>;
async fn route_tables(&self, req: &RouteTablesRequest) -> Result<RouteTablesResponse>;
async fn fetch_nodes(&self) -> Result<ClusterNodesResp>;
}
Loading

0 comments on commit 99ca8d0

Please sign in to comment.