Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add version in PartitionInfo #537

Merged
merged 2 commits into from
Jan 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions analytic_engine/src/meta/details.rs
Original file line number Diff line number Diff line change
Expand Up @@ -774,7 +774,7 @@ mod tests {
use common_util::{runtime, runtime::Runtime, tests::init_log_for_test};
use futures::future::BoxFuture;
use table_engine::{
partition::{Definition, HashPartitionInfo, PartitionInfo},
partition::{HashPartitionInfo, PartitionDefinition, PartitionInfo},
table::{SchemaId, TableId, TableSeqGenerator},
};
use wal::rocks_impl::manager::Builder as WalBuilder;
Expand Down Expand Up @@ -1188,8 +1188,10 @@ mod tests {

runtime.block_on(async move {
let table_id = ctx.alloc_table_id();
let default_version = 0;
let partition_info = Some(PartitionInfo::Hash(HashPartitionInfo {
definitions: vec![Definition {
version: default_version,
partition_definitions: vec![PartitionDefinition {
name: "p0".to_string(),
origin_name: Some("region0".to_string()),
}],
Expand Down
26 changes: 15 additions & 11 deletions interpreters/src/show_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,14 @@ impl ShowCreateInterpreter {
res += format!(
" PARTITION BY LINEAR HASH({}) PARTITIONS {}",
expr,
v.definitions.len()
v.partition_definitions.len()
)
.as_str()
} else {
res += format!(
" PARTITION BY HASH({}) PARTITIONS {}",
expr,
v.definitions.len()
v.partition_definitions.len()
)
.as_str()
}
Expand All @@ -145,14 +145,14 @@ impl ShowCreateInterpreter {
res += format!(
" PARTITION BY LINEAR KEY({}) PARTITIONS {}",
rendered_partition_key,
v.definitions.len()
v.partition_definitions.len()
)
.as_str()
} else {
res += format!(
" PARTITION BY KEY({}) PARTITIONS {}",
rendered_partition_key,
v.definitions.len()
v.partition_definitions.len()
)
.as_str()
}
Expand Down Expand Up @@ -186,20 +186,23 @@ mod test {

use datafusion_expr::col;
use datafusion_proto::bytes::Serializeable;
use table_engine::partition::{Definition, HashPartitionInfo, KeyPartitionInfo, PartitionInfo};
use table_engine::partition::{
HashPartitionInfo, KeyPartitionInfo, PartitionDefinition, PartitionInfo,
};

use super::ShowCreateInterpreter;

#[test]
fn test_render_hash_partition_info() {
let expr = col("col1").add(col("col2"));
let partition_info = PartitionInfo::Hash(HashPartitionInfo {
definitions: vec![
Definition {
version: 0,
partition_definitions: vec![
PartitionDefinition {
name: "p0".to_string(),
origin_name: None,
},
Definition {
PartitionDefinition {
name: "p1".to_string(),
origin_name: None,
},
Expand All @@ -219,12 +222,13 @@ mod test {
fn test_render_key_partition_info() {
let partition_key_col_name = "col1";
let partition_info = PartitionInfo::Key(KeyPartitionInfo {
definitions: vec![
Definition {
version: 0,
partition_definitions: vec![
PartitionDefinition {
name: "p0".to_string(),
origin_name: None,
},
Definition {
PartitionDefinition {
name: "p1".to_string(),
origin_name: None,
},
Expand Down
18 changes: 10 additions & 8 deletions proto/protos/meta_update.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,25 @@ message AddSpaceMeta {
string space_name = 2;
}

message Definition {
message PartitionDefinition {
string name = 1;
oneof origin_name {
string origin = 2;
}
}

message HashPartitionInfo {
repeated Definition definitions = 1;
bytes expr = 2;
bool linear = 3;
int32 version = 1;
repeated PartitionDefinition partition_definitions = 2;
bytes expr = 3;
bool linear = 4;
}

message KeyPartitionInfo {
repeated Definition definitions = 1;
repeated string partition_key = 2;
bool linear = 3;
int32 version = 1;
repeated PartitionDefinition partition_definitions = 2;
repeated string partition_key = 3;
bool linear = 4;
}

// Meta update for a new table
Expand All @@ -46,7 +48,7 @@ message AddTableMeta {

message PartitionInfo {
oneof partition_info_enum {
HashPartitionInfo hash = 1;
HashPartitionInfo hash = 1;
KeyPartitionInfo key = 2;
}
}
Expand Down
16 changes: 11 additions & 5 deletions sql/src/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,17 @@ use datafusion_expr::Expr;
use datafusion_proto::bytes::Serializeable;
use snafu::ResultExt;
use sqlparser::ast::Expr as SqlExpr;
use table_engine::partition::{Definition, HashPartitionInfo, KeyPartitionInfo, PartitionInfo};
use table_engine::partition::{
HashPartitionInfo, KeyPartitionInfo, PartitionDefinition, PartitionInfo,
};

use crate::{
ast::{HashPartition, KeyPartition, Partition},
planner::{ParsePartitionWithCause, Result, UnsupportedPartition},
};

const DEFAULT_PARTITION_VERSION: i32 = 0;

pub struct PartitionParser;

impl PartitionParser {
Expand Down Expand Up @@ -43,7 +47,8 @@ impl PartitionParser {
})?;

Ok(HashPartitionInfo {
definitions,
version: DEFAULT_PARTITION_VERSION,
partition_definitions: definitions,
expr,
linear,
})
Expand All @@ -65,17 +70,18 @@ impl PartitionParser {
let definitions = make_partition_definitions(partition_num);

Ok(KeyPartitionInfo {
definitions,
version: DEFAULT_PARTITION_VERSION,
partition_definitions: definitions,
partition_key,
linear,
})
}
}

fn make_partition_definitions(partition_num: u64) -> Vec<Definition> {
fn make_partition_definitions(partition_num: u64) -> Vec<PartitionDefinition> {
(0..partition_num)
.into_iter()
.map(|p| Definition {
.map(|p| PartitionDefinition {
name: p.to_string(),
origin_name: None,
})
Expand Down
107 changes: 61 additions & 46 deletions table_engine/src/partition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,51 +74,53 @@ pub enum PartitionInfo {
}

impl PartitionInfo {
pub fn get_definitions(&self) -> Vec<Definition> {
pub fn get_definitions(&self) -> Vec<PartitionDefinition> {
match self {
Self::Hash(v) => v.definitions.clone(),
Self::Key(v) => v.definitions.clone(),
Self::Hash(v) => v.partition_definitions.clone(),
Self::Key(v) => v.partition_definitions.clone(),
}
}
}

#[derive(Clone, Debug, PartialEq, Eq, Default)]
pub struct Definition {
pub struct PartitionDefinition {
pub name: String,
pub origin_name: Option<String>,
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct HashPartitionInfo {
pub definitions: Vec<Definition>,
pub version: i32,
pub partition_definitions: Vec<PartitionDefinition>,
pub expr: Bytes,
pub linear: bool,
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct KeyPartitionInfo {
pub definitions: Vec<Definition>,
pub version: i32,
pub partition_definitions: Vec<PartitionDefinition>,
pub partition_key: Vec<String>,
pub linear: bool,
}

impl From<Definition> for meta_pb::Definition {
fn from(definition: Definition) -> Self {
impl From<PartitionDefinition> for meta_pb::PartitionDefinition {
fn from(definition: PartitionDefinition) -> Self {
Self {
name: definition.name,
origin_name: definition
.origin_name
.map(meta_pb::definition::OriginName::Origin),
.map(meta_pb::partition_definition::OriginName::Origin),
}
}
}

impl From<meta_pb::Definition> for Definition {
fn from(pb: meta_pb::Definition) -> Self {
impl From<meta_pb::PartitionDefinition> for PartitionDefinition {
fn from(pb: meta_pb::PartitionDefinition) -> Self {
let mut origin_name = None;
if let Some(v) = pb.origin_name {
match v {
meta_pb::definition::OriginName::Origin(name) => origin_name = Some(name),
meta_pb::partition_definition::OriginName::Origin(name) => origin_name = Some(name),
}
}
Self {
Expand All @@ -131,8 +133,9 @@ impl From<meta_pb::Definition> for Definition {
impl From<meta_pb::HashPartitionInfo> for HashPartitionInfo {
fn from(partition_info_pb: meta_pb::HashPartitionInfo) -> Self {
HashPartitionInfo {
definitions: partition_info_pb
.definitions
version: partition_info_pb.version,
partition_definitions: partition_info_pb
.partition_definitions
.into_iter()
.map(|v| v.into())
.collect(),
Expand All @@ -145,8 +148,9 @@ impl From<meta_pb::HashPartitionInfo> for HashPartitionInfo {
impl From<HashPartitionInfo> for meta_pb::HashPartitionInfo {
fn from(partition_info: HashPartitionInfo) -> Self {
meta_pb::HashPartitionInfo {
definitions: partition_info
.definitions
version: partition_info.version,
partition_definitions: partition_info
.partition_definitions
.into_iter()
.map(|v| v.into())
.collect(),
Expand All @@ -159,8 +163,9 @@ impl From<HashPartitionInfo> for meta_pb::HashPartitionInfo {
impl From<meta_pb::KeyPartitionInfo> for KeyPartitionInfo {
fn from(partition_info_pb: meta_pb::KeyPartitionInfo) -> Self {
KeyPartitionInfo {
definitions: partition_info_pb
.definitions
version: partition_info_pb.version,
partition_definitions: partition_info_pb
.partition_definitions
.into_iter()
.map(|v| v.into())
.collect(),
Expand All @@ -173,8 +178,9 @@ impl From<meta_pb::KeyPartitionInfo> for KeyPartitionInfo {
impl From<KeyPartitionInfo> for meta_pb::KeyPartitionInfo {
fn from(partition_info: KeyPartitionInfo) -> Self {
meta_pb::KeyPartitionInfo {
definitions: partition_info
.definitions
version: partition_info.version,
partition_definitions: partition_info
.partition_definitions
.into_iter()
.map(|v| v.into())
.collect(),
Expand Down Expand Up @@ -279,30 +285,39 @@ impl PartitionInfoEncoder {
}
}

#[test]
fn test_partition_info_encoder() {
let partition_info = PartitionInfo::Key(KeyPartitionInfo {
definitions: vec![
Definition {
name: "p0".to_string(),
origin_name: Some("partition_0".to_string()),
},
Definition {
name: "p1".to_string(),
origin_name: None,
},
],
partition_key: vec!["col1".to_string(), "col2".to_string(), "col3".to_string()],
linear: false,
});
let partition_info_encoder = PartitionInfoEncoder::default();
let encode_partition_info = partition_info_encoder
.encode(partition_info.clone())
.unwrap();
let decode_partition_info = partition_info_encoder
.decode(&encode_partition_info)
.unwrap()
.unwrap();

assert_eq!(decode_partition_info, partition_info);
#[cfg(test)]
mod test {
use crate::partition::{
rule::key::DEFAULT_PARTITION_VERSION, KeyPartitionInfo, PartitionDefinition, PartitionInfo,
PartitionInfoEncoder,
};

#[test]
fn test_partition_info_encoder() {
let partition_info = PartitionInfo::Key(KeyPartitionInfo {
version: DEFAULT_PARTITION_VERSION,
partition_definitions: vec![
PartitionDefinition {
name: "p0".to_string(),
origin_name: Some("partition_0".to_string()),
},
PartitionDefinition {
name: "p1".to_string(),
origin_name: None,
},
],
partition_key: vec!["col1".to_string(), "col2".to_string(), "col3".to_string()],
linear: false,
});
let partition_info_encoder = PartitionInfoEncoder::default();
let encode_partition_info = partition_info_encoder
.encode(partition_info.clone())
.unwrap();
let decode_partition_info = partition_info_encoder
.decode(&encode_partition_info)
.unwrap()
.unwrap();

assert_eq!(decode_partition_info, partition_info);
}
}
Loading