diff --git a/Cargo.lock b/Cargo.lock index 4eb26376f6..f18f26d713 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5394,6 +5394,7 @@ dependencies = [ "arrow 43.0.0", "async-trait", "bytes_ext", + "catalog", "chrono", "common_types", "datafusion", diff --git a/df_engine_extensions/src/dist_sql_query/mod.rs b/df_engine_extensions/src/dist_sql_query/mod.rs index d0e4b2fdd2..fd777d8483 100644 --- a/df_engine_extensions/src/dist_sql_query/mod.rs +++ b/df_engine_extensions/src/dist_sql_query/mod.rs @@ -15,14 +15,12 @@ use std::{fmt, sync::Arc}; use async_trait::async_trait; -use common_types::schema::RecordSchema; use datafusion::{ error::Result as DfResult, execution::TaskContext, physical_plan::{ExecutionPlan, SendableRecordBatchStream}, }; use futures::future::BoxFuture; -use prost::bytes::Bytes; use table_engine::{ remote::model::TableIdentifier, table::{ReadRequest, TableRef}, @@ -40,15 +38,10 @@ pub trait RemotePhysicalPlanExecutor: fmt::Debug + Send + Sync + 'static { &self, table: TableIdentifier, task_context: &TaskContext, - encoded_plan: EncodedPlan, + plan: Arc, ) -> DfResult>>; } -pub struct EncodedPlan { - pub plan: Bytes, - pub schema: RecordSchema, -} - type RemotePhysicalPlanExecutorRef = Arc; /// Executable scan's builder diff --git a/df_engine_extensions/src/dist_sql_query/physical_plan.rs b/df_engine_extensions/src/dist_sql_query/physical_plan.rs index ad253b8711..0e838294d9 100644 --- a/df_engine_extensions/src/dist_sql_query/physical_plan.rs +++ b/df_engine_extensions/src/dist_sql_query/physical_plan.rs @@ -23,7 +23,6 @@ use std::{ }; use arrow::{datatypes::SchemaRef as ArrowSchemaRef, record_batch::RecordBatch}; -use common_types::schema::RecordSchema; use datafusion::{ error::{DataFusionError, Result as DfResult}, execution::TaskContext, @@ -33,13 +32,10 @@ use datafusion::{ SendableRecordBatchStream as DfSendableRecordBatchStream, Statistics, }, }; -use datafusion_proto::{ - bytes::physical_plan_to_bytes_with_extension_codec, physical_plan::PhysicalExtensionCodec, -}; use futures::{future::BoxFuture, FutureExt, Stream, StreamExt}; use table_engine::{remote::model::TableIdentifier, table::ReadRequest}; -use crate::dist_sql_query::{EncodedPlan, RemotePhysicalPlanExecutor}; +use crate::dist_sql_query::RemotePhysicalPlanExecutor; /// Placeholder of partitioned table's scan plan /// It is inexecutable actually and just for carrying the necessary information @@ -117,7 +113,6 @@ impl DisplayAs for UnresolvedPartitionedScan { pub struct ResolvedPartitionedScan { pub remote_executor: Arc, pub remote_exec_plans: Vec<(TableIdentifier, Arc)>, - pub extension_codec: Arc, } impl ResolvedPartitionedScan { @@ -139,7 +134,6 @@ impl ResolvedPartitionedScan { let plan = ResolvedPartitionedScan { remote_executor: self.remote_executor.clone(), remote_exec_plans: new_plans, - extension_codec: self.extension_codec.clone(), }; Ok(Arc::new(plan)) @@ -187,26 +181,10 @@ impl ExecutionPlan for ResolvedPartitionedScan { ) -> DfResult { let (sub_table, plan) = &self.remote_exec_plans[partition]; - // Encode to build `EncodedPlan`. - let plan_bytes = physical_plan_to_bytes_with_extension_codec( - plan.clone(), - self.extension_codec.as_ref(), - )?; - let record_schema = RecordSchema::try_from(plan.schema()).map_err(|e| { - DataFusionError::Internal(format!( - "failed to convert arrow_schema to record_schema, arrow_schema:{}, err:{e}", - plan.schema() - )) - })?; - let encoded_plan = EncodedPlan { - plan: plan_bytes, - schema: record_schema, - }; - // Send plan for remote execution. let stream_future = self.remote_executor - .execute(sub_table.clone(), &context, encoded_plan)?; + .execute(sub_table.clone(), &context, plan.clone())?; let record_stream = PartitionedScanStream::new(stream_future, plan.schema()); Ok(Box::pin(record_stream)) diff --git a/df_engine_extensions/src/dist_sql_query/resolver.rs b/df_engine_extensions/src/dist_sql_query/resolver.rs index 7c8823c584..3634d53ad9 100644 --- a/df_engine_extensions/src/dist_sql_query/resolver.rs +++ b/df_engine_extensions/src/dist_sql_query/resolver.rs @@ -18,24 +18,13 @@ use async_recursion::async_recursion; use catalog::manager::ManagerRef as CatalogManagerRef; use datafusion::{ error::{DataFusionError, Result as DfResult}, - execution::{runtime_env::RuntimeEnv, FunctionRegistry}, physical_plan::ExecutionPlan, }; -use datafusion_proto::{ - physical_plan::{AsExecutionPlan, PhysicalExtensionCodec}, - protobuf, -}; -use prost::Message; use table_engine::{remote::model::TableIdentifier, table::TableRef}; -use crate::{ - codec::PhysicalExtensionCodecImpl, - dist_sql_query::{ - physical_plan::{ - ResolvedPartitionedScan, UnresolvedPartitionedScan, UnresolvedSubTableScan, - }, - ExecutableScanBuilderRef, RemotePhysicalPlanExecutorRef, - }, +use crate::dist_sql_query::{ + physical_plan::{ResolvedPartitionedScan, UnresolvedPartitionedScan, UnresolvedSubTableScan}, + ExecutableScanBuilderRef, RemotePhysicalPlanExecutorRef, }; /// Resolver which makes datafuison dist query related plan executable. @@ -50,12 +39,6 @@ pub struct Resolver { remote_executor: RemotePhysicalPlanExecutorRef, catalog_manager: CatalogManagerRef, scan_builder: ExecutableScanBuilderRef, - - // TODO: hold `SessionContext` here rather than these two parts. - runtime_env: Arc, - function_registry: Arc, - - extension_codec: Arc, } impl Resolver { @@ -63,16 +46,11 @@ impl Resolver { remote_executor: RemotePhysicalPlanExecutorRef, catalog_manager: CatalogManagerRef, scan_builder: ExecutableScanBuilderRef, - runtime_env: Arc, - function_registry: Arc, ) -> Self { Self { remote_executor, catalog_manager, scan_builder, - runtime_env, - function_registry, - extension_codec: Arc::new(PhysicalExtensionCodecImpl::new()), } } @@ -98,7 +76,6 @@ impl Resolver { return Ok(Arc::new(ResolvedPartitionedScan { remote_executor: self.remote_executor.clone(), remote_exec_plans: remote_plans, - extension_codec: self.extension_codec.clone(), })); } @@ -119,25 +96,8 @@ impl Resolver { plan.with_new_children(new_children) } - /// Resolve encoded sub table scanning plan. - // TODO: because we need to async init the `ScanTable` plan before executing, - // so sub scan plan's resolving can just be async now... - pub async fn resolve_sub_scan(&self, encoded_plan: &[u8]) -> DfResult> { - // Decode to datafusion physical plan. - let protobuf = protobuf::PhysicalPlanNode::decode(encoded_plan).map_err(|e| { - DataFusionError::Plan(format!("failed to decode bytes to physical plan, err:{e}")) - })?; - let plan = protobuf.try_into_physical_plan( - self.function_registry.as_ref(), - &self.runtime_env, - self.extension_codec.as_ref(), - )?; - - self.resolve_sub_scan_internal(plan).await - } - #[async_recursion] - async fn resolve_sub_scan_internal( + pub async fn resolve_sub_scan( &self, plan: Arc, ) -> DfResult> { @@ -165,7 +125,7 @@ impl Resolver { // Resolve children if exist. let mut new_children = Vec::with_capacity(children.len()); for child in children { - let child = self.resolve_sub_scan_internal(child).await?; + let child = self.resolve_sub_scan(child).await?; new_children.push(child); } @@ -215,15 +175,9 @@ mod test { let ctx = TestContext::new(); let plan = ctx.build_basic_sub_table_plan(); let resolver = ctx.resolver(); - let new_plan = displayable( - resolver - .resolve_sub_scan_internal(plan) - .await - .unwrap() - .as_ref(), - ) - .indent(true) - .to_string(); + let new_plan = displayable(resolver.resolve_sub_scan(plan).await.unwrap().as_ref()) + .indent(true) + .to_string(); insta::assert_snapshot!(new_plan); } @@ -243,10 +197,7 @@ mod test { assert_eq!(original_plan_display, new_plan_display); // It should not be processed by `resolve_sub_scan_internal`. - let new_plan = resolver - .resolve_sub_scan_internal(plan.clone()) - .await - .unwrap(); + let new_plan = resolver.resolve_sub_scan(plan.clone()).await.unwrap(); let new_plan_display = displayable(new_plan.as_ref()).indent(true).to_string(); diff --git a/df_engine_extensions/src/dist_sql_query/test_util.rs b/df_engine_extensions/src/dist_sql_query/test_util.rs index 3299b210b4..c0f297310b 100644 --- a/df_engine_extensions/src/dist_sql_query/test_util.rs +++ b/df_engine_extensions/src/dist_sql_query/test_util.rs @@ -27,7 +27,7 @@ use catalog::{manager::ManagerRef, test_util::MockCatalogManagerBuilder}; use common_types::{projected_schema::ProjectedSchema, tests::build_schema_for_cpu}; use datafusion::{ error::{DataFusionError, Result as DfResult}, - execution::{runtime_env::RuntimeEnv, FunctionRegistry, TaskContext}, + execution::{FunctionRegistry, TaskContext}, logical_expr::{expr_fn, Literal, Operator}, physical_plan::{ expressions::{binary, col, lit}, @@ -51,7 +51,7 @@ use trace_metric::MetricsCollector; use crate::dist_sql_query::{ physical_plan::{PartitionedScanStream, UnresolvedPartitionedScan, UnresolvedSubTableScan}, resolver::Resolver, - EncodedPlan, ExecutableScanBuilder, RemotePhysicalPlanExecutor, + ExecutableScanBuilder, RemotePhysicalPlanExecutor, }; // Test context @@ -187,8 +187,6 @@ impl TestContext { Arc::new(MockRemotePhysicalPlanExecutor), self.catalog_manager.clone(), Box::new(MockScanBuilder), - Arc::new(RuntimeEnv::default()), - Arc::new(MockFunctionRegistry), ) } @@ -348,7 +346,7 @@ impl RemotePhysicalPlanExecutor for MockRemotePhysicalPlanExecutor { &self, _table: TableIdentifier, _task_context: &TaskContext, - _encoded_plan: EncodedPlan, + _plan: Arc, ) -> DfResult>> { unimplemented!() } diff --git a/interpreters/src/tests.rs b/interpreters/src/tests.rs index 3ff078ad68..1db445ff58 100644 --- a/interpreters/src/tests.rs +++ b/interpreters/src/tests.rs @@ -23,11 +23,12 @@ use catalog::{ use catalog_impls::table_based::TableBasedManager; use common_types::request_id::RequestId; use datafusion::execution::runtime_env::RuntimeConfig; +use df_operator::registry::{FunctionRegistry, FunctionRegistryImpl}; use query_engine::{datafusion_impl::DatafusionQueryEngineImpl, QueryEngineRef}; use query_frontend::{ parser::Parser, plan::Plan, planner::Planner, provider::MetaProvider, tests::MockMetaProvider, }; -use table_engine::engine::TableEngineRef; +use table_engine::{engine::TableEngineRef, memory::MockRemoteEngine}; use crate::{ context::Context, @@ -369,9 +370,17 @@ async fn test_interpreters(engine_context: T) { let catalog_manager = Arc::new(build_catalog_manager(engine.clone()).await); let table_operator = TableOperator::new(catalog_manager.clone()); let table_manipulator = Arc::new(TableManipulatorImpl::new(table_operator)); - let query_engine = Box::new( - DatafusionQueryEngineImpl::new(query_engine::Config::default(), RuntimeConfig::default()) - .unwrap(), + let function_registry = Arc::new(FunctionRegistryImpl::default()); + let remote_engine = Arc::new(MockRemoteEngine); + let query_engine = Arc::new( + DatafusionQueryEngineImpl::new( + query_engine::config::Config::default(), + RuntimeConfig::default(), + function_registry.to_df_function_registry(), + remote_engine, + catalog_manager.clone(), + ) + .unwrap(), ); let env = Env { diff --git a/query_engine/Cargo.toml b/query_engine/Cargo.toml index f6e2c89533..586788730a 100644 --- a/query_engine/Cargo.toml +++ b/query_engine/Cargo.toml @@ -29,6 +29,7 @@ workspace = true arrow = { workspace = true } async-trait = { workspace = true } bytes_ext = { workspace = true } +catalog = { workspace = true } chrono = { workspace = true } common_types = { workspace = true } datafusion = { workspace = true } diff --git a/query_engine/src/datafusion_impl/executor.rs b/query_engine/src/datafusion_impl/executor.rs index 1695fe1dba..9ed423752c 100644 --- a/query_engine/src/datafusion_impl/executor.rs +++ b/query_engine/src/datafusion_impl/executor.rs @@ -23,21 +23,42 @@ use time_ext::InstantExt; use crate::{ context::Context, - datafusion_impl::DfContextBuilder, + datafusion_impl::{ + task_context::{DatafusionTaskExecContext, Preprocessor}, + DfContextBuilder, + }, error::*, executor::Executor, - physical_planner::{PhysicalPlanPtr, TaskContext}, + physical_planner::{PhysicalPlanPtr, TaskExecContext}, }; #[derive(Debug, Clone)] pub struct DatafusionExecutorImpl { - // Datafuison session context builder + /// Datafuison session context builder df_ctx_builder: Arc, + + /// Preprocessor for processing some physical plan before executing + preprocessor: Arc, } impl DatafusionExecutorImpl { - pub fn new(df_ctx_builder: Arc) -> Self { - Self { df_ctx_builder } + pub fn new(df_ctx_builder: Arc, preprocessor: Arc) -> Self { + Self { + df_ctx_builder, + preprocessor, + } + } + + fn task_exec_context(&self, ctx: &Context) -> TaskExecContext { + let session_ctx = self.df_ctx_builder.build(ctx); + let task_ctx = session_ctx.task_ctx(); + + let df_ctx = DatafusionTaskExecContext { + task_ctx, + preprocessor: self.preprocessor.clone(), + }; + + TaskExecContext::default().with_datafusion_context(df_ctx) } } @@ -56,11 +77,10 @@ impl Executor for DatafusionExecutorImpl { let begin_instant = Instant::now(); // TODO: build the `TaskContext` directly rather than through `SessionContext`. - let session_ctx = self.df_ctx_builder.build(ctx); - let df_task_ctx = session_ctx.task_ctx(); - let task_ctx = TaskContext::default().with_datafusion_task_ctx(df_task_ctx); + let task_ctx = self.task_exec_context(ctx); let stream = physical_plan .execute(&task_ctx) + .await .box_err() .with_context(|| ExecutorWithCause { msg: Some("failed to execute physical plan".to_string()), diff --git a/query_engine/src/datafusion_impl/mod.rs b/query_engine/src/datafusion_impl/mod.rs index 0972c92793..b3afd1ed34 100644 --- a/query_engine/src/datafusion_impl/mod.rs +++ b/query_engine/src/datafusion_impl/mod.rs @@ -14,23 +14,26 @@ use std::{fmt, sync::Arc, time::Instant}; +use catalog::manager::ManagerRef as CatalogManager; use datafusion::{ execution::{ context::{QueryPlanner, SessionState}, runtime_env::{RuntimeConfig, RuntimeEnv}, + FunctionRegistry, }, optimizer::analyzer::Analyzer, physical_optimizer::PhysicalOptimizerRule, prelude::{SessionConfig, SessionContext}, }; -use table_engine::provider::CeresdbOptions; +use df_engine_extensions::codec::PhysicalExtensionCodecImpl; +use table_engine::{provider::CeresdbOptions, remote::RemoteEngineRef}; use crate::{ context::Context, datafusion_impl::{ executor::DatafusionExecutorImpl, logical_optimizer::type_conversion::TypeConversion, physical_planner::DatafusionPhysicalPlannerImpl, - physical_planner_extension::QueryPlannerAdapter, + physical_planner_extension::QueryPlannerAdapter, task_context::Preprocessor, }, executor::ExecutorRef, physical_planner::PhysicalPlannerRef, @@ -44,6 +47,7 @@ pub mod physical_plan; pub mod physical_plan_extension; pub mod physical_planner; pub mod physical_planner_extension; +pub mod task_context; use crate::error::*; @@ -54,13 +58,34 @@ pub struct DatafusionQueryEngineImpl { } impl DatafusionQueryEngineImpl { - pub fn new(config: Config, runtime_config: RuntimeConfig) -> Result { + pub fn new( + config: Config, + runtime_config: RuntimeConfig, + function_registry: Arc, + remote_engine: RemoteEngineRef, + catalog_manager: CatalogManager, + ) -> Result { let runtime_env = Arc::new(RuntimeEnv::new(runtime_config).unwrap()); - let physical_planner = Arc::new(QueryPlannerAdapter); - let df_ctx_builder = Arc::new(DfContextBuilder::new(config, runtime_env, physical_planner)); + let df_physical_planner = Arc::new(QueryPlannerAdapter); + let df_ctx_builder = Arc::new(DfContextBuilder::new( + config, + runtime_env.clone(), + df_physical_planner, + )); + // Physical planner let physical_planner = Arc::new(DatafusionPhysicalPlannerImpl::new(df_ctx_builder.clone())); - let executor = Arc::new(DatafusionExecutorImpl::new(df_ctx_builder)); + + // Executor + let extension_codec = Arc::new(PhysicalExtensionCodecImpl::new()); + let preprocessor = Arc::new(Preprocessor::new( + remote_engine, + catalog_manager, + runtime_env.clone(), + function_registry.clone(), + extension_codec, + )); + let executor = Arc::new(DatafusionExecutorImpl::new(df_ctx_builder, preprocessor)); Ok(Self { physical_planner, @@ -117,6 +142,8 @@ impl DfContextBuilder { let ceresdb_options = CeresdbOptions { request_id: ctx.request_id.as_u64(), request_timeout: timeout, + default_catalog: ctx.default_catalog.clone(), + default_schema: ctx.default_schema.clone(), }; let mut df_session_config = SessionConfig::new() .with_default_catalog_and_schema( diff --git a/query_engine/src/datafusion_impl/physical_plan.rs b/query_engine/src/datafusion_impl/physical_plan.rs index a9f54ca123..672520a986 100644 --- a/query_engine/src/datafusion_impl/physical_plan.rs +++ b/query_engine/src/datafusion_impl/physical_plan.rs @@ -16,8 +16,8 @@ use std::{ any::Any, - fmt::{Debug, Formatter}, - sync::Arc, + fmt::{self, Debug, Formatter}, + sync::{Arc, RwLock}, }; use async_trait::async_trait; @@ -29,28 +29,61 @@ use snafu::{OptionExt, ResultExt}; use table_engine::stream::{FromDfStream, SendableRecordBatchStream}; use crate::{ + datafusion_impl::task_context::Preprocessor, error::*, - physical_planner::{PhysicalPlan, TaskContext}, + physical_planner::{PhysicalPlan, TaskExecContext}, }; -pub struct DataFusionPhysicalPlanAdapter { - plan: Arc, +pub enum TypedPlan { + Normal(Arc), + Partitioned(Arc), + Remote(Vec), } -impl DataFusionPhysicalPlanAdapter { - pub fn new(plan: Arc) -> Self { - Self { plan } +impl TypedPlan { + async fn maybe_preprocess( + &self, + preprocessor: &Preprocessor, + ) -> Result> { + preprocessor.process(self).await } +} - pub fn as_df_physical_plan(&self) -> Arc { - self.plan.clone() +impl fmt::Debug for TypedPlan { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + match self { + Self::Normal(plan) => f.debug_tuple("Normal").field(plan).finish(), + Self::Partitioned(plan) => f.debug_tuple("Partitioned").field(plan).finish(), + Self::Remote(_) => f.debug_tuple("Remote").finish(), + } + } +} + +/// Datafusion physical plan adapter +/// Some covert works for the `original_plan` will be done before executing, +/// and convert result will be kept in `executed_plan` for inspecting the +/// execution status in later. +pub struct DataFusionPhysicalPlanAdapter { + /// Original plan + original_plan: TypedPlan, + + /// Executed plan from `original_plan` converting + executed_plan: RwLock>>, +} + +impl DataFusionPhysicalPlanAdapter { + pub fn new(typed_plan: TypedPlan) -> Self { + Self { + original_plan: typed_plan, + executed_plan: RwLock::new(None), + } } } impl Debug for DataFusionPhysicalPlanAdapter { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("DataFusionPhysicalPlan") - .field("plan", &self.plan) + .field("typed_plan", &self.original_plan) .finish() } } @@ -61,33 +94,44 @@ impl PhysicalPlan for DataFusionPhysicalPlanAdapter { self } - fn execute(&self, task_ctx: &TaskContext) -> Result { + async fn execute(&self, task_ctx: &TaskExecContext) -> Result { + // Get datafusion task context. let df_task_ctx = task_ctx - .try_to_datafusion_task_ctx() + .as_datafusion_task_ctx() .with_context(|| PhysicalPlanNoCause { msg: Some("datafusion task ctx not found".to_string()), })?; - let partition_count = self.plan.output_partitioning().partition_count(); - let df_stream = if partition_count <= 1 { - self.plan - .execute(0, df_task_ctx) - .box_err() - .context(PhysicalPlanWithCause { - msg: Some(format!("partition_count:{partition_count}")), - })? + + // Maybe need preprocess for getting executable plan. + let executable = self + .original_plan + .maybe_preprocess(&df_task_ctx.preprocessor) + .await?; + + // Coalesce the multiple outputs plan. + let partition_count = executable.output_partitioning().partition_count(); + let executable = if partition_count <= 1 { + executable } else { - // merge into a single partition - let plan = CoalescePartitionsExec::new(self.plan.clone()); - // MergeExec must produce a single partition - assert_eq!(1, plan.output_partitioning().partition_count()); - plan.execute(0, df_task_ctx) - .box_err() - .context(PhysicalPlanWithCause { - msg: Some(format!("partition_count:{partition_count}")), - })? + Arc::new(CoalescePartitionsExec::new(executable)) }; + // Kept the executed plan. + { + let executed = &mut *self.executed_plan.write().unwrap(); + *executed = Some(executable.clone()); + } + + // Execute the plan. + // Ensure to be `Some` here. + let df_stream = executable + .execute(0, df_task_ctx.task_ctx.clone()) + .box_err() + .context(PhysicalPlanWithCause { + msg: Some(format!("partition_count:{partition_count}")), + })?; + let stream = FromDfStream::new(df_stream) .box_err() .context(PhysicalPlanWithCause { msg: None })?; @@ -96,8 +140,13 @@ impl PhysicalPlan for DataFusionPhysicalPlanAdapter { } fn metrics_to_string(&self) -> String { - DisplayableExecutionPlan::with_metrics(&*self.plan) - .indent(true) - .to_string() + let executed_opt = { self.executed_plan.read().unwrap().clone() }; + + match executed_opt { + Some(plan) => DisplayableExecutionPlan::with_metrics(plan.as_ref()) + .indent(true) + .to_string(), + None => "plan is not executed yet".to_string(), + } } } diff --git a/query_engine/src/datafusion_impl/physical_planner.rs b/query_engine/src/datafusion_impl/physical_planner.rs index 24868e6bae..5af560cae4 100644 --- a/query_engine/src/datafusion_impl/physical_planner.rs +++ b/query_engine/src/datafusion_impl/physical_planner.rs @@ -21,7 +21,10 @@ use snafu::ResultExt; use crate::{ context::Context, - datafusion_impl::{physical_plan::DataFusionPhysicalPlanAdapter, DfContextBuilder}, + datafusion_impl::{ + physical_plan::{DataFusionPhysicalPlanAdapter, TypedPlan}, + DfContextBuilder, + }, error::*, physical_planner::{PhysicalPlanPtr, PhysicalPlanner}, }; @@ -36,10 +39,25 @@ impl DatafusionPhysicalPlannerImpl { pub fn new(df_ctx_builder: Arc) -> Self { Self { df_ctx_builder } } + + fn has_partitioned_table(logical_plan: &QueryPlan) -> bool { + let mut has_partitioned_table = false; + let _ = logical_plan + .tables + .visit(|_, table| -> std::result::Result<(), ()> { + if table.partition_info().is_some() { + has_partitioned_table = true; + } + Ok(()) + }); + + has_partitioned_table + } } #[async_trait] impl PhysicalPlanner for DatafusionPhysicalPlannerImpl { + // TODO: we should modify `QueryPlan` to support create remote plan here. async fn plan(&self, ctx: &Context, logical_plan: QueryPlan) -> Result { // Register catalogs to datafusion execution context. let catalogs = CatalogProviderAdapter::new_adapters(logical_plan.tables.clone()); @@ -59,7 +77,16 @@ impl PhysicalPlanner for DatafusionPhysicalPlannerImpl { .await .box_err() .context(PhysicalPlannerWithCause { msg: None })?; - let physical_plan = DataFusionPhysicalPlanAdapter::new(exec_plan); + + // Decide if partitioned table exists. + let has_partitioned_table = + DatafusionPhysicalPlannerImpl::has_partitioned_table(&logical_plan); + let typed_plan = if has_partitioned_table { + TypedPlan::Partitioned(exec_plan) + } else { + TypedPlan::Normal(exec_plan) + }; + let physical_plan = DataFusionPhysicalPlanAdapter::new(typed_plan); Ok(Box::new(physical_plan)) } diff --git a/query_engine/src/datafusion_impl/task_context.rs b/query_engine/src/datafusion_impl/task_context.rs new file mode 100644 index 0000000000..acd8b404a9 --- /dev/null +++ b/query_engine/src/datafusion_impl/task_context.rs @@ -0,0 +1,245 @@ +// Copyright 2023 The CeresDB Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{ + fmt, + sync::Arc, + time::{Duration, Instant}, +}; + +use async_trait::async_trait; +use catalog::manager::ManagerRef as CatalogManagerRef; +use common_types::{request_id::RequestId, schema::RecordSchema}; +use datafusion::{ + error::{DataFusionError, Result as DfResult}, + execution::{runtime_env::RuntimeEnv, FunctionRegistry, TaskContext}, + physical_plan::{ExecutionPlan, SendableRecordBatchStream}, +}; +use datafusion_proto::{ + bytes::physical_plan_to_bytes_with_extension_codec, + physical_plan::{AsExecutionPlan, PhysicalExtensionCodec}, + protobuf, +}; +use df_engine_extensions::dist_sql_query::{ + resolver::Resolver, ExecutableScanBuilder, RemotePhysicalPlanExecutor, +}; +use futures::future::BoxFuture; +use generic_error::BoxError; +use prost::Message; +use snafu::ResultExt; +use table_engine::{ + provider::{CeresdbOptions, ScanTable}, + remote::{ + model::{ + ExecContext, ExecutePlanRequest, PhysicalPlan, RemoteExecuteRequest, TableIdentifier, + }, + RemoteEngineRef, + }, + stream::ToDfStream, + table::{ReadRequest, TableRef}, +}; + +use crate::{datafusion_impl::physical_plan::TypedPlan, error::*}; + +#[allow(dead_code)] +pub struct DatafusionTaskExecContext { + pub task_ctx: Arc, + pub preprocessor: Arc, +} + +/// Preprocessor for datafusion physical plan +#[allow(dead_code)] +pub struct Preprocessor { + dist_query_resolver: Resolver, + runtime_env: Arc, + function_registry: Arc, + extension_codec: Arc, +} + +impl Preprocessor { + pub fn new( + remote_engine: RemoteEngineRef, + catalog_manager: CatalogManagerRef, + runtime_env: Arc, + function_registry: Arc, + extension_codec: Arc, + ) -> Self { + let remote_executor = Arc::new(RemotePhysicalPlanExecutorImpl { + remote_engine, + extension_codec: extension_codec.clone(), + }); + let scan_builder = Box::new(ExecutableScanBuilderImpl); + let resolver = Resolver::new(remote_executor, catalog_manager, scan_builder); + + Self { + dist_query_resolver: resolver, + runtime_env, + function_registry, + extension_codec, + } + } + + pub async fn process(&self, typed_plan: &TypedPlan) -> Result> { + match typed_plan { + TypedPlan::Normal(plan) => Ok(plan.clone()), + TypedPlan::Partitioned(plan) => self.preprocess_partitioned_table_plan(plan).await, + TypedPlan::Remote(plan) => self.preprocess_remote_plan(plan).await, + } + } + + async fn preprocess_remote_plan(&self, encoded_plan: &[u8]) -> Result> { + // Decode to datafusion physical plan. + let protobuf = protobuf::PhysicalPlanNode::decode(encoded_plan) + .box_err() + .with_context(|| ExecutorWithCause { + msg: Some("failed to decode plan".to_string()), + })?; + let plan = protobuf + .try_into_physical_plan( + self.function_registry.as_ref(), + &self.runtime_env, + self.extension_codec.as_ref(), + ) + .box_err() + .with_context(|| ExecutorWithCause { + msg: Some("failed to rebuild physical plan from the decoded plan".to_string()), + })?; + + self.dist_query_resolver + .resolve_sub_scan(plan) + .await + .box_err() + .with_context(|| ExecutorWithCause { + msg: format!("failed to preprocess remote plan"), + }) + } + + async fn preprocess_partitioned_table_plan( + &self, + plan: &Arc, + ) -> Result> { + self.dist_query_resolver + .resolve_partitioned_scan(plan.clone()) + .box_err() + .with_context(|| ExecutorWithCause { + msg: format!("failed to preprocess partitioned table plan, plan:{plan:?}"), + }) + } +} + +impl fmt::Debug for Preprocessor { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Preprocessor") + .field("dist_query_resolver", &"preprocess partitioned table plan") + .field("extension_codec", &self.extension_codec) + .finish() + } +} + +/// Remote physical plan executor impl +#[derive(Debug)] +struct RemotePhysicalPlanExecutorImpl { + remote_engine: RemoteEngineRef, + extension_codec: Arc, +} + +impl RemotePhysicalPlanExecutor for RemotePhysicalPlanExecutorImpl { + fn execute( + &self, + table: TableIdentifier, + task_context: &TaskContext, + plan: Arc, + ) -> DfResult>> { + // Get the custom context to rebuild execution context. + let ceresdb_options = task_context + .session_config() + .options() + .extensions + .get::(); + assert!(ceresdb_options.is_some()); + let ceresdb_options = ceresdb_options.unwrap(); + let request_id = RequestId::from(ceresdb_options.request_id); + let deadline = ceresdb_options + .request_timeout + .map(|n| Instant::now() + Duration::from_millis(n)); + let default_catalog = ceresdb_options.default_catalog.clone(); + let default_schema = ceresdb_options.default_schema.clone(); + + let exec_ctx = ExecContext { + request_id, + deadline, + default_catalog, + default_schema, + }; + + // Encode plan and schema + let plan_schema = RecordSchema::try_from(plan.schema()).map_err(|e| { + DataFusionError::Internal(format!( + "failed to convert arrow_schema to record_schema, arrow_schema:{}, err:{e}", + plan.schema() + )) + })?; + + let encoded_plan = + physical_plan_to_bytes_with_extension_codec(plan, self.extension_codec.as_ref())?; + + // Build returned stream future. + let remote_engine = self.remote_engine.clone(); + let future = Box::pin(async move { + let remote_request = RemoteExecuteRequest { + context: exec_ctx, + physical_plan: PhysicalPlan::Datafusion(encoded_plan), + }; + + let request = ExecutePlanRequest { + table, + plan_schema, + remote_request, + }; + + // Remote execute. + let stream = remote_engine + .execute_physical_plan(request) + .await + .map_err(|e| { + DataFusionError::Internal(format!( + "failed to execute physical plan by remote engine, err:{e}" + )) + })?; + + Ok(Box::pin(ToDfStream(stream)) as _) + }); + + Ok(future) + } +} + +/// Executable scan build impl +#[derive(Debug)] +struct ExecutableScanBuilderImpl; + +#[async_trait] +impl ExecutableScanBuilder for ExecutableScanBuilderImpl { + async fn build( + &self, + table: TableRef, + read_request: ReadRequest, + ) -> DfResult> { + let mut scan = ScanTable::new(table, read_request); + scan.maybe_init_stream().await.map_err(|e| { + DataFusionError::Internal(format!("failed to build executable table scan, err:{e}")) + })?; + Ok(Arc::new(scan)) + } +} diff --git a/query_engine/src/error.rs b/query_engine/src/error.rs index b808e8fecd..14097cbb1e 100644 --- a/query_engine/src/error.rs +++ b/query_engine/src/error.rs @@ -19,13 +19,19 @@ use snafu::{Backtrace, Snafu}; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { - #[snafu(display("Physical plan err with cause,, msg:{msg:?}, err:{source}",))] + #[snafu(display("Failed to init query engine, msg:{msg}.\nBacktrace:\n{backtrace}"))] + InitNoCause { msg: String, backtrace: Backtrace }, + + #[snafu(display("Failed to init query engine, msg:{msg}, err:{source}"))] + InitWithCause { msg: String, source: GenericError }, + + #[snafu(display("Physical plan err with cause,, msg:{msg:?}, err:{source}"))] PhysicalPlanWithCause { msg: Option, source: GenericError, }, - #[snafu(display("Physical plan err with no cause, msg:{msg:?}.\nBacktrace:\n{backtrace}",))] + #[snafu(display("Physical plan err with no cause, msg:{msg:?}.\nBacktrace:\n{backtrace}"))] PhysicalPlanNoCause { msg: Option, backtrace: Backtrace, diff --git a/query_engine/src/lib.rs b/query_engine/src/lib.rs index f2088142df..dc41a58764 100644 --- a/query_engine/src/lib.rs +++ b/query_engine/src/lib.rs @@ -22,16 +22,108 @@ pub mod datafusion_impl; pub mod error; pub mod executor; pub mod physical_planner; -use std::fmt; +use std::{fmt, sync::Arc}; -pub use config::Config; +use catalog::manager::ManagerRef; +use datafusion::execution::{runtime_env::RuntimeConfig, FunctionRegistry}; +use snafu::OptionExt; +use table_engine::remote::RemoteEngineRef; -use crate::{executor::ExecutorRef, physical_planner::PhysicalPlannerRef}; +use crate::{ + config::Config, datafusion_impl::DatafusionQueryEngineImpl, error::*, executor::ExecutorRef, + physical_planner::PhysicalPlannerRef, +}; +/// Query engine pub trait QueryEngine: fmt::Debug + Send + Sync { fn physical_planner(&self) -> PhysicalPlannerRef; fn executor(&self) -> ExecutorRef; } -pub type QueryEngineRef = Box; +pub type QueryEngineRef = Arc; + +/// Query engine builder +#[derive(Default)] +pub struct QueryEngineBuilder { + config: Option, + df_function_registry: Option>, + df_runtime_config: Option, + catalog_manager: Option, + remote_engine: Option, +} + +#[derive(Debug)] +pub enum QueryEngineType { + Datafusion, +} + +impl QueryEngineBuilder { + pub fn config(mut self, config: Config) -> Self { + self.config = Some(config); + self + } + + pub fn df_function_registry( + mut self, + df_function_registry: Arc, + ) -> Self { + self.df_function_registry = Some(df_function_registry); + self + } + + pub fn df_runtime_config(mut self, df_runtime_config: RuntimeConfig) -> Self { + self.df_runtime_config = Some(df_runtime_config); + self + } + + pub fn catalog_manager(mut self, catalog_manager: ManagerRef) -> Self { + self.catalog_manager = Some(catalog_manager); + self + } + + pub fn remote_engine(mut self, remote_engine: RemoteEngineRef) -> Self { + self.remote_engine = Some(remote_engine); + self + } + + fn build_datafusion_query_engine(self) -> Result { + // Check if necessary component exists. + let config = self.config.with_context(|| InitNoCause { + msg: "config not found", + })?; + + let df_function_registry = self.df_function_registry.with_context(|| InitNoCause { + msg: "df_function_registry not found", + })?; + + let df_runtime_config = self.df_runtime_config.with_context(|| InitNoCause { + msg: "df_runtime_config not found", + })?; + + let catalog_manager = self.catalog_manager.with_context(|| InitNoCause { + msg: "catalog_manager not found", + })?; + + let remote_engine = self.remote_engine.with_context(|| InitNoCause { + msg: "remote_engine not found", + })?; + + // Build engine. + let df_query_engine = DatafusionQueryEngineImpl::new( + config, + df_runtime_config, + df_function_registry, + remote_engine, + catalog_manager, + )?; + + Ok(Arc::new(df_query_engine)) + } + + pub fn build(self, engine_type: QueryEngineType) -> Result { + match engine_type { + QueryEngineType::Datafusion => self.build_datafusion_query_engine(), + } + } +} diff --git a/query_engine/src/physical_planner.rs b/query_engine/src/physical_planner.rs index 8d46f83af4..7c6aa65406 100644 --- a/query_engine/src/physical_planner.rs +++ b/query_engine/src/physical_planner.rs @@ -15,11 +15,10 @@ use std::{any::Any, fmt, sync::Arc}; use async_trait::async_trait; -use datafusion::execution::TaskContext as DfTaskContext; use query_frontend::plan::QueryPlan; use table_engine::stream::SendableRecordBatchStream; -use crate::{context::Context, error::*}; +use crate::{context::Context, datafusion_impl::task_context::DatafusionTaskExecContext, error::*}; /// Physical query planner that converts a logical plan to a /// physical plan suitable for execution. @@ -35,11 +34,12 @@ pub trait PhysicalPlanner: fmt::Debug + Send + Sync + 'static { pub type PhysicalPlannerRef = Arc; -pub trait PhysicalPlan: std::fmt::Debug + Send + Sync + 'static { +#[async_trait] +pub trait PhysicalPlan: std::fmt::Debug + Sync + Send + 'static { fn as_any(&self) -> &dyn Any; /// execute this plan and returns the result - fn execute(&self, task_ctx: &TaskContext) -> Result; + async fn execute(&self, task_ctx: &TaskExecContext) -> Result; /// Convert internal metrics to string. fn metrics_to_string(&self) -> String; @@ -49,17 +49,17 @@ pub type PhysicalPlanPtr = Box; /// Task context, just a wrapper of datafusion task context now #[derive(Default)] -pub struct TaskContext { - df_task_context: Option>, +pub struct TaskExecContext { + df_context: Option, } -impl TaskContext { - pub fn with_datafusion_task_ctx(mut self, df_task_ctx: Arc) -> Self { - self.df_task_context = Some(df_task_ctx); +impl TaskExecContext { + pub fn with_datafusion_context(mut self, df_task_ctx: DatafusionTaskExecContext) -> Self { + self.df_context = Some(df_task_ctx); self } - pub fn try_to_datafusion_task_ctx(&self) -> Option> { - self.df_task_context.clone() + pub fn as_datafusion_task_ctx(&self) -> Option<&DatafusionTaskExecContext> { + self.df_context.as_ref() } } diff --git a/query_frontend/src/provider.rs b/query_frontend/src/provider.rs index 5756aa6a9f..ef75a38876 100644 --- a/query_frontend/src/provider.rs +++ b/query_frontend/src/provider.rs @@ -100,7 +100,7 @@ pub struct ResolvedTable { // testing. fn enable_dedicated_partitioned_table_provider() -> bool { std::env::var("CERESDB_DEDICATED_PARTITIONED_TABLE_PROVIDER") - .unwrap_or_else(|_| "false".to_string()) + .unwrap_or_else(|_| "true".to_string()) == "true" } diff --git a/remote_engine_client/src/client.rs b/remote_engine_client/src/client.rs index 3ee3c7e52b..0771b6907a 100644 --- a/remote_engine_client/src/client.rs +++ b/remote_engine_client/src/client.rs @@ -29,9 +29,7 @@ use ceresdbproto::{ remote_engine::{self, read_response::Output::Arrow, remote_engine_service_client::*}, storage::arrow_payload, }; -use common_types::{ - projected_schema::ProjectedSchema, record_batch::RecordBatch, schema::RecordSchema, -}; +use common_types::{record_batch::RecordBatch, schema::RecordSchema}; use futures::{Stream, StreamExt}; use generic_error::BoxError; use log::info; @@ -40,8 +38,8 @@ use runtime::Runtime; use snafu::{ensure, OptionExt, ResultExt}; use table_engine::{ remote::model::{ - GetTableInfoRequest, ReadRequest, TableIdentifier, TableInfo, WriteBatchRequest, - WriteBatchResult, WriteRequest, + ExecutePlanRequest, GetTableInfoRequest, ReadRequest, TableIdentifier, TableInfo, + WriteBatchRequest, WriteBatchResult, WriteRequest, }, table::{SchemaId, TableId}, }; @@ -79,7 +77,7 @@ impl Client { // Read from remote. let table_ident = request.table.clone(); - let projected_schema = request.read_request.projected_schema.clone(); + let record_schema = request.read_request.projected_schema.to_record_schema(); let mut rpc_client = RemoteEngineServiceClient::::new(route_context.channel); let request_pb = ceresdbproto::remote_engine::ReadRequest::try_from(request) .box_err() @@ -110,7 +108,7 @@ impl Client { // evict cache entry. let response = response.into_inner(); let remote_read_record_batch_stream = - ClientReadRecordBatchStream::new(table_ident, response, projected_schema); + ClientReadRecordBatchStream::new(table_ident, response, record_schema); Ok(remote_read_record_batch_stream) } @@ -323,6 +321,52 @@ impl Client { } } + pub async fn execute_physical_plan( + &self, + request: ExecutePlanRequest, + ) -> Result { + // Find the channel from router firstly. + let route_context = self.cached_router.route(&request.table).await?; + + // Execute plan from remote. + let table_ident = request.table; + let plan_schema = request.plan_schema; + let mut rpc_client = RemoteEngineServiceClient::::new(route_context.channel); + let request_pb = + ceresdbproto::remote_engine::ExecutePlanRequest::try_from(request.remote_request) + .box_err() + .context(Convert { + msg: "Failed to convert RemoteExecuteRequest to pb", + })?; + + let result = rpc_client + .execute_physical_plan(Request::new(request_pb)) + .await + .with_context(|| Rpc { + table_idents: vec![table_ident.clone()], + msg: "Failed to read from remote engine", + }); + + let response = match result { + Ok(response) => response, + + Err(e) => { + // If occurred error, we simply evict the corresponding channel now. + // TODO: evict according to the type of error. + self.evict_route_from_cache(&[table_ident]).await; + return Err(e); + } + }; + + // When success to get the stream, table has been found in remote, not need to + // evict cache entry. + let response = response.into_inner(); + let remote_execute_plan_stream = + ClientReadRecordBatchStream::new(table_ident, response, plan_schema); + + Ok(remote_execute_plan_stream) + } + async fn evict_route_from_cache(&self, table_idents: &[TableIdentifier]) { info!( "Remote engine client evict route from cache, table_ident:{:?}", @@ -335,22 +379,19 @@ impl Client { pub struct ClientReadRecordBatchStream { pub table_ident: TableIdentifier, pub response_stream: Streaming, - pub projected_schema: ProjectedSchema, - pub projected_record_schema: RecordSchema, + pub record_schema: RecordSchema, } impl ClientReadRecordBatchStream { pub fn new( table_ident: TableIdentifier, response_stream: Streaming, - projected_schema: ProjectedSchema, + record_schema: RecordSchema, ) -> Self { - let projected_record_schema = projected_schema.to_record_schema(); Self { table_ident, response_stream, - projected_schema, - projected_record_schema, + record_schema, } } } diff --git a/remote_engine_client/src/lib.rs b/remote_engine_client/src/lib.rs index 94238ee47e..cf57315c89 100644 --- a/remote_engine_client/src/lib.rs +++ b/remote_engine_client/src/lib.rs @@ -40,7 +40,10 @@ use snafu::ResultExt; use table_engine::{ remote::{ self, - model::{GetTableInfoRequest, ReadRequest, TableInfo, WriteBatchResult, WriteRequest}, + model::{ + ExecutePlanRequest, GetTableInfoRequest, ReadRequest, TableInfo, WriteBatchResult, + WriteRequest, + }, RemoteEngine, }, stream::{self, ErrWithSource, RecordBatchStream, SendableRecordBatchStream}, @@ -167,6 +170,19 @@ impl RemoteEngine for RemoteEngineImpl { .box_err() .context(remote::GetTableInfo) } + + async fn execute_physical_plan( + &self, + request: ExecutePlanRequest, + ) -> remote::Result { + let client_read_stream = self + .client + .execute_physical_plan(request) + .await + .box_err() + .context(remote::ExecutePhysicalPlan)?; + Ok(Box::pin(RemoteReadRecordBatchStream(client_read_stream))) + } } impl fmt::Debug for RemoteEngineImpl { @@ -199,6 +215,6 @@ impl Stream for RemoteReadRecordBatchStream { impl RecordBatchStream for RemoteReadRecordBatchStream { fn schema(&self) -> &RecordSchema { - &self.0.projected_record_schema + &self.0.record_schema } } diff --git a/server/src/grpc/remote_engine_service/mod.rs b/server/src/grpc/remote_engine_service/mod.rs index 101df3a8e0..8938f4e400 100644 --- a/server/src/grpc/remote_engine_service/mod.rs +++ b/server/src/grpc/remote_engine_service/mod.rs @@ -27,14 +27,15 @@ use async_trait::async_trait; use catalog::{manager::ManagerRef, schema::SchemaRef}; use ceresdbproto::{ remote_engine::{ - read_response::Output::Arrow, remote_engine_service_server::RemoteEngineService, row_group, + execute_plan_request, read_response::Output::Arrow, + remote_engine_service_server::RemoteEngineService, row_group, ExecContext, ExecutePlanRequest, GetTableInfoRequest, GetTableInfoResponse, ReadRequest, ReadResponse, WriteBatchRequest, WriteRequest, WriteResponse, }, storage::{arrow_payload, ArrowPayload}, }; -use common_types::record_batch::RecordBatch; -use futures::stream::{self, BoxStream, FuturesUnordered, Stream, StreamExt}; +use common_types::{record_batch::RecordBatch, request_id::RequestId}; +use futures::stream::{self, BoxStream, FuturesUnordered, StreamExt}; use generic_error::BoxError; use log::{error, info}; use notifier::notifier::{ExecutionGuard, RequestNotifiers, RequestResult}; @@ -42,16 +43,21 @@ use proxy::{ hotspot::{HotspotRecorder, Message}, instance::InstanceRef, }; +use query_engine::{ + datafusion_impl::physical_plan::{DataFusionPhysicalPlanAdapter, TypedPlan}, + QueryEngineRef, QueryEngineType, +}; use snafu::{OptionExt, ResultExt}; use table_engine::{ engine::EngineRuntimes, predicate::PredicateRef, remote::model::{self, TableIdentifier}, - stream::PartitionedStreams, + stream::{PartitionedStreams, SendableRecordBatchStream}, table::TableRef, }; use time_ext::InstantExt; use tokio::sync::mpsc::{self, Receiver, Sender}; +use tokio_stream::Stream; use tonic::{Request, Response, Status}; use super::metrics::REMOTE_ENGINE_WRITE_BATCH_NUM_ROWS_HISTOGRAM; @@ -118,6 +124,69 @@ impl Drop for StreamWithMetric { } } +macro_rules! record_stream_to_response_stream { + ($record_stream_result:ident, $StreamType:ident) => { + match $record_stream_result { + Ok(stream) => { + let new_stream: Self::$StreamType = Box::pin(stream.map(|res| match res { + Ok(record_batch) => { + let resp = match ipc::encode_record_batch( + &record_batch.into_arrow_record_batch(), + CompressOptions { + compress_min_length: DEFAULT_COMPRESS_MIN_LENGTH, + method: CompressionMethod::Zstd, + }, + ) + .box_err() + .context(ErrWithCause { + code: StatusCode::Internal, + msg: "encode record batch failed", + }) { + Err(e) => ReadResponse { + header: Some(error::build_err_header(e)), + ..Default::default() + }, + Ok(CompressOutput { payload, method }) => { + let compression = match method { + CompressionMethod::None => arrow_payload::Compression::None, + CompressionMethod::Zstd => arrow_payload::Compression::Zstd, + }; + + ReadResponse { + header: Some(error::build_ok_header()), + output: Some(Arrow(ArrowPayload { + record_batches: vec![payload], + compression: compression as i32, + })), + } + } + }; + + Ok(resp) + } + Err(e) => { + let resp = ReadResponse { + header: Some(error::build_err_header(e)), + ..Default::default() + }; + Ok(resp) + } + })); + + Ok(Response::new(new_stream)) + } + Err(e) => { + let resp = ReadResponse { + header: Some(error::build_err_header(e)), + ..Default::default() + }; + let stream = stream::once(async { Ok(resp) }); + Ok(Response::new(Box::pin(stream))) + } + } + }; +} + #[derive(Clone)] pub struct QueryDedup { pub config: QueryDedupConfig, @@ -450,6 +519,32 @@ impl RemoteEngineServiceImpl { Ok(Response::new(batch_resp)) } + async fn execute_physical_plan_internal( + &self, + request: Request, + ) -> Result { + let instant = Instant::now(); + let request = request.into_inner(); + let query_engine = self.instance.query_engine.clone(); + + let stream_result = self + .runtimes + .read_runtime + .spawn(async move { handle_execute_plan(request, query_engine).await }) + .await + .box_err() + .with_context(|| ErrWithCause { + code: StatusCode::Internal, + msg: "failed to run execute physical plan task", + })?; + + REMOTE_ENGINE_GRPC_HANDLER_DURATION_HISTOGRAM_VEC + .stream_read + .observe(instant.saturating_elapsed().as_secs_f64()); + + stream_result + } + fn handler_ctx(&self) -> HandlerContext { HandlerContext { catalog_manager: self.instance.catalog_manager.clone(), @@ -489,64 +584,7 @@ impl RemoteEngineService for RemoteEngineServiceImpl { None => self.stream_read_internal(request).await, }; - match result { - Ok(stream) => { - let new_stream: Self::ReadStream = Box::pin(stream.map(|res| match res { - Ok(record_batch) => { - let resp = match ipc::encode_record_batch( - &record_batch.into_arrow_record_batch(), - CompressOptions { - compress_min_length: DEFAULT_COMPRESS_MIN_LENGTH, - method: CompressionMethod::Zstd, - }, - ) - .box_err() - .context(ErrWithCause { - code: StatusCode::Internal, - msg: "encode record batch failed", - }) { - Err(e) => ReadResponse { - header: Some(error::build_err_header(e)), - ..Default::default() - }, - Ok(CompressOutput { payload, method }) => { - let compression = match method { - CompressionMethod::None => arrow_payload::Compression::None, - CompressionMethod::Zstd => arrow_payload::Compression::Zstd, - }; - - ReadResponse { - header: Some(error::build_ok_header()), - output: Some(Arrow(ArrowPayload { - record_batches: vec![payload], - compression: compression as i32, - })), - } - } - }; - - Ok(resp) - } - Err(e) => { - let resp = ReadResponse { - header: Some(error::build_err_header(e)), - ..Default::default() - }; - Ok(resp) - } - })); - - Ok(Response::new(new_stream)) - } - Err(e) => { - let resp = ReadResponse { - header: Some(error::build_err_header(e)), - ..Default::default() - }; - let stream = stream::once(async { Ok(resp) }); - Ok(Response::new(Box::pin(stream))) - } - } + record_stream_to_response_stream!(result, ReadStream) } async fn write( @@ -572,9 +610,21 @@ impl RemoteEngineService for RemoteEngineServiceImpl { async fn execute_physical_plan( &self, - _request: Request, + request: Request, ) -> std::result::Result, Status> { - todo!() + let record_stream_result = + self.execute_physical_plan_internal(request) + .await + .map(|stream| { + stream.map(|batch_result| { + batch_result.box_err().with_context(|| ErrWithCause { + code: StatusCode::Internal, + msg: "failed to poll record batch", + }) + }) + }); + + record_stream_to_response_stream!(record_stream_result, ExecutePhysicalPlanStream) } } @@ -780,6 +830,72 @@ async fn handle_get_table_info( }) } +async fn handle_execute_plan( + request: ExecutePlanRequest, + query_engine: QueryEngineRef, +) -> Result { + // Build execution context. + let ctx_in_req = request.context.with_context(|| ErrNoCause { + code: StatusCode::Internal, + msg: "execution context not found in physical plan request", + })?; + + let ExecContext { + request_id, + default_catalog, + default_schema, + timeout_ms, + } = ctx_in_req; + + let request_id = RequestId::from(request_id); + let deadline = if timeout_ms >= 0 { + Some(Instant::now() + Duration::from_millis(timeout_ms as u64)) + } else { + None + }; + + let exec_ctx = query_engine::context::Context { + request_id, + deadline, + default_catalog, + default_schema, + }; + + // Build physical plan. + let typed_plan_in_req = request.physical_plan.with_context(|| ErrNoCause { + code: StatusCode::Internal, + msg: "plan not found in physical plan request", + })?; + // FIXME: return the type from query engine. + let valid_plan = check_and_extract_plan(typed_plan_in_req, QueryEngineType::Datafusion)?; + // TODO: Build remote plan in physical planner. + let physical_plan = Box::new(DataFusionPhysicalPlanAdapter::new(TypedPlan::Remote( + valid_plan, + ))); + + // Execute plan. + let executor = query_engine.executor(); + executor + .execute(&exec_ctx, physical_plan) + .await + .box_err() + .with_context(|| ErrWithCause { + code: StatusCode::Internal, + msg: "failed to execute remote plan", + }) +} + +fn check_and_extract_plan( + typed_plan: execute_plan_request::PhysicalPlan, + engine_type: QueryEngineType, +) -> Result> { + match (typed_plan, engine_type) { + (execute_plan_request::PhysicalPlan::Datafusion(plan), QueryEngineType::Datafusion) => { + Ok(plan) + } + } +} + fn find_table_by_identifier( ctx: &HandlerContext, table_identifier: &TableIdentifier, diff --git a/server/src/server.rs b/server/src/server.rs index 25928b0cca..9e6e364343 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use analytic_engine::setup::OpenedWals; use catalog::manager::ManagerRef; use cluster::ClusterRef; +use datafusion::execution::{runtime_env::RuntimeConfig, FunctionRegistry}; use df_operator::registry::FunctionRegistryRef; use interpreters::table_manipulator::TableManipulatorRef; use log::{info, warn}; @@ -33,11 +34,14 @@ use proxy::{ schema_config_provider::SchemaConfigProviderRef, Proxy, }; -use query_engine::QueryEngineRef; +use query_engine::{QueryEngineBuilder, QueryEngineType}; use remote_engine_client::RemoteEngineImpl; use router::{endpoint::Endpoint, RouterRef}; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; -use table_engine::engine::{EngineRuntimes, TableEngineRef}; +use table_engine::{ + engine::{EngineRuntimes, TableEngineRef}, + remote::RemoteEngineRef, +}; use crate::{ config::ServerConfig, @@ -85,6 +89,15 @@ pub enum Error { #[snafu(display("Missing limiter.\nBacktrace:\n{}", backtrace))] MissingLimiter { backtrace: Backtrace }, + #[snafu(display("Missing datafusion context.\nBacktrace:\n{}", backtrace))] + MissingDatafusionContext { backtrace: Backtrace }, + + #[snafu(display("Missing query engine config.\nBacktrace:\n{}", backtrace))] + MissingQueryEngineConfig { backtrace: Backtrace }, + + #[snafu(display("Missing config content.\nBacktrace:\n{}", backtrace))] + MissingConfigContent { backtrace: Backtrace }, + #[snafu(display("Http service failed, msg:{}, err:{}", msg, source))] HttpService { msg: String, @@ -117,6 +130,9 @@ pub enum Error { #[snafu(display("Failed to open tables in standalone mode, err:{}", source))] OpenLocalTables { source: local_tables::Error }, + + #[snafu(display("Failed to build query engine, err:{source}"))] + BuildQueryEngine { source: query_engine::error::Error }, } define_result!(Error); @@ -212,13 +228,12 @@ impl Server { #[must_use] pub struct Builder { server_config: ServerConfig, - remote_engine_client_config: remote_engine_client::config::Config, node_addr: String, + query_engine_config: Option, config_content: Option, engine_runtimes: Option>, log_runtime: Option>, catalog_manager: Option, - query_engine: Option, table_engine: Option, table_manipulator: Option, function_registry: Option, @@ -228,19 +243,20 @@ pub struct Builder { schema_config_provider: Option, local_tables_recoverer: Option, opened_wals: Option, + remote_engine: Option, + datatfusion_context: Option, } impl Builder { pub fn new(config: ServerConfig) -> Self { Self { server_config: config, - remote_engine_client_config: remote_engine_client::Config::default(), node_addr: "".to_string(), + query_engine_config: None, config_content: None, engine_runtimes: None, log_runtime: None, catalog_manager: None, - query_engine: None, table_engine: None, table_manipulator: None, function_registry: None, @@ -250,6 +266,8 @@ impl Builder { schema_config_provider: None, local_tables_recoverer: None, opened_wals: None, + remote_engine: None, + datatfusion_context: None, } } @@ -263,6 +281,14 @@ impl Builder { self } + pub fn query_engine_config( + mut self, + query_engine_config: query_engine::config::Config, + ) -> Self { + self.query_engine_config = Some(query_engine_config); + self + } + pub fn engine_runtimes(mut self, engine_runtimes: Arc) -> Self { self.engine_runtimes = Some(engine_runtimes); self @@ -278,11 +304,6 @@ impl Builder { self } - pub fn query_engine(mut self, val: QueryEngineRef) -> Self { - self.query_engine = Some(val); - self - } - pub fn table_engine(mut self, val: TableEngineRef) -> Self { self.table_engine = Some(val); self @@ -331,11 +352,20 @@ impl Builder { self } + pub fn remote_engine(mut self, remote_engine: RemoteEngineRef) -> Self { + self.remote_engine = Some(remote_engine); + self + } + + pub fn datafusion_context(mut self, datafusion_context: DatafusionContext) -> Self { + self.datatfusion_context = Some(datafusion_context); + self + } + /// Build and run the server pub fn build(self) -> Result { // Build instance let catalog_manager = self.catalog_manager.context(MissingCatalogManager)?; - let query_engine = self.query_engine.context(MissingQueryEngine)?; let table_engine = self.table_engine.context(MissingTableEngine)?; let table_manipulator = self.table_manipulator.context(MissingTableManipulator)?; let function_registry = self.function_registry.context(MissingFunctionRegistry)?; @@ -346,20 +376,37 @@ impl Builder { .context(MissingSchemaConfigProvider)?; let log_runtime = self.log_runtime.context(MissingLogRuntime)?; let engine_runtimes = self.engine_runtimes.context(MissingEngineRuntimes)?; - let config_content = self.config_content.expect("Missing config content"); + let config_content = self.config_content.context(MissingConfigContent)?; + let query_engine_config = self.query_engine_config.context(MissingQueryEngineConfig)?; + let datafusion_context = self.datatfusion_context.context(MissingDatafusionContext)?; let hotspot_recorder = Arc::new(HotspotRecorder::new( self.server_config.hotspot, engine_runtimes.default_runtime.clone(), )); + + // Build remote engine. let remote_engine_ref = Arc::new(RemoteEngineImpl::new( - self.remote_engine_client_config.clone(), + self.server_config.remote_client.clone(), router.clone(), engine_runtimes.io_runtime.clone(), )); + // Build partitioned table engine. + // TODO: remove the partitioned table engine. let partition_table_engine = Arc::new(PartitionTableEngine::new(remote_engine_ref.clone())); + // Build query engine. + let query_engine_builder = QueryEngineBuilder::default() + .config(query_engine_config) + .catalog_manager(catalog_manager.clone()) + .df_function_registry(datafusion_context.function_registry) + .df_runtime_config(datafusion_context.runtime_config) + .remote_engine(remote_engine_ref.clone()); + let query_engine = query_engine_builder + .build(QueryEngineType::Datafusion) + .context(BuildQueryEngine)?; + let instance = { let instance = Instance { catalog_manager, @@ -469,3 +516,8 @@ impl Builder { Ok(server) } } + +pub struct DatafusionContext { + pub function_registry: Arc, + pub runtime_config: RuntimeConfig, +} diff --git a/src/ceresdb/src/config.rs b/src/ceresdb/src/config.rs index 5659f25274..8d991524c1 100644 --- a/src/ceresdb/src/config.rs +++ b/src/ceresdb/src/config.rs @@ -63,7 +63,7 @@ pub struct Config { pub analytic: analytic_engine::Config, /// Query engine config. - pub query_engine: query_engine::Config, + pub query_engine: query_engine::config::Config, /// The deployment of the server. pub cluster_deployment: Option, diff --git a/src/ceresdb/src/setup.rs b/src/ceresdb/src/setup.rs index bb2f489448..9ec3dbdd53 100644 --- a/src/ceresdb/src/setup.rs +++ b/src/ceresdb/src/setup.rs @@ -25,7 +25,7 @@ use catalog::{manager::ManagerRef, schema::OpenOptions, table_operator::TableOpe use catalog_impls::{table_based::TableBasedManager, volatile, CatalogManagerImpl}; use cluster::{cluster_impl::ClusterImpl, config::ClusterConfig, shard_set::ShardSet}; use datafusion::execution::runtime_env::RuntimeConfig as DfRuntimeConfig; -use df_operator::registry::FunctionRegistryImpl; +use df_operator::registry::{FunctionRegistry, FunctionRegistryImpl}; use interpreters::table_manipulator::{catalog_based, meta_based}; use log::info; use logger::RuntimeLevel; @@ -36,12 +36,11 @@ use proxy::{ cluster_based::ClusterBasedProvider, config_based::ConfigBasedProvider, }, }; -use query_engine::datafusion_impl::DatafusionQueryEngineImpl; use router::{rule_based::ClusterView, ClusterBasedRouter, RuleBasedRouter}; use server::{ config::{StaticRouteConfig, StaticTopologyConfig}, local_tables::LocalTablesRecoverer, - server::Builder, + server::{Builder, DatafusionContext}, }; use table_engine::{engine::EngineRuntimes, memory::MemoryTableEngine, proxy::TableEngineProxy}; use tracing_util::{ @@ -125,13 +124,10 @@ async fn run_server_with_runtimes( .load_functions() .expect("Failed to create function registry"); let function_registry = Arc::new(function_registry); - - // Create query engine - // TODO: use a builder to support different query engine? - let query_engine = Box::new( - DatafusionQueryEngineImpl::new(config.query_engine.clone(), DfRuntimeConfig::default()) - .expect("Failed to init datafusion query engine"), - ); + let datafusion_context = DatafusionContext { + function_registry: function_registry.clone().to_df_function_registry(), + runtime_config: DfRuntimeConfig::default(), + }; // Config limiter let limiter = Limiter::new(config.limiter.clone()); @@ -142,9 +138,10 @@ async fn run_server_with_runtimes( .config_content(config_content) .engine_runtimes(engine_runtimes.clone()) .log_runtime(log_runtime.clone()) - .query_engine(query_engine) .function_registry(function_registry) - .limiter(limiter); + .limiter(limiter) + .datafusion_context(datafusion_context) + .query_engine_config(config.query_engine.clone()); let wal_builder = T::default(); let builder = match &config.cluster_deployment { diff --git a/table_engine/src/memory.rs b/table_engine/src/memory.rs index 4ae5dba9fd..678e2276e5 100644 --- a/table_engine/src/memory.rs +++ b/table_engine/src/memory.rs @@ -41,7 +41,7 @@ use crate::{ }, remote::{ self, - model::{self, GetTableInfoRequest, WriteBatchResult}, + model::{self, ExecutePlanRequest, GetTableInfoRequest, WriteBatchResult}, RemoteEngine, }, stream::{ @@ -347,24 +347,31 @@ impl RemoteEngine for MockRemoteEngine { &self, _request: remote::model::ReadRequest, ) -> remote::Result { - todo!() + unimplemented!() } async fn write(&self, _request: remote::model::WriteRequest) -> remote::Result { - todo!() + unimplemented!() } async fn write_batch( &self, _requests: Vec, ) -> remote::Result> { - todo!() + unimplemented!() } async fn get_table_info( &self, _request: GetTableInfoRequest, ) -> remote::Result { - todo!() + unimplemented!() + } + + async fn execute_physical_plan( + &self, + _request: ExecutePlanRequest, + ) -> remote::Result { + unimplemented!() } } diff --git a/table_engine/src/provider.rs b/table_engine/src/provider.rs index d3dfd3febd..b0ab4116e1 100644 --- a/table_engine/src/provider.rs +++ b/table_engine/src/provider.rs @@ -53,6 +53,8 @@ const SCAN_TABLE_METRICS_COLLECTOR_NAME: &str = "scan_table"; pub struct CeresdbOptions { pub request_id: u64, pub request_timeout: Option, + pub default_schema: String, + pub default_catalog: String, } impl ConfigExtension for CeresdbOptions { @@ -341,7 +343,7 @@ impl ScanTable { } } - async fn maybe_init_stream(&mut self) -> Result<()> { + pub async fn maybe_init_stream(&mut self) -> Result<()> { let read_res = self.table.partitioned_read(self.request.clone()).await; let mut stream_state = self.stream_state.lock().unwrap(); diff --git a/table_engine/src/remote/mod.rs b/table_engine/src/remote/mod.rs index b9cc470498..e48d3e3284 100644 --- a/table_engine/src/remote/mod.rs +++ b/table_engine/src/remote/mod.rs @@ -25,7 +25,7 @@ use model::{ReadRequest, WriteRequest}; use snafu::Snafu; use crate::{ - remote::model::{GetTableInfoRequest, TableInfo, WriteBatchResult}, + remote::model::{ExecutePlanRequest, GetTableInfoRequest, TableInfo, WriteBatchResult}, stream::SendableRecordBatchStream, }; @@ -40,6 +40,9 @@ pub enum Error { #[snafu(display("Failed to get table info from remote, err:{}", source))] GetTableInfo { source: GenericError }, + + #[snafu(display("Failed to execute physical plan from remote, err:{}", source))] + ExecutePhysicalPlan { source: GenericError }, } define_result!(Error); @@ -56,6 +59,11 @@ pub trait RemoteEngine: fmt::Debug + Send + Sync { async fn write_batch(&self, requests: Vec) -> Result>; async fn get_table_info(&self, request: GetTableInfoRequest) -> Result; + + async fn execute_physical_plan( + &self, + request: ExecutePlanRequest, + ) -> Result; } /// Remote engine reference diff --git a/table_engine/src/remote/model.rs b/table_engine/src/remote/model.rs index 3aea5789bb..5e8a0deab4 100644 --- a/table_engine/src/remote/model.rs +++ b/table_engine/src/remote/model.rs @@ -14,16 +14,17 @@ //! Model for remote table engine -use std::collections::HashMap; +use std::{collections::HashMap, time::Instant}; -use bytes_ext::ByteVec; -use ceresdbproto::remote_engine::{self, row_group::Rows::Contiguous}; +use bytes_ext::{ByteVec, Bytes}; +use ceresdbproto::remote_engine::{self, execute_plan_request, row_group::Rows::Contiguous}; use common_types::{ + request_id::RequestId, row::{ contiguous::{ContiguousRow, ContiguousRowReader, ContiguousRowWriter}, Row, RowGroup, RowGroupBuilder, }, - schema::{IndexInWriterSchema, Schema}, + schema::{IndexInWriterSchema, RecordSchema, Schema}, }; use generic_error::{BoxError, GenericError, GenericResult}; use macros::define_result; @@ -33,6 +34,7 @@ use crate::{ partition::PartitionInfo, table::{ ReadRequest as TableReadRequest, SchemaId, TableId, WriteRequest as TableWriteRequest, + NO_TIMEOUT, }, }; @@ -277,3 +279,82 @@ pub struct TableInfo { /// Partition Info pub partition_info: Option, } + +/// Request for remote executing physical plan +pub struct ExecutePlanRequest { + /// Table information for routing + pub table: TableIdentifier, + + /// Schema of the encoded physical plan + pub plan_schema: RecordSchema, + + /// Remote plan execution request + pub remote_request: RemoteExecuteRequest, +} + +impl ExecutePlanRequest { + pub fn new( + table: TableIdentifier, + plan_schema: RecordSchema, + context: ExecContext, + physical_plan: PhysicalPlan, + ) -> Self { + let remote_request = RemoteExecuteRequest { + context, + physical_plan, + }; + + Self { + table, + plan_schema, + remote_request, + } + } +} + +pub struct RemoteExecuteRequest { + /// Execution Context + pub context: ExecContext, + + /// Physical plan for remote execution + pub physical_plan: PhysicalPlan, +} + +pub struct ExecContext { + pub request_id: RequestId, + pub deadline: Option, + pub default_catalog: String, + pub default_schema: String, +} + +pub enum PhysicalPlan { + Datafusion(Bytes), +} + +impl From for ceresdbproto::remote_engine::ExecutePlanRequest { + fn from(value: RemoteExecuteRequest) -> Self { + let rest_duration_ms = if let Some(deadline) = value.context.deadline { + deadline.duration_since(Instant::now()).as_millis() as i64 + } else { + NO_TIMEOUT + }; + + let pb_context = ceresdbproto::remote_engine::ExecContext { + request_id: value.context.request_id.as_u64(), + default_catalog: value.context.default_catalog, + default_schema: value.context.default_schema, + timeout_ms: rest_duration_ms, + }; + + let pb_plan = match value.physical_plan { + PhysicalPlan::Datafusion(plan) => { + execute_plan_request::PhysicalPlan::Datafusion(plan.to_vec()) + } + }; + + Self { + context: Some(pb_context), + physical_plan: Some(pb_plan), + } + } +} diff --git a/table_engine/src/table.rs b/table_engine/src/table.rs index c79d0f2cac..df2af7a725 100644 --- a/table_engine/src/table.rs +++ b/table_engine/src/table.rs @@ -164,7 +164,7 @@ define_result!(Error); /// Default partition num to scan in parallelism. pub const DEFAULT_READ_PARALLELISM: usize = 8; -const NO_TIMEOUT: i64 = -1; +pub const NO_TIMEOUT: i64 = -1; /// Schema id (24 bits) #[derive(Debug, Clone, Copy, PartialEq, Eq)]