Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support random partition rule #1193

Merged
merged 8 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ bytes = "1"
bytes_ext = { path = "components/bytes_ext" }
catalog = { path = "catalog" }
catalog_impls = { path = "catalog_impls" }
ceresdbproto = "1.0.13"
ceresdbproto = "1.0.14"
codec = { path = "components/codec" }
chrono = "0.4"
clap = "3.0"
Expand Down
12 changes: 9 additions & 3 deletions analytic_engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ use log::{error, info};
use snafu::{OptionExt, ResultExt};
use table_engine::{
engine::{
Close, CloseShardRequest, CloseTableRequest, CreateTableRequest, DropTableRequest,
OpenShard, OpenShardRequest, OpenShardResult, OpenTableNoCause, OpenTableRequest,
OpenTableWithCause, Result, TableDef, TableEngine,
Close, CloseShardRequest, CloseTableRequest, CreateTableParams, CreateTableRequest,
DropTableRequest, OpenShard, OpenShardRequest, OpenShardResult, OpenTableNoCause,
OpenTableRequest, OpenTableWithCause, Result, TableDef, TableEngine,
},
table::{SchemaId, TableRef},
ANALYTIC_ENGINE_TYPE,
Expand Down Expand Up @@ -100,6 +100,12 @@ impl TableEngine for TableEngineImpl {
Ok(())
}

async fn validate_create_table(&self, params: &CreateTableParams) -> Result<()> {
self.instance.validate_create_table(params)?;

Ok(())
}

async fn create_table(&self, request: CreateTableRequest) -> Result<TableRef> {
let space_id = build_space_id(request.schema_id);

Expand Down
19 changes: 10 additions & 9 deletions analytic_engine/src/instance/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,15 +212,16 @@ impl<'a> Alterer<'a> {
"Instance alter options, space_id:{}, tables:{:?}, old_table_opts:{:?}, options:{:?}",
self.table_data.space_id, self.table_data.name, current_table_options, options
);
let mut table_opts =
table_options::merge_table_options_for_alter(&options, &current_table_options)
.box_err()
.context(InvalidOptions {
space_id: self.table_data.space_id,
table: &self.table_data.name,
table_id: self.table_data.id,
})?;
table_opts.sanitize();
let table_opts = {
let mut opts =
table_options::merge_table_options_for_alter(&options, &current_table_options)
.box_err()
.context(InvalidOptions {
table: &self.table_data.name,
})?;
opts.sanitize();
opts
};
let manifest_update = AlterOptionsMeta {
space_id: self.table_data.space_id,
table_id: self.table_data.id,
Expand Down
61 changes: 42 additions & 19 deletions analytic_engine/src/instance/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,51 @@

use generic_error::BoxError;
use log::info;
use snafu::{OptionExt, ResultExt};
use table_engine::engine::CreateTableRequest;
use snafu::{ensure, OptionExt, ResultExt};
use table_engine::{
engine::{CreateTableParams, CreateTableRequest},
partition::PartitionInfo,
};

use crate::{
instance::{
engine::{CreateOpenFailedTable, InvalidOptions, Result, TableNotExist, WriteManifest},
engine::{
CreateOpenFailedTable, InvalidOptions, Result, TableNotExist,
TryCreateRandomPartitionTableInOverwriteMode, WriteManifest,
},
Instance,
},
manifest::meta_edit::{AddTableMeta, MetaEdit, MetaEditRequest, MetaUpdate},
space::SpaceRef,
table::data::{TableDataRef, TableShardInfo},
table_options,
table_options, TableOptions,
};

impl Instance {
/// Validate the request of creating table.
pub fn validate_create_table(&self, params: &CreateTableParams) -> Result<TableOptions> {
let table_opts =
table_options::merge_table_options_for_create(&params.table_options, &self.table_opts)
.box_err()
.context(InvalidOptions {
table: &params.table_name,
})?;

if let Some(partition_info) = &params.partition_info {
let dedup_on_random_partition =
table_opts.need_dedup() && matches!(partition_info, PartitionInfo::Random(_));

ensure!(
!dedup_on_random_partition,
TryCreateRandomPartitionTableInOverwriteMode {
table: &params.table_name,
}
);
}

Ok(table_opts)
}

/// Create table need to be handled by write worker.
pub async fn do_create_table(
&self,
Expand All @@ -39,21 +69,14 @@ impl Instance {
) -> Result<TableDataRef> {
info!("Instance create table, request:{:?}", request);

if space.is_open_failed_table(&request.table_name) {
if space.is_open_failed_table(&request.params.table_name) {
return CreateOpenFailedTable {
table: request.table_name,
table: request.params.table_name,
}
.fail();
}

let mut table_opts =
table_options::merge_table_options_for_create(&request.options, &self.table_opts)
.box_err()
.context(InvalidOptions {
space_id: space.id,
table: &request.table_name,
table_id: request.table_id,
})?;
let mut table_opts = self.validate_create_table(&request.params)?;
// Sanitize options before creating table.
table_opts.sanitize();

Expand All @@ -66,8 +89,8 @@ impl Instance {
let meta_update = MetaUpdate::AddTable(AddTableMeta {
space_id: space.id,
table_id: request.table_id,
table_name: request.table_name.clone(),
schema: request.table_schema,
table_name: request.params.table_name.clone(),
schema: request.params.table_schema,
opts: table_opts,
});
MetaEditRequest {
Expand All @@ -79,9 +102,9 @@ impl Instance {
.manifest
.apply_edit(edit_req)
.await
.with_context(|| WriteManifest {
.context(WriteManifest {
space_id: space.id,
table: request.table_name.clone(),
table: &request.params.table_name,
table_id: request.table_id,
})?;

Expand All @@ -91,7 +114,7 @@ impl Instance {
.with_context(|| TableNotExist {
msg: format!(
"table not exist, space_id:{}, table_id:{}, table_name:{}",
space.id, request.table_id, request.table_name
space.id, request.table_id, request.params.table_name
),
})
}
Expand Down
28 changes: 12 additions & 16 deletions analytic_engine/src/instance/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,19 +124,8 @@ pub enum Error {
source: GenericError,
},

#[snafu(display(
"Invalid options, space_id:{}, table:{}, table_id:{}, err:{}",
space_id,
table,
table_id,
source
))]
InvalidOptions {
space_id: SpaceId,
table: String,
table_id: TableId,
source: GenericError,
},
#[snafu(display("Invalid options, table:{table}, err:{source}",))]
InvalidOptions { table: String, source: GenericError },

#[snafu(display(
"Failed to create table data, space_id:{}, table:{}, table_id:{}, err:{}",
Expand Down Expand Up @@ -243,14 +232,21 @@ pub enum Error {
msg: Option<String>,
backtrace: Backtrace,
},

#[snafu(display(
"Try to create a random partition table in overwrite mode, table:{table}.\nBacktrace:\n{backtrace}",
))]
TryCreateRandomPartitionTableInOverwriteMode { table: String, backtrace: Backtrace },
}

define_result!(Error);

impl From<Error> for table_engine::engine::Error {
fn from(err: Error) -> Self {
match &err {
Error::InvalidOptions { table, .. } | Error::SpaceNotExist { table, .. } => {
Error::InvalidOptions { table, .. }
| Error::SpaceNotExist { table, .. }
| Error::TryCreateRandomPartitionTableInOverwriteMode { table, .. } => {
Self::InvalidArguments {
table: table.clone(),
source: Box::new(err),
Expand Down Expand Up @@ -330,8 +326,8 @@ impl Instance {
request: CreateTableRequest,
) -> Result<SpaceAndTable> {
let context = SpaceContext {
catalog_name: request.catalog_name.clone(),
schema_name: request.schema_name.clone(),
catalog_name: request.params.catalog_name.clone(),
schema_name: request.params.schema_name.clone(),
};
let space = self.find_or_create_space(space_id, context).await?;
let table_data = self.do_create_table(space.clone(), request).await?;
Expand Down
19 changes: 11 additions & 8 deletions analytic_engine/src/table/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,7 @@ pub mod tests {
use arena::NoopCollector;
use common_types::{datum::DatumKind, table::DEFAULT_SHARD_ID};
use table_engine::{
engine::{CreateTableRequest, TableState},
engine::{CreateTableParams, CreateTableRequest, TableState},
table::SchemaId,
};
use time_ext::ReadableDuration;
Expand Down Expand Up @@ -793,18 +793,21 @@ pub mod tests {
pub fn build(self) -> TableData {
let space_id = DEFAULT_SPACE_ID;
let table_schema = default_schema();
let create_request = CreateTableRequest {
let params = CreateTableParams {
catalog_name: "test_catalog".to_string(),
schema_name: "public".to_string(),
schema_id: SchemaId::from_u32(DEFAULT_SPACE_ID),
table_id: self.table_id,
table_name: self.table_name,
table_schema,
engine: table_engine::ANALYTIC_ENGINE_TYPE.to_string(),
options: HashMap::new(),
table_options: HashMap::new(),
partition_info: None,
};
let create_request = CreateTableRequest {
params,
schema_id: SchemaId::from_u32(DEFAULT_SPACE_ID),
table_id: self.table_id,
state: TableState::Stable,
shard_id: self.shard_id,
partition_info: None,
};

let table_opts = TableOptions::default();
Expand All @@ -814,8 +817,8 @@ pub mod tests {
TableData::new(
space_id,
create_request.table_id,
create_request.table_name,
create_request.table_schema,
create_request.params.table_name,
create_request.params.table_schema,
create_request.shard_id,
table_opts,
&purger,
Expand Down
33 changes: 19 additions & 14 deletions analytic_engine/src/tests/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use common_types::{
};
use table_engine::{
self,
engine::{CreateTableRequest, TableState},
engine::{CreateTableParams, CreateTableRequest, TableState},
predicate::Predicate,
table::{GetRequest, ReadOptions, ReadRequest, SchemaId, TableId, TableSeq},
};
Expand Down Expand Up @@ -122,7 +122,7 @@ impl FixedSchemaTable {
}

fn new_row_group(&self, rows: Vec<Row>) -> RowGroup {
RowGroupBuilder::with_rows(self.create_request.table_schema.clone(), rows)
RowGroupBuilder::with_rows(self.create_request.params.table_schema.clone(), rows)
.unwrap()
.build()
}
Expand All @@ -132,7 +132,7 @@ impl FixedSchemaTable {
}

pub fn new_read_all_request(&self, opts: ReadOptions) -> ReadRequest {
new_read_all_request_with_order(self.create_request.table_schema.clone(), opts)
new_read_all_request_with_order(self.create_request.params.table_schema.clone(), opts)
}

pub fn new_get_request(&self, key: KeyTuple) -> GetRequest {
Expand All @@ -141,7 +141,7 @@ impl FixedSchemaTable {
GetRequest {
request_id: RequestId::next_id(),
projected_schema: ProjectedSchema::no_projection(
self.create_request.table_schema.clone(),
self.create_request.params.table_schema.clone(),
),
primary_key,
}
Expand Down Expand Up @@ -272,7 +272,7 @@ impl Builder {
}

pub fn table_name(mut self, table_name: String) -> Self {
self.create_request.table_name = table_name;
self.create_request.params.table_name = table_name;
self
}

Expand All @@ -282,7 +282,7 @@ impl Builder {
}

pub fn enable_ttl(mut self, enable_ttl: bool) -> Self {
self.create_request.options.insert(
self.create_request.params.table_options.insert(
common_types::OPTION_KEY_ENABLE_TTL.to_string(),
enable_ttl.to_string(),
);
Expand All @@ -291,7 +291,8 @@ impl Builder {

pub fn ttl(mut self, duration: ReadableDuration) -> Self {
self.create_request
.options
.params
.table_options
.insert(common_types::TTL.to_string(), duration.to_string());
self
}
Expand All @@ -305,17 +306,21 @@ impl Builder {

impl Default for Builder {
fn default() -> Self {
let params = CreateTableParams {
catalog_name: "ceresdb".to_string(),
schema_name: "public".to_string(),
table_name: "test_table".to_string(),
table_schema: FixedSchemaTable::default_schema(),
partition_info: None,
engine: table_engine::ANALYTIC_ENGINE_TYPE.to_string(),
table_options: HashMap::new(),
};

Self {
create_request: CreateTableRequest {
catalog_name: "ceresdb".to_string(),
schema_name: "public".to_string(),
params,
schema_id: SchemaId::from_u32(2),
table_id: new_table_id(2, 1),
table_name: "test_table".to_string(),
table_schema: FixedSchemaTable::default_schema(),
partition_info: None,
engine: table_engine::ANALYTIC_ENGINE_TYPE.to_string(),
options: HashMap::new(),
state: TableState::Stable,
shard_id: DEFAULT_SHARD_ID,
},
Expand Down
2 changes: 1 addition & 1 deletion analytic_engine/src/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ impl<T: WalsOpener> TestContext<T> {
}

async fn create_table(&mut self, create_request: CreateTableRequest) {
let table_name = create_request.table_name.clone();
let table_name = create_request.params.table_name.clone();
let table = self.engine().create_table(create_request).await.unwrap();

self.name_to_tables.insert(table_name.to_string(), table);
Expand Down
Loading
Loading