diff --git a/Cargo.lock b/Cargo.lock index f1fef69724..8ed42f1052 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2741,9 +2741,9 @@ checksum = "879d54834c8c76457ef4293a689b2a8c59b076067ad77b15efafbb05f92a592b" [[package]] name = "itertools" -version = "0.10.3" +version = "0.10.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9a9d19fa1e79b6215ff29b9d6880b706147f16e9b1dbb1e4e5947b5b02bc5e3" +checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473" dependencies = [ "either", ] @@ -4377,6 +4377,7 @@ version = "1.0.0-alpha01" dependencies = [ "prost", "protoc-bin-vendored", + "tonic", "tonic-build", ] @@ -5840,7 +5841,10 @@ dependencies = [ "common_util", "datafusion", "datafusion-expr", + "df_operator", + "env_logger", "futures 0.3.21", + "itertools", "log", "parquet", "parquet_ext", @@ -6603,8 +6607,8 @@ version = "1.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ - "cfg-if 0.1.10", - "rand 0.3.23", + "cfg-if 1.0.0", + "rand 0.8.5", "static_assertions 1.1.0", ] diff --git a/analytic_engine/src/sst/factory.rs b/analytic_engine/src/sst/factory.rs index 1c18f767ea..6c8fbe9193 100644 --- a/analytic_engine/src/sst/factory.rs +++ b/analytic_engine/src/sst/factory.rs @@ -2,10 +2,19 @@ //! Factory for different kinds sst builder and reader. -use std::{fmt::Debug, sync::Arc}; +use std::{ + fmt::Debug, + ops::Range, + sync::{Arc, RwLock}, +}; +use async_trait::async_trait; +use bytes::Bytes; use common_types::projected_schema::ProjectedSchema; -use common_util::runtime::Runtime; +use common_util::{ + error::{GenericError, GenericResult}, + runtime::Runtime, +}; use object_store::{ObjectStoreRef, Path}; use table_engine::predicate::PredicateRef; @@ -13,7 +22,10 @@ use crate::{ sst::{ builder::SstBuilder, meta_cache::MetaCacheRef, - parquet::{builder::ParquetSstBuilder, AsyncParquetReader, ThreadedReader}, + parquet::{ + async_reader::AsyncFileReader, builder::ParquetSstBuilder, AsyncParquetReader, + ThreadedReader, + }, reader::SstReader, }, table_options::Compression, @@ -96,6 +108,58 @@ pub struct SstBuilderOptions { pub compression: Compression, } +pub struct FileReaderOnObjectStore { + path: Path, + store: ObjectStoreRef, + cached_file_size: RwLock>, +} + +impl FileReaderOnObjectStore { + pub fn new(path: Path, store: ObjectStoreRef) -> Self { + Self { + path, + store, + cached_file_size: RwLock::new(None), + } + } +} + +#[async_trait] +impl AsyncFileReader for FileReaderOnObjectStore { + async fn file_size(&self) -> GenericResult { + // check cached filed_size first + { + let file_size = self.cached_file_size.read().unwrap(); + if let Some(s) = file_size.as_ref() { + return Ok(*s); + } + } + + // fetch the size from the underlying store + let head = self + .store + .head(&self.path) + .await + .map_err(|e| Box::new(e) as GenericError)?; + *self.cached_file_size.write().unwrap() = Some(head.size); + Ok(head.size) + } + + async fn get_byte_range(&self, range: Range) -> GenericResult { + self.store + .get_range(&self.path, range) + .await + .map_err(|e| Box::new(e) as _) + } + + async fn get_byte_ranges(&self, ranges: &[Range]) -> GenericResult> { + self.store + .get_ranges(&self.path, ranges) + .await + .map_err(|e| Box::new(e) as _) + } +} + #[derive(Debug, Default)] pub struct FactoryImpl; @@ -106,11 +170,13 @@ impl Factory for FactoryImpl { path: &'a Path, store_picker: &'a ObjectStorePickerRef, ) -> Option> { + let store = store_picker.pick_by_freq(options.frequency).clone(); + let file_reader = FileReaderOnObjectStore::new(path.clone(), store); + let parquet_reader = AsyncParquetReader::new(path, Arc::new(file_reader), options); // TODO: Currently, we only have one sst format, and we have to choose right // reader for sst according to its real format in the future. - let reader = AsyncParquetReader::new(path, store_picker, options); let reader = ThreadedReader::new( - reader, + parquet_reader, options.runtime.clone(), options.background_read_parallelism, ); diff --git a/analytic_engine/src/sst/parquet/async_reader.rs b/analytic_engine/src/sst/parquet/async_reader.rs index caec9dd0fb..c552c65060 100644 --- a/analytic_engine/src/sst/parquet/async_reader.rs +++ b/analytic_engine/src/sst/parquet/async_reader.rs @@ -17,16 +17,21 @@ use common_types::{ projected_schema::{ProjectedSchema, RowProjector}, record_batch::{ArrowRecordBatchProjector, RecordBatchWithKey}, }; -use common_util::{runtime::Runtime, time::InstantExt}; -use datafusion::datasource::file_format; +use common_util::{error::GenericResult, runtime::Runtime, time::InstantExt}; +use datafusion::error::DataFusionError as DfError; use futures::{future::BoxFuture, FutureExt, Stream, StreamExt, TryFutureExt}; use log::{debug, error, info, warn}; -use object_store::{ObjectMeta, ObjectStoreRef, Path}; +use object_store::Path; use parquet::{ - arrow::{async_reader::AsyncFileReader, ParquetRecordBatchStreamBuilder, ProjectionMask}, - file::metadata::RowGroupMetaData, + arrow::{ + async_reader::AsyncFileReader as AsyncParquetFileReader, ParquetRecordBatchStreamBuilder, + ProjectionMask, + }, + file::{ + footer, + metadata::{ParquetMetaData, RowGroupMetaData}, + }, }; -use parquet_ext::ParquetMetaDataRef; use prometheus::local::LocalHistogram; use snafu::ResultExt; use table_engine::predicate::PredicateRef; @@ -34,7 +39,7 @@ use tokio::sync::mpsc::{self, Receiver, Sender}; use crate::{ sst::{ - factory::{ObjectStorePickerRef, ReadFrequency, SstReaderOptions}, + factory::{ReadFrequency, SstReaderOptions}, file::{BloomFilter, SstMetaData}, meta_cache::{MetaCacheRef, MetaData}, metrics, @@ -46,11 +51,67 @@ use crate::{ type SendableRecordBatchStream = Pin> + Send>>; +pub type AsyncFileReaderRef = Arc; + +#[async_trait] +pub trait AsyncFileReader: Send + Sync { + async fn file_size(&self) -> GenericResult; + + async fn get_byte_range(&self, range: Range) -> GenericResult; + + async fn get_byte_ranges(&self, ranges: &[Range]) -> GenericResult>; +} + +/// Fetch and parse [`ParquetMetadata`] from the file reader. +/// +/// Referring to: https://github.com/apache/arrow-datafusion/blob/ac2e5d15e5452e83c835d793a95335e87bf35569/datafusion/core/src/datasource/file_format/parquet.rs#L390-L449 +async fn fetch_parquet_metadata_from_file_reader( + file_reader: &dyn AsyncFileReader, +) -> std::result::Result { + const FOOTER_LEN: usize = 8; + + let file_size = file_reader.file_size().await?; + + if file_size < FOOTER_LEN { + let err_msg = format!("file size of {} is less than footer", file_size); + return Err(DfError::Execution(err_msg)); + } + + let footer_start = file_size - FOOTER_LEN; + + let footer_bytes = file_reader + .get_byte_range(footer_start..file_size) + .await + .map_err(|e| DfError::External(e))?; + + assert_eq!(footer_bytes.len(), FOOTER_LEN); + let mut footer = [0; FOOTER_LEN]; + footer.copy_from_slice(&footer_bytes); + + let metadata_len = footer::decode_footer(&footer)?; + + if file_size < metadata_len + FOOTER_LEN { + let err_msg = format!( + "file size of {} is smaller than footer + metadata {}", + file_size, + metadata_len + FOOTER_LEN + ); + return Err(DfError::Execution(err_msg)); + } + + let metadata_start = file_size - metadata_len - FOOTER_LEN; + let metadata_bytes = file_reader + .get_byte_range(metadata_start..footer_start) + .await?; + + Ok(footer::decode_metadata(&metadata_bytes)?) +} + pub struct Reader<'a> { /// The path where the data is persisted. path: &'a Path, /// The storage where the data is persist. - store: &'a ObjectStoreRef, + file_reader: AsyncFileReaderRef, projected_schema: ProjectedSchema, meta_cache: Option, predicate: PredicateRef, @@ -69,17 +130,16 @@ pub struct Reader<'a> { impl<'a> Reader<'a> { pub fn new( path: &'a Path, - store_picker: &'a ObjectStorePickerRef, + file_reader: AsyncFileReaderRef, options: &SstReaderOptions, ) -> Self { let batch_size = options.read_batch_row_num; let parallelism_options = ParallelismOptions::new(options.read_batch_row_num, options.num_rows_per_row_group); - let store = store_picker.pick_by_freq(options.frequency); Self { path, - store, + file_reader, projected_schema: options.projected_schema.clone(), meta_cache: options.meta_cache.clone(), predicate: options.predicate.clone(), @@ -160,7 +220,7 @@ impl<'a> Reader<'a> { let meta_data = self.meta_data.as_ref().unwrap(); let row_projector = self.row_projector.as_ref().unwrap(); let object_store_reader = - ObjectStoreReader::new(self.store.clone(), self.path.clone(), meta_data.clone()); + ParquetFileReaderAdapter::new(self.file_reader.clone(), meta_data.clone()); // Get target row groups. let filtered_row_groups = self.filter_row_groups( @@ -245,17 +305,11 @@ impl<'a> Reader<'a> { Ok(()) } - async fn load_meta_data_from_storage( - &self, - object_meta: &ObjectMeta, - ) -> Result { - let meta_data = - file_format::parquet::fetch_parquet_metadata(self.store.as_ref(), object_meta, None) - .await - .map_err(|e| Box::new(e) as _) - .context(DecodeSstMeta)?; - - Ok(Arc::new(meta_data)) + async fn load_meta_data_from_storage(&self) -> Result { + fetch_parquet_metadata_from_file_reader(self.file_reader.as_ref()) + .await + .map_err(|e| Box::new(e) as _) + .context(DecodeSstMeta) } fn need_update_cache(&self) -> bool { @@ -278,15 +332,11 @@ impl<'a> Reader<'a> { let empty_predicate = self.predicate.exprs().is_empty(); let meta_data = { - let object_meta = self - .store - .head(self.path) - .await - .context(ObjectStoreError {})?; - let parquet_meta_data = self.load_meta_data_from_storage(&object_meta).await?; + let parquet_meta_data = self.load_meta_data_from_storage().await?; let ignore_bloom_filter = avoid_update_cache && empty_predicate; - MetaData::try_new(&parquet_meta_data, object_meta.size, ignore_bloom_filter) + let file_size = self.file_reader.file_size().await.context(DecodeSstMeta)?; + MetaData::try_new(&parquet_meta_data, file_size, ignore_bloom_filter) .map_err(|e| Box::new(e) as _) .context(DecodeSstMeta)? }; @@ -352,18 +402,16 @@ struct ReaderMetrics { } #[derive(Clone)] -struct ObjectStoreReader { - storage: ObjectStoreRef, - path: Path, +struct ParquetFileReaderAdapter { + file_reader: AsyncFileReaderRef, meta_data: MetaData, metrics: ReaderMetrics, } -impl ObjectStoreReader { - fn new(storage: ObjectStoreRef, path: Path, meta_data: MetaData) -> Self { +impl ParquetFileReaderAdapter { + fn new(file_reader: AsyncFileReaderRef, meta_data: MetaData) -> Self { Self { - storage, - path, + file_reader, meta_data, metrics: ReaderMetrics { bytes_scanned: 0, @@ -373,20 +421,24 @@ impl ObjectStoreReader { } } -impl Drop for ObjectStoreReader { +impl Drop for ParquetFileReaderAdapter { fn drop(&mut self) { - info!("ObjectStoreReader dropped, metrics:{:?}", self.metrics); + info!( + "ParquetFileReaderAdapter is dropped, metrics:{:?}", + self.metrics + ); } } -impl AsyncFileReader for ObjectStoreReader { +impl AsyncParquetFileReader for ParquetFileReaderAdapter { fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { self.metrics.bytes_scanned += range.end - range.start; self.metrics .sst_get_range_length_histogram .observe((range.end - range.start) as f64); - self.storage - .get_range(&self.path, range) + + self.file_reader + .get_byte_range(range) .map_err(|e| { parquet::errors::ParquetError::General(format!( "Failed to fetch range from object store, err:{}", @@ -406,11 +458,11 @@ impl AsyncFileReader for ObjectStoreReader { .observe((range.end - range.start) as f64); } async move { - self.storage - .get_ranges(&self.path, &ranges) + self.file_reader + .get_byte_ranges(&ranges) .map_err(|e| { parquet::errors::ParquetError::General(format!( - "Failed to fetch ranges from object store, err:{}", + "Failed to fetch ranges from underlying reader, err:{}", e )) }) diff --git a/analytic_engine/src/sst/parquet/builder.rs b/analytic_engine/src/sst/parquet/builder.rs index ad40415b45..eda9008d9d 100644 --- a/analytic_engine/src/sst/parquet/builder.rs +++ b/analytic_engine/src/sst/parquet/builder.rs @@ -263,7 +263,8 @@ mod tests { row_iter::tests::build_record_batch_with_key, sst::{ factory::{ - Factory, FactoryImpl, ReadFrequency, SstBuilderOptions, SstReaderOptions, SstType, + Factory, FactoryImpl, FileReaderOnObjectStore, ReadFrequency, SstBuilderOptions, + SstReaderOptions, SstType, }, parquet::AsyncParquetReader, reader::{tests::check_stream, SstReader}, @@ -358,8 +359,15 @@ mod tests { }; let mut reader: Box = { - let mut reader = - AsyncParquetReader::new(&sst_file_path, &store_picker, &sst_reader_options); + let file_reader = FileReaderOnObjectStore::new( + sst_file_path.clone(), + store_picker.default_store().clone(), + ); + let mut reader = AsyncParquetReader::new( + &sst_file_path, + Arc::new(file_reader), + &sst_reader_options, + ); let mut sst_meta_readback = { // FIXME: size of SstMetaData is not what this file's size, so overwrite it // https://github.com/CeresDB/ceresdb/issues/321 diff --git a/catalog_impls/src/table_based.rs b/catalog_impls/src/table_based.rs index 3d818fcb84..1b53d165c5 100644 --- a/catalog_impls/src/table_based.rs +++ b/catalog_impls/src/table_based.rs @@ -23,8 +23,8 @@ use common_util::define_result; use log::{debug, error, info}; use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu}; use system_catalog::sys_catalog_table::{ - self, CreateCatalogRequest, CreateSchemaRequest, SysCatalogTable, Visitor, - VisitorCatalogNotFound, VisitorOpenTable, VisitorSchemaNotFound, + self, CreateCatalogRequest, CreateSchemaRequest, SysCatalogTable, VisitOptions, + VisitOptionsBuilder, VisitorCatalogNotFound, VisitorInner, VisitorSchemaNotFound, }; use table_engine::{ engine::{TableEngineRef, TableState}, @@ -92,8 +92,6 @@ pub struct TableBasedManager { /// Sys catalog table catalog_table: Arc, catalogs: CatalogMap, - /// Table engine proxy - engine_proxy: TableEngineRef, /// Global schema id generator, Each schema has a unique schema id. schema_id_generator: Arc, } @@ -120,7 +118,7 @@ impl Manager for TableBasedManager { impl TableBasedManager { /// Create and init the TableBasedManager. // TODO(yingwen): Define all constants in catalog crate. - pub async fn new(backend: TableEngineRef, engine_proxy: TableEngineRef) -> Result { + pub async fn new(backend: TableEngineRef) -> Result { // Create or open sys_catalog table, will also create a space (catalog + schema) // for system catalog. let catalog_table = SysCatalogTable::new(backend) @@ -130,7 +128,6 @@ impl TableBasedManager { let mut manager = Self { catalog_table: Arc::new(catalog_table), catalogs: HashMap::new(), - engine_proxy, schema_id_generator: Arc::new(SchemaIdGenerator::default()), }; @@ -139,9 +136,22 @@ impl TableBasedManager { Ok(manager) } - #[cfg(test)] - pub fn get_engine_proxy(&self) -> TableEngineRef { - self.engine_proxy.clone() + pub async fn fetch_table_infos(&mut self) -> Result> { + let catalog_table = self.catalog_table.clone(); + + let mut table_infos = Vec::default(); + let visitor_inner = VisitorInnerImpl { + catalog_table: catalog_table.clone(), + catalogs: &mut self.catalogs, + schema_id_generator: self.schema_id_generator.clone(), + table_infos: &mut table_infos, + }; + + let visit_opts = VisitOptionsBuilder::default().visit_table().build(); + + Self::visit_catalog_table_with_options(catalog_table, visitor_inner, visit_opts).await?; + + Ok(table_infos) } /// Load all data from sys catalog table. @@ -149,19 +159,22 @@ impl TableBasedManager { // The system catalog and schema in it is not persisted, so we add it manually. self.load_system_catalog(); - let mut visitor = VisitorImpl { + // Load all existent catalog/schema from catalog_table + let catalog_table = self.catalog_table.clone(); + + let visitor_inner = VisitorInnerImpl { catalog_table: self.catalog_table.clone(), catalogs: &mut self.catalogs, - engine_proxy: self.engine_proxy.clone(), schema_id_generator: self.schema_id_generator.clone(), + table_infos: &mut Vec::default(), }; - // Load all existent catalog/schema/tables from catalog_table. - let opts = ReadOptions::default(); - self.catalog_table - .visit(opts, &mut visitor) - .await - .context(VisitSysCatalog)?; + let visit_opts = VisitOptionsBuilder::default() + .visit_catalog() + .visit_schema() + .build(); + + Self::visit_catalog_table_with_options(catalog_table, visitor_inner, visit_opts).await?; // Create default catalog if it is not exists. self.maybe_create_default_catalog().await?; @@ -169,6 +182,19 @@ impl TableBasedManager { Ok(()) } + async fn visit_catalog_table_with_options( + catalog_table: Arc, + mut visitor_inner: VisitorInnerImpl<'_>, + visit_opts: VisitOptions, + ) -> Result<()> { + let opts = ReadOptions::default(); + + catalog_table + .visit(opts, &mut visitor_inner, visit_opts) + .await + .context(VisitSysCatalog) + } + fn load_system_catalog(&mut self) { // Get the `sys_catalog` table and add it to tables. let table = self.catalog_table.inner_table(); @@ -306,15 +332,15 @@ impl TableBasedManager { type CatalogMap = HashMap>; /// Sys catalog visitor implementation, used to load catalog info -struct VisitorImpl<'a> { +struct VisitorInnerImpl<'a> { catalog_table: Arc, catalogs: &'a mut CatalogMap, - engine_proxy: TableEngineRef, schema_id_generator: Arc, + table_infos: &'a mut Vec, } #[async_trait] -impl<'a> Visitor for VisitorImpl<'a> { +impl<'a> VisitorInner for VisitorInnerImpl<'a> { fn visit_catalog(&mut self, request: CreateCatalogRequest) -> sys_catalog_table::Result<()> { debug!("Visitor visit catalog, request:{:?}", request); let schema_id_generator = self.schema_id_generator.clone(); @@ -364,7 +390,7 @@ impl<'a> Visitor for VisitorImpl<'a> { Ok(()) } - async fn visit_tables(&mut self, table_info: TableInfo) -> sys_catalog_table::Result<()> { + fn visit_tables(&mut self, table_info: TableInfo) -> sys_catalog_table::Result<()> { debug!("Visitor visit tables, table_info:{:?}", table_info); let catalog = @@ -397,26 +423,8 @@ impl<'a> Visitor for VisitorImpl<'a> { return Ok(()); } - let open_request = OpenTableRequest::from(table_info); - let table_name = open_request.table_name.clone(); - let table_opt = self - .engine_proxy - .open_table(open_request) - .await - .context(VisitorOpenTable)?; - - match table_opt { - Some(table) => { - schema.insert_table_into_memory(table_id, table); - } - None => { - // Now we ignore the error that table not in engine but in catalog. - error!( - "Visitor found table not in engine, table_name:{:?}", - table_name - ); - } - } + // Collect table infos for later opening. + self.table_infos.push(table_info); Ok(()) } @@ -825,7 +833,7 @@ impl Schema for SchemaImpl { async fn open_table( &self, request: OpenTableRequest, - _opts: OpenOptions, + opts: OpenOptions, ) -> schema::Result> { debug!( "Table based catalog manager open table, request:{:?}", @@ -834,9 +842,33 @@ impl Schema for SchemaImpl { self.validate_schema_info(&request.catalog_name, &request.schema_name)?; - // All tables have been opened duration initialization, so just check - // whether the table exists. - self.check_create_table_read(&request.table_name, false) + // Do opening work. + let table_name = request.table_name.clone(); + let table_id = request.table_id; + let table_opt = opts + .table_engine + .open_table(request.clone()) + .await + .map_err(|e| Box::new(e) as _) + .context(schema::OpenTableWithCause)?; + + match table_opt { + Some(table) => { + self.insert_table_into_memory(table_id, table.clone()); + + Ok(Some(table)) + } + + None => { + // Now we ignore the error that table not in engine but in catalog. + error!( + "Visitor found table not in engine, table_name:{:?}, table_id:{}", + table_name, table_id, + ); + + Ok(None) + } + } } async fn close_table( @@ -889,16 +921,8 @@ mod tests { use crate::table_based::TableBasedManager; async fn build_catalog_manager(analytic: TableEngineRef) -> TableBasedManager { - // Create table engine proxy - let memory = MemoryTableEngine; - - let engine_proxy = Arc::new(TableEngineProxy { - memory, - analytic: analytic.clone(), - }); - // Create catalog manager, use analytic table as backend - TableBasedManager::new(analytic.clone(), engine_proxy.clone()) + TableBasedManager::new(analytic.clone()) .await .expect("Failed to create catalog manager") } @@ -949,12 +973,13 @@ mod tests { let mut test_ctx = env.new_context(engine_context); test_ctx.open().await; - let catalog_manager = build_catalog_manager(test_ctx.engine().clone()).await; + let catalog_manager = build_catalog_manager(test_ctx.clone_engine()).await; let catalog_name = catalog_manager.default_catalog_name(); let schema_name = catalog_manager.default_schema_name(); let catalog = catalog_manager.catalog_by_name(catalog_name); assert!(catalog.is_ok()); assert!(catalog.as_ref().unwrap().is_some()); + let schema = catalog .as_ref() .unwrap() @@ -1023,14 +1048,21 @@ mod tests { let mut test_ctx = env.new_context(engine_context); test_ctx.open().await; - let catalog_manager = build_catalog_manager(test_ctx.clone_engine()).await; + let engine = test_ctx.engine().clone(); + let memory = MemoryTableEngine; + let engine_proxy = Arc::new(TableEngineProxy { + memory, + analytic: engine.clone(), + }); + + let catalog_manager = build_catalog_manager(engine.clone()).await; let schema = build_default_schema_with_catalog(&catalog_manager).await; let table_name = "test"; let request = build_create_table_req(table_name, schema.clone()).await; let opts = CreateOptions { - table_engine: catalog_manager.get_engine_proxy(), + table_engine: engine_proxy.clone(), create_if_not_exists: true, }; @@ -1045,7 +1077,7 @@ mod tests { assert!(schema.table_by_name(table_name).unwrap().is_some()); let opts2 = CreateOptions { - table_engine: catalog_manager.get_engine_proxy(), + table_engine: engine_proxy, create_if_not_exists: false, }; assert!(schema.create_table(request.clone(), opts2).await.is_err()); @@ -1062,7 +1094,14 @@ mod tests { let mut test_ctx = env.new_context(engine_context); test_ctx.open().await; - let catalog_manager = build_catalog_manager(test_ctx.clone_engine()).await; + let engine = test_ctx.engine().clone(); + let memory = MemoryTableEngine; + let engine_proxy = Arc::new(TableEngineProxy { + memory, + analytic: engine.clone(), + }); + + let catalog_manager = build_catalog_manager(engine.clone()).await; let schema = build_default_schema_with_catalog(&catalog_manager).await; let table_name = "test"; @@ -1075,7 +1114,7 @@ mod tests { engine: engine_name.to_string(), }; let drop_table_opts = DropOptions { - table_engine: catalog_manager.get_engine_proxy(), + table_engine: engine_proxy.clone(), }; assert!(!schema @@ -1085,7 +1124,7 @@ mod tests { let create_table_request = build_create_table_req(table_name, schema.clone()).await; let create_table_opts = CreateOptions { - table_engine: catalog_manager.get_engine_proxy(), + table_engine: engine_proxy, create_if_not_exists: true, }; diff --git a/common_util/src/error.rs b/common_util/src/error.rs new file mode 100644 index 0000000000..cfad97934c --- /dev/null +++ b/common_util/src/error.rs @@ -0,0 +1,4 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +pub type GenericError = Box; +pub type GenericResult = std::result::Result; diff --git a/common_util/src/lib.rs b/common_util/src/lib.rs index 26ef18cae0..00c9e31139 100644 --- a/common_util/src/lib.rs +++ b/common_util/src/lib.rs @@ -11,6 +11,7 @@ pub mod macros; pub mod alloc_tracker; pub mod codec; pub mod config; +pub mod error; pub mod metric; pub mod panic; pub mod record_batch; diff --git a/interpreters/src/show_create.rs b/interpreters/src/show_create.rs index a916c8c632..ecad9a762c 100644 --- a/interpreters/src/show_create.rs +++ b/interpreters/src/show_create.rs @@ -139,6 +139,24 @@ impl ShowCreateInterpreter { .as_str() } } + PartitionInfo::Key(v) => { + let rendered_partition_key = v.partition_key.join(","); + if v.linear { + res += format!( + " PARTITION BY LINEAR KEY({}) PARTITIONS {}", + rendered_partition_key, + v.definitions.len() + ) + .as_str() + } else { + res += format!( + " PARTITION BY KEY({}) PARTITIONS {}", + rendered_partition_key, + v.definitions.len() + ) + .as_str() + } + } } // TODO: update datafusion to remove `#`. @@ -168,12 +186,12 @@ mod test { use datafusion_expr::col; use datafusion_proto::bytes::Serializeable; - use table_engine::partition::{Definition, HashPartitionInfo, PartitionInfo}; + use table_engine::partition::{Definition, HashPartitionInfo, KeyPartitionInfo, PartitionInfo}; use super::ShowCreateInterpreter; #[test] - fn test_render_partition_info() { + fn test_render_hash_partition_info() { let expr = col("col1").add(col("col2")); let partition_info = PartitionInfo::Hash(HashPartitionInfo { definitions: vec![ @@ -196,4 +214,29 @@ mod test { ShowCreateInterpreter::render_partition_info(Some(partition_info)) ); } + + #[test] + fn test_render_key_partition_info() { + let partition_key_col_name = "col1"; + let partition_info = PartitionInfo::Key(KeyPartitionInfo { + definitions: vec![ + Definition { + name: "p0".to_string(), + origin_name: None, + }, + Definition { + name: "p1".to_string(), + origin_name: None, + }, + ], + partition_key: vec![partition_key_col_name.to_string()], + linear: false, + }); + + let expected = " PARTITION BY KEY(col1) PARTITIONS 2".to_string(); + assert_eq!( + expected, + ShowCreateInterpreter::render_partition_info(Some(partition_info)) + ); + } } diff --git a/interpreters/src/tests.rs b/interpreters/src/tests.rs index 7b1273f7fe..e92afdcbd1 100644 --- a/interpreters/src/tests.rs +++ b/interpreters/src/tests.rs @@ -21,7 +21,7 @@ use crate::{ async fn build_catalog_manager(analytic: TableEngineRef) -> TableBasedManager { // Create catalog manager, use analytic table as backend - TableBasedManager::new(analytic.clone(), analytic) + TableBasedManager::new(analytic.clone()) .await .expect("Failed to create catalog manager") } diff --git a/proto/Cargo.toml b/proto/Cargo.toml index d962537d14..3ac7201faf 100644 --- a/proto/Cargo.toml +++ b/proto/Cargo.toml @@ -12,6 +12,7 @@ workspace = true [dependencies] prost = { workspace = true } +tonic = { workspace = true } [build-dependencies] protoc-bin-vendored = "3.0.0" diff --git a/proto/build.rs b/proto/build.rs index c04ea0a516..19d1050f24 100644 --- a/proto/build.rs +++ b/proto/build.rs @@ -15,6 +15,7 @@ fn main() -> Result<(), Box> { "protos/table_requests.proto", "protos/wal_on_mq.proto", "protos/oss_cache.proto", + "protos/remote_engine.proto", ], &["protos"], )?; diff --git a/proto/protos/meta_update.proto b/proto/protos/meta_update.proto index 4148b13942..314cdee353 100644 --- a/proto/protos/meta_update.proto +++ b/proto/protos/meta_update.proto @@ -15,7 +15,7 @@ message AddSpaceMeta { message Definition { string name = 1; - oneof origin_name{ + oneof origin_name { string origin = 2; } } @@ -26,6 +26,12 @@ message HashPartitionInfo { bool linear = 3; } +message KeyPartitionInfo { + repeated Definition definitions = 1; + repeated string partition_key = 2; + bool linear = 3; +} + // Meta update for a new table message AddTableMeta { uint32 space_id = 1; @@ -37,6 +43,7 @@ message AddTableMeta { analytic_common.TableOptions options = 5; oneof partition_info { HashPartitionInfo hash = 6; + KeyPartitionInfo key_partition = 7; } } diff --git a/proto/protos/remote_engine.proto b/proto/protos/remote_engine.proto new file mode 100644 index 0000000000..b175e9d2cc --- /dev/null +++ b/proto/protos/remote_engine.proto @@ -0,0 +1,82 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +syntax = "proto3"; +package remote_engine; + +import "common.proto"; + +message ResponseHeader { + uint32 code = 1; + string error = 2; +} + +service RemoteEngineService { + rpc Read(ReadRequest) returns (stream ReadResponse) {} + rpc Write(WriteRequest) returns (WriteResponse) {} +} + +message TableIdentifier { + string catalog = 1; + string schema = 2; + string table = 3; +} + +message ReadOptions { + uint64 batch_size = 1; + uint64 read_parallelism = 2; +} + +message Projection { + repeated uint64 idx = 1; +} + +message ProjectedSchema { + common.TableSchema table_schema = 1; + Projection projection = 2; +} + +message Predicate { + repeated bytes exprs = 1; + common.TimeRange time_range = 2; +} + +enum ReadOrder { + None = 0; + Asc = 1; + Desc = 2; +} + +message TableReadRequest { + uint64 request_id = 1; + ReadOptions opts = 2; + ProjectedSchema projected_schema = 3; + Predicate predicate = 4; + ReadOrder order = 5; +} + +message ReadRequest { + TableIdentifier table = 1; + TableReadRequest read_request = 2; +} + +message ReadResponse { + ResponseHeader header = 1; + repeated bytes rows = 2; +} + +message RowGroup { + common.TableSchema table_schema = 1; + repeated bytes rows = 2; + int64 min_timestamp = 3; + int64 max_timestamp = 4; +} + +message WriteRequest { + TableIdentifier table = 1; + RowGroup row_group = 2; +} + +message WriteResponse { + ResponseHeader header = 1; + uint64 affected_rows = 2; +} diff --git a/proto/src/lib.rs b/proto/src/lib.rs index a149e61fc2..c962707ddf 100644 --- a/proto/src/lib.rs +++ b/proto/src/lib.rs @@ -12,3 +12,4 @@ pub mod sst; pub mod sys_catalog; pub mod table_requests; pub mod wal_on_mq; +pub mod remote_engine; diff --git a/server/src/lib.rs b/server/src/lib.rs index 54887e26b5..dc5ef663dc 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -18,6 +18,7 @@ mod handlers; mod http; mod instance; pub mod limiter; +pub mod local_tables; pub mod logger; mod metrics; mod mysql; diff --git a/server/src/local_tables.rs b/server/src/local_tables.rs new file mode 100644 index 0000000000..183c7931aa --- /dev/null +++ b/server/src/local_tables.rs @@ -0,0 +1,80 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +//! Recover tables in standalone mode + +use catalog::{ + schema::{OpenOptions, OpenTableRequest}, + CatalogRef, +}; +use common_util::define_result; +use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; +use table_engine::table::TableInfo; + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display( + "Failed to recover local tables with cause, msg:{}, err:{}", + msg, + source + ))] + RecoverWithCause { + msg: String, + source: Box, + }, + + #[snafu(display( + "Failed to recover local tables with cause, msg:{}.\nBacktrace:\n{}", + msg, + backtrace + ))] + RecoverNoCause { msg: String, backtrace: Backtrace }, +} + +define_result!(Error); + +/// Local tables recoverer +pub struct LocalTablesRecoverer { + table_infos: Vec, + catalog: CatalogRef, + open_opts: OpenOptions, +} + +impl LocalTablesRecoverer { + pub fn new(table_infos: Vec, catalog: CatalogRef, open_opts: OpenOptions) -> Self { + Self { + table_infos, + catalog, + open_opts, + } + } + + pub async fn recover(&self) -> Result<()> { + let opts = self.open_opts.clone(); + for table_info in &self.table_infos { + let schema = self + .catalog + .schema_by_name(&table_info.schema_name) + .map_err(|e| Box::new(e) as _) + .context(RecoverWithCause { + msg: format!("failed to get schema of table, table_info:{:?}", table_info), + })? + .with_context(|| RecoverNoCause { + msg: format!("schema of table not found, table_info:{:?}", table_info), + })?; + + let open_request = OpenTableRequest::from(table_info.clone()); + schema + .open_table(open_request.clone(), opts.clone()) + .await + .map_err(|e| Box::new(e) as _) + .context(RecoverWithCause { + msg: format!("failed to open table, open_request:{:?}", open_request), + })? + .with_context(|| RecoverNoCause { + msg: format!("no table is opened, open_request:{:?}", open_request), + })?; + } + + Ok(()) + } +} diff --git a/server/src/server.rs b/server/src/server.rs index d9c4c70eec..473ca53c43 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -8,7 +8,7 @@ use catalog::manager::ManagerRef; use cluster::ClusterRef; use df_operator::registry::FunctionRegistryRef; use interpreters::table_manipulator::TableManipulatorRef; -use log::warn; +use log::{info, warn}; use logger::RuntimeLevel; use query_engine::executor::Executor as QueryExecutor; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; @@ -20,6 +20,7 @@ use crate::{ http::{self, HttpConfig, Service}, instance::{Instance, InstanceRef}, limiter::Limiter, + local_tables::{self, LocalTablesRecoverer}, mysql, mysql::error::Error as MysqlError, route::RouterRef, @@ -78,6 +79,9 @@ pub enum Error { #[snafu(display("Failed to start cluster, err:{}", source))] StartCluster { source: cluster::Error }, + + #[snafu(display("Failed to open tables in standalone mode, err:{}", source))] + OpenLocalTables { source: local_tables::Error }, } define_result!(Error); @@ -90,6 +94,7 @@ pub struct Server { mysql_service: mysql::MysqlService, instance: InstanceRef, cluster: Option, + local_tables_recoverer: Option, } impl Server { @@ -104,19 +109,34 @@ impl Server { } pub async fn start(&mut self) -> Result<()> { + // Run in standalone mode + if let Some(local_tables_recoverer) = &self.local_tables_recoverer { + info!("Server start, open local tables"); + local_tables_recoverer + .recover() + .await + .context(OpenLocalTables)?; + } + + // Run in cluster mode if let Some(cluster) = &self.cluster { + info!("Server start, start cluster"); cluster.start().await.context(StartCluster)?; } // TODO: Is it necessary to create default schema in cluster mode? + info!("Server start, create default schema if not exist"); self.create_default_schema_if_not_exists().await; + info!("Server start, start services"); self.mysql_service .start() .await .context(StartMysqlService)?; self.rpc_services.start().await.context(StartGrpcService)?; + info!("Server start finished"); + Ok(()) } @@ -155,6 +175,7 @@ pub struct Builder { cluster: Option, router: Option, schema_config_provider: Option, + local_tables_recoverer: Option, } impl Builder { @@ -172,6 +193,7 @@ impl Builder { cluster: None, router: None, schema_config_provider: None, + local_tables_recoverer: None, } } @@ -233,6 +255,11 @@ impl Builder { self } + pub fn local_tables_recoverer(mut self, local_tables_recoverer: LocalTablesRecoverer) -> Self { + self.local_tables_recoverer = Some(local_tables_recoverer); + self + } + /// Build and run the server pub fn build(self) -> Result> { // Build instance @@ -306,6 +333,7 @@ impl Builder { mysql_service, instance, cluster: self.cluster, + local_tables_recoverer: self.local_tables_recoverer, }; Ok(server) } diff --git a/sql/src/ast.rs b/sql/src/ast.rs index 23d9401af8..33c66c6fdd 100644 --- a/sql/src/ast.rs +++ b/sql/src/ast.rs @@ -75,6 +75,7 @@ pub struct CreateTable { #[derive(Debug, PartialEq, Eq)] pub enum Partition { Hash(HashPartition), + Key(KeyPartition), } #[derive(Debug, PartialEq, Eq)] @@ -87,6 +88,14 @@ pub struct HashPartition { pub expr: sqlparser::ast::Expr, } +#[derive(Debug, PartialEq, Eq)] +pub struct KeyPartition { + /// Key partition description: https://dev.mysql.com/doc/refman/5.7/en/partitioning-key.html + pub linear: bool, + pub partition_num: u64, + pub partition_key: Vec, +} + #[derive(Debug, PartialEq, Eq)] pub struct DropTable { /// Table name diff --git a/sql/src/parser.rs b/sql/src/parser.rs index 4e47a641df..98ff0e79ef 100644 --- a/sql/src/parser.rs +++ b/sql/src/parser.rs @@ -19,7 +19,7 @@ use table_engine::ANALYTIC_ENGINE_TYPE; use crate::ast::{ AlterAddColumn, AlterModifySetting, CreateTable, DescribeTable, DropTable, ExistsTable, - HashPartition, Partition, ShowCreate, ShowCreateObject, ShowTables, Statement, + HashPartition, KeyPartition, Partition, ShowCreate, ShowCreateObject, ShowTables, Statement, }; define_result!(ParserError); @@ -27,7 +27,7 @@ define_result!(ParserError); // Use `Parser::expected` instead, if possible macro_rules! parser_err { ($MSG:expr) => { - Err(ParserError::ParserError($MSG.to_string())) + Err(ParserError::ParserError($MSG)) }; } @@ -537,7 +537,9 @@ impl<'a> Parser<'a> { &mut self, columns: &[ColumnDef], ) -> Result> { - // TODO: only hash type is supported now, we should support other types. + if let Some(key) = self.maybe_parse_and_check_key_partition(columns)? { + return Ok(Some(Partition::Key(key))); + } if let Some(hash) = self.maybe_parse_and_check_hash_partition(columns)? { return Ok(Some(Partition::Hash(hash))); } @@ -550,49 +552,102 @@ impl<'a> Parser<'a> { columns: &[ColumnDef], ) -> Result> { // Parse first part: "PARTITION BY HASH(expr)". - let (is_hash_partition, is_linear) = { - if self.consume_token("HASH") { - (true, false) - } else if self.consume_tokens(&["LINEAR", "HASH"]) { - (true, true) - } else { - (false, false) - } - }; - - if !is_hash_partition { + let linear = if self.consume_token("HASH") { + false + } else if self.consume_tokens(&["LINEAR", "HASH"]) { + true + } else { return Ok(None); - } + }; // TODO: support all valid exprs not only column expr. let expr = self.parse_and_check_expr_in_hash(columns)?; - // Parse second part: "PARTITIONS num" (if not set, num will use 1 as default). - let partition_num = if self.parser.parse_keyword(Keyword::PARTITIONS) { - match self.parser.parse_number_value()? { - sqlparser::ast::Value::Number(v, _) => match v.parse::() { - Ok(v) => v, - Err(e) => { - return parser_err!(format!( - "valid partition num after PARTITIONS, err:{}", - e - )) - } - }, - _ => return parser_err!("expect partition num after PARTITIONS"), - } - } else { - 1 - }; + let partition_num = self.parse_partition_num()?; // Parse successfully. Ok(Some(HashPartition { - linear: is_linear, + linear, partition_num, expr, })) } + fn maybe_parse_and_check_key_partition( + &mut self, + columns: &[ColumnDef], + ) -> Result> { + let linear = if self.consume_token("KEY") { + false + } else if self.consume_tokens(&["LINEAR", "KEY"]) { + true + } else { + return Ok(None); + }; + + let key_columns = self + .parser + .parse_parenthesized_column_list(Mandatory) + .map_err(|e| { + ParserError::ParserError(format!("Fail to parse partition key, err:{}", e)) + })?; + + // Ensure at least one column for partition key. + if key_columns.is_empty() { + return parser_err!( + "except at least one partition key, default partition key is unsupported now" + .to_string() + ); + } + + // Validate all columns composing partition key: + // - The column must exist; + // - The column must be a tag; + for key_col in &key_columns { + let col_def = match columns.iter().find(|c| c.name.value == key_col.value) { + Some(v) => v, + None => { + return parser_err!(format!( + "partition key contains non-existent column:{}", + key_col.value, + )) + } + }; + let tag_column = col_def.options.iter().any(|opt| is_tag_column(&opt.option)); + if !tag_column { + return parser_err!(format!( + "partition key must be tag, key name:{:?}", + key_col.value + )); + } + } + + let partition_num = self.parse_partition_num()?; + let partition_key = key_columns.into_iter().map(|v| v.value).collect(); + + // Parse successfully. + Ok(Some(KeyPartition { + linear, + partition_num, + partition_key, + })) + } + + // Parse second part: "PARTITIONS num" (if not set, num will use 1 as default). + fn parse_partition_num(&mut self) -> Result { + if self.parser.parse_keyword(Keyword::PARTITIONS) { + match self.parser.parse_number_value()? { + sqlparser::ast::Value::Number(v, _) => match v.parse::() { + Ok(v) => Ok(v), + Err(e) => parser_err!(format!("invalid partition num, raw:{}, err:{}", v, e)), + }, + v => parser_err!(format!("expect partition number, found:{}", v)), + } + } else { + Ok(1) + } + } + fn parse_and_check_expr_in_hash(&mut self, columns: &[ColumnDef]) -> Result { let expr = self.parser.parse_expr()?; if let Expr::Nested(inner) = expr { @@ -655,12 +710,8 @@ fn check_column_expr_validity_in_hash(column: &Ident, columns: &[ColumnDef]) -> | DataType::UnsignedSmallInt(_) | DataType::UnsignedBigInt(_) ); - - let tag_option = col.options.iter().find(|opt| { - opt.option == ColumnOption::DialectSpecific(vec![Token::make_keyword(TAG)]) - }); - - is_integer && tag_option.is_some() + let tag_column = col.options.iter().any(|opt| is_tag_column(&opt.option)); + is_integer && tag_column } else { false } @@ -731,7 +782,10 @@ fn maybe_convert_table_name(object_name: &mut ObjectName) { #[cfg(test)] mod tests { - use sqlparser::ast::{ColumnOptionDef, DataType, Ident, ObjectName, Value}; + use sqlparser::{ + ast::{ColumnOptionDef, DataType, Ident, ObjectName, Value}, + parser::ParserError::ParserError, + }; use super::*; use crate::ast::TableName; @@ -1190,6 +1244,7 @@ mod tests { } struct HashPartitionTableCases; + impl HashPartitionTableCases { // Basic fn basic() { @@ -1289,4 +1344,54 @@ mod tests { ); } } + + #[test] + fn test_key_partition() { + KeyPartitionTableCases::basic(); + KeyPartitionTableCases::default_key_partition(); + KeyPartitionTableCases::invalid_column_type(); + } + + struct KeyPartitionTableCases; + + impl KeyPartitionTableCases { + fn basic() { + let sql = r#"CREATE TABLE `demo` (`name` string TAG, `value` double NOT NULL, `t` timestamp NOT NULL, TIMESTAMP KEY(t)) PARTITION BY KEY(name) PARTITIONS 2 ENGINE=Analytic with (enable_ttl="false")"#; + let stmt = Parser::parse_sql(sql).unwrap(); + assert_eq!(stmt.len(), 1); + match &stmt[0] { + Statement::Create(v) => { + if let Some(Partition::Key(p)) = &v.partition { + assert!(!p.linear); + assert_eq!(&p.partition_key[0], "name"); + assert_eq!(p.partition_num, 2); + } else { + panic!("failed"); + }; + } + _ => panic!("failed"), + } + } + + fn default_key_partition() { + let sql = r#"CREATE TABLE `demo` (`name` string TAG, `value` double NOT NULL, `t` timestamp NOT NULL, TIMESTAMP KEY(t)) PARTITION BY KEY() PARTITIONS 2 ENGINE=Analytic with (enable_ttl="false")"#; + let stmt = Parser::parse_sql(sql); + assert_eq!( + stmt.err().unwrap(), + ParserError( + "Fail to parse partition key, err:sql parser error: Expected identifier, found: )".to_string() + ) + ); + } + + fn invalid_column_type() { + let sql = r#"CREATE TABLE `demo` (`name` string TAG, `value` double NOT NULL, `t` timestamp NOT NULL, TIMESTAMP KEY(t)) PARTITION BY KEY(value) PARTITIONS 2 ENGINE=Analytic with (enable_ttl="false")"#; + let stmt = Parser::parse_sql(sql); + + assert_eq!( + stmt.err().unwrap(), + ParserError(r#"partition key must be tag, key name:"value""#.to_string()) + ) + } + } } diff --git a/sql/src/partition.rs b/sql/src/partition.rs index c7073195c7..5630eaa82a 100644 --- a/sql/src/partition.rs +++ b/sql/src/partition.rs @@ -7,10 +7,10 @@ use datafusion_expr::Expr; use datafusion_proto::bytes::Serializeable; use snafu::ResultExt; use sqlparser::ast::Expr as SqlExpr; -use table_engine::partition::{Definition, HashPartitionInfo, PartitionInfo}; +use table_engine::partition::{Definition, HashPartitionInfo, KeyPartitionInfo, PartitionInfo}; use crate::{ - ast::{HashPartition, Partition}, + ast::{HashPartition, KeyPartition, Partition}, planner::{ParsePartitionWithCause, Result, UnsupportedPartition}, }; @@ -20,6 +20,7 @@ impl PartitionParser { pub fn parse(partition_stmt: Partition) -> Result { Ok(match partition_stmt { Partition::Hash(stmt) => PartitionInfo::Hash(PartitionParser::parse_hash(stmt)?), + Partition::Key(stmt) => PartitionInfo::Key(PartitionParser::parse_key_partition(stmt)?), }) } @@ -30,7 +31,7 @@ impl PartitionParser { expr, } = hash_stmt; - let definitions = parse_to_definition(partition_num); + let definitions = make_partition_definitions(partition_num); if let SqlExpr::Identifier(id) = expr { let expr = Expr::Column(Column::from_name(id.value)); @@ -53,13 +54,29 @@ impl PartitionParser { .fail() } } + + pub fn parse_key_partition(key_partition_stmt: KeyPartition) -> Result { + let KeyPartition { + linear, + partition_num, + partition_key, + } = key_partition_stmt; + + let definitions = make_partition_definitions(partition_num); + + Ok(KeyPartitionInfo { + definitions, + partition_key, + linear, + }) + } } -fn parse_to_definition(partition_num: u64) -> Vec { +fn make_partition_definitions(partition_num: u64) -> Vec { (0..partition_num) .into_iter() .map(|p| Definition { - name: format!("{}", p), + name: p.to_string(), origin_name: None, }) .collect() diff --git a/src/setup.rs b/src/setup.rs index 307176c59f..7f4788849d 100644 --- a/src/setup.rs +++ b/src/setup.rs @@ -9,7 +9,7 @@ use analytic_engine::{ setup::{EngineBuilder, KafkaWalEngineBuilder, ObkvWalEngineBuilder, RocksDBWalEngineBuilder}, WalStorageConfig, }; -use catalog::manager::ManagerRef; +use catalog::{manager::ManagerRef, schema::OpenOptions, CatalogRef}; use catalog_impls::{table_based::TableBasedManager, volatile, CatalogManagerImpl}; use cluster::{cluster_impl::ClusterImpl, shard_tables_cache::ShardTablesCache}; use common_util::runtime; @@ -22,6 +22,7 @@ use query_engine::executor::{Executor, ExecutorImpl}; use server::{ config::{Config, DeployMode, RuntimeConfig, StaticTopologyConfig}, limiter::Limiter, + local_tables::LocalTablesRecoverer, route::{ cluster_based::ClusterBasedRouter, rule_based::{ClusterView, RuleBasedRouter}, @@ -221,16 +222,31 @@ async fn build_in_standalone_mode( table_engine: TableEngineRef, engine_proxy: TableEngineRef, ) -> Builder { - let table_based_manager = TableBasedManager::new(table_engine, engine_proxy.clone()) + let mut table_based_manager = TableBasedManager::new(table_engine) .await .expect("Failed to create catalog manager"); + // Get collected table infos. + let table_infos = table_based_manager + .fetch_table_infos() + .await + .expect("Failed to fetch table infos for opening"); + // Create catalog manager, use analytic table as backend let catalog_manager = Arc::new(CatalogManagerImpl::new(Arc::new(table_based_manager))); let table_manipulator = Arc::new(catalog_based::TableManipulatorImpl::new( catalog_manager.clone(), )); + // Iterate the table infos to recover. + let default_catalog = default_catalog(catalog_manager.clone()); + let open_opts = OpenOptions { + table_engine: engine_proxy, + }; + + // Create local tables recoverer. + let local_tables_recoverer = LocalTablesRecoverer::new(table_infos, default_catalog, open_opts); + // Create schema in default catalog. create_static_topology_schema( catalog_manager.clone(), @@ -252,6 +268,7 @@ async fn build_in_standalone_mode( .table_manipulator(table_manipulator) .router(router) .schema_config_provider(schema_config_provider) + .local_tables_recoverer(local_tables_recoverer) } async fn create_static_topology_schema( @@ -274,3 +291,11 @@ async fn create_static_topology_schema( ); } } + +fn default_catalog(catalog_manager: ManagerRef) -> CatalogRef { + let default_catalog_name = catalog_manager.default_catalog_name(); + catalog_manager + .catalog_by_name(default_catalog_name) + .expect("fail to get default catalog") + .expect("default catalog is not found") +} diff --git a/system_catalog/src/sys_catalog_table.rs b/system_catalog/src/sys_catalog_table.rs index 3d074cfafe..ad3f4b1f61 100644 --- a/system_catalog/src/sys_catalog_table.rs +++ b/system_catalog/src/sys_catalog_table.rs @@ -520,7 +520,12 @@ impl SysCatalogTable { /// Visit all data in the sys catalog table // TODO(yingwen): Expose read options - pub async fn visit(&self, opts: ReadOptions, visitor: &mut dyn Visitor) -> Result<()> { + pub async fn visit( + &self, + opts: ReadOptions, + visitor_inner: &mut dyn VisitorInner, + options: VisitOptions, + ) -> Result<()> { let read_request = ReadRequest { request_id: RequestId::next_id(), opts, @@ -533,10 +538,15 @@ impl SysCatalogTable { info!("batch_stream schema is:{:?}", batch_stream.schema()); // TODO(yingwen): Check stream schema and table schema? + let mut visitor = Visitor { + inner: visitor_inner, + options, + }; + while let Some(batch) = batch_stream.try_next().await.context(ReadStream)? { // Visit all requests in the record batch info!("real batch_stream schema is:{:?}", batch.schema()); - self.visit_record_batch(batch, visitor).await?; + self.visit_record_batch(batch, &mut visitor).await?; } Ok(()) @@ -546,7 +556,7 @@ impl SysCatalogTable { async fn visit_record_batch( &self, batch: RecordBatch, - visitor: &mut dyn Visitor, + visitor: &mut Visitor<'_>, ) -> Result<()> { let key_column = batch.column(self.key_column_index); let value_column = batch.column(self.value_column_index); @@ -572,33 +582,107 @@ impl SysCatalogTable { let request = decode_one_request(key.as_varbinary().unwrap(), value.as_varbinary().unwrap())?; - Self::call_visitor(request, visitor).await?; + visitor.visit(request)?; } Ok(()) } - - /// Invoke visitor - async fn call_visitor(request: DecodedRequest, visitor: &mut dyn Visitor) -> Result<()> { - match request { - DecodedRequest::CreateCatalog(req) => visitor.visit_catalog(req), - DecodedRequest::CreateSchema(req) => visitor.visit_schema(req), - DecodedRequest::TableEntry(req) => visitor.visit_tables(req).await, - } - } } -/// Visitor for sys catalog requests +/// Visitor inner for sys catalog requests // TODO(yingwen): Define an Error for visitor #[async_trait] -pub trait Visitor { +pub trait VisitorInner { // TODO(yingwen): Use enum another type if need more operation (delete/update) fn visit_catalog(&mut self, request: CreateCatalogRequest) -> Result<()>; fn visit_schema(&mut self, request: CreateSchemaRequest) -> Result<()>; // FIXME(xikai): Should this method be called visit_table? - async fn visit_tables(&mut self, table_info: TableInfo) -> Result<()>; + fn visit_tables(&mut self, table_info: TableInfo) -> Result<()>; +} + +/// Options for visiting sys catalog requests +/// +/// Following infos can be visited: +/// + catalog +/// + schema +/// + table +/// One or more you can select. +#[derive(Debug)] +pub struct VisitOptions { + pub visit_catalog: bool, + pub visit_schema: bool, + pub visit_table: bool, +} + +/// Builder for [VisitOptions] +#[derive(Debug, Default)] +pub struct VisitOptionsBuilder { + visit_catalog: bool, + visit_schema: bool, + visit_table: bool, +} + +impl VisitOptionsBuilder { + pub fn build(self) -> VisitOptions { + VisitOptions { + visit_catalog: self.visit_catalog, + visit_schema: self.visit_schema, + visit_table: self.visit_table, + } + } + + pub fn visit_catalog(mut self) -> Self { + self.visit_catalog = true; + self + } + + pub fn visit_schema(mut self) -> Self { + self.visit_schema = true; + self + } + + pub fn visit_table(mut self) -> Self { + self.visit_table = true; + self + } +} + +pub struct Visitor<'a> { + inner: &'a mut dyn VisitorInner, + options: VisitOptions, +} + +impl<'a> Visitor<'a> { + fn visit(&mut self, request: DecodedRequest) -> Result<()> { + debug!("Visitor begin to visit, options:{:?}", self.options); + + match request { + DecodedRequest::CreateCatalog(req) => { + if self.options.visit_catalog { + self.inner.visit_catalog(req) + } else { + Ok(()) + } + } + + DecodedRequest::CreateSchema(req) => { + if self.options.visit_schema { + self.inner.visit_schema(req) + } else { + Ok(()) + } + } + DecodedRequest::TableEntry(req) => { + if self.options.visit_table { + self.inner.visit_tables(req) + } else { + Ok(()) + } + } + } + } } /// Build a new table schema for sys catalog diff --git a/table_engine/Cargo.toml b/table_engine/Cargo.toml index a42afeefac..d100268057 100644 --- a/table_engine/Cargo.toml +++ b/table_engine/Cargo.toml @@ -18,7 +18,9 @@ common_types = { workspace = true } common_util = { workspace = true } datafusion = { workspace = true } datafusion-expr = { workspace = true } +df_operator = { workspace = true } futures = { workspace = true } +itertools = "0.10.5" log = { workspace = true } parquet = { workspace = true } parquet_ext = { workspace = true } @@ -29,3 +31,6 @@ serde_derive = { workspace = true } smallvec = { workspace = true } snafu = { workspace = true } tokio = { workspace = true } + +[dev-dependencies] +env_logger = { workspace = true } diff --git a/table_engine/src/partition/mod.rs b/table_engine/src/partition/mod.rs index cbcd1e31c5..bcde89a70d 100644 --- a/table_engine/src/partition/mod.rs +++ b/table_engine/src/partition/mod.rs @@ -6,22 +6,56 @@ pub mod rule; use common_types::bytes::Bytes; use proto::meta_update as meta_pb; +use snafu::{Backtrace, Snafu}; + +// TODO: we should refactor for splitting the errors. +#[derive(Debug, Snafu)] +#[snafu(visibility(pub))] +pub enum Error { + #[snafu(display( + "Failed to build partition rule, msg:{}.\nBacktrace:{}\n", + msg, + backtrace + ))] + BuildPartitionRule { msg: String, backtrace: Backtrace }, + + #[snafu(display( + "Failed to locate partitions for write, msg:{}.\nBacktrace:{}\n", + msg, + backtrace + ))] + LocateWritePartition { msg: String, backtrace: Backtrace }, + + #[snafu(display( + "Failed to locate partitions for read, msg:{}.\nBacktrace:{}\n", + msg, + backtrace + ))] + LocateReadPartition { msg: String, backtrace: Backtrace }, + + #[snafu(display("Internal error occurred, msg:{}", msg,))] + Internal { msg: String }, +} + +define_result!(Error); /// Info for how to partition table #[derive(Clone, Debug, PartialEq, Eq)] pub enum PartitionInfo { Hash(HashPartitionInfo), + Key(KeyPartitionInfo), } impl PartitionInfo { pub fn get_definitions(&self) -> Vec { match self { Self::Hash(v) => v.definitions.clone(), + Self::Key(v) => v.definitions.clone(), } } } -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq, Eq, Default)] pub struct Definition { pub name: String, pub origin_name: Option, @@ -34,6 +68,13 @@ pub struct HashPartitionInfo { pub linear: bool, } +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct KeyPartitionInfo { + pub definitions: Vec, + pub partition_key: Vec, + pub linear: bool, +} + impl From for meta_pb::Definition { fn from(definition: Definition) -> Self { Self { @@ -68,6 +109,11 @@ impl From for meta_pb::add_table_meta::PartitionInfo { expr: v.expr.to_vec(), linear: v.linear, }), + PartitionInfo::Key(v) => Self::KeyPartition(meta_pb::KeyPartitionInfo { + definitions: v.definitions.into_iter().map(|v| v.into()).collect(), + partition_key: v.partition_key, + linear: v.linear, + }), } } } @@ -80,6 +126,13 @@ impl From for PartitionInfo { expr: Bytes::from(v.expr), linear: v.linear, }), + meta_pb::add_table_meta::PartitionInfo::KeyPartition(v) => { + Self::Key(KeyPartitionInfo { + definitions: v.definitions.into_iter().map(|v| v.into()).collect(), + partition_key: v.partition_key, + linear: v.linear, + }) + } } } } diff --git a/table_engine/src/partition/rule/df_adapter/extractor.rs b/table_engine/src/partition/rule/df_adapter/extractor.rs new file mode 100644 index 0000000000..06aae8a8fa --- /dev/null +++ b/table_engine/src/partition/rule/df_adapter/extractor.rs @@ -0,0 +1,110 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +//! Partition filter extractor + +use std::collections::HashSet; + +use common_types::datum::Datum; +use datafusion_expr::{Expr, Operator}; +use df_operator::visitor::find_columns_by_expr; + +use crate::partition::rule::filter::{PartitionCondition, PartitionFilter}; + +/// The datafusion filter exprs extractor +/// +/// It's used to extract the meaningful `Expr`s and convert them to +/// [PartitionFilter](the inner filter type in ceresdb). +/// +/// NOTICE: When you implements [PartitionRule] for specific partition strategy, +/// you should implement the corresponding [FilterExtractor], too. +/// +/// For example: [KeyRule] and [KeyExtractor]. +/// If they are not related, [PartitionRule] may not take effect. +pub trait FilterExtractor: Send + Sync + 'static { + fn extract(&self, filters: &[Expr], columns: &[String]) -> Vec; +} +pub struct KeyExtractor; + +impl FilterExtractor for KeyExtractor { + fn extract(&self, filters: &[Expr], columns: &[String]) -> Vec { + if filters.is_empty() { + return Vec::default(); + } + + let mut target = Vec::with_capacity(filters.len()); + for filter in filters { + // If no target columns included in `filter`, ignore this `filter`. + let columns_in_filter = find_columns_by_expr(filter) + .into_iter() + .collect::>(); + let find_result = columns + .iter() + .find(|col| columns_in_filter.contains(col.as_str())); + + if find_result.is_none() { + continue; + } + + // If target columns included, now only the situation that only target column in + // filter is supported. Once other type column found here, we ignore it. + // TODO: support above situation. + if columns_in_filter.len() != 1 { + continue; + } + + // Finally, we try to convert `filter` to `PartitionFilter`. + // We just support the simple situation: "colum = value" now. + // TODO: support "colum in [value list]". + // TODO: we need to compare and check the datatype of column and value. + // (Actually, there is type conversion on high-level, but when converted data + // is overflow, it may take no effect). + let partition_filter = match filter.clone() { + Expr::BinaryExpr { left, op, right } => match (*left, op, *right) { + (Expr::Column(col), Operator::Eq, Expr::Literal(val)) + | (Expr::Literal(val), Operator::Eq, Expr::Column(col)) => { + let datum_opt = Datum::from_scalar_value(&val); + datum_opt.map(|d| PartitionFilter::new(col.name, PartitionCondition::Eq(d))) + } + _ => None, + }, + _ => None, + }; + + if let Some(pf) = partition_filter { + target.push(pf); + } + } + + target + } +} + +pub type FilterExtractorRef = Box; + +#[cfg(test)] +mod tests { + use datafusion::scalar::ScalarValue; + use datafusion_expr::col; + + use super::{FilterExtractor, *}; + + #[test] + fn test_key_extractor_basic() { + let extractor = KeyExtractor; + + // `Eq` expr will be accepted. + let columns = vec!["col1".to_string()]; + let accepted_expr = col("col1").eq(Expr::Literal(ScalarValue::Int32(Some(42)))); + let partition_filter = extractor.extract(&[accepted_expr], &columns); + let expected = PartitionFilter { + column: "col1".to_string(), + condition: PartitionCondition::Eq(Datum::Int32(42)), + }; + assert_eq!(partition_filter.get(0).unwrap(), &expected); + + // Other expr will be rejected now. + let rejected_expr = col("col1").gt(Expr::Literal(ScalarValue::Int32(Some(42)))); + let partition_filter = extractor.extract(&[rejected_expr], &columns); + assert!(partition_filter.is_empty()); + } +} diff --git a/table_engine/src/partition/rule/df_adapter/mod.rs b/table_engine/src/partition/rule/df_adapter/mod.rs new file mode 100644 index 0000000000..2fdcc87182 --- /dev/null +++ b/table_engine/src/partition/rule/df_adapter/mod.rs @@ -0,0 +1,273 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +//! Partition rule datafusion adapter + +use common_types::{row::RowGroup, schema::Schema}; +use datafusion_expr::Expr; + +use self::extractor::KeyExtractor; +use super::factory::PartitionRuleFactory; +use crate::partition::{ + rule::{df_adapter::extractor::FilterExtractorRef, PartitionRuleRef}, + BuildPartitionRule, PartitionInfo, Result, +}; + +pub(crate) mod extractor; + +/// Partition rule's adapter for datafusion +pub struct DfPartitionRuleAdapter { + /// Partition rule + rule: PartitionRuleRef, + + /// `PartitionFilter` extractor for datafusion `Expr` + extractor: FilterExtractorRef, +} + +impl DfPartitionRuleAdapter { + pub fn new(partition_info: PartitionInfo, schema: &Schema) -> Result { + let extractor = Self::create_extractor(&partition_info)?; + let rule = PartitionRuleFactory::create(partition_info, schema)?; + + Ok(Self { rule, extractor }) + } + + pub fn columns(&self) -> Vec { + self.rule.columns() + } + + pub fn locate_partitions_for_write(&self, row_group: &RowGroup) -> Result> { + self.rule.locate_partitions_for_write(row_group) + } + + pub fn locate_partitions_for_read(&self, filters: &[Expr]) -> Result> { + // Extract partition filters from datafusion filters. + let columns = self.columns(); + let partition_filters = self.extractor.extract(filters, &columns); + + // Locate partitions from filters. + self.rule.locate_partitions_for_read(&partition_filters) + } + + fn create_extractor(partition_info: &PartitionInfo) -> Result { + match partition_info { + PartitionInfo::Key(_) => Ok(Box::new(KeyExtractor)), + PartitionInfo::Hash(_) => BuildPartitionRule { + msg: format!( + "unsupported partition strategy, strategy:{:?}", + partition_info + ), + } + .fail(), + } + } +} + +#[cfg(test)] +mod tests { + use common_types::{ + bytes::BytesMut, + column_schema, + datum::{Datum, DatumKind}, + row::RowGroupBuilder, + schema::{Builder, Schema, TSID_COLUMN}, + string::StringBytes, + time::Timestamp, + }; + use datafusion_expr::{col, lit}; + + use super::*; + use crate::partition::{rule::key::compute_partition, Definition, KeyPartitionInfo}; + + // TODO: this test maybe not reasonable to place here. + #[test] + fn test_locate_partitions_for_read() { + let schema = build_schema(); + let partition_num = 16; + let filter1 = col("col1").eq(lit(1_i32)); + let filter2 = col("col2").eq(lit("test".to_string())); + let filter3 = col("col3").eq(lit(42_u64)); + let filter4 = col("col1").eq(lit(3_i32)); + let valid_filters_1 = vec![filter1.clone(), filter2.clone(), filter3.clone()]; + let valid_filters_2 = vec![filter1, filter2, filter3, filter4]; + let ket_partition = KeyPartitionInfo { + definitions: vec![Definition::default(); partition_num], + partition_key: vec!["col1".to_string(), "col2".to_string(), "col3".to_string()], + linear: false, + }; + + // Basic flow + let key_rule_adapter = + DfPartitionRuleAdapter::new(PartitionInfo::Key(ket_partition), &schema).unwrap(); + let partitions = key_rule_adapter + .locate_partitions_for_read(&valid_filters_1) + .unwrap(); + + let partition_keys = vec![ + Datum::Int32(1), + Datum::String(StringBytes::from("test")), + Datum::UInt64(42), + ]; + let partition_key_refs = partition_keys.iter().collect::>(); + let mut buf = BytesMut::new(); + let expected = compute_partition(&partition_key_refs, partition_num as u64, &mut buf); + + assert_eq!(partitions[0], expected); + + // Conflict filter and empty partitions + let partitions = key_rule_adapter + .locate_partitions_for_read(&valid_filters_2) + .unwrap(); + + assert!(partitions.is_empty()); + } + + // TODO: this test maybe not reasonable to place here. + #[test] + fn test_locate_partitions_for_read_invalid() { + let schema = build_schema(); + let partition_num = 16; + let filter1 = col("col1").eq(lit(1_i32)); + let filter2 = col("col2").eq(lit("test".to_string())); + let filter3 = col("col3").gt(lit(42_u64)); + let filter4 = col("col4").eq(lit(42_u64)); + + let invalid_filters_1 = vec![filter1.clone(), filter2.clone(), filter3]; + let invalid_filters_2 = vec![filter1, filter2, filter4]; + let ket_partition = KeyPartitionInfo { + definitions: vec![Definition::default(); partition_num], + partition_key: vec!["col1".to_string(), "col2".to_string(), "col3".to_string()], + linear: false, + }; + + // Locate for invalid filters + let key_rule_adapter = + DfPartitionRuleAdapter::new(PartitionInfo::Key(ket_partition), &schema).unwrap(); + + // Partitions located from invalid filters. + let partitions_1 = key_rule_adapter + .locate_partitions_for_read(&invalid_filters_1) + .unwrap(); + let partitions_2 = key_rule_adapter + .locate_partitions_for_read(&invalid_filters_2) + .unwrap(); + + // Expected + let all_partitions = (0..partition_num).into_iter().collect::>(); + assert_eq!(partitions_1, all_partitions); + assert_eq!(partitions_2, all_partitions); + } + + // TODO: this test maybe not reasonable to place here. + #[test] + fn test_locate_partitions_for_write() { + // Basic flow + let schema = build_schema(); + let partition_num = 16; + let ket_partition = KeyPartitionInfo { + definitions: vec![Definition::default(); partition_num], + partition_key: vec!["col1".to_string(), "col2".to_string(), "col3".to_string()], + linear: false, + }; + + // Build `RowGroup` + let test_datums = vec![ + vec![ + Datum::Int32(1), + Datum::String(StringBytes::from("test1")), + Datum::UInt64(42), + ], + vec![ + Datum::Int32(4), + Datum::String(StringBytes::from("test2")), + Datum::UInt64(4242), + ], + ]; + + let mut row_group_builder = RowGroupBuilder::new(schema.clone()); + row_group_builder + .row_builder() + .append_datum(Datum::UInt64(0)) + .unwrap() + .append_datum(Datum::Timestamp(Timestamp::new(0))) + .unwrap() + .append_datum(test_datums[0][0].clone()) + .unwrap() + .append_datum(test_datums[0][1].clone()) + .unwrap() + .append_datum(test_datums[0][2].clone()) + .unwrap() + .finish() + .unwrap(); + row_group_builder + .row_builder() + .append_datum(Datum::UInt64(1)) + .unwrap() + .append_datum(Datum::Timestamp(Timestamp::new(1))) + .unwrap() + .append_datum(test_datums[1][0].clone()) + .unwrap() + .append_datum(test_datums[1][1].clone()) + .unwrap() + .append_datum(test_datums[1][2].clone()) + .unwrap() + .finish() + .unwrap(); + let row_group = row_group_builder.build(); + + // Basic flow + let key_rule_adapter = + DfPartitionRuleAdapter::new(PartitionInfo::Key(ket_partition), &schema).unwrap(); + let partitions = key_rule_adapter + .locate_partitions_for_write(&row_group) + .unwrap(); + + // Expected + let partition_keys_1 = test_datums[0].clone(); + let partition_key_refs_1 = partition_keys_1.iter().collect::>(); + let partition_keys_2 = test_datums[1].clone(); + let partition_key_refs_2 = partition_keys_2.iter().collect::>(); + let mut buf = BytesMut::new(); + let expected_1 = compute_partition(&partition_key_refs_1, partition_num as u64, &mut buf); + let expected_2 = compute_partition(&partition_key_refs_2, partition_num as u64, &mut buf); + let expecteds = vec![expected_1, expected_2]; + + assert_eq!(partitions, expecteds); + } + + fn build_schema() -> Schema { + Builder::new() + .auto_increment_column_id(true) + .add_key_column( + column_schema::Builder::new(TSID_COLUMN.to_string(), DatumKind::UInt64) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .add_key_column( + column_schema::Builder::new("timestamp".to_string(), DatumKind::Timestamp) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .add_normal_column( + column_schema::Builder::new("col1".to_string(), DatumKind::Int32) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .add_normal_column( + column_schema::Builder::new("col2".to_string(), DatumKind::String) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .add_normal_column( + column_schema::Builder::new("col3".to_string(), DatumKind::UInt64) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .build() + .expect("should succeed to build schema") + } +} diff --git a/table_engine/src/partition/rule/factory.rs b/table_engine/src/partition/rule/factory.rs new file mode 100644 index 0000000000..b5741bb552 --- /dev/null +++ b/table_engine/src/partition/rule/factory.rs @@ -0,0 +1,51 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +//! Partition rule factory + +use common_types::schema::Schema; +use snafu::OptionExt; + +use super::{key::KeyRule, ColumnWithType}; +use crate::partition::{ + rule::PartitionRuleRef, BuildPartitionRule, KeyPartitionInfo, PartitionInfo, Result, +}; + +pub struct PartitionRuleFactory; + +impl PartitionRuleFactory { + pub fn create(partition_info: PartitionInfo, schema: &Schema) -> Result { + match partition_info { + PartitionInfo::Key(key_info) => Self::create_key_rule(key_info, schema), + _ => BuildPartitionRule { + msg: format!( + "unsupported partition strategy, strategy:{:?}", + partition_info + ), + } + .fail(), + } + } + + fn create_key_rule(key_info: KeyPartitionInfo, schema: &Schema) -> Result { + let typed_key_columns = key_info + .partition_key + .into_iter() + .map(|col| { + schema + .column_with_name(col.as_str()) + .with_context(|| BuildPartitionRule { + msg: format!( + "column in key partition info not found in schema, column:{}", + col + ), + }) + .map(|col_schema| ColumnWithType::new(col, col_schema.data_type)) + }) + .collect::>>()?; + + Ok(Box::new(KeyRule { + typed_key_columns, + partition_num: key_info.definitions.len() as u64, + })) + } +} diff --git a/table_engine/src/partition/rule/filter.rs b/table_engine/src/partition/rule/filter.rs new file mode 100644 index 0000000000..0c87617c9f --- /dev/null +++ b/table_engine/src/partition/rule/filter.rs @@ -0,0 +1,37 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +//! Partition filter + +use common_types::datum::Datum; + +/// Filter using for partition +/// +/// Now, it is same as the `BinaryExpr`in datafusion. +#[allow(dead_code)] +#[derive(Debug, Clone, PartialEq)] +pub enum PartitionCondition { + /// Expressions are equal + Eq(Datum), + /// IN Expressions + In(Vec), + /// Left side is smaller than right side + Lt(Datum), + /// Left side is smaller or equal to right side + LtEq(Datum), + /// Left side is greater than right side + Gt(Datum), + /// Left side is greater or equal to right side + GtEq(Datum), +} + +#[derive(Debug, Clone, PartialEq)] +pub struct PartitionFilter { + pub column: String, + pub condition: PartitionCondition, +} + +impl PartitionFilter { + pub fn new(column: String, condition: PartitionCondition) -> Self { + Self { column, condition } + } +} diff --git a/table_engine/src/partition/rule/key.rs b/table_engine/src/partition/rule/key.rs new file mode 100644 index 0000000000..7cbac57780 --- /dev/null +++ b/table_engine/src/partition/rule/key.rs @@ -0,0 +1,395 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +//! Key partition rule + +use std::collections::{HashMap, HashSet}; + +use common_types::{ + bytes::{BufMut, BytesMut}, + datum::Datum, + hash::hash64, + row::{Row, RowGroup}, +}; +use itertools::Itertools; +use log::{debug, error}; +use snafu::OptionExt; + +use crate::partition::{ + rule::{filter::PartitionCondition, ColumnWithType, PartitionFilter, PartitionRule}, + Internal, LocateWritePartition, Result, +}; + +pub struct KeyRule { + pub typed_key_columns: Vec, + pub partition_num: u64, +} + +impl KeyRule { + /// Check and do cartesian product to get candidate partition groups. + /// + /// for example: + /// key_col1: [f1, f2, f3] + /// key_col2: [f4, f5] + /// will convert to: + /// group1: [key_col1: f1, key_col2: f4] + /// group2: [key_col1: f1, key_col2: f5] + /// group3: [key_col1: f2, key_col2: f4] + /// ... + /// + /// Above logics are preparing for implementing something like: + /// fa1 && fa2 && fb = (fa1 && fb) && (fa2 && fb) + /// Partitions about above expression will be calculated by following steps: + /// + partitions about "(fa1 && fb)" will be calculated first, + /// assume "partitions 1" + /// + partitions about "(fa2 && fb)" will be calculated after, + /// assume "partitions 2" + /// + "total partitions" = "partitions 1" intersection "partitions 2" + fn get_candidate_partition_keys_groups( + &self, + filters: &[PartitionFilter], + ) -> Result>> { + let column_name_to_idxs = self + .typed_key_columns + .iter() + .enumerate() + .map(|(col_idx, typed_col)| (typed_col.column.clone(), col_idx)) + .collect::>(); + let mut filter_by_columns = vec![Vec::new(); self.typed_key_columns.len()]; + + // Group the filters by their columns. + for (filter_idx, filter) in filters.iter().enumerate() { + let col_idx = column_name_to_idxs + .get(filter.column.as_str()) + .context(Internal { + msg: format!( + "column in filters but not in target, column:{}, targets:{:?}", + filter.column, self.typed_key_columns + ), + })?; + + filter_by_columns + .get_mut(*col_idx) + .unwrap() + .push(filter_idx); + } + debug!( + "KeyRule get candidate partition keys groups, filter_by_columns:{:?}", + filter_by_columns + ); + + let empty_filter = filter_by_columns.iter().find(|filter| filter.is_empty()); + if empty_filter.is_some() { + return Ok(Vec::default()); + } + + let groups = filter_by_columns + .into_iter() + .map(|filters| filters.into_iter()) + .multi_cartesian_product() + .collect_vec(); + debug!( + "KeyRule get candidate partition keys groups, groups:{:?}", + groups + ); + + Ok(groups) + } + + fn compute_partition_for_inserted_row( + &self, + row: &Row, + target_column_idxs: &[usize], + buf: &mut BytesMut, + ) -> usize { + let partition_keys = target_column_idxs + .iter() + .map(|col_idx| &row[*col_idx]) + .collect_vec(); + compute_partition(&partition_keys, self.partition_num, buf) + } + + fn compute_partition_for_keys_group( + &self, + group: &[usize], + filters: &[PartitionFilter], + buf: &mut BytesMut, + ) -> Result> { + buf.clear(); + + let mut partitions = HashSet::new(); + let expanded_group = expand_partition_keys_group(group, filters)?; + for partition_keys in expanded_group { + let partition_key_refs = partition_keys.iter().collect_vec(); + let partition = compute_partition(&partition_key_refs, self.partition_num, buf); + partitions.insert(partition); + } + + Ok(partitions) + } +} + +impl PartitionRule for KeyRule { + fn columns(&self) -> Vec { + self.typed_key_columns + .iter() + .map(|typed_col| typed_col.column.clone()) + .collect() + } + + fn locate_partitions_for_write(&self, row_group: &RowGroup) -> Result> { + // Extract idxs. + // TODO: we should compare column's related data types in `typed_key_columns` + // and the ones in `row_group`'s schema. + let typed_idxs = self + .typed_key_columns + .iter() + .map(|typed_col| row_group.schema().index_of(typed_col.column.as_str())) + .collect::>>() + .context(LocateWritePartition { + msg: format!( + "not all key columns found in schema when locate partition by key strategy, key columns:{:?}", + self.typed_key_columns + ), + })?; + + // Compute partitions. + let mut buf = BytesMut::new(); + let partitions = row_group + .iter() + .map(|row| self.compute_partition_for_inserted_row(row, &typed_idxs, &mut buf)) + .collect(); + Ok(partitions) + } + + fn locate_partitions_for_read(&self, filters: &[PartitionFilter]) -> Result> { + let all_partitions = (0..self.partition_num as usize).into_iter().collect(); + + // Filters are empty. + if filters.is_empty() { + return Ok(all_partitions); + } + + // Group the filters by their columns. + // If found invalid filter, return all partitions. + let candidate_partition_keys_groups = self + .get_candidate_partition_keys_groups(filters) + .map_err(|e| { + error!("KeyRule locate partition for read, err:{}", e); + }) + .unwrap_or_default(); + if candidate_partition_keys_groups.is_empty() { + return Ok(all_partitions); + } + + let mut buf = BytesMut::new(); + let (first_group, rest_groups) = candidate_partition_keys_groups.split_first().unwrap(); + let mut target_partitions = + self.compute_partition_for_keys_group(first_group, filters, &mut buf)?; + for group in rest_groups { + // Same as above, if found invalid, return all partitions. + let partitions = match self.compute_partition_for_keys_group(group, filters, &mut buf) { + Ok(partitions) => partitions, + Err(e) => { + error!("KeyRule locate partition for read, err:{}", e); + return Ok(all_partitions); + } + }; + + target_partitions = target_partitions + .intersection(&partitions) + .copied() + .collect::>(); + } + + Ok(target_partitions.into_iter().collect()) + } +} + +fn expand_partition_keys_group( + group: &[usize], + filters: &[PartitionFilter], +) -> Result>> { + let mut datum_by_columns = Vec::with_capacity(group.len()); + for filter_idx in group { + let filter = &filters[*filter_idx]; + let datums = match &filter.condition { + // Only `Eq` is supported now. + // TODO: to support `In`'s extracting. + PartitionCondition::Eq(datum) => vec![datum.clone()], + PartitionCondition::In(datums) => datums.clone(), + _ => { + return Internal { + msg: format!("invalid partition filter found, filter:{:?},", filter), + } + .fail() + } + }; + + datum_by_columns.push(datums); + } + + let expanded_group = datum_by_columns + .into_iter() + .map(|filters| filters.into_iter()) + .multi_cartesian_product() + .collect_vec(); + Ok(expanded_group) +} + +// Compute partition +pub(crate) fn compute_partition( + partition_keys: &[&Datum], + partition_num: u64, + buf: &mut BytesMut, +) -> usize { + buf.clear(); + partition_keys + .iter() + .for_each(|datum| buf.put_slice(&datum.to_bytes())); + + (hash64(buf) % partition_num) as usize +} + +#[cfg(test)] +mod tests { + use std::collections::BTreeSet; + + use common_types::{datum::DatumKind, string::StringBytes}; + + use super::*; + + #[test] + fn test_compute_partition_for_inserted_row() { + let partition_num = 16; + let key_rule = KeyRule { + typed_key_columns: vec![ColumnWithType::new("col1".to_string(), DatumKind::UInt32)], + partition_num, + }; + + let datums = vec![ + Datum::Int32(1), + Datum::Int32(42), + Datum::String(StringBytes::copy_from_str("test")), + Datum::Int64(84), + Datum::Null, + ]; + let row = Row::from_datums(datums.clone()); + let defined_idxs = vec![1_usize, 2, 3, 4]; + + // Actual + let mut buf = BytesMut::new(); + let actual = key_rule.compute_partition_for_inserted_row(&row, &defined_idxs, &mut buf); + + // Expected + buf.clear(); + buf.put_slice(&datums[1].to_bytes()); + buf.put_slice(&datums[2].to_bytes()); + buf.put_slice(&datums[3].to_bytes()); + buf.put_slice(&datums[4].to_bytes()); + let expected = (hash64(&buf) % partition_num) as usize; + + assert_eq!(actual, expected); + } + + #[test] + fn test_get_candidate_partition_keys_groups() { + // Key rule of keys:[col1, col2, col3] + let partition_num = 16; + let key_rule = KeyRule { + typed_key_columns: vec![ + ColumnWithType::new("col1".to_string(), DatumKind::UInt32), + ColumnWithType::new("col2".to_string(), DatumKind::UInt32), + ColumnWithType::new("col3".to_string(), DatumKind::UInt32), + ], + partition_num, + }; + + // Filters(related columns: col1, col2, col3, col1, col2) + let filter1 = PartitionFilter { + column: "col1".to_string(), + condition: PartitionCondition::Eq(Datum::UInt32(1)), + }; + + let filter2 = PartitionFilter { + column: "col2".to_string(), + condition: PartitionCondition::Eq(Datum::UInt32(2)), + }; + + let filter3 = PartitionFilter { + column: "col3".to_string(), + condition: PartitionCondition::Eq(Datum::UInt32(3)), + }; + + let filter4 = PartitionFilter { + column: "col1".to_string(), + condition: PartitionCondition::Eq(Datum::UInt32(4)), + }; + + let filter5 = PartitionFilter { + column: "col2".to_string(), + condition: PartitionCondition::Eq(Datum::UInt32(5)), + }; + + let filters = vec![filter1, filter2, filter3, filter4, filter5]; + + // Groups' len: 4 + // Col1's filter idxs: [0, 3] + // Col2's filter idxs: [1, 4] + // Col3's filter idxs: [2] + let groups = key_rule + .get_candidate_partition_keys_groups(&filters) + .unwrap(); + assert_eq!(groups.len(), 4); + + let mut filter_in_groups = vec![BTreeSet::new(); 3]; + for group in groups { + filter_in_groups[0].insert(group[0]); + filter_in_groups[1].insert(group[1]); + filter_in_groups[2].insert(group[2]); + } + let filter_in_groups = filter_in_groups + .into_iter() + .map(|filters| filters.into_iter().collect_vec()) + .collect_vec(); + + assert_eq!(filter_in_groups[0], [0, 3]); + assert_eq!(filter_in_groups[1], [1, 4]); + assert_eq!(filter_in_groups[2], [2]); + } + + #[test] + fn test_expand_partition_keys_group() { + // Filters(related columns: col1, col2, col3, col1) + let filter1 = PartitionFilter { + column: "col1".to_string(), + condition: PartitionCondition::Eq(Datum::UInt32(1)), + }; + + let filter2 = PartitionFilter { + column: "col2".to_string(), + condition: PartitionCondition::In(vec![Datum::UInt32(2), Datum::UInt32(22)]), + }; + + let filter3 = PartitionFilter { + column: "col3".to_string(), + condition: PartitionCondition::Eq(Datum::UInt32(3)), + }; + + let filter4 = PartitionFilter { + column: "col1".to_string(), + condition: PartitionCondition::Eq(Datum::UInt32(4)), + }; + let filters = vec![filter1, filter2, filter3, filter4]; + + // Group + let group = vec![0, 1, 2]; + + // Expanded group + let expanded_group = expand_partition_keys_group(&group, &filters).unwrap(); + let expected = vec![ + vec![Datum::UInt32(1), Datum::UInt32(2), Datum::UInt32(3)], + vec![Datum::UInt32(1), Datum::UInt32(22), Datum::UInt32(3)], + ]; + assert_eq!(expanded_group, expected); + } +} diff --git a/table_engine/src/partition/rule/mock.rs b/table_engine/src/partition/rule/mock.rs deleted file mode 100644 index 69e18fbe10..0000000000 --- a/table_engine/src/partition/rule/mock.rs +++ /dev/null @@ -1,21 +0,0 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. - -//! Mock partition rule - -use common_types::row::RowGroup; - -use crate::partition::rule::{PartitionFilter, PartitionRule, Result}; - -pub struct MockRule { - pub wanted: usize, -} - -impl PartitionRule for MockRule { - fn locate_partitions_for_write(&self, row_group: &RowGroup) -> Result> { - Ok(vec![self.wanted; row_group.num_rows()]) - } - - fn locate_partitions_for_read(&self, _filters: &[PartitionFilter]) -> Result> { - Ok(vec![self.wanted]) - } -} diff --git a/table_engine/src/partition/rule/mod.rs b/table_engine/src/partition/rule/mod.rs index 7381cac2db..d73dce5604 100644 --- a/table_engine/src/partition/rule/mod.rs +++ b/table_engine/src/partition/rule/mod.rs @@ -2,38 +2,50 @@ //! Partition rules -pub mod mock; +pub mod df_adapter; +pub(crate) mod factory; +pub(crate) mod filter; +pub(crate) mod key; -use common_types::row::RowGroup; -use common_util::define_result; -use datafusion_expr::{Expr, Operator}; -use snafu::Snafu; +use common_types::{datum::DatumKind, row::RowGroup}; -#[derive(Debug, Snafu)] -pub enum Error {} - -define_result!(Error); +use self::filter::PartitionFilter; +use crate::partition::Result; /// Partition rule locate partition -pub trait PartitionRule { +pub trait PartitionRule: Send + Sync + 'static { + fn columns(&self) -> Vec; + /// Locate the partition for each row in `row_group`. /// /// Len of returned value should be equal to the one of rows in `row group`. fn locate_partitions_for_write(&self, row_group: &RowGroup) -> Result>; /// Locate partitions according to `filters`. + /// + /// NOTICE: Exprs which are useless for partitioning in specific partition + /// strategy will be considered to have been filtered by corresponding + /// [Extractor]. + /// + /// For example: + /// In key partition, only filters like "a = 1", "a in [1,2,3]" can be + /// passed here. + /// + /// If unexpected filters still found, all partitions will be returned. fn locate_partitions_for_read(&self, filters: &[PartitionFilter]) -> Result>; } -/// Filter using for partition -/// -/// Now, it is same as the `BinaryExpr`in datafusion. #[allow(dead_code)] -pub struct PartitionFilter { - /// Left-hand side of the expression - left: Box, - /// The comparison operator - op: Operator, - /// Right-hand side of the expression - right: Box, +#[derive(Debug)] +pub struct ColumnWithType { + column: String, + datum_type: DatumKind, +} + +impl ColumnWithType { + pub fn new(column: String, datum_type: DatumKind) -> Self { + Self { column, datum_type } + } } + +pub type PartitionRuleRef = Box;