Skip to content

Commit

Permalink
feat: update ceresdb proto & adapter to partition info (#515)
Browse files Browse the repository at this point in the history
* update ceresdb proto & adapter to partition info

* refactor by cr

* fix CI

* refactor by cr

* add encoder test

Co-authored-by: jiacai2050 <dev@liujiacai.net>
  • Loading branch information
ZuLiangWang and jiacai2050 authored Dec 29, 2022
1 parent bcb5f99 commit 56081b0
Show file tree
Hide file tree
Showing 8 changed files with 277 additions and 54 deletions.
14 changes: 7 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ message_queue = { path = "components/message_queue" }

[workspace.dependencies.ceresdbproto]
git = "https://github.com/CeresDB/ceresdbproto.git"
rev = "b9c45bcdbf7d55d5889d42b4c8017282819e6049"
rev = "1ea76ec99bebee298dfb27b6a15feeb4f42c3aa2"

[workspace.dependencies.datafusion]
git = "https://github.com/CeresDB/arrow-datafusion.git"
Expand Down
17 changes: 15 additions & 2 deletions analytic_engine/src/meta/meta_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ pub enum Error {
#[snafu(display("Failed to convert schema, err:{}", source))]
ConvertSchema { source: common_types::schema::Error },

#[snafu(display("Failed to convert partition info, err:{}", source))]
ConvertPartitionInfo {
source: table_engine::partition::Error,
},

#[snafu(display("Empty table schema.\nBacktrace:\n{}", backtrace))]
EmptyTableSchema { backtrace: Backtrace },

Expand Down Expand Up @@ -213,13 +218,14 @@ pub struct AddTableMeta {

impl From<AddTableMeta> for meta_pb::AddTableMeta {
fn from(v: AddTableMeta) -> Self {
let partition_info = v.partition_info.map(|v| v.into());
meta_pb::AddTableMeta {
space_id: v.space_id,
table_id: v.table_id.as_u64(),
table_name: v.table_name,
schema: Some(common_pb::TableSchema::from(&v.schema)),
options: Some(analytic_common::TableOptions::from(v.opts)),
partition_info: v.partition_info.map(|info| info.into()),
partition_info,
}
}
}
Expand All @@ -230,13 +236,20 @@ impl TryFrom<meta_pb::AddTableMeta> for AddTableMeta {
fn try_from(src: meta_pb::AddTableMeta) -> Result<Self> {
let table_schema = src.schema.context(EmptyTableSchema)?;
let opts = src.options.context(EmptyTableOptions)?;
let partition_info = match src.partition_info {
Some(partition_info) => {
Some(PartitionInfo::try_from(partition_info).context(ConvertPartitionInfo)?)
}
None => None,
};

Ok(Self {
space_id: src.space_id,
table_id: TableId::from(src.table_id),
table_name: src.table_name,
schema: Schema::try_from(table_schema).context(ConvertSchema)?,
opts: TableOptions::from(opts),
partition_info: src.partition_info.map(|v| v.into()),
partition_info,
})
}
}
Expand Down
28 changes: 24 additions & 4 deletions interpreters/src/table_manipulator/meta_based.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ use async_trait::async_trait;
use common_types::schema::SchemaEncoder;
use log::info;
use meta_client::{
types::{CreateTableRequest, DropTableRequest, PartitionInfo},
types::{CreateTableRequest, DropTableRequest, PartitionTableInfo},
MetaClientRef,
};
use snafu::ResultExt;
use sql::plan::{CreateTablePlan, DropTablePlan};
use table_engine::{engine::TableEngineRef, partition::format_sub_partition_table_name};
use table_engine::{
engine::TableEngineRef,
partition::{format_sub_partition_table_name, PartitionInfoEncoder},
};

use crate::{
context::Context,
Expand Down Expand Up @@ -45,21 +48,38 @@ impl TableManipulator for TableManipulatorImpl {
),
})?;

let partition_info = plan.partition_info.map(|v| {
let sub_table_names = plan.partition_info.clone().map(|v| {
v.get_definitions()
.iter()
.map(|def| format_sub_partition_table_name(&plan.table, &def.name))
.collect::<Vec<String>>()
});
let partition_table_info =
sub_table_names.map(|v| PartitionTableInfo { sub_table_names: v });

let encoder = PartitionInfoEncoder::default();

let encoded_partition_info = match plan.partition_info.clone() {
None => Vec::new(),
Some(v) => encoder
.encode(v)
.map_err(|e| Box::new(e) as _)
.with_context(|| CreateWithCause {
msg: format!(
"fail to encode partition info, ctx:{:?}, plan:{:?}",
ctx, plan
),
})?,
};
let req = CreateTableRequest {
schema_name: ctx.default_schema().to_string(),
name: plan.table,
encoded_schema,
engine: plan.engine,
create_if_not_exist: plan.if_not_exists,
options: plan.options,
partition_info: partition_info.map(|v| PartitionInfo { names: v }),
partition_table_info,
encoded_partition_info,
};

let resp = self
Expand Down
17 changes: 11 additions & 6 deletions meta_client/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ pub struct AllocSchemaIdResponse {
}

#[derive(Clone, Debug)]
pub struct PartitionInfo {
pub names: Vec<String>,
pub struct PartitionTableInfo {
pub sub_table_names: Vec<String>,
}

#[derive(Clone, Debug)]
Expand All @@ -45,7 +45,8 @@ pub struct CreateTableRequest {
pub engine: String,
pub create_if_not_exist: bool,
pub options: HashMap<String, String>,
pub partition_info: Option<PartitionInfo>,
pub partition_table_info: Option<PartitionTableInfo>,
pub encoded_partition_info: Vec<u8>,
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -306,6 +307,11 @@ impl From<meta_service_pb::AllocSchemaIdResponse> for AllocSchemaIdResponse {

impl From<CreateTableRequest> for meta_service_pb::CreateTableRequest {
fn from(req: CreateTableRequest) -> Self {
let partition_table_info =
req.partition_table_info
.map(|v| meta_service_pb::PartitionTableInfo {
sub_table_names: v.sub_table_names,
});
Self {
header: None,
schema_name: req.schema_name,
Expand All @@ -314,9 +320,8 @@ impl From<CreateTableRequest> for meta_service_pb::CreateTableRequest {
engine: req.engine,
create_if_not_exist: req.create_if_not_exist,
options: req.options,
partition_info: req
.partition_info
.map(|v| meta_service_pb::PartitionInfo { names: v.names }),
encoded_partition_info: req.encoded_partition_info,
partition_table_info,
}
}
}
Expand Down
10 changes: 7 additions & 3 deletions proto/protos/meta_update.proto
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,13 @@ message AddTableMeta {
common.TableSchema schema = 4;
// Options of the table
analytic_common.TableOptions options = 5;
oneof partition_info {
HashPartitionInfo hash = 6;
KeyPartitionInfo key_partition = 7;
PartitionInfo partition_info = 6;
}

message PartitionInfo {
oneof partition_info_enum {
HashPartitionInfo hash = 1;
KeyPartitionInfo key = 2;
}
}

Expand Down
17 changes: 16 additions & 1 deletion server/src/grpc/meta_event_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use query_engine::executor::Executor as QueryExecutor;
use snafu::{OptionExt, ResultExt};
use table_engine::{
engine::{CloseTableRequest, TableEngineRef, TableState},
partition::PartitionInfoEncoder,
table::{SchemaId, TableId},
ANALYTIC_ENGINE_TYPE,
};
Expand Down Expand Up @@ -333,6 +334,20 @@ async fn handle_create_table_on_shard(
),
})?;

let partition_info = match request.encoded_partition_info.is_empty() {
true => None,
false => PartitionInfoEncoder::default()
.decode(&request.encoded_partition_info)
.map_err(|e| Box::new(e) as _)
.context(ErrWithCause {
code: StatusCode::BadRequest,
msg: format!(
"fail to decode encoded partition info bytes, raw_bytes:{:?}",
request.encoded_partition_info
),
})?,
};

let create_table_request = CreateTableRequest {
catalog_name: ctx.catalog_manager.default_catalog_name().to_string(),
schema_name: table.schema_name,
Expand All @@ -344,7 +359,7 @@ async fn handle_create_table_on_shard(
state: TableState::Stable,
shard_id: shard_info.id,
cluster_version: topology.cluster_topology_version,
partition_info: None,
partition_info,
};
let create_opts = CreateOptions {
table_engine: ctx.table_engine,
Expand Down
Loading

0 comments on commit 56081b0

Please sign in to comment.