Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: teach ceresdb to generate UnresolvedPartitionedScan for partitioned table #1148

Merged
merged 19 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ members = [
"components/trace_metric_derive",
"components/trace_metric_derive_tests",
"components/tracing_util",
"df_engine_extensions",
"df_operator",
"integration_tests",
"integration_tests/sdk/rust",
Expand Down Expand Up @@ -108,6 +109,7 @@ common_types = { path = "common_types" }
datafusion = { git = "https://github.com/CeresDB/arrow-datafusion.git", rev = "9c3a537e25e5ab3299922864034f67fb2f79805d" }
datafusion-proto = { git = "https://github.com/CeresDB/arrow-datafusion.git", rev = "9c3a537e25e5ab3299922864034f67fb2f79805d" }
df_operator = { path = "df_operator" }
df_engine_extensions = { path = "df_engine_extensions" }
future_ext = { path = "components/future_ext" }
etcd-client = "0.10.3"
env_logger = "0.6"
Expand Down
4 changes: 4 additions & 0 deletions common_types/src/projected_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ impl ProjectedSchema {
pub fn to_projected_arrow_schema(&self) -> ArrowSchemaRef {
self.0.record_schema.to_arrow_schema_ref()
}

pub fn original_schema(&self) -> &Schema {
&self.0.original_schema
}
}

impl From<ProjectedSchema> for ceresdbproto::schema::ProjectedSchema {
Expand Down
32 changes: 32 additions & 0 deletions df_engine_extensions/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Copyright 2023 The CeresDB Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

[package]
name = "df_engine_extensions"

[package.version]
workspace = true

[package.authors]
workspace = true

[package.edition]
workspace = true

[dependencies]
arrow = { workspace = true }
datafusion = { workspace = true }
generic_error = { workspace = true }
snafu = { workspace = true }
table_engine = { workspace = true }
16 changes: 16 additions & 0 deletions df_engine_extensions/src/dist_sql_query/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright 2023 The CeresDB Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod partitioned_table_scan;
pub mod sub_table_scan;
164 changes: 164 additions & 0 deletions df_engine_extensions/src/dist_sql_query/partitioned_table_scan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Copyright 2023 The CeresDB Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{any::Any, fmt, sync::Arc};

use arrow::datatypes::SchemaRef;
use datafusion::{
error::{DataFusionError, Result as DfResult},
execution::TaskContext,
physical_expr::PhysicalSortExpr,
physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
SendableRecordBatchStream as DfSendableRecordBatchStream, Statistics,
},
};
use table_engine::{
remote::{model::TableIdentifier, RemoteEngineRef},
table::ReadRequest,
};

/// Unresolved partitioned table scan which can't be executed before resolving
#[derive(Debug)]
pub struct UnresolvedPartitionedScan {
pub sub_tables: Vec<TableIdentifier>,
pub read_request: ReadRequest,
}

impl ExecutionPlan for UnresolvedPartitionedScan {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
self.read_request
.projected_schema
.to_projected_arrow_schema()
}

fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(self.sub_tables.len())
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
}

fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> DfResult<Arc<dyn ExecutionPlan>> {
Err(DataFusionError::Internal(
"UnresolvedPartitionedScan should not have children".to_string(),
))
}

fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
) -> DfResult<DfSendableRecordBatchStream> {
Err(DataFusionError::Internal(
"UnresolvedPartitionedScan can not be executed".to_string(),
))
}

fn statistics(&self) -> Statistics {
Statistics::default()
}
}

impl DisplayAs for UnresolvedPartitionedScan {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"UnresolvedPartitionedScan: sub_tables={:?}, read_request:{:?}, partition_count={}",
self.sub_tables,
self.read_request,
self.output_partitioning().partition_count(),
)
}
}

/// Executable partitioned table scan
///
/// It will send the `remote_exec_plan`s to corresponding nodes to execute.
#[derive(Debug)]
pub struct ResolvedPartitionedScan {
pub remote_engine: RemoteEngineRef,
pub remote_exec_plans: Vec<(TableIdentifier, Arc<dyn ExecutionPlan>)>,
}

impl ExecutionPlan for ResolvedPartitionedScan {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
self.remote_exec_plans
.first()
.expect("remote_exec_plans should not be empty")
.1
.schema()
}

fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(self.remote_exec_plans.len())
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
}

fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> DfResult<Arc<dyn ExecutionPlan>> {
Err(DataFusionError::Internal(
"UnresolvedPartitionedScan should not have children".to_string(),
))
}

fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
) -> DfResult<DfSendableRecordBatchStream> {
todo!()
}

fn statistics(&self) -> Statistics {
Statistics::default()
}
}

// TODO: make display for the plan more pretty.
impl DisplayAs for ResolvedPartitionedScan {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"ResolvedPartitionedScan: remote_exec_plans:{:?}, partition_count={}",
self.remote_exec_plans,
self.output_partitioning().partition_count(),
)
}
}
96 changes: 96 additions & 0 deletions df_engine_extensions/src/dist_sql_query/sub_table_scan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright 2023 The CeresDB Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{any::Any, fmt, sync::Arc};

use arrow::datatypes::SchemaRef;
use datafusion::{
error::{DataFusionError, Result as DfResult},
execution::TaskContext,
physical_expr::PhysicalSortExpr,
physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
SendableRecordBatchStream as DfSendableRecordBatchStream, Statistics,
},
};
use table_engine::{provider::ScanTable, remote::model::TableIdentifier, table::ReadRequest};

/// Unresolved sub table scan which can't be executed before resolving
jiacai2050 marked this conversation as resolved.
Show resolved Hide resolved
#[derive(Debug)]
pub struct UnresolvedSubTableScan {
pub table: TableIdentifier,
pub read_request: ReadRequest,
}

impl ExecutionPlan for UnresolvedSubTableScan {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
self.read_request
.projected_schema
.to_projected_arrow_schema()
}

fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(self.read_request.opts.read_parallelism)
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
}

fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> DfResult<Arc<dyn ExecutionPlan>> {
Err(DataFusionError::Internal(
"UnresolvedSubTableScan should not have children".to_string(),
))
}

fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
) -> DfResult<DfSendableRecordBatchStream> {
Err(DataFusionError::Internal(
"UnresolvedSubTableScan can not be executed".to_string(),
))
}

fn statistics(&self) -> Statistics {
Statistics::default()
}
}

impl DisplayAs for UnresolvedSubTableScan {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"UnresolvedSubTableScan: table={:?}, read_request:{:?}, partition_count={}",
self.table,
self.read_request,
self.output_partitioning().partition_count(),
)
}
}

/// `ResolvedSubTableScan` is `ScanTable` actually.
pub type ResolvedSubTableScan = ScanTable;
17 changes: 17 additions & 0 deletions df_engine_extensions/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright 2023 The CeresDB Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.

pub mod dist_sql_query;
Loading
Loading