Skip to content

Commit

Permalink
fix: drop partition table (#871)
Browse files Browse the repository at this point in the history
  • Loading branch information
chunshao90 authored May 6, 2023
1 parent bc4d398 commit 7674470
Showing 1 changed file with 78 additions and 39 deletions.
117 changes: 78 additions & 39 deletions proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ use std::{

use ::http::StatusCode;
use bytes::Bytes;
use catalog::schema::{CreateOptions, CreateTableRequest, DropOptions, DropTableRequest};
use catalog::schema::{
CreateOptions, CreateTableRequest, DropOptions, DropTableRequest, SchemaRef,
};
use ceresdbproto::storage::{
storage_service_client::StorageServiceClient, value, PrometheusRemoteQueryRequest,
PrometheusRemoteQueryResponse, SqlQueryRequest, SqlQueryResponse, WriteSeriesEntry,
Expand Down Expand Up @@ -220,16 +222,6 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
schema_name: &str,
table_name: &str,
) -> Result<()> {
let partition_table_info = self
.router
.fetch_partition_table_info(schema_name, table_name)
.await?;
if partition_table_info.is_none() {
return Ok(());
}

let partition_table_info = partition_table_info.unwrap();

let catalog = self
.instance
.catalog_manager
Expand All @@ -256,44 +248,62 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
code: StatusCode::BAD_REQUEST,
msg: format!("Schema not found, schema_name:{schema_name}"),
})?;

let table = schema
.table_by_name(&partition_table_info.name)
.table_by_name(table_name)
.box_err()
.with_context(|| ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: format!(
"Failed to find table, table_name:{}",
partition_table_info.name
),
msg: format!("Failed to find table, table_name:{table_name}"),
})?;

if let Some(table) = table {
if table.id().as_u64() == partition_table_info.id {
return Ok(());
}
let partition_table_info_in_meta = self
.router
.fetch_partition_table_info(schema_name, table_name)
.await?;

// Drop partition table if table id not match.
let opts = DropOptions {
table_engine: self.instance.partition_table_engine.clone(),
};
schema
.drop_table(
DropTableRequest {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
table_name: table_name.to_string(),
engine: PARTITION_TABLE_ENGINE_TYPE.to_string(),
},
opts,
match (table, &partition_table_info_in_meta) {
(Some(table), Some(partition_table_info)) => {
// No need to create partition table when table_id match.
if table.id().as_u64() == partition_table_info.id {
return Ok(());
}
info!("Drop partition table because the id of the table in ceresdb is different from the one in ceresmeta, catalog_name:{catalog_name}, schema_name:{schema_name}, table_name:{table_name}, old_table_id:{}, new_table_id:{}",
table.id().as_u64(), partition_table_info.id);
// Drop partition table because the id of the table in ceresdb is different from
// the one in ceresmeta.
self.drop_partition_table(
schema.clone(),
catalog_name.to_string(),
schema_name.to_string(),
table_name.to_string(),
)
.await
.box_err()
.with_context(|| ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: format!("Failed to drop partition table, table_name:{table_name}"),
})?;
.await?;
}
(Some(table), None) => {
// Drop partition table because it does not exist in ceresmeta but exists in
// ceresdb-server.
if table.partition_info().is_some() {
info!("Drop partition table because it does not exist in ceresmeta but exists in ceresdb-server, catalog_name:{catalog_name}, schema_name:{schema_name}, table_name:{table_name}, table_id:{}",table.id());
self.drop_partition_table(
schema.clone(),
catalog_name.to_string(),
schema_name.to_string(),
table_name.to_string(),
)
.await?;
}
// No need to create non-partition table.
return Ok(());
}
// No need to create partition table when table not exist.
(None, None) => return Ok(()),
// Create partition table in following code.
(None, Some(_)) => (),
}

let partition_table_info = partition_table_info_in_meta.unwrap();

// If table not exists, open it.
// Get table_schema from first sub partition table.
let first_sub_partition_table_name = util::get_sub_partition_name(
Expand Down Expand Up @@ -433,6 +443,35 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {

Ok(plan_vec)
}

async fn drop_partition_table(
&self,
schema: SchemaRef,
catalog_name: String,
schema_name: String,
table_name: String,
) -> Result<()> {
let opts = DropOptions {
table_engine: self.instance.partition_table_engine.clone(),
};
schema
.drop_table(
DropTableRequest {
catalog_name,
schema_name,
table_name: table_name.clone(),
engine: PARTITION_TABLE_ENGINE_TYPE.to_string(),
},
opts,
)
.await
.box_err()
.with_context(|| ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: format!("Failed to drop partition table, table_name:{table_name}"),
})?;
Ok(())
}
}

#[derive(Clone)]
Expand Down

0 comments on commit 7674470

Please sign in to comment.