Skip to content

Commit

Permalink
feat: add shard related methods to table engine (#897)
Browse files Browse the repository at this point in the history
## Which issue does this PR close?

Closes #
Part of #799 

## Rationale for this change
+ Add `open_shard` and `close_shard` methods into `TableEngine`.
+ Impl the methods above on demand.

## What changes are included in this PR?
See above.
## Are there any user-facing changes?
None.
## How does this change test
Test by ut.
  • Loading branch information
Rachelint authored May 22, 2023
1 parent 8893478 commit 12f8656
Show file tree
Hide file tree
Showing 9 changed files with 230 additions and 132 deletions.
101 changes: 98 additions & 3 deletions analytic_engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ use std::sync::Arc;

use async_trait::async_trait;
use common_util::error::BoxError;
use log::info;
use log::{error, info};
use snafu::ResultExt;
use table_engine::{
engine::{
Close, CloseTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest, Result,
TableEngine,
Close, CloseShardRequest, CloseTableRequest, CreateTableRequest, DropTableRequest,
OpenShardRequest, OpenTableRequest, Result, TableEngine,
},
table::{SchemaId, TableRef},
ANALYTIC_ENGINE_TYPE,
Expand All @@ -37,6 +37,63 @@ impl TableEngineImpl {
pub fn new(instance: InstanceRef) -> Self {
Self { instance }
}

async fn open_tables_of_shard(
&self,
open_requests: Vec<table_engine::engine::OpenTableRequest>,
) -> Vec<table_engine::engine::Result<Option<TableRef>>> {
if open_requests.is_empty() {
return Vec::new();
}

let mut open_results = Vec::with_capacity(open_requests.len());
for request in open_requests {
let result = self
.open_table(request.clone())
.await
.map_err(|e| {
error!("Failed to open table, open_request:{request:?}, err:{e}");
e
})
.map(|table_opt| {
if table_opt.is_none() {
error!(
"Table engine returns none when opening table, open_request:{request:?}"
);
}
table_opt
});

open_results.push(result);
}

open_results
}

async fn close_tables_of_shard(
&self,
close_requests: Vec<table_engine::engine::CloseTableRequest>,
) -> Vec<table_engine::engine::Result<String>> {
if close_requests.is_empty() {
return Vec::new();
}

let mut close_results = Vec::with_capacity(close_requests.len());
for request in close_requests {
let result = self
.close_table(request.clone())
.await
.map_err(|e| {
error!("Failed to close table, close_request:{request:?}, err:{e}");
e
})
.map(|_| request.table_name);

close_results.push(result);
}

close_results
}
}

impl Drop for TableEngineImpl {
Expand Down Expand Up @@ -132,6 +189,44 @@ impl TableEngine for TableEngineImpl {

Ok(())
}

async fn open_shard(&self, request: OpenShardRequest) -> Vec<Result<Option<TableRef>>> {
let table_defs = request.table_defs;
let open_requests = table_defs
.into_iter()
.map(|def| OpenTableRequest {
catalog_name: def.catalog_name,
schema_name: def.schema_name,
schema_id: def.schema_id,
table_name: def.name,
table_id: def.id,
engine: request.engine.clone(),
shard_id: request.shard_id,
})
.collect();

self.open_tables_of_shard(open_requests).await
}

async fn close_shard(
&self,
request: CloseShardRequest,
) -> Vec<table_engine::engine::Result<String>> {
let table_defs = request.table_defs;
let close_requests = table_defs
.into_iter()
.map(|def| CloseTableRequest {
catalog_name: def.catalog_name,
schema_name: def.schema_name,
schema_id: def.schema_id,
table_name: def.name,
table_id: def.id,
engine: request.engine.clone(),
})
.collect();

self.close_tables_of_shard(close_requests).await
}
}

/// Generate the space id from the schema id with assumption schema id is unique
Expand Down
34 changes: 34 additions & 0 deletions catalog/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,40 @@ pub struct AlterTableRequest {
pub operations: Vec<AlterTableOperation>,
}

#[derive(Debug, Clone)]
pub struct OpenShardRequest {
/// Shard id
pub shard_id: ShardId,

/// Table infos
pub table_defs: Vec<TableDef>,

/// Table engine type
pub engine: String,
}

#[derive(Clone, Debug)]
pub struct TableDef {
pub catalog_name: String,
pub schema_name: String,
pub id: TableId,
pub name: String,
}

impl TableDef {
pub fn into_engine_table_def(self, schema_id: SchemaId) -> engine::TableDef {
engine::TableDef {
catalog_name: self.catalog_name,
schema_name: self.schema_name,
schema_id,
id: self.id,
name: self.name,
}
}
}

pub type CloseShardRequest = OpenShardRequest;

/// Schema manage tables.
#[async_trait]
pub trait Schema {
Expand Down
147 changes: 34 additions & 113 deletions catalog/src/table_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,15 @@
use std::time::Instant;

use common_util::{error::BoxError, time::InstantExt};
use log::{error, info, warn};
use log::{info, warn};
use snafu::{OptionExt, ResultExt};
use table_engine::{
engine::{CloseShardRequest, OpenShardRequest, TableEngineRef},
table::TableRef,
};
use table_engine::{engine, table::TableRef};

use crate::{
manager::ManagerRef,
schema::{
CloseOptions, CloseTableRequest, CreateOptions, CreateTableRequest, DropOptions,
DropTableRequest, OpenOptions, OpenTableRequest, SchemaRef,
CloseOptions, CloseShardRequest, CloseTableRequest, CreateOptions, CreateTableRequest,
DropOptions, DropTableRequest, OpenOptions, OpenShardRequest, OpenTableRequest, SchemaRef,
},
Result, TableOperatorNoCause, TableOperatorWithCause,
};
Expand All @@ -39,40 +36,30 @@ impl TableOperator {
let shard_id = request.shard_id;

// Generate open requests.
let table_infos = request.table_defs;
let schemas_and_requests = table_infos
.into_iter()
.map(|table| {
let schema_res = self.schema_by_name(&table.catalog_name, &table.schema_name);

schema_res.map(|schema| {
let request = table_engine::engine::OpenTableRequest {
catalog_name: table.catalog_name,
schema_name: table.schema_name,
schema_id: schema.id(),
table_name: table.name.clone(),
table_id: table.id,
engine: request.engine.clone(),
shard_id: request.shard_id,
};

(schema, request)
})
})
.collect::<Result<Vec<_>>>()?;
let (schemas, requests): (Vec<_>, Vec<_>) = schemas_and_requests.into_iter().unzip();
let mut schemas = Vec::with_capacity(request.table_defs.len());
let mut engine_table_defs = Vec::with_capacity(request.table_defs.len());
for open_ctx in request.table_defs {
let schema = self.schema_by_name(&open_ctx.catalog_name, &open_ctx.schema_name)?;
engine_table_defs.push(open_ctx.into_engine_table_def(schema.id()));
schemas.push(schema);
}

// Open tables by table engine.
// TODO: add the `open_shard` method into table engine.
let open_res = open_tables_of_shard(table_engine, requests).await;
let engine_open_shard_req = engine::OpenShardRequest {
shard_id: request.shard_id,
table_defs: engine_table_defs,
engine: request.engine,
};
let open_results = table_engine.open_shard(engine_open_shard_req).await;

// Check and register successful opened table into schema.
let mut success_count = 0_u32;
let mut no_table_count = 0_u32;
let mut open_err_count = 0_u32;

for (schema, open_res) in schemas.into_iter().zip(open_res.into_iter()) {
match open_res {
for (schema, open_result) in schemas.into_iter().zip(open_results.into_iter()) {
match open_result {
Ok(Some(table)) => {
schema.register_table(table);
success_count += 1;
Expand Down Expand Up @@ -112,38 +99,29 @@ impl TableOperator {
let shard_id = request.shard_id;

// Generate open requests.
let table_defs = request.table_defs;
let schemas_and_requests = table_defs
.into_iter()
.map(|def| {
let schema_res = self.schema_by_name(&def.catalog_name, &def.schema_name);

schema_res.map(|schema| {
let request = table_engine::engine::CloseTableRequest {
catalog_name: def.catalog_name,
schema_name: def.schema_name,
schema_id: schema.id(),
table_name: def.name.clone(),
table_id: def.id,
engine: request.engine.clone(),
};

(schema, request)
})
})
.collect::<Result<Vec<_>>>()?;
let (schemas, requests): (Vec<_>, Vec<_>) = schemas_and_requests.into_iter().unzip();
let mut schemas = Vec::with_capacity(request.table_defs.len());
let mut engine_table_defs = Vec::with_capacity(request.table_defs.len());
for table_def in request.table_defs {
let schema = self.schema_by_name(&table_def.catalog_name, &table_def.schema_name)?;
engine_table_defs.push(table_def.into_engine_table_def(schema.id()));
schemas.push(schema);
}

// Close tables by table engine.
// TODO: add the `close_shard` method into table engine.
let results = close_tables_of_shard(table_engine, requests).await;
let engine_close_shard_req = engine::CloseShardRequest {
shard_id: request.shard_id,
table_defs: engine_table_defs,
engine: request.engine,
};
let close_results = table_engine.close_shard(engine_close_shard_req).await;

// Check and unregister successful closed table from schema.
let mut success_count = 0_u32;
let mut close_err_count = 0_u32;

for (schema, result) in schemas.into_iter().zip(results.into_iter()) {
match result {
for (schema, close_result) in schemas.into_iter().zip(close_results.into_iter()) {
match close_result {
Ok(table_name) => {
schema.unregister_table(&table_name);
success_count += 1;
Expand Down Expand Up @@ -286,60 +264,3 @@ impl TableOperator {
})
}
}

async fn open_tables_of_shard(
table_engine: TableEngineRef,
open_requests: Vec<table_engine::engine::OpenTableRequest>,
) -> Vec<table_engine::engine::Result<Option<TableRef>>> {
if open_requests.is_empty() {
return Vec::new();
}

let mut open_results = Vec::with_capacity(open_requests.len());
for request in open_requests {
let result = table_engine
.open_table(request.clone())
.await
.map_err(|e| {
error!("Failed to open table, open_request:{request:?}, err:{e}");
e
})
.map(|table_opt| {
if table_opt.is_none() {
error!(
"Table engine returns none when opening table, open_request:{request:?}"
);
}
table_opt
});

open_results.push(result);
}

open_results
}

async fn close_tables_of_shard(
table_engine: TableEngineRef,
close_requests: Vec<table_engine::engine::CloseTableRequest>,
) -> Vec<table_engine::engine::Result<String>> {
if close_requests.is_empty() {
return Vec::new();
}

let mut close_results = Vec::with_capacity(close_requests.len());
for request in close_requests {
let result = table_engine
.close_table(request.clone())
.await
.map_err(|e| {
error!("Failed to close table, close_request:{request:?}, err:{e}");
e
})
.map(|_| request.table_name);

close_results.push(result);
}

close_results
}
12 changes: 10 additions & 2 deletions partition_table_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use common_util::error::BoxError;
use snafu::{OptionExt, ResultExt};
use table_engine::{
engine::{
CloseTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest, Result,
TableEngine, Unexpected, UnexpectedNoCause,
CloseShardRequest, CloseTableRequest, CreateTableRequest, DropTableRequest,
OpenShardRequest, OpenTableRequest, Result, TableEngine, Unexpected, UnexpectedNoCause,
},
remote::RemoteEngineRef,
table::TableRef,
Expand Down Expand Up @@ -75,4 +75,12 @@ impl TableEngine for PartitionTableEngine {
async fn close_table(&self, _request: CloseTableRequest) -> Result<()> {
Ok(())
}

async fn open_shard(&self, _request: OpenShardRequest) -> Vec<Result<Option<TableRef>>> {
vec![Ok(None)]
}

async fn close_shard(&self, _request: CloseShardRequest) -> Vec<Result<String>> {
vec![Ok("".to_string())]
}
}
Loading

0 comments on commit 12f8656

Please sign in to comment.