Skip to content

Commit

Permalink
feat: teach ceresdb to convert the inexecutable partitioned scan to e…
Browse files Browse the repository at this point in the history
…xecutable(resolving process) (#1161)

## Rationale
Part of #1112 
CeresDB is able to generate the specific physical plan for partitioned
table in #1148, but it is inexecutable.
This pr introduce the `resolver` to make it executable.  

## Detailed Changes
+ Add `Resolver` to convert the `UnresolvePartitionedScan` to final
executable scan plans of sub tables.
+ Add test for the resolving process.

## Test Plan
Test by new ut.
  • Loading branch information
Rachelint authored Aug 25, 2023
1 parent b835c44 commit 12d8b01
Show file tree
Hide file tree
Showing 11 changed files with 832 additions and 13 deletions.
59 changes: 58 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ workspace = true
[package.edition]
workspace = true

[features]
test = []

[dependencies]
async-trait = { workspace = true }
common_types = { workspace = true }
Expand Down
2 changes: 2 additions & 0 deletions catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ pub mod consts;
pub mod manager;
pub mod schema;
pub mod table_operator;
#[cfg(feature = "test")]
pub mod test_util;

use std::sync::Arc;

Expand Down
169 changes: 169 additions & 0 deletions catalog/src/test_util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
// Copyright 2023 The CeresDB Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{collections::HashMap, sync::Arc};

use async_trait::async_trait;
use table_engine::table::{SchemaId, TableRef};

use crate::{
manager::{Manager, ManagerRef},
schema::{
CreateOptions, CreateTableRequest, DropOptions, DropTableRequest, NameRef,
Result as SchemaResult, Schema, SchemaRef,
},
Catalog, CatalogRef, Result,
};

/// Mock catalog builder
pub struct MockCatalogManagerBuilder {
catalog: String,
schema: String,
tables: Vec<TableRef>,
}

impl MockCatalogManagerBuilder {
pub fn new(catalog: String, schema: String, tables: Vec<TableRef>) -> Self {
Self {
catalog,
schema,
tables,
}
}

pub fn build(self) -> ManagerRef {
let schema = Arc::new(MockSchema {
name: self.schema.clone(),
tables: self
.tables
.into_iter()
.map(|t| (t.name().to_string(), t))
.collect(),
});

let catalog = Arc::new(MockCatalog {
name: self.catalog.clone(),
schemas: HashMap::from([(self.schema.clone(), schema as _)]),
});

Arc::new(MockCatalogManager {
catalogs: HashMap::from([(self.catalog.clone(), catalog as _)]),
default_catalog: self.catalog,
default_schema: self.schema,
})
}
}

/// Mock catalog manager which only support default catalog and schema
///
/// You can set the default catalog and schema when initializing.
struct MockCatalogManager {
catalogs: HashMap<String, CatalogRef>,
default_catalog: String,
default_schema: String,
}

impl Manager for MockCatalogManager {
fn default_catalog_name(&self) -> crate::schema::NameRef {
&self.default_catalog
}

fn default_schema_name(&self) -> crate::schema::NameRef {
&self.default_schema
}

fn catalog_by_name(
&self,
name: crate::schema::NameRef,
) -> crate::manager::Result<Option<CatalogRef>> {
Ok(self.catalogs.get(name).cloned())
}

fn all_catalogs(&self) -> crate::manager::Result<Vec<CatalogRef>> {
Ok(self.catalogs.clone().into_values().collect())
}
}

struct MockCatalog {
name: String,
schemas: HashMap<String, SchemaRef>,
}

#[async_trait::async_trait]
impl Catalog for MockCatalog {
fn name(&self) -> NameRef {
&self.name
}

fn schema_by_name(&self, name: NameRef) -> Result<Option<SchemaRef>> {
Ok(self.schemas.get(name).cloned())
}

async fn create_schema<'a>(&'a self, _name: NameRef<'a>) -> Result<()> {
unimplemented!()
}

/// All schemas
fn all_schemas(&self) -> Result<Vec<SchemaRef>> {
Ok(self.schemas.clone().into_values().collect())
}
}

struct MockSchema {
name: String,
tables: HashMap<String, TableRef>,
}

#[async_trait]
impl Schema for MockSchema {
fn name(&self) -> NameRef {
&self.name
}

fn id(&self) -> SchemaId {
SchemaId::from_u32(42)
}

fn table_by_name(&self, name: NameRef) -> SchemaResult<Option<TableRef>> {
Ok(self.tables.get(name).cloned())
}

async fn create_table(
&self,
_request: CreateTableRequest,
_opts: CreateOptions,
) -> SchemaResult<TableRef> {
unimplemented!()
}

async fn drop_table(
&self,
_request: DropTableRequest,
_opts: DropOptions,
) -> SchemaResult<bool> {
unimplemented!()
}

fn all_tables(&self) -> SchemaResult<Vec<TableRef>> {
Ok(self.tables.clone().into_values().collect())
}

fn register_table(&self, _table: TableRef) {
unimplemented!()
}

fn unregister_table(&self, _table_name: &str) {
unimplemented!()
}
}
7 changes: 7 additions & 0 deletions df_engine_extensions/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,14 @@ workspace = true

[dependencies]
arrow = { workspace = true }
async-trait = { workspace = true }
catalog = { workspace = true, features = ["test"] }
datafusion = { workspace = true }
generic_error = { workspace = true }
snafu = { workspace = true }
table_engine = { workspace = true }

[dev-dependencies]
common_types = { workspace = true, features = ["test"] }
insta = { version = "1.31.0" }
trace_metric = { workspace = true }
32 changes: 32 additions & 0 deletions df_engine_extensions/src/dist_sql_query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,36 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{fmt, sync::Arc};

use async_trait::async_trait;
use datafusion::{
error::Result as DfResult,
physical_plan::{ExecutionPlan, SendableRecordBatchStream},
};
use table_engine::{
remote::model::TableIdentifier,
table::{ReadRequest, TableRef},
};

pub mod physical_plan;
pub mod resolver;

/// Remote datafusion physical plan executor
#[async_trait]
pub trait RemotePhysicalPlanExecutor: Clone + fmt::Debug + Send + Sync + 'static {
async fn execute(
&self,
table: TableIdentifier,
physical_plan: Arc<dyn ExecutionPlan>,
) -> DfResult<SendableRecordBatchStream>;
}

/// Executable scan's builder
///
/// It is not suitable to restrict the detailed implementation of executable
/// scan, so we define a builder here which return the general `ExecutionPlan`.
pub trait ExecutableScanBuilder: fmt::Debug + Send + Sync + 'static {
fn build(&self, table: TableRef, read_request: ReadRequest)
-> DfResult<Arc<dyn ExecutionPlan>>;
}
Loading

0 comments on commit 12d8b01

Please sign in to comment.