Skip to content

Commit

Permalink
refactor: refactor query engine (#1137)
Browse files Browse the repository at this point in the history
## Rationale
Close #1136 
Part of #1112 

The query engine impl now is too messy, you can #1136 . Worse, it can't
support physical plan's remote execution, which is necessary for #1112.

In this pr, I refactor it for solving problems mentioned above.

## Detailed Changes
+ Split `physical plan` and `physical planner`'s trait definition and
impl.
+ Split `physical plan`'s creation and execution.
+ Modify the call path for making it runable again.

## Test Plan
Test by exist tests.

---------

Co-authored-by: tanruixiang <tanruixiang0104@gmail.com>
  • Loading branch information
Rachelint and tanruixiang authored Aug 7, 2023
1 parent 442dd5f commit cd175e2
Show file tree
Hide file tree
Showing 50 changed files with 746 additions and 619 deletions.
13 changes: 9 additions & 4 deletions interpreters/src/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
//! Interpreter factory

use catalog::manager::ManagerRef;
use query_engine::executor::Executor;
use query_engine::{executor::Executor, physical_planner::PhysicalPlanner};
use query_frontend::plan::Plan;
use table_engine::engine::TableEngineRef;

Expand All @@ -35,22 +35,25 @@ use crate::{
};

/// A factory to create interpreters
pub struct Factory<Q> {
pub struct Factory<Q, P> {
query_executor: Q,
physical_planner: P,
catalog_manager: ManagerRef,
table_engine: TableEngineRef,
table_manipulator: TableManipulatorRef,
}

impl<Q: Executor + 'static> Factory<Q> {
impl<Q: Executor + 'static, P: PhysicalPlanner> Factory<Q, P> {
pub fn new(
query_executor: Q,
physical_planner: P,
catalog_manager: ManagerRef,
table_engine: TableEngineRef,
table_manipulator: TableManipulatorRef,
) -> Self {
Self {
query_executor,
physical_planner,
catalog_manager,
table_engine,
table_manipulator,
Expand All @@ -65,7 +68,9 @@ impl<Q: Executor + 'static> Factory<Q> {
validator.validate(&plan)?;

let interpreter = match plan {
Plan::Query(p) => SelectInterpreter::create(ctx, p, self.query_executor),
Plan::Query(p) => {
SelectInterpreter::create(ctx, p, self.query_executor, self.physical_planner)
}
Plan::Insert(p) => InsertInterpreter::create(ctx, p),
Plan::Create(p) => {
CreateInterpreter::create(ctx, p, self.table_engine, self.table_manipulator)
Expand Down
33 changes: 23 additions & 10 deletions interpreters/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
use async_trait::async_trait;
use log::debug;
use macros::define_result;
use query_engine::executor::{Executor, Query};
use query_engine::{executor::Executor, physical_planner::PhysicalPlanner};
use query_frontend::plan::QueryPlan;
use snafu::{ResultExt, Snafu};

Expand All @@ -32,32 +32,37 @@ pub enum Error {
CreateQueryContext { source: crate::context::Error },

#[snafu(display("Failed to execute logical plan, err:{}", source))]
ExecutePlan {
source: query_engine::executor::Error,
},
ExecutePlan { source: query_engine::error::Error },
}

define_result!(Error);

/// Select interpreter
pub struct SelectInterpreter<T> {
pub struct SelectInterpreter<T, P> {
ctx: Context,
plan: QueryPlan,
executor: T,
physical_planner: P,
}

impl<T: Executor + 'static> SelectInterpreter<T> {
pub fn create(ctx: Context, plan: QueryPlan, executor: T) -> InterpreterPtr {
impl<T: Executor + 'static, P: PhysicalPlanner> SelectInterpreter<T, P> {
pub fn create(
ctx: Context,
plan: QueryPlan,
executor: T,
physical_planner: P,
) -> InterpreterPtr {
Box::new(Self {
ctx,
plan,
executor,
physical_planner,
})
}
}

#[async_trait]
impl<T: Executor> Interpreter for SelectInterpreter<T> {
impl<T: Executor, P: PhysicalPlanner> Interpreter for SelectInterpreter<T, P> {
async fn execute(self: Box<Self>) -> InterpreterResult<Output> {
let request_id = self.ctx.request_id();
debug!(
Expand All @@ -70,10 +75,18 @@ impl<T: Executor> Interpreter for SelectInterpreter<T> {
.new_query_context()
.context(CreateQueryContext)
.context(Select)?;
let query = Query::new(self.plan);

// Create physical plan.
let physical_plan = self
.physical_planner
.plan(&query_ctx, self.plan)
.await
.context(ExecutePlan)
.context(Select)?;

let record_batches = self
.executor
.execute_logical_plan(query_ctx, query)
.execute(&query_ctx, physical_plan)
.await
.context(ExecutePlan)
.context(Select)?;
Expand Down
16 changes: 11 additions & 5 deletions interpreters/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ use catalog::{
};
use catalog_impls::table_based::TableBasedManager;
use common_types::request_id::RequestId;
use query_engine::{executor::ExecutorImpl, Config as QueryConfig};
use query_engine::{
datafusion_impl::physical_planner::DatafusionPhysicalPlannerImpl, executor::ExecutorImpl,
Config as QueryConfig,
};
use query_frontend::{
parser::Parser, plan::Plan, planner::Planner, provider::MetaProvider, tests::MockMetaProvider,
};
Expand Down Expand Up @@ -72,9 +75,10 @@ impl<M> Env<M>
where
M: MetaProvider,
{
async fn build_factory(&self) -> Factory<ExecutorImpl> {
async fn build_factory(&self) -> Factory<ExecutorImpl, DatafusionPhysicalPlannerImpl> {
Factory::new(
ExecutorImpl::new(query_engine::Config::default()),
ExecutorImpl,
DatafusionPhysicalPlannerImpl::new(query_engine::Config::default()),
self.catalog_manager.clone(),
self.engine(),
self.table_manipulator.clone(),
Expand Down Expand Up @@ -225,7 +229,8 @@ where
let table_operator = TableOperator::new(catalog_manager.clone());
let table_manipulator = Arc::new(TableManipulatorImpl::new(table_operator));
let insert_factory = Factory::new(
ExecutorImpl::new(QueryConfig::default()),
ExecutorImpl,
DatafusionPhysicalPlannerImpl::new(QueryConfig::default()),
catalog_manager.clone(),
self.engine(),
table_manipulator.clone(),
Expand All @@ -244,7 +249,8 @@ where
let select_sql =
"SELECT key1, key2, field1, field2, field3, field4, field5 from test_missing_columns_table";
let select_factory = Factory::new(
ExecutorImpl::new(QueryConfig::default()),
ExecutorImpl,
DatafusionPhysicalPlannerImpl::new(QueryConfig::default()),
catalog_manager,
self.engine(),
table_manipulator,
Expand Down
7 changes: 5 additions & 2 deletions proxy/src/grpc/prom_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ use generic_error::BoxError;
use http::StatusCode;
use interpreters::interpreter::Output;
use log::info;
use query_engine::executor::{Executor as QueryExecutor, RecordBatchVec};
use query_engine::{
executor::{Executor as QueryExecutor, RecordBatchVec},
physical_planner::PhysicalPlanner,
};
use query_frontend::{
frontend::{Context as SqlContext, Error as FrontendError, Frontend},
promql::ColumnNames,
Expand All @@ -47,7 +50,7 @@ use crate::{
Context, Proxy,
};

impl<Q: QueryExecutor + 'static> Proxy<Q> {
impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {
/// Implement prometheus query in grpc service.
/// Note: not used in prod now.
pub async fn handle_prom_query(
Expand Down
4 changes: 2 additions & 2 deletions proxy/src/grpc/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
// limitations under the License.

use ceresdbproto::storage::{RouteRequest, RouteResponse};
use query_engine::executor::Executor as QueryExecutor;
use query_engine::{executor::Executor as QueryExecutor, physical_planner::PhysicalPlanner};

use crate::{error, metrics::GRPC_HANDLER_COUNTER_VEC, Context, Proxy};

impl<Q: QueryExecutor + 'static> Proxy<Q> {
impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {
pub async fn handle_route(&self, _ctx: Context, req: RouteRequest) -> RouteResponse {
let routes = self.route(req).await;

Expand Down
4 changes: 2 additions & 2 deletions proxy/src/grpc/sql_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use generic_error::BoxError;
use http::StatusCode;
use interpreters::interpreter::Output;
use log::{error, warn};
use query_engine::executor::Executor as QueryExecutor;
use query_engine::{executor::Executor as QueryExecutor, physical_planner::PhysicalPlanner};
use router::endpoint::Endpoint;
use snafu::ResultExt;
use tokio::sync::mpsc;
Expand All @@ -47,7 +47,7 @@ use crate::{

const STREAM_QUERY_CHANNEL_LEN: usize = 20;

impl<Q: QueryExecutor + 'static> Proxy<Q> {
impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {
pub async fn handle_sql_query(&self, ctx: Context, req: SqlQueryRequest) -> SqlQueryResponse {
// Incoming query maybe larger than query_failed + query_succeeded for some
// corner case, like lots of time-consuming queries come in at the same time and
Expand Down
4 changes: 2 additions & 2 deletions proxy/src/grpc/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
// limitations under the License.

use ceresdbproto::storage::{WriteRequest, WriteResponse};
use query_engine::executor::Executor as QueryExecutor;
use query_engine::{executor::Executor as QueryExecutor, physical_planner::PhysicalPlanner};

use crate::{error, error::build_ok_header, metrics::GRPC_HANDLER_COUNTER_VEC, Context, Proxy};

impl<Q: QueryExecutor + 'static> Proxy<Q> {
impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {
pub async fn handle_write(&self, ctx: Context, req: WriteRequest) -> WriteResponse {
self.hotspot_recorder.inc_write_reqs(&req).await;

Expand Down
6 changes: 4 additions & 2 deletions proxy/src/handlers/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

use std::collections::BTreeSet;

use query_engine::physical_planner::PhysicalPlanner;

use crate::{handlers::prelude::*, limiter::BlockRule};

#[derive(Debug, Deserialize)]
Expand All @@ -38,9 +40,9 @@ pub struct BlockResponse {
block_rules: BTreeSet<BlockRule>,
}

pub async fn handle_block<Q: QueryExecutor + 'static>(
pub async fn handle_block<Q: QueryExecutor + 'static, P: PhysicalPlanner>(
_ctx: RequestContext,
instance: InstanceRef<Q>,
instance: InstanceRef<Q, P>,
request: BlockRequest,
) -> Result<BlockResponse> {
let limiter = &instance.limiter;
Expand Down
9 changes: 6 additions & 3 deletions proxy/src/http/prom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ use prom_remote_api::types::{
Label, LabelMatcher, Query, QueryResult, RemoteStorage, Sample, TimeSeries, WriteRequest,
};
use prost::Message;
use query_engine::executor::{Executor as QueryExecutor, RecordBatchVec};
use query_engine::{
executor::{Executor as QueryExecutor, RecordBatchVec},
physical_planner::PhysicalPlanner,
};
use query_frontend::{
frontend::{Context, Frontend},
promql::{RemoteQueryPlan, DEFAULT_FIELD_COLUMN, NAME_LABEL},
Expand All @@ -62,7 +65,7 @@ use crate::{

impl reject::Reject for Error {}

impl<Q: QueryExecutor + 'static> Proxy<Q> {
impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {
/// Handle write samples to remote storage with remote storage protocol.
async fn handle_prom_remote_write(&self, ctx: RequestContext, req: WriteRequest) -> Result<()> {
let write_table_requests = convert_write_request(req)?;
Expand Down Expand Up @@ -210,7 +213,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
}

#[async_trait]
impl<Q: QueryExecutor + 'static> RemoteStorage for Proxy<Q> {
impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> RemoteStorage for Proxy<Q, P> {
type Context = RequestContext;
type Err = Error;

Expand Down
4 changes: 2 additions & 2 deletions proxy/src/http/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use ceresdbproto::storage::RouteRequest;
use query_engine::executor::Executor as QueryExecutor;
use query_engine::{executor::Executor as QueryExecutor, physical_planner::PhysicalPlanner};
use router::endpoint::Endpoint;
use serde::Serialize;

Expand All @@ -30,7 +30,7 @@ pub struct RouteItem {
pub endpoint: Option<Endpoint>,
}

impl<Q: QueryExecutor + 'static> Proxy<Q> {
impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {
pub async fn handle_http_route(
&self,
ctx: &RequestContext,
Expand Down
7 changes: 5 additions & 2 deletions proxy/src/http/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ use common_types::{
use generic_error::BoxError;
use http::StatusCode;
use interpreters::interpreter::Output;
use query_engine::executor::{Executor as QueryExecutor, RecordBatchVec};
use query_engine::{
executor::{Executor as QueryExecutor, RecordBatchVec},
physical_planner::PhysicalPlanner,
};
use serde::{
ser::{SerializeMap, SerializeSeq},
Deserialize, Serialize,
Expand All @@ -40,7 +43,7 @@ use crate::{
Context, Proxy,
};

impl<Q: QueryExecutor + 'static> Proxy<Q> {
impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {
pub async fn handle_http_sql_query(
&self,
ctx: &RequestContext,
Expand Down
4 changes: 2 additions & 2 deletions proxy/src/influxdb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use generic_error::BoxError;
use http::StatusCode;
use interpreters::interpreter::Output;
use log::{debug, info};
use query_engine::executor::Executor as QueryExecutor;
use query_engine::{executor::Executor as QueryExecutor, physical_planner::PhysicalPlanner};
use query_frontend::{
frontend::{Context as SqlContext, Frontend},
provider::CatalogMetaProvider,
Expand All @@ -47,7 +47,7 @@ use crate::{
Context, Proxy,
};

impl<Q: QueryExecutor + 'static> Proxy<Q> {
impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {
pub async fn handle_influxdb_query(
&self,
ctx: RequestContext,
Expand Down
7 changes: 5 additions & 2 deletions proxy/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@ use crate::limiter::Limiter;
/// A cluster instance. Usually there is only one instance per cluster
///
/// Q: query_engine::executor::Executor
pub struct Instance<Q> {
/// P: query_engine::physical_planner::PhysicalPlanner
pub struct Instance<Q, P> {
pub catalog_manager: ManagerRef,
pub query_executor: Q,
pub physical_planner: P,

pub table_engine: TableEngineRef,
pub partition_table_engine: TableEngineRef,
// User defined functions registry.
Expand All @@ -39,4 +42,4 @@ pub struct Instance<Q> {
}

/// A reference counted instance pointer
pub type InstanceRef<Q> = Arc<Instance<Q>>;
pub type InstanceRef<Q, P> = Arc<Instance<Q, P>>;
Loading

0 comments on commit cd175e2

Please sign in to comment.