diff --git a/Cargo.lock b/Cargo.lock index f0a7bfd4ad..c5360466d7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1246,6 +1246,7 @@ dependencies = [ "catalog_impls", "clap 3.2.23", "cluster", + "datafusion", "df_operator", "etcd-client", "interpreters", @@ -3297,6 +3298,7 @@ dependencies = [ "datafusion", "datafusion-proto", "df_operator", + "futures 0.3.28", "generic_error", "hash_ext", "log", @@ -5355,6 +5357,7 @@ version = "1.2.6-alpha" dependencies = [ "arrow 43.0.0", "async-trait", + "bytes_ext", "chrono", "common_types", "datafusion", diff --git a/Cargo.toml b/Cargo.toml index 172a7293d5..2e66a3e2bf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -94,7 +94,7 @@ arena = { path = "components/arena" } async-stream = "0.3.4" async-trait = "0.1.72" base64 = "0.13" -bytes = "1.1.0" +bytes = "1" bytes_ext = { path = "components/bytes_ext" } catalog = { path = "catalog" } catalog_impls = { path = "catalog_impls" } @@ -188,6 +188,7 @@ catalog = { workspace = true } catalog_impls = { workspace = true } clap = { workspace = true } cluster = { workspace = true } +datafusion = { workspace = true } df_operator = { workspace = true } etcd-client = { workspace = true } interpreters = { workspace = true } diff --git a/df_operator/src/registry.rs b/df_operator/src/registry.rs index a65a29f511..57750b87d1 100644 --- a/df_operator/src/registry.rs +++ b/df_operator/src/registry.rs @@ -14,8 +14,18 @@ //! Function registry. -use std::{collections::HashMap, sync::Arc}; - +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; + +use datafusion::{ + error::{DataFusionError, Result as DfResult}, + execution::FunctionRegistry as DfFunctionRegistry, + logical_expr::{ + AggregateUDF as DfAggregateUDF, ScalarUDF as DfScalarUDF, WindowUDF as DfWindowUDF, + }, +}; use macros::define_result; use snafu::{ensure, Backtrace, Snafu}; @@ -31,6 +41,7 @@ define_result!(Error); /// A registry knows how to build logical expressions out of user-defined /// function' names +// TODO: maybe unnecessary to define inner trait rather than using datafusion's? pub trait FunctionRegistry { fn register_udf(&mut self, udf: ScalarUdf) -> Result<()>; @@ -41,6 +52,9 @@ pub trait FunctionRegistry { fn find_udaf(&self, name: &str) -> Result>; fn list_udfs(&self) -> Result>; + + // TODO: can we remove restriction about `Send` and `Sync`? + fn to_df_function_registry(self: Arc) -> Arc; } /// Default function registry. @@ -96,8 +110,50 @@ impl FunctionRegistry for FunctionRegistryImpl { } fn list_udfs(&self) -> Result> { - let udfs = self.scalar_functions.values().cloned().collect(); - Ok(udfs) + Ok(self.scalar_functions.values().cloned().collect()) + } + + fn to_df_function_registry(self: Arc) -> Arc { + Arc::new(DfFunctionRegistryAdapter(self)) + } +} + +struct DfFunctionRegistryAdapter(FunctionRegistryRef); + +impl DfFunctionRegistry for DfFunctionRegistryAdapter { + fn udfs(&self) -> HashSet { + self.0 + .list_udfs() + .expect("failed to list udfs") + .into_iter() + .map(|f| f.name().to_string()) + .collect() + } + + fn udf(&self, name: &str) -> DfResult> { + self.0 + .find_udf(name) + .map_err(|e| DataFusionError::Internal(format!("failed to find udf, err:{e}")))? + .ok_or(DataFusionError::Internal(format!( + "udf not found, name:{name}" + ))) + .map(|f| f.to_datafusion_udf()) + } + + fn udaf(&self, name: &str) -> DfResult> { + self.0 + .find_udaf(name) + .map_err(|e| DataFusionError::Internal(format!("failed to find udaf, err:{e}")))? + .ok_or(DataFusionError::Internal(format!( + "udaf not found, name:{name}" + ))) + .map(|f| f.to_datafusion_udaf()) + } + + fn udwf(&self, _name: &str) -> DfResult> { + Err(DataFusionError::Internal( + "no udwfs defined now".to_string(), + )) } } diff --git a/interpreters/Cargo.toml b/interpreters/Cargo.toml index 102dd90a8f..41d3f83501 100644 --- a/interpreters/Cargo.toml +++ b/interpreters/Cargo.toml @@ -34,6 +34,7 @@ common_types = { workspace = true } datafusion = { workspace = true } datafusion-proto = { workspace = true } df_operator = { workspace = true } +futures = { workspace = true } generic_error = { workspace = true } hash_ext = { workspace = true } log = { workspace = true } diff --git a/interpreters/src/describe.rs b/interpreters/src/describe.rs index 89e8c6df07..4dd1fe1eea 100644 --- a/interpreters/src/describe.rs +++ b/interpreters/src/describe.rs @@ -21,13 +21,13 @@ use arrow::{ }; use async_trait::async_trait; use macros::define_result; -use query_engine::executor::RecordBatchVec; use query_frontend::plan::DescribeTablePlan; use snafu::{ResultExt, Snafu}; use table_engine::table::TableRef; -use crate::interpreter::{ - Describe, Interpreter, InterpreterPtr, Output, Result as InterpreterResult, +use crate::{ + interpreter::{Describe, Interpreter, InterpreterPtr, Output, Result as InterpreterResult}, + RecordBatchVec, }; #[derive(Debug, Snafu)] diff --git a/interpreters/src/exists.rs b/interpreters/src/exists.rs index 1a50294bbc..70c927f1a1 100644 --- a/interpreters/src/exists.rs +++ b/interpreters/src/exists.rs @@ -21,12 +21,12 @@ use arrow::{ }; use async_trait::async_trait; use macros::define_result; -use query_engine::executor::RecordBatchVec; use query_frontend::plan::ExistsTablePlan; use snafu::{ResultExt, Snafu}; -use crate::interpreter::{ - Exists, Interpreter, InterpreterPtr, Output, Result as InterpreterResult, +use crate::{ + interpreter::{Exists, Interpreter, InterpreterPtr, Output, Result as InterpreterResult}, + RecordBatchVec, }; #[derive(Debug, Snafu)] diff --git a/interpreters/src/factory.rs b/interpreters/src/factory.rs index eb70e1ca4b..4827ee6f27 100644 --- a/interpreters/src/factory.rs +++ b/interpreters/src/factory.rs @@ -15,7 +15,7 @@ //! Interpreter factory use catalog::manager::ManagerRef; -use query_engine::{executor::Executor, physical_planner::PhysicalPlanner}; +use query_engine::{executor::ExecutorRef, physical_planner::PhysicalPlannerRef}; use query_frontend::plan::Plan; use table_engine::engine::TableEngineRef; @@ -35,18 +35,18 @@ use crate::{ }; /// A factory to create interpreters -pub struct Factory { - query_executor: Q, - physical_planner: P, +pub struct Factory { + query_executor: ExecutorRef, + physical_planner: PhysicalPlannerRef, catalog_manager: ManagerRef, table_engine: TableEngineRef, table_manipulator: TableManipulatorRef, } -impl Factory { +impl Factory { pub fn new( - query_executor: Q, - physical_planner: P, + query_executor: ExecutorRef, + physical_planner: PhysicalPlannerRef, catalog_manager: ManagerRef, table_engine: TableEngineRef, table_manipulator: TableManipulatorRef, diff --git a/interpreters/src/interpreter.rs b/interpreters/src/interpreter.rs index eb1467dd1c..6404882fd8 100644 --- a/interpreters/src/interpreter.rs +++ b/interpreters/src/interpreter.rs @@ -16,9 +16,10 @@ use async_trait::async_trait; use macros::define_result; -use query_engine::executor::RecordBatchVec; use snafu::Snafu; +use crate::RecordBatchVec; + // Make the variant closer to actual error code like invalid arguments. #[derive(Debug, Snafu)] #[snafu(visibility(pub(crate)))] diff --git a/interpreters/src/lib.rs b/interpreters/src/lib.rs index 900e2a27af..6547c5c840 100644 --- a/interpreters/src/lib.rs +++ b/interpreters/src/lib.rs @@ -18,6 +18,8 @@ #![feature(string_remove_matches)] +use common_types::record_batch::RecordBatch; + pub mod alter_table; pub mod context; pub mod create; @@ -36,3 +38,6 @@ mod show_create; #[cfg(test)] mod tests; + +// Use a type alias so that we are able to replace the implementation +pub type RecordBatchVec = Vec; diff --git a/interpreters/src/select.rs b/interpreters/src/select.rs index e957a1151c..198c0c39d3 100644 --- a/interpreters/src/select.rs +++ b/interpreters/src/select.rs @@ -15,15 +15,19 @@ //! Interpreter for select statement use async_trait::async_trait; +use futures::TryStreamExt; +use generic_error::{BoxError, GenericError}; use log::debug; use macros::define_result; -use query_engine::{executor::Executor, physical_planner::PhysicalPlanner}; +use query_engine::{executor::ExecutorRef, physical_planner::PhysicalPlannerRef}; use query_frontend::plan::QueryPlan; use snafu::{ResultExt, Snafu}; +use table_engine::stream::SendableRecordBatchStream; use crate::{ context::Context, interpreter::{Interpreter, InterpreterPtr, Output, Result as InterpreterResult, Select}, + RecordBatchVec, }; #[derive(Debug, Snafu)] @@ -31,26 +35,26 @@ pub enum Error { #[snafu(display("Failed to create query context, err:{}", source))] CreateQueryContext { source: crate::context::Error }, - #[snafu(display("Failed to execute logical plan, err:{}", source))] - ExecutePlan { source: query_engine::error::Error }, + #[snafu(display("Failed to execute physical plan, msg:{}, err:{}", msg, source))] + ExecutePlan { msg: String, source: GenericError }, } define_result!(Error); /// Select interpreter -pub struct SelectInterpreter { +pub struct SelectInterpreter { ctx: Context, plan: QueryPlan, - executor: T, - physical_planner: P, + executor: ExecutorRef, + physical_planner: PhysicalPlannerRef, } -impl SelectInterpreter { +impl SelectInterpreter { pub fn create( ctx: Context, plan: QueryPlan, - executor: T, - physical_planner: P, + executor: ExecutorRef, + physical_planner: PhysicalPlannerRef, ) -> InterpreterPtr { Box::new(Self { ctx, @@ -62,7 +66,7 @@ impl SelectInterpreter { } #[async_trait] -impl Interpreter for SelectInterpreter { +impl Interpreter for SelectInterpreter { async fn execute(self: Box) -> InterpreterResult { let request_id = self.ctx.request_id(); debug!( @@ -81,14 +85,20 @@ impl Interpreter for SelectInterpreter { .physical_planner .plan(&query_ctx, self.plan) .await - .context(ExecutePlan) + .box_err() + .context(ExecutePlan { + msg: "failed to build physical plan", + }) .context(Select)?; - let record_batches = self + let record_batch_stream = self .executor .execute(&query_ctx, physical_plan) .await - .context(ExecutePlan) + .box_err() + .context(ExecutePlan { + msg: "failed to execute physical plan", + }) .context(Select)?; debug!( @@ -96,6 +106,19 @@ impl Interpreter for SelectInterpreter { request_id ); + let record_batches = collect(record_batch_stream).await?; + Ok(Output::Records(record_batches)) } } + +async fn collect(stream: SendableRecordBatchStream) -> InterpreterResult { + stream + .try_collect() + .await + .box_err() + .context(ExecutePlan { + msg: "failed to collect execution results", + }) + .context(Select) +} diff --git a/interpreters/src/show_create.rs b/interpreters/src/show_create.rs index 2881f9007a..7fe28d555b 100644 --- a/interpreters/src/show_create.rs +++ b/interpreters/src/show_create.rs @@ -22,7 +22,6 @@ use arrow::{ use datafusion::logical_expr::Expr; use datafusion_proto::bytes::Serializeable; use log::error; -use query_engine::executor::RecordBatchVec; use query_frontend::{ast::ShowCreateObject, plan::ShowCreatePlan}; use snafu::ensure; use table_engine::{partition::PartitionInfo, table::TableRef}; @@ -30,6 +29,7 @@ use table_engine::{partition::PartitionInfo, table::TableRef}; use crate::{ interpreter::Output, show::{Result, UnsupportedType}, + RecordBatchVec, }; pub struct ShowCreateInterpreter { diff --git a/interpreters/src/tests.rs b/interpreters/src/tests.rs index b85178f520..62ec7ea6cc 100644 --- a/interpreters/src/tests.rs +++ b/interpreters/src/tests.rs @@ -22,9 +22,9 @@ use catalog::{ }; use catalog_impls::table_based::TableBasedManager; use common_types::request_id::RequestId; +use datafusion::execution::runtime_env::RuntimeEnv; use query_engine::{ datafusion_impl::physical_planner::DatafusionPhysicalPlannerImpl, executor::ExecutorImpl, - Config as QueryConfig, }; use query_frontend::{ parser::Parser, plan::Plan, planner::Planner, provider::MetaProvider, tests::MockMetaProvider, @@ -75,10 +75,13 @@ impl Env where M: MetaProvider, { - async fn build_factory(&self) -> Factory { + async fn build_factory(&self) -> Factory { Factory::new( - ExecutorImpl, - DatafusionPhysicalPlannerImpl::new(query_engine::Config::default()), + Arc::new(ExecutorImpl), + Arc::new(DatafusionPhysicalPlannerImpl::new( + query_engine::Config::default(), + Arc::new(RuntimeEnv::default()), + )), self.catalog_manager.clone(), self.engine(), self.table_manipulator.clone(), @@ -229,8 +232,11 @@ where let table_operator = TableOperator::new(catalog_manager.clone()); let table_manipulator = Arc::new(TableManipulatorImpl::new(table_operator)); let insert_factory = Factory::new( - ExecutorImpl, - DatafusionPhysicalPlannerImpl::new(QueryConfig::default()), + Arc::new(ExecutorImpl), + Arc::new(DatafusionPhysicalPlannerImpl::new( + query_engine::Config::default(), + Arc::new(RuntimeEnv::default()), + )), catalog_manager.clone(), self.engine(), table_manipulator.clone(), @@ -249,8 +255,11 @@ where let select_sql = "SELECT key1, key2, field1, field2, field3, field4, field5 from test_missing_columns_table"; let select_factory = Factory::new( - ExecutorImpl, - DatafusionPhysicalPlannerImpl::new(QueryConfig::default()), + Arc::new(ExecutorImpl), + Arc::new(DatafusionPhysicalPlannerImpl::new( + query_engine::Config::default(), + Arc::new(RuntimeEnv::default()), + )), catalog_manager, self.engine(), table_manipulator, diff --git a/proxy/src/grpc/prom_query.rs b/proxy/src/grpc/prom_query.rs index 9946c59898..ce31d49a2e 100644 --- a/proxy/src/grpc/prom_query.rs +++ b/proxy/src/grpc/prom_query.rs @@ -30,12 +30,8 @@ use common_types::{ }; use generic_error::BoxError; use http::StatusCode; -use interpreters::interpreter::Output; +use interpreters::{interpreter::Output, RecordBatchVec}; use log::info; -use query_engine::{ - executor::{Executor as QueryExecutor, RecordBatchVec}, - physical_planner::PhysicalPlanner, -}; use query_frontend::{ frontend::{Context as SqlContext, Error as FrontendError, Frontend}, promql::ColumnNames, @@ -49,7 +45,7 @@ use crate::{ Context, Proxy, }; -impl Proxy { +impl Proxy { /// Implement prometheus query in grpc service. /// Note: not used in prod now. pub async fn handle_prom_query( diff --git a/proxy/src/grpc/route.rs b/proxy/src/grpc/route.rs index e24de63537..bc21fd06cd 100644 --- a/proxy/src/grpc/route.rs +++ b/proxy/src/grpc/route.rs @@ -13,11 +13,10 @@ // limitations under the License. use ceresdbproto::storage::{RouteRequest, RouteResponse}; -use query_engine::{executor::Executor as QueryExecutor, physical_planner::PhysicalPlanner}; use crate::{error, metrics::GRPC_HANDLER_COUNTER_VEC, Context, Proxy}; -impl Proxy { +impl Proxy { pub async fn handle_route(&self, _ctx: Context, req: RouteRequest) -> RouteResponse { let routes = self.route(req).await; diff --git a/proxy/src/grpc/sql_query.rs b/proxy/src/grpc/sql_query.rs index 43cad296dc..0b7e5d39bc 100644 --- a/proxy/src/grpc/sql_query.rs +++ b/proxy/src/grpc/sql_query.rs @@ -30,7 +30,6 @@ use generic_error::BoxError; use http::StatusCode; use interpreters::interpreter::Output; use log::{error, warn}; -use query_engine::{executor::Executor as QueryExecutor, physical_planner::PhysicalPlanner}; use router::endpoint::Endpoint; use snafu::ResultExt; use tonic::{transport::Channel, IntoRequest}; @@ -43,7 +42,7 @@ use crate::{ Context, Proxy, }; -impl Proxy { +impl Proxy { pub async fn handle_sql_query(&self, ctx: Context, req: SqlQueryRequest) -> SqlQueryResponse { // Incoming query maybe larger than query_failed + query_succeeded for some // corner case, like lots of time-consuming queries come in at the same time and diff --git a/proxy/src/grpc/write.rs b/proxy/src/grpc/write.rs index fc18ba533d..b9607a3200 100644 --- a/proxy/src/grpc/write.rs +++ b/proxy/src/grpc/write.rs @@ -13,11 +13,10 @@ // limitations under the License. use ceresdbproto::storage::{WriteRequest, WriteResponse}; -use query_engine::{executor::Executor as QueryExecutor, physical_planner::PhysicalPlanner}; use crate::{error, error::build_ok_header, metrics::GRPC_HANDLER_COUNTER_VEC, Context, Proxy}; -impl Proxy { +impl Proxy { pub async fn handle_write(&self, ctx: Context, req: WriteRequest) -> WriteResponse { self.hotspot_recorder.inc_write_reqs(&req).await; diff --git a/proxy/src/handlers/admin.rs b/proxy/src/handlers/admin.rs index 75d7efb605..626fbd554b 100644 --- a/proxy/src/handlers/admin.rs +++ b/proxy/src/handlers/admin.rs @@ -14,8 +14,6 @@ use std::collections::BTreeSet; -use query_engine::physical_planner::PhysicalPlanner; - use crate::{handlers::prelude::*, limiter::BlockRule}; #[derive(Debug, Deserialize)] @@ -40,9 +38,9 @@ pub struct BlockResponse { block_rules: BTreeSet, } -pub async fn handle_block( +pub async fn handle_block( _ctx: RequestContext, - instance: InstanceRef, + instance: InstanceRef, request: BlockRequest, ) -> Result { let limiter = &instance.limiter; diff --git a/proxy/src/http/prom.rs b/proxy/src/http/prom.rs index b40debf214..6f3c380d7b 100644 --- a/proxy/src/http/prom.rs +++ b/proxy/src/http/prom.rs @@ -35,16 +35,12 @@ use common_types::{ }; use generic_error::BoxError; use http::StatusCode; -use interpreters::interpreter::Output; +use interpreters::{interpreter::Output, RecordBatchVec}; use log::{error, info}; use prom_remote_api::types::{ Label, LabelMatcher, Query, QueryResult, RemoteStorage, Sample, TimeSeries, WriteRequest, }; use prost::Message; -use query_engine::{ - executor::{Executor as QueryExecutor, RecordBatchVec}, - physical_planner::PhysicalPlanner, -}; use query_frontend::{ frontend::{Context, Frontend}, promql::{RemoteQueryPlan, DEFAULT_FIELD_COLUMN, NAME_LABEL}, @@ -64,7 +60,7 @@ use crate::{ impl reject::Reject for Error {} -impl Proxy { +impl Proxy { /// Handle write samples to remote storage with remote storage protocol. async fn handle_prom_remote_write(&self, ctx: RequestContext, req: WriteRequest) -> Result<()> { let write_table_requests = convert_write_request(req)?; @@ -206,7 +202,7 @@ impl Proxy { } #[async_trait] -impl RemoteStorage for Proxy { +impl RemoteStorage for Proxy { type Context = RequestContext; type Err = Error; diff --git a/proxy/src/http/route.rs b/proxy/src/http/route.rs index b3018224fa..3a692cc4eb 100644 --- a/proxy/src/http/route.rs +++ b/proxy/src/http/route.rs @@ -13,7 +13,6 @@ // limitations under the License. use ceresdbproto::storage::RouteRequest; -use query_engine::{executor::Executor as QueryExecutor, physical_planner::PhysicalPlanner}; use router::endpoint::Endpoint; use serde::Serialize; @@ -30,7 +29,7 @@ pub struct RouteItem { pub endpoint: Option, } -impl Proxy { +impl Proxy { pub async fn handle_http_route( &self, ctx: &RequestContext, diff --git a/proxy/src/http/sql.rs b/proxy/src/http/sql.rs index 16a3b8439b..41e520f3cc 100644 --- a/proxy/src/http/sql.rs +++ b/proxy/src/http/sql.rs @@ -25,12 +25,8 @@ use common_types::{ }; use generic_error::BoxError; use http::StatusCode; -use interpreters::interpreter::Output; +use interpreters::{interpreter::Output, RecordBatchVec}; use log::error; -use query_engine::{ - executor::{Executor as QueryExecutor, RecordBatchVec}, - physical_planner::PhysicalPlanner, -}; use serde::{ ser::{SerializeMap, SerializeSeq}, Deserialize, Serialize, @@ -44,7 +40,7 @@ use crate::{ Context, Proxy, }; -impl Proxy { +impl Proxy { pub async fn handle_http_sql_query( &self, ctx: &RequestContext, diff --git a/proxy/src/influxdb/mod.rs b/proxy/src/influxdb/mod.rs index 640ddfcab6..c5d102ef13 100644 --- a/proxy/src/influxdb/mod.rs +++ b/proxy/src/influxdb/mod.rs @@ -27,7 +27,6 @@ use generic_error::BoxError; use http::StatusCode; use interpreters::interpreter::Output; use log::{debug, info}; -use query_engine::{executor::Executor as QueryExecutor, physical_planner::PhysicalPlanner}; use query_frontend::{ frontend::{Context as SqlContext, Frontend}, provider::CatalogMetaProvider, @@ -46,7 +45,7 @@ use crate::{ Context, Proxy, }; -impl Proxy { +impl Proxy { pub async fn handle_influxdb_query( &self, ctx: RequestContext, diff --git a/proxy/src/instance.rs b/proxy/src/instance.rs index 43080f9ff1..653f13a639 100644 --- a/proxy/src/instance.rs +++ b/proxy/src/instance.rs @@ -19,22 +19,19 @@ use std::sync::Arc; use catalog::manager::ManagerRef; use df_operator::registry::FunctionRegistryRef; use interpreters::table_manipulator::TableManipulatorRef; +use query_engine::QueryEngineRef; use table_engine::{engine::TableEngineRef, remote::RemoteEngineRef}; use crate::limiter::Limiter; /// A cluster instance. Usually there is only one instance per cluster -/// -/// Q: query_engine::executor::Executor -/// P: query_engine::physical_planner::PhysicalPlanner -pub struct Instance { +pub struct Instance { pub catalog_manager: ManagerRef, - pub query_executor: Q, - pub physical_planner: P, - + pub query_engine: QueryEngineRef, pub table_engine: TableEngineRef, pub partition_table_engine: TableEngineRef, // User defined functions registry. + // TODO: remove it, it should be part of query engine... pub function_registry: FunctionRegistryRef, pub limiter: Limiter, pub table_manipulator: TableManipulatorRef, @@ -42,4 +39,4 @@ pub struct Instance { } /// A reference counted instance pointer -pub type InstanceRef = Arc>; +pub type InstanceRef = Arc; diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index 03431f8826..bf28cebe49 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -69,7 +69,6 @@ use interpreters::{ interpreter::{InterpreterPtr, Output}, }; use log::{error, info}; -use query_engine::{executor::Executor as QueryExecutor, physical_planner::PhysicalPlanner}; use query_frontend::plan::Plan; use router::{endpoint::Endpoint, Router}; use snafu::{OptionExt, ResultExt}; @@ -93,10 +92,10 @@ use crate::{ // Because the clock may have errors, choose 1 hour as the error buffer const QUERY_EXPIRED_BUFFER: Duration = Duration::from_secs(60 * 60); -pub struct Proxy { +pub struct Proxy { router: Arc, forwarder: ForwarderRef, - instance: InstanceRef, + instance: InstanceRef, resp_compress_min_length: usize, auto_create_table: bool, schema_config_provider: SchemaConfigProviderRef, @@ -105,11 +104,11 @@ pub struct Proxy { cluster_with_meta: bool, } -impl Proxy { +impl Proxy { #[allow(clippy::too_many_arguments)] pub fn new( router: Arc, - instance: InstanceRef, + instance: InstanceRef, forward_config: forward::Config, local_endpoint: Endpoint, resp_compress_min_length: usize, @@ -138,7 +137,7 @@ impl Proxy { } } - pub fn instance(&self) -> InstanceRef { + pub fn instance(&self) -> InstanceRef { self.instance.clone() } @@ -512,8 +511,8 @@ impl Proxy { .enable_partition_table_access(enable_partition_table_access) .build(); let interpreter_factory = Factory::new( - self.instance.query_executor.clone(), - self.instance.physical_planner.clone(), + self.instance.query_engine.executor(), + self.instance.query_engine.physical_planner(), self.instance.catalog_manager.clone(), self.instance.table_engine.clone(), self.instance.table_manipulator.clone(), diff --git a/proxy/src/opentsdb/mod.rs b/proxy/src/opentsdb/mod.rs index bfbc6819e7..c1f5f1254c 100644 --- a/proxy/src/opentsdb/mod.rs +++ b/proxy/src/opentsdb/mod.rs @@ -20,7 +20,6 @@ use ceresdbproto::storage::{ }; use http::StatusCode; use log::debug; -use query_engine::{executor::Executor as QueryExecutor, physical_planner::PhysicalPlanner}; use crate::{ context::RequestContext, @@ -32,7 +31,7 @@ use crate::{ pub mod types; -impl Proxy { +impl Proxy { pub async fn handle_opentsdb_put( &self, ctx: RequestContext, diff --git a/proxy/src/read.rs b/proxy/src/read.rs index e53629a3d9..a9b1750ea3 100644 --- a/proxy/src/read.rs +++ b/proxy/src/read.rs @@ -24,7 +24,6 @@ use generic_error::BoxError; use http::StatusCode; use interpreters::interpreter::Output; use log::{error, info, warn}; -use query_engine::{executor::Executor as QueryExecutor, physical_planner::PhysicalPlanner}; use query_frontend::{ frontend, frontend::{Context as SqlContext, Frontend}, @@ -46,7 +45,7 @@ pub enum SqlResponse { Local(Output), } -impl Proxy { +impl Proxy { pub(crate) async fn handle_sql( &self, ctx: &Context, diff --git a/proxy/src/write.rs b/proxy/src/write.rs index c445fb75f5..ba119fd662 100644 --- a/proxy/src/write.rs +++ b/proxy/src/write.rs @@ -39,7 +39,6 @@ use generic_error::BoxError; use http::StatusCode; use interpreters::interpreter::Output; use log::{debug, error, info}; -use query_engine::{executor::Executor as QueryExecutor, physical_planner::PhysicalPlanner}; use query_frontend::{ frontend::{Context as FrontendContext, Frontend}, plan::{AlterTableOperation, AlterTablePlan, InsertPlan, Plan}, @@ -74,7 +73,7 @@ pub(crate) struct WriteResponse { pub failed: u32, } -impl Proxy { +impl Proxy { pub(crate) async fn handle_write_internal( &self, ctx: Context, diff --git a/query_engine/Cargo.toml b/query_engine/Cargo.toml index 8d6a5f6273..20427183a3 100644 --- a/query_engine/Cargo.toml +++ b/query_engine/Cargo.toml @@ -28,6 +28,7 @@ workspace = true # In alphabetical order arrow = { workspace = true } async-trait = { workspace = true } +bytes_ext = { workspace = true } chrono = { workspace = true } common_types = { workspace = true } datafusion = { workspace = true } diff --git a/query_engine/src/codec.rs b/query_engine/src/codec.rs new file mode 100644 index 0000000000..f055ef5755 --- /dev/null +++ b/query_engine/src/codec.rs @@ -0,0 +1,27 @@ +// Copyright 2023 The CeresDB Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{fmt, sync::Arc}; + +use bytes_ext::Bytes; + +use crate::{error::Result, physical_planner::PhysicalPlanPtr}; + +pub trait PhysicalPlanCodec: fmt::Debug + Send + Sync + 'static { + fn encode(&self, plan: &PhysicalPlanPtr) -> Result; + + fn decode(&self, bytes: &[u8]) -> Result; +} + +pub type PhysicalPlanCodecRef = Arc; diff --git a/query_engine/src/datafusion_impl/codec.rs b/query_engine/src/datafusion_impl/codec.rs new file mode 100644 index 0000000000..114aacddf0 --- /dev/null +++ b/query_engine/src/datafusion_impl/codec.rs @@ -0,0 +1,56 @@ +// Copyright 2023 The CeresDB Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{fmt, sync::Arc}; + +use bytes_ext::Bytes; +use datafusion::execution::{runtime_env::RuntimeEnv, FunctionRegistry}; + +use crate::{codec::PhysicalPlanCodec, error::*, physical_planner::PhysicalPlanPtr}; + +/// Datafusion encoder powered by `datafusion-proto` +// TODO: replace `datafusion-proto` with `substrait`? +// TODO: make use of it. +pub struct DataFusionPhysicalPlanEncoderImpl { + _runtime_env: Arc, + _function_registry: Arc, +} + +impl DataFusionPhysicalPlanEncoderImpl { + pub fn new( + runtime_env: Arc, + function_registry: Arc, + ) -> Self { + Self { + _runtime_env: runtime_env, + _function_registry: function_registry, + } + } +} + +impl fmt::Debug for DataFusionPhysicalPlanEncoderImpl { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "DataFusionPhysicalPlanEncoderImpl") + } +} + +impl PhysicalPlanCodec for DataFusionPhysicalPlanEncoderImpl { + fn encode(&self, _plan: &crate::physical_planner::PhysicalPlanPtr) -> Result { + todo!() + } + + fn decode(&self, _bytes: &[u8]) -> Result { + todo!() + } +} diff --git a/query_engine/src/datafusion_impl/logical_optimizer/mod.rs b/query_engine/src/datafusion_impl/logical_optimizer/mod.rs index 3ef090d50a..b945ae8f10 100644 --- a/query_engine/src/datafusion_impl/logical_optimizer/mod.rs +++ b/query_engine/src/datafusion_impl/logical_optimizer/mod.rs @@ -17,45 +17,3 @@ #[cfg(test)] pub mod tests; pub mod type_conversion; - -use datafusion::prelude::SessionContext; -use generic_error::BoxError; -use query_frontend::plan::QueryPlan; -use snafu::ResultExt; - -use crate::error::*; - -/// LogicalOptimizer transform the QueryPlan into a potentially more efficient -/// plan -pub trait LogicalOptimizer { - // TODO(yingwen): Maybe support other plans - fn optimize(&mut self, plan: QueryPlan) -> Result; -} - -pub struct LogicalOptimizerImpl { - ctx: SessionContext, -} - -impl LogicalOptimizerImpl { - pub fn with_context(ctx: SessionContext) -> Self { - Self { ctx } - } -} - -impl LogicalOptimizer for LogicalOptimizerImpl { - fn optimize(&mut self, plan: QueryPlan) -> Result { - // TODO(yingwen): Avoid clone the plan multiple times during optimization - let QueryPlan { - mut df_plan, - tables, - } = plan; - df_plan = self - .ctx - .state() - .optimize(&df_plan) - .box_err() - .context(LogicalOptimizerWithCause { msg: None })?; - - Ok(QueryPlan { df_plan, tables }) - } -} diff --git a/query_engine/src/datafusion_impl/mod.rs b/query_engine/src/datafusion_impl/mod.rs index a8498e6e02..72680c6b39 100644 --- a/query_engine/src/datafusion_impl/mod.rs +++ b/query_engine/src/datafusion_impl/mod.rs @@ -12,9 +12,76 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + +use datafusion::execution::{ + runtime_env::{RuntimeConfig, RuntimeEnv}, + FunctionRegistry, +}; + +use crate::{ + codec::PhysicalPlanCodecRef, + datafusion_impl::{ + codec::DataFusionPhysicalPlanEncoderImpl, physical_planner::DatafusionPhysicalPlannerImpl, + }, + executor::{ExecutorImpl, ExecutorRef}, + physical_planner::PhysicalPlannerRef, + Config, QueryEngine, +}; + +pub mod codec; pub mod logical_optimizer; pub mod physical_optimizer; pub mod physical_plan; pub mod physical_plan_extension; pub mod physical_planner; pub mod physical_planner_extension; + +use crate::error::*; + +#[derive(Debug)] +pub struct DatafusionQueryEngineImpl { + physical_planner: PhysicalPlannerRef, + executor: ExecutorRef, + physical_plan_codec: PhysicalPlanCodecRef, +} + +impl DatafusionQueryEngineImpl { + pub fn new( + config: Config, + runtime_config: RuntimeConfig, + function_registry: Arc, + ) -> Result { + let runtime_env = Arc::new(RuntimeEnv::new(runtime_config).unwrap()); + + let physical_planner = Arc::new(DatafusionPhysicalPlannerImpl::new( + config, + runtime_env.clone(), + )); + let executor = Arc::new(ExecutorImpl); + let physical_plan_codec = Arc::new(DataFusionPhysicalPlanEncoderImpl::new( + runtime_env, + function_registry, + )); + + Ok(Self { + physical_planner, + executor, + physical_plan_codec, + }) + } +} + +impl QueryEngine for DatafusionQueryEngineImpl { + fn physical_planner(&self) -> PhysicalPlannerRef { + self.physical_planner.clone() + } + + fn executor(&self) -> ExecutorRef { + self.executor.clone() + } + + fn physical_plan_codec(&self) -> PhysicalPlanCodecRef { + self.physical_plan_codec.clone() + } +} diff --git a/query_engine/src/datafusion_impl/physical_plan.rs b/query_engine/src/datafusion_impl/physical_plan.rs index 17ad45a98a..7d3c413641 100644 --- a/query_engine/src/datafusion_impl/physical_plan.rs +++ b/query_engine/src/datafusion_impl/physical_plan.rs @@ -34,9 +34,6 @@ use table_engine::stream::{FromDfStream, SendableRecordBatchStream}; use crate::{error::*, physical_planner::PhysicalPlan}; -/// Datafusion physical plan adapter -/// -/// Because we need to pub struct DataFusionPhysicalPlanImpl { ctx: SessionContext, plan: Arc, diff --git a/query_engine/src/datafusion_impl/physical_planner.rs b/query_engine/src/datafusion_impl/physical_planner.rs index 101ed1e56d..a6b95dbe65 100644 --- a/query_engine/src/datafusion_impl/physical_planner.rs +++ b/query_engine/src/datafusion_impl/physical_planner.rs @@ -38,14 +38,18 @@ use crate::{ }; /// Physical planner based on datafusion -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct DatafusionPhysicalPlannerImpl { config: Config, + runtime_env: Arc, } impl DatafusionPhysicalPlannerImpl { - pub fn new(config: Config) -> Self { - Self { config } + pub fn new(config: Config, runtime_env: Arc) -> Self { + Self { + config, + runtime_env, + } } pub fn build_df_session_ctx(&self, config: &Config, ctx: &Context) -> SessionContext { @@ -70,9 +74,8 @@ impl DatafusionPhysicalPlannerImpl { // Using default logcial optimizer, if want to add more custom rule, using // `add_optimizer_rule` to add. - let state = - SessionState::with_config_rt(df_session_config, Arc::new(RuntimeEnv::default())) - .with_query_planner(Arc::new(QueryPlannerAdapter)); + let state = SessionState::with_config_rt(df_session_config, self.runtime_env.clone()) + .with_query_planner(Arc::new(QueryPlannerAdapter)); // Register analyzer rules let state = Self::register_analyzer_rules(state); diff --git a/query_engine/src/executor.rs b/query_engine/src/executor.rs index bd48af05b6..0568f6211f 100644 --- a/query_engine/src/executor.rs +++ b/query_engine/src/executor.rs @@ -14,11 +14,9 @@ //! Query executor -use std::time::Instant; +use std::{fmt, sync::Arc, time::Instant}; use async_trait::async_trait; -use common_types::record_batch::RecordBatch; -use futures::TryStreamExt; use generic_error::BoxError; use log::{debug, info}; use snafu::ResultExt; @@ -27,16 +25,12 @@ use time_ext::InstantExt; use crate::{context::Context, error::*, physical_planner::PhysicalPlanPtr}; -// Use a type alias so that we are able to replace the implementation -pub type RecordBatchVec = Vec; - /// Query executor /// /// Executes the logical plan #[async_trait] -pub trait Executor: Clone + Send + Sync { - // TODO(yingwen): Maybe return a stream - /// Execute the query, returning the query results as RecordBatchVec +pub trait Executor: fmt::Debug + Send + Sync + 'static { + /// Execute the query, return the record batch stream /// /// REQUIRE: The meta data of tables in query should be found from /// ContextRef @@ -44,10 +38,12 @@ pub trait Executor: Clone + Send + Sync { &self, ctx: &Context, physical_plan: PhysicalPlanPtr, - ) -> Result; + ) -> Result; } -#[derive(Clone, Default)] +pub type ExecutorRef = Arc; + +#[derive(Debug, Clone, Default)] pub struct ExecutorImpl; #[async_trait] @@ -56,7 +52,7 @@ impl Executor for ExecutorImpl { &self, ctx: &Context, physical_plan: PhysicalPlanPtr, - ) -> Result { + ) -> Result { let begin_instant = Instant::now(); debug!( @@ -71,10 +67,6 @@ impl Executor for ExecutorImpl { msg: Some("failed to execute physical plan".to_string()), })?; - // Collect all records in the pool, as the stream may perform some costly - // calculation - let record_batches = collect(stream).await?; - info!( "Executor executed plan, request_id:{}, cost:{}ms, plan_and_metrics: {}", ctx.request_id, @@ -82,16 +74,6 @@ impl Executor for ExecutorImpl { physical_plan.metrics_to_string() ); - Ok(record_batches) + Ok(stream) } } - -async fn collect(stream: SendableRecordBatchStream) -> Result { - stream - .try_collect() - .await - .box_err() - .with_context(|| ExecutorWithCause { - msg: Some("failed to collect query results".to_string()), - }) -} diff --git a/query_engine/src/lib.rs b/query_engine/src/lib.rs index f42470360f..18a70e88fe 100644 --- a/query_engine/src/lib.rs +++ b/query_engine/src/lib.rs @@ -16,10 +16,27 @@ //! //! Optimizes and executes logical plan +pub mod codec; pub mod config; pub mod context; pub mod datafusion_impl; pub mod error; pub mod executor; pub mod physical_planner; +use std::fmt; + pub use config::Config; + +use crate::{ + codec::PhysicalPlanCodecRef, executor::ExecutorRef, physical_planner::PhysicalPlannerRef, +}; + +pub trait QueryEngine: fmt::Debug + Send + Sync { + fn physical_planner(&self) -> PhysicalPlannerRef; + + fn executor(&self) -> ExecutorRef; + + fn physical_plan_codec(&self) -> PhysicalPlanCodecRef; +} + +pub type QueryEngineRef = Box; diff --git a/query_engine/src/physical_planner.rs b/query_engine/src/physical_planner.rs index 4bca01c1ca..d81df0facd 100644 --- a/query_engine/src/physical_planner.rs +++ b/query_engine/src/physical_planner.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::{fmt, sync::Arc}; + use async_trait::async_trait; use query_frontend::plan::QueryPlan; use table_engine::stream::SendableRecordBatchStream; @@ -25,12 +27,14 @@ use crate::{context::Context, error::*}; /// + Create the initial physical plan from the optimized logical. /// + Optimize and get the final physical plan. #[async_trait] -pub trait PhysicalPlanner: Clone + Send + Sync + 'static { +pub trait PhysicalPlanner: fmt::Debug + Send + Sync + 'static { /// Create a physical plan from a logical plan async fn plan(&self, ctx: &Context, logical_plan: QueryPlan) -> Result; } -pub trait PhysicalPlan: std::fmt::Debug { +pub type PhysicalPlannerRef = Arc; + +pub trait PhysicalPlan: std::fmt::Debug + Send + Sync + 'static { /// execute this plan and returns the result fn execute(&self) -> Result; @@ -38,4 +42,4 @@ pub trait PhysicalPlan: std::fmt::Debug { fn metrics_to_string(&self) -> String; } -pub type PhysicalPlanPtr = Box; +pub type PhysicalPlanPtr = Box; diff --git a/server/src/grpc/meta_event_service/mod.rs b/server/src/grpc/meta_event_service/mod.rs index 45f20fda39..40204035b0 100644 --- a/server/src/grpc/meta_event_service/mod.rs +++ b/server/src/grpc/meta_event_service/mod.rs @@ -46,7 +46,6 @@ use log::{error, info, warn}; use meta_client::types::{ShardInfo, TableInfo}; use paste::paste; use proxy::instance::InstanceRef; -use query_engine::{executor::Executor as QueryExecutor, physical_planner::PhysicalPlanner}; use runtime::Runtime; use snafu::{OptionExt, ResultExt}; use table_engine::{engine::TableEngineRef, ANALYTIC_ENGINE_TYPE}; @@ -102,15 +101,15 @@ const RETRY: RetryConfig = RetryConfig { }; /// Builder for [MetaServiceImpl]. -pub struct Builder { +pub struct Builder { pub cluster: ClusterRef, - pub instance: InstanceRef, + pub instance: InstanceRef, pub runtime: Arc, pub opened_wals: OpenedWals, } -impl Builder { - pub fn build(self) -> MetaServiceImpl { +impl Builder { + pub fn build(self) -> MetaServiceImpl { let Self { cluster, instance, @@ -131,9 +130,9 @@ impl Builder { } #[derive(Clone)] -pub struct MetaServiceImpl { +pub struct MetaServiceImpl { cluster: ClusterRef, - instance: InstanceRef, + instance: InstanceRef, runtime: Arc, wal_region_closer: WalRegionCloserRef, } @@ -187,7 +186,7 @@ macro_rules! handle_request { }; } -impl MetaServiceImpl { +impl MetaServiceImpl { handle_request!(open_shard, OpenShardRequest, OpenShardResponse); handle_request!(close_shard, CloseShardRequest, CloseShardResponse); @@ -577,7 +576,7 @@ async fn handle_close_table_on_shard( } #[async_trait] -impl MetaEventService for MetaServiceImpl { +impl MetaEventService for MetaServiceImpl { async fn open_shard( &self, request: tonic::Request, diff --git a/server/src/grpc/mod.rs b/server/src/grpc/mod.rs index 0fe5a9e558..c64ea99999 100644 --- a/server/src/grpc/mod.rs +++ b/server/src/grpc/mod.rs @@ -40,7 +40,6 @@ use proxy::{ schema_config_provider::{self}, Proxy, }; -use query_engine::{executor::Executor as QueryExecutor, physical_planner::PhysicalPlanner}; use runtime::{JoinHandle, Runtime}; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use table_engine::engine::EngineRuntimes; @@ -153,17 +152,17 @@ pub enum Error { define_result!(Error); /// Rpc services manages all grpc services of the server. -pub struct RpcServices { +pub struct RpcServices { serve_addr: SocketAddr, - rpc_server: StorageServiceServer>, - meta_rpc_server: Option>>, - remote_engine_server: RemoteEngineServiceServer>, + rpc_server: StorageServiceServer, + meta_rpc_server: Option>, + remote_engine_server: RemoteEngineServiceServer, runtime: Arc, stop_tx: Option>, join_handle: Option>, } -impl RpcServices { +impl RpcServices { pub async fn start(&mut self) -> Result<()> { let rpc_server = self.rpc_server.clone(); let meta_rpc_server = self.meta_rpc_server.clone(); @@ -208,19 +207,19 @@ impl RpcServices { } } -pub struct Builder { +pub struct Builder { endpoint: String, timeout: Option, runtimes: Option>, - instance: Option>, + instance: Option, cluster: Option, opened_wals: Option, - proxy: Option>>, + proxy: Option>, request_notifiers: Option>>>, hotspot_recorder: Option>, } -impl Builder { +impl Builder { pub fn new() -> Self { Self { endpoint: "0.0.0.0:8381".to_string(), @@ -245,7 +244,7 @@ impl Builder { self } - pub fn instance(mut self, instance: InstanceRef) -> Self { + pub fn instance(mut self, instance: InstanceRef) -> Self { self.instance = Some(instance); self } @@ -266,7 +265,7 @@ impl Builder { self } - pub fn proxy(mut self, proxy: Arc>) -> Self { + pub fn proxy(mut self, proxy: Arc) -> Self { self.proxy = Some(proxy); self } @@ -284,8 +283,8 @@ impl Builder { } } -impl Builder { - pub fn build(self) -> Result> { +impl Builder { + pub fn build(self) -> Result { let runtimes = self.runtimes.context(MissingRuntimes)?; let instance = self.instance.context(MissingInstance)?; let opened_wals = self.opened_wals.context(MissingWals)?; diff --git a/server/src/grpc/remote_engine_service/mod.rs b/server/src/grpc/remote_engine_service/mod.rs index a2cdb1a004..4137011796 100644 --- a/server/src/grpc/remote_engine_service/mod.rs +++ b/server/src/grpc/remote_engine_service/mod.rs @@ -35,7 +35,6 @@ use proxy::{ hotspot::{HotspotRecorder, Message}, instance::InstanceRef, }; -use query_engine::{executor::Executor as QueryExecutor, physical_planner::PhysicalPlanner}; use snafu::{OptionExt, ResultExt}; use table_engine::{ engine::EngineRuntimes, @@ -110,14 +109,14 @@ impl Drop for ExecutionGuard { } #[derive(Clone)] -pub struct RemoteEngineServiceImpl { - pub instance: InstanceRef, +pub struct RemoteEngineServiceImpl { + pub instance: InstanceRef, pub runtimes: Arc, pub request_notifiers: Option>>>, pub hotspot_recorder: Arc, } -impl RemoteEngineServiceImpl { +impl RemoteEngineServiceImpl { async fn stream_read_internal( &self, request: Request, @@ -426,9 +425,7 @@ struct HandlerContext { } #[async_trait] -impl RemoteEngineService - for RemoteEngineServiceImpl -{ +impl RemoteEngineService for RemoteEngineServiceImpl { type ReadStream = BoxStream<'static, std::result::Result>; async fn read( diff --git a/server/src/grpc/storage_service/mod.rs b/server/src/grpc/storage_service/mod.rs index 8421b33551..296959b719 100644 --- a/server/src/grpc/storage_service/mod.rs +++ b/server/src/grpc/storage_service/mod.rs @@ -33,21 +33,20 @@ use ceresdbproto::{ use futures::{stream, stream::BoxStream, StreamExt}; use http::StatusCode; use proxy::{Context, Proxy, FORWARDED_FROM}; -use query_engine::{executor::Executor as QueryExecutor, physical_planner::PhysicalPlanner}; use table_engine::engine::EngineRuntimes; use time_ext::InstantExt; use crate::grpc::metrics::GRPC_HANDLER_DURATION_HISTOGRAM_VEC; #[derive(Clone)] -pub struct StorageServiceImpl { - pub proxy: Arc>, +pub struct StorageServiceImpl { + pub proxy: Arc, pub runtimes: Arc, pub timeout: Option, } #[async_trait] -impl StorageService for StorageServiceImpl { +impl StorageService for StorageServiceImpl { type StreamSqlQueryStream = BoxStream<'static, Result>; async fn route( @@ -165,7 +164,7 @@ fn get_forwarded_from(req: &tonic::Request) -> Option { } // TODO: Use macros to simplify duplicate code -impl StorageServiceImpl { +impl StorageServiceImpl { async fn route_internal( &self, req: tonic::Request, @@ -392,7 +391,7 @@ impl StorageServiceImpl { async fn stream_sql_query_internal( &self, ctx: Context, - proxy: Arc>, + proxy: Arc, req: tonic::Request, ) -> Result< tonic::Response>>, diff --git a/server/src/http.rs b/server/src/http.rs index 260154c187..3e48db9c0c 100644 --- a/server/src/http.rs +++ b/server/src/http.rs @@ -37,7 +37,6 @@ use proxy::{ opentsdb::types::{PutParams, PutRequest}, Proxy, }; -use query_engine::{executor::Executor as QueryExecutor, physical_planner::PhysicalPlanner}; use router::endpoint::Endpoint; use runtime::{Runtime, RuntimeRef}; use serde::Serialize; @@ -143,10 +142,10 @@ impl reject::Reject for Error {} /// /// Endpoints beginning with /debug are for internal use, and may subject to /// breaking changes. -pub struct Service { +pub struct Service { // In cluster mode, cluster is valid, while in stand-alone mode, cluster is None cluster: Option, - proxy: Arc>, + proxy: Arc, engine_runtimes: Arc, log_runtime: Arc, profiler: Arc, @@ -157,7 +156,7 @@ pub struct Service { opened_wals: OpenedWals, } -impl Service { +impl Service { pub async fn start(&mut self) -> Result<()> { let ip_addr: IpAddr = self .config @@ -195,7 +194,7 @@ impl Service { } } -impl Service { +impl Service { fn routes( &self, ) -> impl Filter + Clone { @@ -279,7 +278,7 @@ impl Service { .and(self.with_proxy()) .and(self.with_read_runtime()) .and_then( - |req, ctx, proxy: Arc>, runtime: RuntimeRef| async move { + |req, ctx, proxy: Arc, runtime: RuntimeRef| async move { let result = runtime .spawn(async move { proxy @@ -312,7 +311,7 @@ impl Service { .and(warp::get()) .and(self.with_context()) .and(self.with_proxy()) - .and_then(|table: String, ctx, proxy: Arc>| async move { + .and_then(|table: String, ctx, proxy: Arc| async move { let result = proxy .handle_http_route(&ctx, table) .await @@ -345,7 +344,7 @@ impl Service { .and(warp::query::()) .and(warp::body::bytes()) .and(self.with_proxy()) - .and_then(|ctx, params, lines, proxy: Arc>| async move { + .and_then(|ctx, params, lines, proxy: Arc| async move { let request = WriteRequest::new(lines, params); let result = proxy.handle_influxdb_write(ctx, request).await; match result { @@ -363,21 +362,19 @@ impl Service { .and(warp::query::()) .and(warp::body::form::>()) .and(self.with_proxy()) - .and_then( - |method, ctx, params, body, proxy: Arc>| async move { - let request = - InfluxqlRequest::try_new(method, body, params).map_err(reject::custom)?; - let result = proxy - .handle_influxdb_query(ctx, request) - .await - .box_err() - .context(HandleRequest); - match result { - Ok(res) => Ok(reply::json(&res)), - Err(e) => Err(reject::custom(e)), - } - }, - ); + .and_then(|method, ctx, params, body, proxy: Arc| async move { + let request = + InfluxqlRequest::try_new(method, body, params).map_err(reject::custom)?; + let result = proxy + .handle_influxdb_query(ctx, request) + .await + .box_err() + .context(HandleRequest); + match result { + Ok(res) => Ok(reply::json(&res)), + Err(e) => Err(reject::custom(e)), + } + }); warp::path!("influxdb" / "v1" / ..).and(write_api.or(query_api)) } @@ -395,7 +392,7 @@ impl Service { .and(warp::query::()) .and(warp::body::bytes()) .and(self.with_proxy()) - .and_then(|ctx, params, points, proxy: Arc>| async move { + .and_then(|ctx, params, points, proxy: Arc| async move { let request = PutRequest::new(points, params); let result = proxy.handle_opentsdb_put(ctx, request).await; match result { @@ -414,7 +411,7 @@ impl Service { warp::path!("debug" / "flush_memtable") .and(warp::post()) .and(self.with_instance()) - .and_then(|instance: InstanceRef| async move { + .and_then(|instance: InstanceRef| async move { let get_all_tables = || { let mut tables = Vec::new(); for catalog in instance @@ -651,7 +648,7 @@ impl Service { warp::any().map(move || profiler.clone()) } - fn with_proxy(&self) -> impl Filter>,), Error = Infallible> + Clone { + fn with_proxy(&self) -> impl Filter,), Error = Infallible> + Clone { let proxy = self.proxy.clone(); warp::any().map(move || proxy.clone()) } @@ -668,9 +665,7 @@ impl Service { warp::any().map(move || runtime.clone()) } - fn with_instance( - &self, - ) -> impl Filter,), Error = Infallible> + Clone { + fn with_instance(&self) -> impl Filter + Clone { let instance = self.proxy.instance(); warp::any().map(move || instance.clone()) } @@ -696,17 +691,17 @@ impl Service { } /// Service builder -pub struct Builder { +pub struct Builder { config: HttpConfig, engine_runtimes: Option>, log_runtime: Option>, config_content: Option, cluster: Option, - proxy: Option>>, + proxy: Option>, opened_wals: Option, } -impl Builder { +impl Builder { pub fn new(config: HttpConfig) -> Self { Self { config, @@ -739,7 +734,7 @@ impl Builder { self } - pub fn proxy(mut self, proxy: Arc>) -> Self { + pub fn proxy(mut self, proxy: Arc) -> Self { self.proxy = Some(proxy); self } @@ -750,9 +745,9 @@ impl Builder { } } -impl Builder { +impl Builder { /// Build and start the service - pub fn build(self) -> Result> { + pub fn build(self) -> Result { let engine_runtimes = self.engine_runtimes.context(MissingEngineRuntimes)?; let log_runtime = self.log_runtime.context(MissingLogRuntime)?; let config_content = self.config_content.context(MissingInstance)?; diff --git a/server/src/mysql/builder.rs b/server/src/mysql/builder.rs index a0f1105300..58c475658d 100644 --- a/server/src/mysql/builder.rs +++ b/server/src/mysql/builder.rs @@ -15,7 +15,6 @@ use std::{net::SocketAddr, sync::Arc, time::Duration}; use proxy::Proxy; -use query_engine::{executor::Executor as QueryExecutor, physical_planner::PhysicalPlanner}; use snafu::{OptionExt, ResultExt}; use table_engine::engine::EngineRuntimes; @@ -24,10 +23,10 @@ use crate::mysql::{ service::MysqlService, }; -pub struct Builder { +pub struct Builder { config: Config, runtimes: Option>, - proxy: Option>>, + proxy: Option>, } #[derive(Debug)] @@ -37,7 +36,7 @@ pub struct Config { pub timeout: Option, } -impl Builder { +impl Builder { pub fn new(config: Config) -> Self { Self { config, @@ -51,14 +50,14 @@ impl Builder { self } - pub fn proxy(mut self, proxy: Arc>) -> Self { + pub fn proxy(mut self, proxy: Arc) -> Self { self.proxy = Some(proxy); self } } -impl Builder { - pub fn build(self) -> Result> { +impl Builder { + pub fn build(self) -> Result { let runtimes = self.runtimes.context(MissingRuntimes)?; let proxy = self.proxy.context(MissingInstance)?; diff --git a/server/src/mysql/service.rs b/server/src/mysql/service.rs index db059bd86c..36f5d59bff 100644 --- a/server/src/mysql/service.rs +++ b/server/src/mysql/service.rs @@ -17,15 +17,14 @@ use std::{net::SocketAddr, sync::Arc, time::Duration}; use log::{error, info}; use opensrv_mysql::AsyncMysqlIntermediary; use proxy::Proxy; -use query_engine::{executor::Executor as QueryExecutor, physical_planner::PhysicalPlanner}; use runtime::JoinHandle; use table_engine::engine::EngineRuntimes; use tokio::sync::oneshot::{self, Receiver, Sender}; use crate::mysql::{error::Result, worker::MysqlWorker}; -pub struct MysqlService { - proxy: Arc>, +pub struct MysqlService { + proxy: Arc, runtimes: Arc, socket_addr: SocketAddr, join_handler: Option>, @@ -33,13 +32,13 @@ pub struct MysqlService { timeout: Option, } -impl MysqlService { +impl MysqlService { pub fn new( - proxy: Arc>, + proxy: Arc, runtimes: Arc, socket_addr: SocketAddr, timeout: Option, - ) -> MysqlService { + ) -> MysqlService { Self { proxy, runtimes, @@ -51,7 +50,7 @@ impl MysqlService { } } -impl MysqlService { +impl MysqlService { pub async fn start(&mut self) -> Result<()> { let (tx, rx) = oneshot::channel(); @@ -77,7 +76,7 @@ impl MysqlService { } async fn loop_accept( - proxy: Arc>, + proxy: Arc, runtimes: Arc, socket_addr: SocketAddr, timeout: Option, diff --git a/server/src/mysql/worker.rs b/server/src/mysql/worker.rs index 9521c379e4..3f36fe776d 100644 --- a/server/src/mysql/worker.rs +++ b/server/src/mysql/worker.rs @@ -19,7 +19,6 @@ use interpreters::interpreter::Output; use log::{error, info}; use opensrv_mysql::{AsyncMysqlShim, ErrorKind, QueryResultWriter, StatementMetaWriter}; use proxy::{context::RequestContext, http::sql::Request, Proxy}; -use query_engine::{executor::Executor as QueryExecutor, physical_planner::PhysicalPlanner}; use snafu::ResultExt; use crate::mysql::{ @@ -27,19 +26,17 @@ use crate::mysql::{ writer::MysqlQueryResultWriter, }; -pub struct MysqlWorker { +pub struct MysqlWorker { generic_hold: PhantomData, - proxy: Arc>, + proxy: Arc, timeout: Option, } -impl MysqlWorker +impl MysqlWorker where W: std::io::Write + Send + Sync, - Q: QueryExecutor + 'static, - P: PhysicalPlanner, { - pub fn new(proxy: Arc>, timeout: Option) -> Self { + pub fn new(proxy: Arc, timeout: Option) -> Self { Self { generic_hold: PhantomData::default(), proxy, @@ -49,11 +46,9 @@ where } #[async_trait::async_trait] -impl AsyncMysqlShim for MysqlWorker +impl AsyncMysqlShim for MysqlWorker where W: std::io::Write + Send + Sync, - Q: QueryExecutor + 'static, - P: PhysicalPlanner, { type Error = crate::mysql::error::Error; @@ -106,11 +101,9 @@ where } } -impl MysqlWorker +impl MysqlWorker where W: std::io::Write + Send + Sync, - Q: QueryExecutor + 'static, - P: PhysicalPlanner, { async fn do_query<'a>(&'a mut self, sql: &'a str) -> Result { let ctx = self.create_ctx()?; diff --git a/server/src/mysql/writer.rs b/server/src/mysql/writer.rs index 7e4cbcad01..acd91b704f 100644 --- a/server/src/mysql/writer.rs +++ b/server/src/mysql/writer.rs @@ -16,9 +16,8 @@ use common_types::{ column_schema::ColumnSchema, datum::{Datum, DatumKind}, }; -use interpreters::interpreter::Output; +use interpreters::{interpreter::Output, RecordBatchVec}; use opensrv_mysql::{Column, ColumnFlags, ColumnType, OkResponse, QueryResultWriter}; -use query_engine::executor::RecordBatchVec; use crate::mysql::error::Result; diff --git a/server/src/postgresql/builder.rs b/server/src/postgresql/builder.rs index e7f5454980..0efbeddfca 100644 --- a/server/src/postgresql/builder.rs +++ b/server/src/postgresql/builder.rs @@ -15,7 +15,6 @@ use std::{net::SocketAddr, sync::Arc, time::Duration}; use proxy::Proxy; -use query_engine::{executor::Executor as QueryExecutor, physical_planner::PhysicalPlanner}; use snafu::{OptionExt, ResultExt}; use table_engine::engine::EngineRuntimes; @@ -24,15 +23,15 @@ use crate::postgresql::{ PostgresqlService, }; -pub struct Builder { +pub struct Builder { ip: String, port: u16, runtimes: Option>, - proxy: Option>>, + proxy: Option>, timeout: Option, } -impl Builder { +impl Builder { pub fn new() -> Self { Self { ip: "127.0.0.1".to_string(), @@ -43,7 +42,7 @@ impl Builder { } } - pub fn build(self) -> Result> { + pub fn build(self) -> Result { let runtimes = self.runtimes.context(MissingRuntimes)?; let proxy = self.proxy.context(MissingInstance)?; @@ -69,7 +68,7 @@ impl Builder { self } - pub fn proxy(mut self, proxy: Arc>) -> Self { + pub fn proxy(mut self, proxy: Arc) -> Self { self.proxy = Some(proxy); self } diff --git a/server/src/postgresql/handler.rs b/server/src/postgresql/handler.rs index a965d7a099..8743c9e7e2 100644 --- a/server/src/postgresql/handler.rs +++ b/server/src/postgresql/handler.rs @@ -29,19 +29,16 @@ use pgwire::{ error::{PgWireError, PgWireResult}, }; use proxy::{context::RequestContext, http::sql::Request, Proxy}; -use query_engine::{executor::Executor as QueryExecutor, physical_planner::PhysicalPlanner}; use snafu::ResultExt; use crate::postgresql::error::{CreateContext, Result}; -pub struct PostgresqlHandler { - pub(crate) proxy: Arc>, +pub struct PostgresqlHandler { + pub(crate) proxy: Arc, pub(crate) timeout: Option, } #[async_trait] -impl SimpleQueryHandler - for PostgresqlHandler -{ +impl SimpleQueryHandler for PostgresqlHandler { async fn do_query<'a, C>(&self, _client: &C, sql: &'a str) -> PgWireResult>> where C: ClientInfo + Unpin + Send + Sync, @@ -66,7 +63,7 @@ impl SimpleQueryHandler } } -impl PostgresqlHandler { +impl PostgresqlHandler { fn create_ctx(&self) -> Result { let default_catalog = self .proxy diff --git a/server/src/postgresql/service.rs b/server/src/postgresql/service.rs index c415b9cbf5..35f296e90a 100644 --- a/server/src/postgresql/service.rs +++ b/server/src/postgresql/service.rs @@ -20,25 +20,24 @@ use pgwire::api::{ StatelessMakeHandler, }; use proxy::Proxy; -use query_engine::{executor::Executor as QueryExecutor, physical_planner::PhysicalPlanner}; use runtime::JoinHandle; use table_engine::engine::EngineRuntimes; use tokio::sync::oneshot::{self, Receiver, Sender}; use crate::postgresql::{error::Result, handler::PostgresqlHandler}; -pub struct PostgresqlService { +pub struct PostgresqlService { addr: SocketAddr, - proxy: Arc>, + proxy: Arc, runtimes: Arc, join_handler: Option>, tx: Option>, timeout: Option, } -impl PostgresqlService { +impl PostgresqlService { pub fn new( - proxy: Arc>, + proxy: Arc, runtimes: Arc, addr: SocketAddr, timeout: Option, @@ -78,7 +77,7 @@ impl PostgresqlService { } async fn loop_accept( - proxy: Arc>, + proxy: Arc, timeout: Option, runtimes: Arc, socket_addr: SocketAddr, diff --git a/server/src/server.rs b/server/src/server.rs index f20b96d44c..fcd2be58df 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -32,7 +32,7 @@ use proxy::{ schema_config_provider::SchemaConfigProviderRef, Proxy, }; -use query_engine::{executor::Executor as QueryExecutor, physical_planner::PhysicalPlanner}; +use query_engine::QueryEngineRef; use remote_engine_client::RemoteEngineImpl; use router::{endpoint::Endpoint, RouterRef}; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; @@ -66,11 +66,8 @@ pub enum Error { #[snafu(display("Missing catalog manager.\nBacktrace:\n{}", backtrace))] MissingCatalogManager { backtrace: Backtrace }, - #[snafu(display("Missing query executor.\nBacktrace:\n{}", backtrace))] - MissingQueryExecutor { backtrace: Backtrace }, - - #[snafu(display("Missing physical planner.\nBacktrace:\n{}", backtrace))] - MissingPhysicalPlanner { backtrace: Backtrace }, + #[snafu(display("Missing query engine.\nBacktrace:\n{}", backtrace))] + MissingQueryEngine { backtrace: Backtrace }, #[snafu(display("Missing table engine.\nBacktrace:\n{}", backtrace))] MissingTableEngine { backtrace: Backtrace }, @@ -125,17 +122,17 @@ define_result!(Error); // TODO(yingwen): Consider a config manager /// Server -pub struct Server { - http_service: Service, - rpc_services: RpcServices, - mysql_service: mysql::MysqlService, - postgresql_service: postgresql::PostgresqlService, - instance: InstanceRef, +pub struct Server { + http_service: Service, + rpc_services: RpcServices, + mysql_service: mysql::MysqlService, + postgresql_service: postgresql::PostgresqlService, + instance: InstanceRef, cluster: Option, local_tables_recoverer: Option, } -impl Server { +impl Server { pub async fn stop(mut self) { self.rpc_services.shutdown().await; self.http_service.stop(); @@ -212,7 +209,7 @@ impl Server { } #[must_use] -pub struct Builder { +pub struct Builder { server_config: ServerConfig, remote_engine_client_config: remote_engine_client::config::Config, node_addr: String, @@ -220,11 +217,7 @@ pub struct Builder { engine_runtimes: Option>, log_runtime: Option>, catalog_manager: Option, - - // TODO: maybe place these two components in a `QueryEngine`. - query_executor: Option, - physical_planner: Option

, - + query_engine: Option, table_engine: Option, table_manipulator: Option, function_registry: Option, @@ -236,7 +229,7 @@ pub struct Builder { opened_wals: Option, } -impl Builder { +impl Builder { pub fn new(config: ServerConfig) -> Self { Self { server_config: config, @@ -246,8 +239,7 @@ impl Builder { engine_runtimes: None, log_runtime: None, catalog_manager: None, - query_executor: None, - physical_planner: None, + query_engine: None, table_engine: None, table_manipulator: None, function_registry: None, @@ -285,13 +277,8 @@ impl Builder { self } - pub fn query_executor(mut self, val: Q) -> Self { - self.query_executor = Some(val); - self - } - - pub fn physical_planner(mut self, val: P) -> Self { - self.physical_planner = Some(val); + pub fn query_engine(mut self, val: QueryEngineRef) -> Self { + self.query_engine = Some(val); self } @@ -344,11 +331,10 @@ impl Builder { } /// Build and run the server - pub fn build(self) -> Result> { + pub fn build(self) -> Result { // Build instance let catalog_manager = self.catalog_manager.context(MissingCatalogManager)?; - let query_executor = self.query_executor.context(MissingQueryExecutor)?; - let physical_planner = self.physical_planner.context(MissingPhysicalPlanner)?; + let query_engine = self.query_engine.context(MissingQueryEngine)?; let table_engine = self.table_engine.context(MissingTableEngine)?; let table_manipulator = self.table_manipulator.context(MissingTableManipulator)?; let function_registry = self.function_registry.context(MissingFunctionRegistry)?; @@ -376,8 +362,7 @@ impl Builder { let instance = { let instance = Instance { catalog_manager, - query_executor, - physical_planner, + query_engine, table_engine, partition_table_engine, function_registry, diff --git a/src/setup.rs b/src/setup.rs index 1faa5ce926..bc99821cbb 100644 --- a/src/setup.rs +++ b/src/setup.rs @@ -24,7 +24,8 @@ use analytic_engine::{ use catalog::{manager::ManagerRef, schema::OpenOptions, table_operator::TableOperator}; use catalog_impls::{table_based::TableBasedManager, volatile, CatalogManagerImpl}; use cluster::{cluster_impl::ClusterImpl, config::ClusterConfig, shard_set::ShardSet}; -use df_operator::registry::FunctionRegistryImpl; +use datafusion::execution::runtime_env::RuntimeConfig as DfRuntimeConfig; +use df_operator::registry::{FunctionRegistry, FunctionRegistryImpl}; use interpreters::table_manipulator::{catalog_based, meta_based}; use log::info; use logger::RuntimeLevel; @@ -35,11 +36,7 @@ use proxy::{ cluster_based::ClusterBasedProvider, config_based::ConfigBasedProvider, }, }; -use query_engine::{ - datafusion_impl::physical_planner::DatafusionPhysicalPlannerImpl, - executor::{Executor, ExecutorImpl}, - physical_planner::PhysicalPlanner, -}; +use query_engine::datafusion_impl::DatafusionQueryEngineImpl; use router::{rule_based::ClusterView, ClusterBasedRouter, RuleBasedRouter}; use server::{ config::{StaticRouteConfig, StaticTopologyConfig}, @@ -129,9 +126,16 @@ async fn run_server_with_runtimes( .expect("Failed to create function registry"); let function_registry = Arc::new(function_registry); - // Create query executor - let query_executor = ExecutorImpl; - let physical_planner = DatafusionPhysicalPlannerImpl::new(config.query_engine.clone()); + // Create query engine + // TODO: use a builder to support different query engine? + let query_engine = Box::new( + DatafusionQueryEngineImpl::new( + config.query_engine.clone(), + DfRuntimeConfig::default(), + function_registry.clone().to_df_function_registry(), + ) + .expect("Failed to init datafusion query engine"), + ); // Config limiter let limiter = Limiter::new(config.limiter.clone()); @@ -142,8 +146,7 @@ async fn run_server_with_runtimes( .config_content(config_content) .engine_runtimes(engine_runtimes.clone()) .log_runtime(log_runtime.clone()) - .query_executor(query_executor) - .physical_planner(physical_planner) + .query_engine(query_engine) .function_registry(function_registry) .limiter(limiter); @@ -202,13 +205,13 @@ async fn build_table_engine_proxy(engine_builder: EngineBuilder<'_>) -> Arc( +async fn build_with_meta( config: &Config, cluster_config: &ClusterConfig, - builder: Builder, + builder: Builder, runtimes: Arc, wal_opener: T, -) -> Builder { +) -> Builder { // Build meta related modules. let node_meta_info = NodeMetaInfo { addr: config.node.addr.clone(), @@ -274,13 +277,13 @@ async fn build_with_meta( +async fn build_without_meta( config: &Config, static_route_config: &StaticRouteConfig, - builder: Builder, + builder: Builder, runtimes: Arc, wal_builder: T, -) -> Builder { +) -> Builder { let opened_wals = wal_builder .open_wals(&config.analytic.wal, runtimes.clone()) .await