From 12d8b015175e74cfde3b34ec9edb013a2eeed36c Mon Sep 17 00:00:00 2001 From: kamille <34352236+Rachelint@users.noreply.github.com> Date: Fri, 25 Aug 2023 16:34:08 +0800 Subject: [PATCH] feat: teach ceresdb to convert the inexecutable partitioned scan to executable(resolving process) (#1161) ## Rationale Part of #1112 CeresDB is able to generate the specific physical plan for partitioned table in #1148, but it is inexecutable. This pr introduce the `resolver` to make it executable. ## Detailed Changes + Add `Resolver` to convert the `UnresolvePartitionedScan` to final executable scan plans of sub tables. + Add test for the resolving process. ## Test Plan Test by new ut. --- Cargo.lock | 59 ++- catalog/Cargo.toml | 3 + catalog/src/lib.rs | 2 + catalog/src/test_util.rs | 169 +++++++ df_engine_extensions/Cargo.toml | 7 + .../src/dist_sql_query/mod.rs | 32 ++ .../src/dist_sql_query/physical_plan.rs | 44 +- .../src/dist_sql_query/resolver.rs | 474 ++++++++++++++++++ ...test__resolve_simple_partitioned_scan.snap | 9 + ...solver__test__resolve_simple_sub_scan.snap | 9 + table_engine/src/memory.rs | 37 ++ 11 files changed, 832 insertions(+), 13 deletions(-) create mode 100644 catalog/src/test_util.rs create mode 100644 df_engine_extensions/src/dist_sql_query/resolver.rs create mode 100644 df_engine_extensions/src/dist_sql_query/snapshots/df_engine_extensions__dist_sql_query__resolver__test__resolve_simple_partitioned_scan.snap create mode 100644 df_engine_extensions/src/dist_sql_query/snapshots/df_engine_extensions__dist_sql_query__resolver__test__resolve_simple_sub_scan.snap diff --git a/Cargo.lock b/Cargo.lock index ef8b1d4b6d..4ab07ca518 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1612,6 +1612,18 @@ dependencies = [ "crossbeam-utils 0.8.15", ] +[[package]] +name = "console" +version = "0.15.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c926e00cc70edefdc64d3a5ff31cc65bb97a3460097762bd23afb4d8145fccf8" +dependencies = [ + "encode_unicode 0.3.6", + "lazy_static", + "libc", + "windows-sys 0.45.0", +] + [[package]] name = "console-api" version = "0.5.0" @@ -2328,10 +2340,15 @@ name = "df_engine_extensions" version = "1.2.6-alpha" dependencies = [ "arrow 43.0.0", + "async-trait", + "catalog", + "common_types", "datafusion", "generic_error", + "insta", "snafu 0.6.10", "table_engine", + "trace_metric", ] [[package]] @@ -2407,6 +2424,12 @@ version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7fcaabb2fef8c910e7f4c7ce9f67a1283a1715879a7c230ca9d6d1ae31f16d91" +[[package]] +name = "encode_unicode" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" + [[package]] name = "encode_unicode" version = "1.0.0" @@ -3270,6 +3293,19 @@ dependencies = [ "once_cell", ] +[[package]] +name = "insta" +version = "1.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0770b0a3d4c70567f0d58331f3088b0e4c4f56c9b8d764efe654b4a5d46de3a" +dependencies = [ + "console", + "lazy_static", + "linked-hash-map", + "similar", + "yaml-rust", +] + [[package]] name = "instant" version = "0.1.12" @@ -3639,6 +3675,12 @@ dependencies = [ "cc", ] +[[package]] +name = "linked-hash-map" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" + [[package]] name = "linux-raw-sys" version = "0.3.1" @@ -4997,7 +5039,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eea25e07510aa6ab6547308ebe3c036016d162b8da920dbb079e3ba8acf3d95a" dependencies = [ "csv", - "encode_unicode", + "encode_unicode 1.0.0", "is-terminal", "lazy_static", "term", @@ -6333,6 +6375,12 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f27f6278552951f1f2b8cf9da965d10969b2efdea95a6ec47987ab46edfe263a" +[[package]] +name = "similar" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "420acb44afdae038210c99e69aae24109f32f15500aa708e81d46c9f29d55fcf" + [[package]] name = "siphasher" version = "0.3.10" @@ -8155,6 +8203,15 @@ dependencies = [ "lzma-sys", ] +[[package]] +name = "yaml-rust" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56c1936c4cc7a1c9ab21a1ebb602eb942ba868cbd44a99cb7cdc5892335e1c85" +dependencies = [ + "linked-hash-map", +] + [[package]] name = "yatp" version = "0.0.1" diff --git a/catalog/Cargo.toml b/catalog/Cargo.toml index cf8ce65763..2703177224 100644 --- a/catalog/Cargo.toml +++ b/catalog/Cargo.toml @@ -24,6 +24,9 @@ workspace = true [package.edition] workspace = true +[features] +test = [] + [dependencies] async-trait = { workspace = true } common_types = { workspace = true } diff --git a/catalog/src/lib.rs b/catalog/src/lib.rs index cd01f65150..2ed96de614 100644 --- a/catalog/src/lib.rs +++ b/catalog/src/lib.rs @@ -18,6 +18,8 @@ pub mod consts; pub mod manager; pub mod schema; pub mod table_operator; +#[cfg(feature = "test")] +pub mod test_util; use std::sync::Arc; diff --git a/catalog/src/test_util.rs b/catalog/src/test_util.rs new file mode 100644 index 0000000000..a4eba4e1e1 --- /dev/null +++ b/catalog/src/test_util.rs @@ -0,0 +1,169 @@ +// Copyright 2023 The CeresDB Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{collections::HashMap, sync::Arc}; + +use async_trait::async_trait; +use table_engine::table::{SchemaId, TableRef}; + +use crate::{ + manager::{Manager, ManagerRef}, + schema::{ + CreateOptions, CreateTableRequest, DropOptions, DropTableRequest, NameRef, + Result as SchemaResult, Schema, SchemaRef, + }, + Catalog, CatalogRef, Result, +}; + +/// Mock catalog builder +pub struct MockCatalogManagerBuilder { + catalog: String, + schema: String, + tables: Vec, +} + +impl MockCatalogManagerBuilder { + pub fn new(catalog: String, schema: String, tables: Vec) -> Self { + Self { + catalog, + schema, + tables, + } + } + + pub fn build(self) -> ManagerRef { + let schema = Arc::new(MockSchema { + name: self.schema.clone(), + tables: self + .tables + .into_iter() + .map(|t| (t.name().to_string(), t)) + .collect(), + }); + + let catalog = Arc::new(MockCatalog { + name: self.catalog.clone(), + schemas: HashMap::from([(self.schema.clone(), schema as _)]), + }); + + Arc::new(MockCatalogManager { + catalogs: HashMap::from([(self.catalog.clone(), catalog as _)]), + default_catalog: self.catalog, + default_schema: self.schema, + }) + } +} + +/// Mock catalog manager which only support default catalog and schema +/// +/// You can set the default catalog and schema when initializing. +struct MockCatalogManager { + catalogs: HashMap, + default_catalog: String, + default_schema: String, +} + +impl Manager for MockCatalogManager { + fn default_catalog_name(&self) -> crate::schema::NameRef { + &self.default_catalog + } + + fn default_schema_name(&self) -> crate::schema::NameRef { + &self.default_schema + } + + fn catalog_by_name( + &self, + name: crate::schema::NameRef, + ) -> crate::manager::Result> { + Ok(self.catalogs.get(name).cloned()) + } + + fn all_catalogs(&self) -> crate::manager::Result> { + Ok(self.catalogs.clone().into_values().collect()) + } +} + +struct MockCatalog { + name: String, + schemas: HashMap, +} + +#[async_trait::async_trait] +impl Catalog for MockCatalog { + fn name(&self) -> NameRef { + &self.name + } + + fn schema_by_name(&self, name: NameRef) -> Result> { + Ok(self.schemas.get(name).cloned()) + } + + async fn create_schema<'a>(&'a self, _name: NameRef<'a>) -> Result<()> { + unimplemented!() + } + + /// All schemas + fn all_schemas(&self) -> Result> { + Ok(self.schemas.clone().into_values().collect()) + } +} + +struct MockSchema { + name: String, + tables: HashMap, +} + +#[async_trait] +impl Schema for MockSchema { + fn name(&self) -> NameRef { + &self.name + } + + fn id(&self) -> SchemaId { + SchemaId::from_u32(42) + } + + fn table_by_name(&self, name: NameRef) -> SchemaResult> { + Ok(self.tables.get(name).cloned()) + } + + async fn create_table( + &self, + _request: CreateTableRequest, + _opts: CreateOptions, + ) -> SchemaResult { + unimplemented!() + } + + async fn drop_table( + &self, + _request: DropTableRequest, + _opts: DropOptions, + ) -> SchemaResult { + unimplemented!() + } + + fn all_tables(&self) -> SchemaResult> { + Ok(self.tables.clone().into_values().collect()) + } + + fn register_table(&self, _table: TableRef) { + unimplemented!() + } + + fn unregister_table(&self, _table_name: &str) { + unimplemented!() + } +} diff --git a/df_engine_extensions/Cargo.toml b/df_engine_extensions/Cargo.toml index ee3947888e..35639dd80b 100644 --- a/df_engine_extensions/Cargo.toml +++ b/df_engine_extensions/Cargo.toml @@ -26,7 +26,14 @@ workspace = true [dependencies] arrow = { workspace = true } +async-trait = { workspace = true } +catalog = { workspace = true, features = ["test"] } datafusion = { workspace = true } generic_error = { workspace = true } snafu = { workspace = true } table_engine = { workspace = true } + +[dev-dependencies] +common_types = { workspace = true, features = ["test"] } +insta = { version = "1.31.0" } +trace_metric = { workspace = true } diff --git a/df_engine_extensions/src/dist_sql_query/mod.rs b/df_engine_extensions/src/dist_sql_query/mod.rs index 4961adff0e..534b5a2b31 100644 --- a/df_engine_extensions/src/dist_sql_query/mod.rs +++ b/df_engine_extensions/src/dist_sql_query/mod.rs @@ -12,4 +12,36 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::{fmt, sync::Arc}; + +use async_trait::async_trait; +use datafusion::{ + error::Result as DfResult, + physical_plan::{ExecutionPlan, SendableRecordBatchStream}, +}; +use table_engine::{ + remote::model::TableIdentifier, + table::{ReadRequest, TableRef}, +}; + pub mod physical_plan; +pub mod resolver; + +/// Remote datafusion physical plan executor +#[async_trait] +pub trait RemotePhysicalPlanExecutor: Clone + fmt::Debug + Send + Sync + 'static { + async fn execute( + &self, + table: TableIdentifier, + physical_plan: Arc, + ) -> DfResult; +} + +/// Executable scan's builder +/// +/// It is not suitable to restrict the detailed implementation of executable +/// scan, so we define a builder here which return the general `ExecutionPlan`. +pub trait ExecutableScanBuilder: fmt::Debug + Send + Sync + 'static { + fn build(&self, table: TableRef, read_request: ReadRequest) + -> DfResult>; +} diff --git a/df_engine_extensions/src/dist_sql_query/physical_plan.rs b/df_engine_extensions/src/dist_sql_query/physical_plan.rs index 1d44d0a89c..04b14c8dbd 100644 --- a/df_engine_extensions/src/dist_sql_query/physical_plan.rs +++ b/df_engine_extensions/src/dist_sql_query/physical_plan.rs @@ -26,11 +26,9 @@ use datafusion::{ SendableRecordBatchStream as DfSendableRecordBatchStream, Statistics, }, }; -use table_engine::{ - provider::ScanTable, - remote::{model::TableIdentifier, RemoteEngineRef}, - table::ReadRequest, -}; +use table_engine::{remote::model::TableIdentifier, table::ReadRequest}; + +use crate::dist_sql_query::RemotePhysicalPlanExecutor; /// Placeholder of partitioned table's scan plan /// It is inexecutable actually and just for carrying the necessary information @@ -105,12 +103,37 @@ impl DisplayAs for UnresolvedPartitionedScan { /// It includes remote execution plans of sub tables, and will send them to /// related nodes to execute. #[derive(Debug)] -pub struct ResolvedPartitionedScan { - pub remote_engine: RemoteEngineRef, +pub struct ResolvedPartitionedScan { + pub remote_executor: R, pub remote_exec_plans: Vec<(TableIdentifier, Arc)>, } -impl ExecutionPlan for ResolvedPartitionedScan { +impl ResolvedPartitionedScan { + pub fn extend_remote_exec_plans( + &self, + extended_node: Arc, + ) -> DfResult>> { + let new_plans = self + .remote_exec_plans + .iter() + .map(|(table, plan)| { + extended_node + .clone() + .with_new_children(vec![plan.clone()]) + .map(|extended_plan| (table.clone(), extended_plan)) + }) + .collect::>>()?; + + let plan = ResolvedPartitionedScan { + remote_executor: self.remote_executor.clone(), + remote_exec_plans: new_plans, + }; + + Ok(Arc::new(plan)) + } +} + +impl ExecutionPlan for ResolvedPartitionedScan { fn as_any(&self) -> &dyn Any { self } @@ -158,7 +181,7 @@ impl ExecutionPlan for ResolvedPartitionedScan { } // TODO: make display for the plan more pretty. -impl DisplayAs for ResolvedPartitionedScan { +impl DisplayAs for ResolvedPartitionedScan { fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { write!( f, @@ -236,6 +259,3 @@ impl DisplayAs for UnresolvedSubTableScan { ) } } - -/// `ResolvedSubTableScan` is `ScanTable` actually. -pub type ResolvedSubTableScan = ScanTable; diff --git a/df_engine_extensions/src/dist_sql_query/resolver.rs b/df_engine_extensions/src/dist_sql_query/resolver.rs new file mode 100644 index 0000000000..a0da8af06f --- /dev/null +++ b/df_engine_extensions/src/dist_sql_query/resolver.rs @@ -0,0 +1,474 @@ +// Copyright 2023 The CeresDB Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use catalog::manager::ManagerRef as CatalogManagerRef; +use datafusion::{ + error::{DataFusionError, Result as DfResult}, + physical_plan::ExecutionPlan, +}; +use table_engine::{remote::model::TableIdentifier, table::TableRef}; + +use crate::dist_sql_query::{ + physical_plan::{ResolvedPartitionedScan, UnresolvedPartitionedScan, UnresolvedSubTableScan}, + ExecutableScanBuilder, RemotePhysicalPlanExecutor, +}; + +/// Resolver which makes datafuison dist query related plan executable. +/// +/// The reason we define a `Resolver` rather than `physical optimization rule` +/// is: As I see, physical optimization rule is responsible for optimizing a bad +/// plan to good one, rather than making a inexecutable plan executable. +/// So we define `Resolver` to make it, it may be somthing similar to task +/// generator responsible for generating task for executor to run based on +/// physical plan. +pub trait Resolver { + fn resolve(&self, plan: Arc) -> DfResult>; +} + +/// Resolver which makes the partitioned table scan plan executable +struct PartitionedScanResolver { + remote_executor: R, +} + +impl PartitionedScanResolver { + #[allow(dead_code)] + pub fn new(remote_executor: R) -> Self { + Self { remote_executor } + } + + fn resolve_plan(&self, plan: Arc) -> DfResult> { + // Leave node, let's resolve it and return. + if let Some(unresolved) = plan.as_any().downcast_ref::() { + let sub_tables = unresolved.sub_tables.clone(); + let remote_plans = sub_tables + .into_iter() + .map(|table| { + let plan = Arc::new(UnresolvedSubTableScan { + table: table.clone(), + read_request: unresolved.read_request.clone(), + }); + (table, plan as _) + }) + .collect::>(); + + return Ok(Arc::new(ResolvedPartitionedScan { + remote_executor: self.remote_executor.clone(), + remote_exec_plans: remote_plans, + })); + } + + let children = plan.children().clone(); + // Occur some node isn't table scan but without children? It should return, too. + if children.is_empty() { + return Ok(plan); + } + + // Resolve children if exist. + let mut new_children = Vec::with_capacity(children.len()); + for child in children { + let child = self.resolve_plan(child)?; + + new_children.push(child); + } + + // TODO: Push down the computation physical node here rather than simply + // rebuild. + plan.with_new_children(new_children) + } +} + +impl Resolver for PartitionedScanResolver { + fn resolve(&self, plan: Arc) -> DfResult> { + self.resolve_plan(plan) + } +} + +/// Resolver which makes the sub table scan plan executable +struct SubScanResolver { + catalog_manager: CatalogManagerRef, + scan_builder: B, +} + +impl SubScanResolver { + #[allow(dead_code)] + pub fn new(catalog_manager: CatalogManagerRef, scan_builder: B) -> Self { + Self { + catalog_manager, + scan_builder, + } + } + + fn resolve_plan(&self, plan: Arc) -> DfResult> { + // Leave node, let's resolve it and return. + if let Some(unresolved) = plan.as_any().downcast_ref::() { + let table = self.find_table(&unresolved.table)?; + return self + .scan_builder + .build(table, unresolved.read_request.clone()); + } + + let children = plan.children().clone(); + // Occur some node isn't table scan but without children? It should return, too. + if children.is_empty() { + return Ok(plan); + } + + // Resolve children if exist. + let mut new_children = Vec::with_capacity(children.len()); + for child in children { + let child = self.resolve_plan(child)?; + + new_children.push(child); + } + + plan.with_new_children(new_children) + } + + fn find_table(&self, table_ident: &TableIdentifier) -> DfResult { + let catalog = self + .catalog_manager + .catalog_by_name(&table_ident.catalog) + .map_err(|e| DataFusionError::Internal(format!("failed to find catalog, err:{e}")))? + .ok_or(DataFusionError::Internal("catalog not found".to_string()))?; + + let schema = catalog + .schema_by_name(&table_ident.schema) + .map_err(|e| DataFusionError::Internal(format!("failed to find schema, err:{e}")))? + .ok_or(DataFusionError::Internal("schema not found".to_string()))?; + + schema + .table_by_name(&table_ident.table) + .map_err(|e| DataFusionError::Internal(format!("failed to find table, err:{e}")))? + .ok_or(DataFusionError::Internal("table not found".to_string())) + } +} + +impl Resolver for SubScanResolver { + fn resolve(&self, plan: Arc) -> DfResult> { + self.resolve_plan(plan) + } +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use async_trait::async_trait; + use catalog::{manager::ManagerRef, test_util::MockCatalogManagerBuilder}; + use common_types::{projected_schema::ProjectedSchema, tests::build_schema_for_cpu}; + use datafusion::{ + error::Result as DfResult, + logical_expr::{expr_fn, Literal, Operator}, + physical_plan::{ + displayable, + expressions::{binary, col, lit}, + filter::FilterExec, + projection::ProjectionExec, + DisplayAs, ExecutionPlan, PhysicalExpr, SendableRecordBatchStream, + }, + scalar::ScalarValue, + }; + use table_engine::{ + memory::MemoryTable, + predicate::PredicateBuilder, + remote::model::TableIdentifier, + table::{ReadOptions, ReadRequest, TableId, TableRef}, + ANALYTIC_ENGINE_TYPE, + }; + use trace_metric::MetricsCollector; + + use crate::dist_sql_query::{ + physical_plan::{UnresolvedPartitionedScan, UnresolvedSubTableScan}, + resolver::{PartitionedScanResolver, RemotePhysicalPlanExecutor, SubScanResolver}, + ExecutableScanBuilder, + }; + + #[test] + fn test_resolve_simple_partitioned_scan() { + let ctx = TestContext::new(); + let plan = ctx.build_basic_partitioned_table_plan(); + let resolver = PartitionedScanResolver { + remote_executor: MockRemotePhysicalPlanExecutor, + }; + let new_plan = displayable(resolver.resolve_plan(plan).unwrap().as_ref()) + .indent(true) + .to_string(); + insta::assert_snapshot!(new_plan); + } + + #[test] + fn test_resolve_simple_sub_scan() { + let ctx = TestContext::new(); + let plan = ctx.build_basic_sub_table_plan(); + let resolver = SubScanResolver { + catalog_manager: ctx.catalog_manager(), + scan_builder: MockScanBuilder, + }; + let new_plan = displayable(resolver.resolve_plan(plan).unwrap().as_ref()) + .indent(true) + .to_string(); + insta::assert_snapshot!(new_plan); + } + + // Mock scan and its builder + #[derive(Debug)] + struct MockScanBuilder; + + impl ExecutableScanBuilder for MockScanBuilder { + fn build( + &self, + _table: TableRef, + read_request: ReadRequest, + ) -> datafusion::error::Result> { + Ok(Arc::new(MockScan { + request: read_request, + })) + } + } + + #[derive(Debug)] + struct MockScan { + request: ReadRequest, + } + + impl ExecutionPlan for MockScan { + fn as_any(&self) -> &dyn std::any::Any { + unimplemented!() + } + + fn schema(&self) -> arrow::datatypes::SchemaRef { + self.request.projected_schema.to_projected_arrow_schema() + } + + fn output_partitioning(&self) -> datafusion::physical_plan::Partitioning { + datafusion::physical_plan::Partitioning::UnknownPartitioning( + self.request.opts.read_parallelism, + ) + } + + fn output_ordering(&self) -> Option<&[datafusion::physical_expr::PhysicalSortExpr]> { + None + } + + fn children(&self) -> Vec> { + vec![] + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> datafusion::error::Result> { + unimplemented!() + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> datafusion::error::Result + { + unimplemented!() + } + + fn statistics(&self) -> datafusion::physical_plan::Statistics { + unimplemented!() + } + } + + impl DisplayAs for MockScan { + fn fmt_as( + &self, + _t: datafusion::physical_plan::DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + write!(f, "MockScan") + } + } + + // Mock remote executor + #[derive(Debug, Clone)] + struct MockRemotePhysicalPlanExecutor; + + #[async_trait] + impl RemotePhysicalPlanExecutor for MockRemotePhysicalPlanExecutor { + async fn execute( + &self, + _table: TableIdentifier, + _physical_plan: Arc, + ) -> DfResult { + unimplemented!() + } + } + + // Test context + struct TestContext { + request: ReadRequest, + sub_tables: Vec, + physical_filter: Arc, + physical_projection: Vec<(Arc, String)>, + catalog_manager: ManagerRef, + } + + impl TestContext { + fn new() -> Self { + let test_schema = build_schema_for_cpu(); + let sub_tables = vec![ + "__test_1".to_string(), + "__test_2".to_string(), + "__test_3".to_string(), + ] + .into_iter() + .map(|table| TableIdentifier { + catalog: "test_catalog".to_string(), + schema: "test_schema".to_string(), + table, + }) + .collect::>(); + + // Logical exprs. + // Projection: [time, tag1, tag2, value, field2] + let projection = vec![1_usize, 2, 3, 4, 5]; + let projected_schema = + ProjectedSchema::new(test_schema.clone(), Some(projection)).unwrap(); + // Filter: time < 1691974518000 and tag1 == 'test_tag' + let logical_filters = vec![(expr_fn::col("time") + .lt(ScalarValue::TimestampMillisecond(Some(1691974518000), None).lit())) + .and(expr_fn::col("tag1").eq("test_tag".lit()))]; + + // Physical exprs. + let arrow_projected_schema = projected_schema.to_projected_arrow_schema(); + let physical_projection = vec![ + ( + col("time", &arrow_projected_schema).unwrap(), + "time".to_string(), + ), + ( + col("tag1", &arrow_projected_schema).unwrap(), + "tag1".to_string(), + ), + ( + col("tag2", &arrow_projected_schema).unwrap(), + "tag2".to_string(), + ), + ( + col("value", &arrow_projected_schema).unwrap(), + "value".to_string(), + ), + ( + col("field2", &arrow_projected_schema).unwrap(), + "field2".to_string(), + ), + ]; + + let physical_filter1: Arc = binary( + col("time", &arrow_projected_schema).unwrap(), + Operator::Lt, + lit(ScalarValue::TimestampMillisecond(Some(1691974518000), None)), + &arrow_projected_schema, + ) + .unwrap(); + let physical_filter2: Arc = binary( + col("tag1", &arrow_projected_schema).unwrap(), + Operator::Eq, + lit("test_tag"), + &arrow_projected_schema, + ) + .unwrap(); + let physical_filter: Arc = binary( + physical_filter1, + Operator::And, + physical_filter2, + &arrow_projected_schema, + ) + .unwrap(); + + // Build the physical plan. + let predicate = PredicateBuilder::default() + .add_pushdown_exprs(&logical_filters) + .extract_time_range(&test_schema, &logical_filters) + .build(); + let read_request = ReadRequest { + request_id: 42.into(), + opts: ReadOptions::default(), + projected_schema, + predicate, + metrics_collector: MetricsCollector::default(), + }; + + // Build the test catalog + let table = Arc::new(MemoryTable::new( + "__test_1".to_string(), + TableId::from(42), + build_schema_for_cpu(), + ANALYTIC_ENGINE_TYPE.to_string(), + )); + + let catalog_manager_builder = MockCatalogManagerBuilder::new( + "test_catalog".to_string(), + "test_schema".to_string(), + vec![table], + ); + let catalog_manager = catalog_manager_builder.build(); + + Self { + request: read_request, + sub_tables, + physical_filter, + physical_projection, + catalog_manager, + } + } + + // Return test catalog manager + fn catalog_manager(&self) -> ManagerRef { + self.catalog_manager.clone() + } + + // Basic plan includes: + // Projection + // Filter + // Scan + fn build_basic_partitioned_table_plan(&self) -> Arc { + let unresolved_scan = Arc::new(UnresolvedPartitionedScan { + sub_tables: self.sub_tables.clone(), + read_request: self.request.clone(), + }); + + let filter: Arc = Arc::new( + FilterExec::try_new(self.physical_filter.clone(), unresolved_scan).unwrap(), + ); + + Arc::new(ProjectionExec::try_new(self.physical_projection.clone(), filter).unwrap()) + } + + // Basic plan includes: + // Projection + // Filter + // Scan + fn build_basic_sub_table_plan(&self) -> Arc { + let unresolved_scan = Arc::new(UnresolvedSubTableScan { + table: self.sub_tables[0].clone(), + read_request: self.request.clone(), + }); + + let filter: Arc = Arc::new( + FilterExec::try_new(self.physical_filter.clone(), unresolved_scan).unwrap(), + ); + + Arc::new(ProjectionExec::try_new(self.physical_projection.clone(), filter).unwrap()) + } + } +} diff --git a/df_engine_extensions/src/dist_sql_query/snapshots/df_engine_extensions__dist_sql_query__resolver__test__resolve_simple_partitioned_scan.snap b/df_engine_extensions/src/dist_sql_query/snapshots/df_engine_extensions__dist_sql_query__resolver__test__resolve_simple_partitioned_scan.snap new file mode 100644 index 0000000000..0c08870a36 --- /dev/null +++ b/df_engine_extensions/src/dist_sql_query/snapshots/df_engine_extensions__dist_sql_query__resolver__test__resolve_simple_partitioned_scan.snap @@ -0,0 +1,9 @@ +--- +source: df_engine_extensions/src/dist_sql_query/resolver.rs +assertion_line: 229 +expression: new_plan +--- +ProjectionExec: expr=[time@0 as time, tag1@1 as tag1, tag2@2 as tag2, value@3 as value, field2@4 as field2] + FilterExec: time@0 < 1691974518000 AND tag1@1 = test_tag + ResolvedPartitionedScan: remote_exec_plans:[(TableIdentifier { catalog: "test_catalog", schema: "test_schema", table: "__test_1" }, UnresolvedSubTableScan { table: TableIdentifier { catalog: "test_catalog", schema: "test_schema", table: "__test_1" }, read_request: ReadRequest { request_id: RequestId(42), opts: ReadOptions { batch_size: 10000, read_parallelism: 8, deadline: None }, projected_schema: ProjectedSchema { original_schema: Schema { timestamp_index: 1, tsid_index: Some(0), column_schemas: ColumnSchemas { columns: [ColumnSchema { id: 1, name: "tsid", data_type: UInt64, is_nullable: false, is_tag: false, is_dictionary: false, comment: "", escaped_name: "tsid", default_value: None }, ColumnSchema { id: 2, name: "time", data_type: Timestamp, is_nullable: false, is_tag: false, is_dictionary: false, comment: "", escaped_name: "time", default_value: None }, ColumnSchema { id: 3, name: "tag1", data_type: String, is_nullable: false, is_tag: true, is_dictionary: false, comment: "", escaped_name: "tag1", default_value: None }, ColumnSchema { id: 4, name: "tag2", data_type: String, is_nullable: false, is_tag: true, is_dictionary: false, comment: "", escaped_name: "tag2", default_value: None }, ColumnSchema { id: 5, name: "value", data_type: Int8, is_nullable: false, is_tag: false, is_dictionary: false, comment: "", escaped_name: "value", default_value: None }, ColumnSchema { id: 6, name: "field2", data_type: Float, is_nullable: false, is_tag: false, is_dictionary: false, comment: "", escaped_name: "field2", default_value: None }] }, version: 1, primary_key_indexes: [0, 1] }, projection: Some([1, 2, 3, 4, 5]) }, predicate: Predicate { exprs: [BinaryExpr(BinaryExpr { left: BinaryExpr(BinaryExpr { left: Column(Column { relation: None, name: "time" }), op: Lt, right: Literal(TimestampMillisecond(1691974518000, None)) }), op: And, right: BinaryExpr(BinaryExpr { left: Column(Column { relation: None, name: "tag1" }), op: Eq, right: Literal(Utf8("test_tag")) }) })], time_range: TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(1691974518000) } }, metrics_collector: MetricsCollector { name: "", metrics: Mutex { data: [], poisoned: false, .. }, children: Mutex { data: [], poisoned: false, .. } } } }), (TableIdentifier { catalog: "test_catalog", schema: "test_schema", table: "__test_2" }, UnresolvedSubTableScan { table: TableIdentifier { catalog: "test_catalog", schema: "test_schema", table: "__test_2" }, read_request: ReadRequest { request_id: RequestId(42), opts: ReadOptions { batch_size: 10000, read_parallelism: 8, deadline: None }, projected_schema: ProjectedSchema { original_schema: Schema { timestamp_index: 1, tsid_index: Some(0), column_schemas: ColumnSchemas { columns: [ColumnSchema { id: 1, name: "tsid", data_type: UInt64, is_nullable: false, is_tag: false, is_dictionary: false, comment: "", escaped_name: "tsid", default_value: None }, ColumnSchema { id: 2, name: "time", data_type: Timestamp, is_nullable: false, is_tag: false, is_dictionary: false, comment: "", escaped_name: "time", default_value: None }, ColumnSchema { id: 3, name: "tag1", data_type: String, is_nullable: false, is_tag: true, is_dictionary: false, comment: "", escaped_name: "tag1", default_value: None }, ColumnSchema { id: 4, name: "tag2", data_type: String, is_nullable: false, is_tag: true, is_dictionary: false, comment: "", escaped_name: "tag2", default_value: None }, ColumnSchema { id: 5, name: "value", data_type: Int8, is_nullable: false, is_tag: false, is_dictionary: false, comment: "", escaped_name: "value", default_value: None }, ColumnSchema { id: 6, name: "field2", data_type: Float, is_nullable: false, is_tag: false, is_dictionary: false, comment: "", escaped_name: "field2", default_value: None }] }, version: 1, primary_key_indexes: [0, 1] }, projection: Some([1, 2, 3, 4, 5]) }, predicate: Predicate { exprs: [BinaryExpr(BinaryExpr { left: BinaryExpr(BinaryExpr { left: Column(Column { relation: None, name: "time" }), op: Lt, right: Literal(TimestampMillisecond(1691974518000, None)) }), op: And, right: BinaryExpr(BinaryExpr { left: Column(Column { relation: None, name: "tag1" }), op: Eq, right: Literal(Utf8("test_tag")) }) })], time_range: TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(1691974518000) } }, metrics_collector: MetricsCollector { name: "", metrics: Mutex { data: [], poisoned: false, .. }, children: Mutex { data: [], poisoned: false, .. } } } }), (TableIdentifier { catalog: "test_catalog", schema: "test_schema", table: "__test_3" }, UnresolvedSubTableScan { table: TableIdentifier { catalog: "test_catalog", schema: "test_schema", table: "__test_3" }, read_request: ReadRequest { request_id: RequestId(42), opts: ReadOptions { batch_size: 10000, read_parallelism: 8, deadline: None }, projected_schema: ProjectedSchema { original_schema: Schema { timestamp_index: 1, tsid_index: Some(0), column_schemas: ColumnSchemas { columns: [ColumnSchema { id: 1, name: "tsid", data_type: UInt64, is_nullable: false, is_tag: false, is_dictionary: false, comment: "", escaped_name: "tsid", default_value: None }, ColumnSchema { id: 2, name: "time", data_type: Timestamp, is_nullable: false, is_tag: false, is_dictionary: false, comment: "", escaped_name: "time", default_value: None }, ColumnSchema { id: 3, name: "tag1", data_type: String, is_nullable: false, is_tag: true, is_dictionary: false, comment: "", escaped_name: "tag1", default_value: None }, ColumnSchema { id: 4, name: "tag2", data_type: String, is_nullable: false, is_tag: true, is_dictionary: false, comment: "", escaped_name: "tag2", default_value: None }, ColumnSchema { id: 5, name: "value", data_type: Int8, is_nullable: false, is_tag: false, is_dictionary: false, comment: "", escaped_name: "value", default_value: None }, ColumnSchema { id: 6, name: "field2", data_type: Float, is_nullable: false, is_tag: false, is_dictionary: false, comment: "", escaped_name: "field2", default_value: None }] }, version: 1, primary_key_indexes: [0, 1] }, projection: Some([1, 2, 3, 4, 5]) }, predicate: Predicate { exprs: [BinaryExpr(BinaryExpr { left: BinaryExpr(BinaryExpr { left: Column(Column { relation: None, name: "time" }), op: Lt, right: Literal(TimestampMillisecond(1691974518000, None)) }), op: And, right: BinaryExpr(BinaryExpr { left: Column(Column { relation: None, name: "tag1" }), op: Eq, right: Literal(Utf8("test_tag")) }) })], time_range: TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(1691974518000) } }, metrics_collector: MetricsCollector { name: "", metrics: Mutex { data: [], poisoned: false, .. }, children: Mutex { data: [], poisoned: false, .. } } } })], partition_count=3 + diff --git a/df_engine_extensions/src/dist_sql_query/snapshots/df_engine_extensions__dist_sql_query__resolver__test__resolve_simple_sub_scan.snap b/df_engine_extensions/src/dist_sql_query/snapshots/df_engine_extensions__dist_sql_query__resolver__test__resolve_simple_sub_scan.snap new file mode 100644 index 0000000000..a4bf35ad57 --- /dev/null +++ b/df_engine_extensions/src/dist_sql_query/snapshots/df_engine_extensions__dist_sql_query__resolver__test__resolve_simple_sub_scan.snap @@ -0,0 +1,9 @@ +--- +source: df_engine_extensions/src/dist_sql_query/resolver.rs +assertion_line: 218 +expression: new_plan +--- +ProjectionExec: expr=[time@0 as time, tag1@1 as tag1, tag2@2 as tag2, value@3 as value, field2@4 as field2] + FilterExec: time@0 < 1691974518000 AND tag1@1 = test_tag + MockScan + diff --git a/table_engine/src/memory.rs b/table_engine/src/memory.rs index 3d81c31726..f3c685134d 100644 --- a/table_engine/src/memory.rs +++ b/table_engine/src/memory.rs @@ -39,6 +39,11 @@ use crate::{ CloseShardRequest, CloseTableRequest, CreateTableRequest, DropTableRequest, OpenShardRequest, OpenShardResult, OpenTableRequest, TableEngine, }, + remote::{ + self, + model::{self, GetTableInfoRequest, WriteBatchResult}, + RemoteEngine, + }, stream::{ self, ErrNoSource, ErrWithSource, PartitionedStreams, RecordBatchStream, SendableRecordBatchStream, @@ -324,3 +329,35 @@ impl TableEngine for MemoryTableEngine { vec![Ok("".to_string())] } } + +/// Mock remote engine +#[derive(Debug)] +pub struct MockRemoteEngine; + +#[async_trait] +impl RemoteEngine for MockRemoteEngine { + async fn read( + &self, + _request: remote::model::ReadRequest, + ) -> remote::Result { + todo!() + } + + async fn write(&self, _request: remote::model::WriteRequest) -> remote::Result { + todo!() + } + + async fn write_batch( + &self, + _requests: Vec, + ) -> remote::Result> { + todo!() + } + + async fn get_table_info( + &self, + _request: GetTableInfoRequest, + ) -> remote::Result { + todo!() + } +}