Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: drop partition table #871

Merged
merged 2 commits into from
May 6, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 78 additions & 39 deletions proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ use std::{

use ::http::StatusCode;
use bytes::Bytes;
use catalog::schema::{CreateOptions, CreateTableRequest, DropOptions, DropTableRequest};
use catalog::schema::{
CreateOptions, CreateTableRequest, DropOptions, DropTableRequest, SchemaRef,
};
use ceresdbproto::storage::{
storage_service_client::StorageServiceClient, value, PrometheusRemoteQueryRequest,
PrometheusRemoteQueryResponse, SqlQueryRequest, SqlQueryResponse, WriteSeriesEntry,
Expand Down Expand Up @@ -220,16 +222,6 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
schema_name: &str,
table_name: &str,
) -> Result<()> {
let partition_table_info = self
.router
.fetch_partition_table_info(schema_name, table_name)
.await?;
if partition_table_info.is_none() {
return Ok(());
}

let partition_table_info = partition_table_info.unwrap();

let catalog = self
.instance
.catalog_manager
Expand All @@ -256,44 +248,62 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
code: StatusCode::BAD_REQUEST,
msg: format!("Schema not found, schema_name:{schema_name}"),
})?;

let table = schema
.table_by_name(&partition_table_info.name)
.table_by_name(table_name)
.box_err()
.with_context(|| ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: format!(
"Failed to find table, table_name:{}",
partition_table_info.name
),
msg: format!("Failed to find table, table_name:{table_name}"),
})?;

if let Some(table) = table {
if table.id().as_u64() == partition_table_info.id {
return Ok(());
}
let partition_table_info_in_meta = self
.router
.fetch_partition_table_info(schema_name, table_name)
.await?;

// Drop partition table if table id not match.
let opts = DropOptions {
table_engine: self.instance.partition_table_engine.clone(),
};
schema
.drop_table(
DropTableRequest {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
table_name: table_name.to_string(),
engine: PARTITION_TABLE_ENGINE_TYPE.to_string(),
},
opts,
match (table, &partition_table_info_in_meta) {
(Some(table), Some(partition_table_info)) => {
// No need to create partition table when table_id match.
if table.id().as_u64() == partition_table_info.id {
return Ok(());
}
info!("Drop partition table because the id of the table in ceresdb is different from the one in ceresmeta, catalog_name:{catalog_name}, schema_name:{schema_name}, table_name:{table_name}, old_table_id:{}, new_table_id:{}",
table.id().as_u64(), partition_table_info.id);
// Drop partition table because the id of the table in ceresdb is different from
// the one in ceresmeta.
self.drop_partition_table(
schema.clone(),
catalog_name.to_string(),
schema_name.to_string(),
table_name.to_string(),
)
.await
.box_err()
.with_context(|| ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: format!("Failed to drop partition table, table_name:{table_name}"),
})?;
.await?;
}
(Some(table), None) => {
// Drop partition table because it does not exist in ceresmeta but exists in
// ceresdb-server.
if table.partition_info().is_some() {
info!("Drop partition table because it does not exist in ceresmeta but exists in ceresdb-server, catalog_name:{catalog_name}, schema_name:{schema_name}, table_name:{table_name}, table_id:{}",table.id());
self.drop_partition_table(
schema.clone(),
catalog_name.to_string(),
schema_name.to_string(),
table_name.to_string(),
)
.await?;
}
// No need to create non-partition table.
return Ok(());
}
// No need to create partition table when table not exist.
(None, None) => return Ok(()),
// Create partition table in following code.
(None, Some(_)) => (),
}

let partition_table_info = partition_table_info_in_meta.unwrap();

// If table not exists, open it.
// Get table_schema from first sub partition table.
let first_sub_partition_table_name = util::get_sub_partition_name(
Expand Down Expand Up @@ -433,6 +443,35 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {

Ok(plan_vec)
}

async fn drop_partition_table(
&self,
schema: SchemaRef,
catalog_name: String,
schema_name: String,
table_name: String,
) -> Result<()> {
let opts = DropOptions {
table_engine: self.instance.partition_table_engine.clone(),
};
schema
.drop_table(
DropTableRequest {
catalog_name,
schema_name,
table_name: table_name.clone(),
engine: PARTITION_TABLE_ENGINE_TYPE.to_string(),
},
opts,
)
.await
.box_err()
.with_context(|| ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: format!("Failed to drop partition table, table_name:{table_name}"),
})?;
Ok(())
}
}

#[derive(Clone)]
Expand Down