Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: recovery in standalone mode #414

Merged
merged 4 commits into from
Dec 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

161 changes: 100 additions & 61 deletions catalog_impls/src/table_based.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use common_util::define_result;
use log::{debug, error, info};
use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu};
use system_catalog::sys_catalog_table::{
self, CreateCatalogRequest, CreateSchemaRequest, SysCatalogTable, Visitor,
VisitorCatalogNotFound, VisitorOpenTable, VisitorSchemaNotFound,
self, CreateCatalogRequest, CreateSchemaRequest, SysCatalogTable, VisitOptions,
VisitOptionsBuilder, VisitorCatalogNotFound, VisitorInner, VisitorSchemaNotFound,
};
use table_engine::{
engine::{TableEngineRef, TableState},
Expand Down Expand Up @@ -92,8 +92,6 @@ pub struct TableBasedManager {
/// Sys catalog table
catalog_table: Arc<SysCatalogTable>,
catalogs: CatalogMap,
/// Table engine proxy
engine_proxy: TableEngineRef,
/// Global schema id generator, Each schema has a unique schema id.
schema_id_generator: Arc<SchemaIdGenerator>,
}
Expand All @@ -120,7 +118,7 @@ impl Manager for TableBasedManager {
impl TableBasedManager {
/// Create and init the TableBasedManager.
// TODO(yingwen): Define all constants in catalog crate.
pub async fn new(backend: TableEngineRef, engine_proxy: TableEngineRef) -> Result<Self> {
pub async fn new(backend: TableEngineRef) -> Result<Self> {
// Create or open sys_catalog table, will also create a space (catalog + schema)
// for system catalog.
let catalog_table = SysCatalogTable::new(backend)
Expand All @@ -130,7 +128,6 @@ impl TableBasedManager {
let mut manager = Self {
catalog_table: Arc::new(catalog_table),
catalogs: HashMap::new(),
engine_proxy,
schema_id_generator: Arc::new(SchemaIdGenerator::default()),
};

Expand All @@ -139,36 +136,65 @@ impl TableBasedManager {
Ok(manager)
}

#[cfg(test)]
pub fn get_engine_proxy(&self) -> TableEngineRef {
self.engine_proxy.clone()
pub async fn fetch_table_infos(&mut self) -> Result<Vec<TableInfo>> {
let catalog_table = self.catalog_table.clone();

let mut table_infos = Vec::default();
let visitor_inner = VisitorInnerImpl {
catalog_table: catalog_table.clone(),
catalogs: &mut self.catalogs,
schema_id_generator: self.schema_id_generator.clone(),
table_infos: &mut table_infos,
};

let visit_opts = VisitOptionsBuilder::default().visit_table().build();

Self::visit_catalog_table_with_options(catalog_table, visitor_inner, visit_opts).await?;

Ok(table_infos)
}

/// Load all data from sys catalog table.
async fn init(&mut self) -> Result<()> {
// The system catalog and schema in it is not persisted, so we add it manually.
self.load_system_catalog();

let mut visitor = VisitorImpl {
// Load all existent catalog/schema from catalog_table
let catalog_table = self.catalog_table.clone();

let visitor_inner = VisitorInnerImpl {
catalog_table: self.catalog_table.clone(),
catalogs: &mut self.catalogs,
engine_proxy: self.engine_proxy.clone(),
schema_id_generator: self.schema_id_generator.clone(),
table_infos: &mut Vec::default(),
};

// Load all existent catalog/schema/tables from catalog_table.
let opts = ReadOptions::default();
self.catalog_table
.visit(opts, &mut visitor)
.await
.context(VisitSysCatalog)?;
let visit_opts = VisitOptionsBuilder::default()
.visit_catalog()
.visit_schema()
.build();

Self::visit_catalog_table_with_options(catalog_table, visitor_inner, visit_opts).await?;

// Create default catalog if it is not exists.
self.maybe_create_default_catalog().await?;

Ok(())
}

async fn visit_catalog_table_with_options(
catalog_table: Arc<SysCatalogTable>,
mut visitor_inner: VisitorInnerImpl<'_>,
visit_opts: VisitOptions,
) -> Result<()> {
let opts = ReadOptions::default();

catalog_table
.visit(opts, &mut visitor_inner, visit_opts)
.await
.context(VisitSysCatalog)
}

fn load_system_catalog(&mut self) {
// Get the `sys_catalog` table and add it to tables.
let table = self.catalog_table.inner_table();
Expand Down Expand Up @@ -306,15 +332,15 @@ impl TableBasedManager {
type CatalogMap = HashMap<String, Arc<CatalogImpl>>;

/// Sys catalog visitor implementation, used to load catalog info
struct VisitorImpl<'a> {
struct VisitorInnerImpl<'a> {
catalog_table: Arc<SysCatalogTable>,
catalogs: &'a mut CatalogMap,
engine_proxy: TableEngineRef,
schema_id_generator: Arc<SchemaIdGenerator>,
table_infos: &'a mut Vec<TableInfo>,
}

#[async_trait]
impl<'a> Visitor for VisitorImpl<'a> {
impl<'a> VisitorInner for VisitorInnerImpl<'a> {
fn visit_catalog(&mut self, request: CreateCatalogRequest) -> sys_catalog_table::Result<()> {
debug!("Visitor visit catalog, request:{:?}", request);
let schema_id_generator = self.schema_id_generator.clone();
Expand Down Expand Up @@ -364,7 +390,7 @@ impl<'a> Visitor for VisitorImpl<'a> {
Ok(())
}

async fn visit_tables(&mut self, table_info: TableInfo) -> sys_catalog_table::Result<()> {
fn visit_tables(&mut self, table_info: TableInfo) -> sys_catalog_table::Result<()> {
debug!("Visitor visit tables, table_info:{:?}", table_info);

let catalog =
Expand Down Expand Up @@ -397,26 +423,8 @@ impl<'a> Visitor for VisitorImpl<'a> {
return Ok(());
}

let open_request = OpenTableRequest::from(table_info);
let table_name = open_request.table_name.clone();
let table_opt = self
.engine_proxy
.open_table(open_request)
.await
.context(VisitorOpenTable)?;

match table_opt {
Some(table) => {
schema.insert_table_into_memory(table_id, table);
}
None => {
// Now we ignore the error that table not in engine but in catalog.
error!(
"Visitor found table not in engine, table_name:{:?}",
table_name
);
}
}
// Collect table infos for later opening.
self.table_infos.push(table_info);

Ok(())
}
Expand Down Expand Up @@ -825,7 +833,7 @@ impl Schema for SchemaImpl {
async fn open_table(
&self,
request: OpenTableRequest,
_opts: OpenOptions,
opts: OpenOptions,
) -> schema::Result<Option<TableRef>> {
debug!(
"Table based catalog manager open table, request:{:?}",
Expand All @@ -834,9 +842,33 @@ impl Schema for SchemaImpl {

self.validate_schema_info(&request.catalog_name, &request.schema_name)?;

// All tables have been opened duration initialization, so just check
// whether the table exists.
self.check_create_table_read(&request.table_name, false)
// Do opening work.
let table_name = request.table_name.clone();
let table_id = request.table_id;
let table_opt = opts
.table_engine
.open_table(request.clone())
.await
.map_err(|e| Box::new(e) as _)
.context(schema::OpenTableWithCause)?;

match table_opt {
Some(table) => {
self.insert_table_into_memory(table_id, table.clone());

Ok(Some(table))
}

None => {
// Now we ignore the error that table not in engine but in catalog.
error!(
"Visitor found table not in engine, table_name:{:?}, table_id:{}",
table_name, table_id,
);

Ok(None)
}
}
}

async fn close_table(
Expand Down Expand Up @@ -889,16 +921,8 @@ mod tests {
use crate::table_based::TableBasedManager;

async fn build_catalog_manager(analytic: TableEngineRef) -> TableBasedManager {
// Create table engine proxy
let memory = MemoryTableEngine;

let engine_proxy = Arc::new(TableEngineProxy {
memory,
analytic: analytic.clone(),
});

// Create catalog manager, use analytic table as backend
TableBasedManager::new(analytic.clone(), engine_proxy.clone())
TableBasedManager::new(analytic.clone())
.await
.expect("Failed to create catalog manager")
}
Expand Down Expand Up @@ -949,12 +973,13 @@ mod tests {
let mut test_ctx = env.new_context(engine_context);
test_ctx.open().await;

let catalog_manager = build_catalog_manager(test_ctx.engine().clone()).await;
let catalog_manager = build_catalog_manager(test_ctx.clone_engine()).await;
let catalog_name = catalog_manager.default_catalog_name();
let schema_name = catalog_manager.default_schema_name();
let catalog = catalog_manager.catalog_by_name(catalog_name);
assert!(catalog.is_ok());
assert!(catalog.as_ref().unwrap().is_some());

let schema = catalog
.as_ref()
.unwrap()
Expand Down Expand Up @@ -1023,14 +1048,21 @@ mod tests {
let mut test_ctx = env.new_context(engine_context);
test_ctx.open().await;

let catalog_manager = build_catalog_manager(test_ctx.clone_engine()).await;
let engine = test_ctx.engine().clone();
let memory = MemoryTableEngine;
let engine_proxy = Arc::new(TableEngineProxy {
memory,
analytic: engine.clone(),
});

let catalog_manager = build_catalog_manager(engine.clone()).await;
let schema = build_default_schema_with_catalog(&catalog_manager).await;

let table_name = "test";
let request = build_create_table_req(table_name, schema.clone()).await;

let opts = CreateOptions {
table_engine: catalog_manager.get_engine_proxy(),
table_engine: engine_proxy.clone(),
create_if_not_exists: true,
};

Expand All @@ -1045,7 +1077,7 @@ mod tests {
assert!(schema.table_by_name(table_name).unwrap().is_some());

let opts2 = CreateOptions {
table_engine: catalog_manager.get_engine_proxy(),
table_engine: engine_proxy,
create_if_not_exists: false,
};
assert!(schema.create_table(request.clone(), opts2).await.is_err());
Expand All @@ -1062,7 +1094,14 @@ mod tests {
let mut test_ctx = env.new_context(engine_context);
test_ctx.open().await;

let catalog_manager = build_catalog_manager(test_ctx.clone_engine()).await;
let engine = test_ctx.engine().clone();
let memory = MemoryTableEngine;
let engine_proxy = Arc::new(TableEngineProxy {
memory,
analytic: engine.clone(),
});

let catalog_manager = build_catalog_manager(engine.clone()).await;
let schema = build_default_schema_with_catalog(&catalog_manager).await;

let table_name = "test";
Expand All @@ -1075,7 +1114,7 @@ mod tests {
engine: engine_name.to_string(),
};
let drop_table_opts = DropOptions {
table_engine: catalog_manager.get_engine_proxy(),
table_engine: engine_proxy.clone(),
};

assert!(!schema
Expand All @@ -1085,7 +1124,7 @@ mod tests {

let create_table_request = build_create_table_req(table_name, schema.clone()).await;
let create_table_opts = CreateOptions {
table_engine: catalog_manager.get_engine_proxy(),
table_engine: engine_proxy,
create_if_not_exists: true,
};

Expand Down
2 changes: 1 addition & 1 deletion interpreters/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{

async fn build_catalog_manager(analytic: TableEngineRef) -> TableBasedManager {
// Create catalog manager, use analytic table as backend
TableBasedManager::new(analytic.clone(), analytic)
TableBasedManager::new(analytic.clone())
.await
.expect("Failed to create catalog manager")
}
Expand Down
1 change: 1 addition & 0 deletions server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mod handlers;
mod http;
mod instance;
pub mod limiter;
pub mod local_tables;
pub mod logger;
mod metrics;
mod mysql;
Expand Down
Loading