Skip to content

Commit

Permalink
feat: refactor Resolver in dist sql query (#1186)
Browse files Browse the repository at this point in the history
## Rationale
Part of #1112 
The old resolver has some problems, need to refactor before adding it
into the main query process.

## Detailed Changes
+ Refactor `Resolver` in dist sql query.

## Test Plan
Test by richer unit tests.
  • Loading branch information
Rachelint authored Sep 8, 2023
1 parent 0451a9c commit 6f90a7c
Show file tree
Hide file tree
Showing 8 changed files with 590 additions and 189 deletions.
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion df_engine_extensions/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,20 @@ workspace = true

[dependencies]
arrow = { workspace = true }
async-recursion = "1.0.4"
async-trait = { workspace = true }
catalog = { workspace = true, features = ["test"] }
ceresdbproto = { workspace = true }
common_types = { workspace = true, features = ["test"] }
datafusion = { workspace = true }
datafusion-proto = { workspace = true }
futures = { workspace = true }
generic_error = { workspace = true }
prost = { workspace = true }
snafu = { workspace = true }
table_engine = { workspace = true }

[dev-dependencies]
common_types = { workspace = true, features = ["test"] }
insta = { version = "1.31.0" }
tokio = { workspace = true }
trace_metric = { workspace = true }
31 changes: 24 additions & 7 deletions df_engine_extensions/src/dist_sql_query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@
use std::{fmt, sync::Arc};

use async_trait::async_trait;
use common_types::schema::RecordSchema;
use datafusion::{
error::Result as DfResult,
execution::TaskContext,
physical_plan::{ExecutionPlan, SendableRecordBatchStream},
};
use futures::future::BoxFuture;
use prost::bytes::Bytes;
use table_engine::{
remote::model::TableIdentifier,
table::{ReadRequest, TableRef},
Expand All @@ -31,20 +35,33 @@ pub mod resolver;
pub mod test_util;

/// Remote datafusion physical plan executor
#[async_trait]
pub trait RemotePhysicalPlanExecutor: Clone + fmt::Debug + Send + Sync + 'static {
async fn execute(
pub trait RemotePhysicalPlanExecutor: fmt::Debug + Send + Sync + 'static {
fn execute(
&self,
table: TableIdentifier,
physical_plan: Arc<dyn ExecutionPlan>,
) -> DfResult<SendableRecordBatchStream>;
task_context: &TaskContext,
encoded_plan: EncodedPlan,
) -> DfResult<BoxFuture<'static, DfResult<SendableRecordBatchStream>>>;
}

pub struct EncodedPlan {
pub plan: Bytes,
pub schema: RecordSchema,
}

type RemotePhysicalPlanExecutorRef = Arc<dyn RemotePhysicalPlanExecutor>;

/// Executable scan's builder
///
/// It is not suitable to restrict the detailed implementation of executable
/// scan, so we define a builder here which return the general `ExecutionPlan`.
#[async_trait]
pub trait ExecutableScanBuilder: fmt::Debug + Send + Sync + 'static {
fn build(&self, table: TableRef, read_request: ReadRequest)
-> DfResult<Arc<dyn ExecutionPlan>>;
async fn build(
&self,
table: TableRef,
read_request: ReadRequest,
) -> DfResult<Arc<dyn ExecutionPlan>>;
}

type ExecutableScanBuilderRef = Box<dyn ExecutableScanBuilder>;
Loading

0 comments on commit 6f90a7c

Please sign in to comment.