Skip to content

Commit

Permalink
chore: define QueryEngine and wrap all things into it (#1160)
Browse files Browse the repository at this point in the history
## Rationale
When I add `PhysicalPlanEncoder`, I found there have been too many
sub-modules in `query_engine` crate now... And I foudn in fact we always
use them together, so I decide to define `QueryEngine` trait to
integrate them.

## Detailed Changes
+ Add `PhysicalPlanEncoder` and its datafusion impl.
+ Refactor `Executor`.
+ Define `QueryEngine` to integrate `PhysicalPlanner`,
`PhysicalPlanEncoder`, `Executor`.

## Test Plan
Test by exist tests.
  • Loading branch information
Rachelint authored Aug 19, 2023
1 parent 422bba7 commit 50f8a72
Show file tree
Hide file tree
Showing 50 changed files with 495 additions and 348 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ arena = { path = "components/arena" }
async-stream = "0.3.4"
async-trait = "0.1.72"
base64 = "0.13"
bytes = "1.1.0"
bytes = "1"
bytes_ext = { path = "components/bytes_ext" }
catalog = { path = "catalog" }
catalog_impls = { path = "catalog_impls" }
Expand Down Expand Up @@ -188,6 +188,7 @@ catalog = { workspace = true }
catalog_impls = { workspace = true }
clap = { workspace = true }
cluster = { workspace = true }
datafusion = { workspace = true }
df_operator = { workspace = true }
etcd-client = { workspace = true }
interpreters = { workspace = true }
Expand Down
64 changes: 60 additions & 4 deletions df_operator/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,18 @@

//! Function registry.

use std::{collections::HashMap, sync::Arc};

use std::{
collections::{HashMap, HashSet},
sync::Arc,
};

use datafusion::{
error::{DataFusionError, Result as DfResult},
execution::FunctionRegistry as DfFunctionRegistry,
logical_expr::{
AggregateUDF as DfAggregateUDF, ScalarUDF as DfScalarUDF, WindowUDF as DfWindowUDF,
},
};
use macros::define_result;
use snafu::{ensure, Backtrace, Snafu};

Expand All @@ -31,6 +41,7 @@ define_result!(Error);

/// A registry knows how to build logical expressions out of user-defined
/// function' names
// TODO: maybe unnecessary to define inner trait rather than using datafusion's?
pub trait FunctionRegistry {
fn register_udf(&mut self, udf: ScalarUdf) -> Result<()>;

Expand All @@ -41,6 +52,9 @@ pub trait FunctionRegistry {
fn find_udaf(&self, name: &str) -> Result<Option<AggregateUdf>>;

fn list_udfs(&self) -> Result<Vec<ScalarUdf>>;

// TODO: can we remove restriction about `Send` and `Sync`?
fn to_df_function_registry(self: Arc<Self>) -> Arc<dyn DfFunctionRegistry + Send + Sync>;
}

/// Default function registry.
Expand Down Expand Up @@ -96,8 +110,50 @@ impl FunctionRegistry for FunctionRegistryImpl {
}

fn list_udfs(&self) -> Result<Vec<ScalarUdf>> {
let udfs = self.scalar_functions.values().cloned().collect();
Ok(udfs)
Ok(self.scalar_functions.values().cloned().collect())
}

fn to_df_function_registry(self: Arc<Self>) -> Arc<dyn DfFunctionRegistry + Send + Sync> {
Arc::new(DfFunctionRegistryAdapter(self))
}
}

struct DfFunctionRegistryAdapter(FunctionRegistryRef);

impl DfFunctionRegistry for DfFunctionRegistryAdapter {
fn udfs(&self) -> HashSet<String> {
self.0
.list_udfs()
.expect("failed to list udfs")
.into_iter()
.map(|f| f.name().to_string())
.collect()
}

fn udf(&self, name: &str) -> DfResult<Arc<DfScalarUDF>> {
self.0
.find_udf(name)
.map_err(|e| DataFusionError::Internal(format!("failed to find udf, err:{e}")))?
.ok_or(DataFusionError::Internal(format!(
"udf not found, name:{name}"
)))
.map(|f| f.to_datafusion_udf())
}

fn udaf(&self, name: &str) -> DfResult<Arc<DfAggregateUDF>> {
self.0
.find_udaf(name)
.map_err(|e| DataFusionError::Internal(format!("failed to find udaf, err:{e}")))?
.ok_or(DataFusionError::Internal(format!(
"udaf not found, name:{name}"
)))
.map(|f| f.to_datafusion_udaf())
}

fn udwf(&self, _name: &str) -> DfResult<Arc<DfWindowUDF>> {
Err(DataFusionError::Internal(
"no udwfs defined now".to_string(),
))
}
}

Expand Down
1 change: 1 addition & 0 deletions interpreters/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ common_types = { workspace = true }
datafusion = { workspace = true }
datafusion-proto = { workspace = true }
df_operator = { workspace = true }
futures = { workspace = true }
generic_error = { workspace = true }
hash_ext = { workspace = true }
log = { workspace = true }
Expand Down
6 changes: 3 additions & 3 deletions interpreters/src/describe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ use arrow::{
};
use async_trait::async_trait;
use macros::define_result;
use query_engine::executor::RecordBatchVec;
use query_frontend::plan::DescribeTablePlan;
use snafu::{ResultExt, Snafu};
use table_engine::table::TableRef;

use crate::interpreter::{
Describe, Interpreter, InterpreterPtr, Output, Result as InterpreterResult,
use crate::{
interpreter::{Describe, Interpreter, InterpreterPtr, Output, Result as InterpreterResult},
RecordBatchVec,
};

#[derive(Debug, Snafu)]
Expand Down
6 changes: 3 additions & 3 deletions interpreters/src/exists.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ use arrow::{
};
use async_trait::async_trait;
use macros::define_result;
use query_engine::executor::RecordBatchVec;
use query_frontend::plan::ExistsTablePlan;
use snafu::{ResultExt, Snafu};

use crate::interpreter::{
Exists, Interpreter, InterpreterPtr, Output, Result as InterpreterResult,
use crate::{
interpreter::{Exists, Interpreter, InterpreterPtr, Output, Result as InterpreterResult},
RecordBatchVec,
};

#[derive(Debug, Snafu)]
Expand Down
14 changes: 7 additions & 7 deletions interpreters/src/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
//! Interpreter factory

use catalog::manager::ManagerRef;
use query_engine::{executor::Executor, physical_planner::PhysicalPlanner};
use query_engine::{executor::ExecutorRef, physical_planner::PhysicalPlannerRef};
use query_frontend::plan::Plan;
use table_engine::engine::TableEngineRef;

Expand All @@ -35,18 +35,18 @@ use crate::{
};

/// A factory to create interpreters
pub struct Factory<Q, P> {
query_executor: Q,
physical_planner: P,
pub struct Factory {
query_executor: ExecutorRef,
physical_planner: PhysicalPlannerRef,
catalog_manager: ManagerRef,
table_engine: TableEngineRef,
table_manipulator: TableManipulatorRef,
}

impl<Q: Executor + 'static, P: PhysicalPlanner> Factory<Q, P> {
impl Factory {
pub fn new(
query_executor: Q,
physical_planner: P,
query_executor: ExecutorRef,
physical_planner: PhysicalPlannerRef,
catalog_manager: ManagerRef,
table_engine: TableEngineRef,
table_manipulator: TableManipulatorRef,
Expand Down
3 changes: 2 additions & 1 deletion interpreters/src/interpreter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@

use async_trait::async_trait;
use macros::define_result;
use query_engine::executor::RecordBatchVec;
use snafu::Snafu;

use crate::RecordBatchVec;

// Make the variant closer to actual error code like invalid arguments.
#[derive(Debug, Snafu)]
#[snafu(visibility(pub(crate)))]
Expand Down
5 changes: 5 additions & 0 deletions interpreters/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

#![feature(string_remove_matches)]

use common_types::record_batch::RecordBatch;

pub mod alter_table;
pub mod context;
pub mod create;
Expand All @@ -36,3 +38,6 @@ mod show_create;

#[cfg(test)]
mod tests;

// Use a type alias so that we are able to replace the implementation
pub type RecordBatchVec = Vec<RecordBatch>;
49 changes: 36 additions & 13 deletions interpreters/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,42 +15,46 @@
//! Interpreter for select statement

use async_trait::async_trait;
use futures::TryStreamExt;
use generic_error::{BoxError, GenericError};
use log::debug;
use macros::define_result;
use query_engine::{executor::Executor, physical_planner::PhysicalPlanner};
use query_engine::{executor::ExecutorRef, physical_planner::PhysicalPlannerRef};
use query_frontend::plan::QueryPlan;
use snafu::{ResultExt, Snafu};
use table_engine::stream::SendableRecordBatchStream;

use crate::{
context::Context,
interpreter::{Interpreter, InterpreterPtr, Output, Result as InterpreterResult, Select},
RecordBatchVec,
};

#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Failed to create query context, err:{}", source))]
CreateQueryContext { source: crate::context::Error },

#[snafu(display("Failed to execute logical plan, err:{}", source))]
ExecutePlan { source: query_engine::error::Error },
#[snafu(display("Failed to execute physical plan, msg:{}, err:{}", msg, source))]
ExecutePlan { msg: String, source: GenericError },
}

define_result!(Error);

/// Select interpreter
pub struct SelectInterpreter<T, P> {
pub struct SelectInterpreter {
ctx: Context,
plan: QueryPlan,
executor: T,
physical_planner: P,
executor: ExecutorRef,
physical_planner: PhysicalPlannerRef,
}

impl<T: Executor + 'static, P: PhysicalPlanner> SelectInterpreter<T, P> {
impl SelectInterpreter {
pub fn create(
ctx: Context,
plan: QueryPlan,
executor: T,
physical_planner: P,
executor: ExecutorRef,
physical_planner: PhysicalPlannerRef,
) -> InterpreterPtr {
Box::new(Self {
ctx,
Expand All @@ -62,7 +66,7 @@ impl<T: Executor + 'static, P: PhysicalPlanner> SelectInterpreter<T, P> {
}

#[async_trait]
impl<T: Executor, P: PhysicalPlanner> Interpreter for SelectInterpreter<T, P> {
impl Interpreter for SelectInterpreter {
async fn execute(self: Box<Self>) -> InterpreterResult<Output> {
let request_id = self.ctx.request_id();
debug!(
Expand All @@ -81,21 +85,40 @@ impl<T: Executor, P: PhysicalPlanner> Interpreter for SelectInterpreter<T, P> {
.physical_planner
.plan(&query_ctx, self.plan)
.await
.context(ExecutePlan)
.box_err()
.context(ExecutePlan {
msg: "failed to build physical plan",
})
.context(Select)?;

let record_batches = self
let record_batch_stream = self
.executor
.execute(&query_ctx, physical_plan)
.await
.context(ExecutePlan)
.box_err()
.context(ExecutePlan {
msg: "failed to execute physical plan",
})
.context(Select)?;

debug!(
"Interpreter execute select finish, request_id:{}",
request_id
);

let record_batches = collect(record_batch_stream).await?;

Ok(Output::Records(record_batches))
}
}

async fn collect(stream: SendableRecordBatchStream) -> InterpreterResult<RecordBatchVec> {
stream
.try_collect()
.await
.box_err()
.context(ExecutePlan {
msg: "failed to collect execution results",
})
.context(Select)
}
2 changes: 1 addition & 1 deletion interpreters/src/show_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ use arrow::{
use datafusion::logical_expr::Expr;
use datafusion_proto::bytes::Serializeable;
use log::error;
use query_engine::executor::RecordBatchVec;
use query_frontend::{ast::ShowCreateObject, plan::ShowCreatePlan};
use snafu::ensure;
use table_engine::{partition::PartitionInfo, table::TableRef};

use crate::{
interpreter::Output,
show::{Result, UnsupportedType},
RecordBatchVec,
};

pub struct ShowCreateInterpreter {
Expand Down
Loading

0 comments on commit 50f8a72

Please sign in to comment.