Skip to content

Commit

Permalink
feat: teach ceresdb to run the whole dist query process (#1204)
Browse files Browse the repository at this point in the history
## Rationale
Closes #1112 
The final part of #1112, we may do distributed query in the new way
after this pr.

## Detailed Changes
+ Impl rpc service to support physical plan's remote execution.
+ Refactor query engine to support the new query process.

## Test Plan
Test by exist tests.
  • Loading branch information
Rachelint authored Sep 27, 2023
1 parent 0e79f9c commit 28b37a3
Show file tree
Hide file tree
Showing 27 changed files with 1,001 additions and 284 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 1 addition & 8 deletions df_engine_extensions/src/dist_sql_query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,12 @@
use std::{fmt, sync::Arc};

use async_trait::async_trait;
use common_types::schema::RecordSchema;
use datafusion::{
error::Result as DfResult,
execution::TaskContext,
physical_plan::{ExecutionPlan, SendableRecordBatchStream},
};
use futures::future::BoxFuture;
use prost::bytes::Bytes;
use table_engine::{
remote::model::TableIdentifier,
table::{ReadRequest, TableRef},
Expand All @@ -40,15 +38,10 @@ pub trait RemotePhysicalPlanExecutor: fmt::Debug + Send + Sync + 'static {
&self,
table: TableIdentifier,
task_context: &TaskContext,
encoded_plan: EncodedPlan,
plan: Arc<dyn ExecutionPlan>,
) -> DfResult<BoxFuture<'static, DfResult<SendableRecordBatchStream>>>;
}

pub struct EncodedPlan {
pub plan: Bytes,
pub schema: RecordSchema,
}

type RemotePhysicalPlanExecutorRef = Arc<dyn RemotePhysicalPlanExecutor>;

/// Executable scan's builder
Expand Down
26 changes: 2 additions & 24 deletions df_engine_extensions/src/dist_sql_query/physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use std::{
};

use arrow::{datatypes::SchemaRef as ArrowSchemaRef, record_batch::RecordBatch};
use common_types::schema::RecordSchema;
use datafusion::{
error::{DataFusionError, Result as DfResult},
execution::TaskContext,
Expand All @@ -33,13 +32,10 @@ use datafusion::{
SendableRecordBatchStream as DfSendableRecordBatchStream, Statistics,
},
};
use datafusion_proto::{
bytes::physical_plan_to_bytes_with_extension_codec, physical_plan::PhysicalExtensionCodec,
};
use futures::{future::BoxFuture, FutureExt, Stream, StreamExt};
use table_engine::{remote::model::TableIdentifier, table::ReadRequest};

use crate::dist_sql_query::{EncodedPlan, RemotePhysicalPlanExecutor};
use crate::dist_sql_query::RemotePhysicalPlanExecutor;

/// Placeholder of partitioned table's scan plan
/// It is inexecutable actually and just for carrying the necessary information
Expand Down Expand Up @@ -117,7 +113,6 @@ impl DisplayAs for UnresolvedPartitionedScan {
pub struct ResolvedPartitionedScan {
pub remote_executor: Arc<dyn RemotePhysicalPlanExecutor>,
pub remote_exec_plans: Vec<(TableIdentifier, Arc<dyn ExecutionPlan>)>,
pub extension_codec: Arc<dyn PhysicalExtensionCodec>,
}

impl ResolvedPartitionedScan {
Expand All @@ -139,7 +134,6 @@ impl ResolvedPartitionedScan {
let plan = ResolvedPartitionedScan {
remote_executor: self.remote_executor.clone(),
remote_exec_plans: new_plans,
extension_codec: self.extension_codec.clone(),
};

Ok(Arc::new(plan))
Expand Down Expand Up @@ -187,26 +181,10 @@ impl ExecutionPlan for ResolvedPartitionedScan {
) -> DfResult<DfSendableRecordBatchStream> {
let (sub_table, plan) = &self.remote_exec_plans[partition];

// Encode to build `EncodedPlan`.
let plan_bytes = physical_plan_to_bytes_with_extension_codec(
plan.clone(),
self.extension_codec.as_ref(),
)?;
let record_schema = RecordSchema::try_from(plan.schema()).map_err(|e| {
DataFusionError::Internal(format!(
"failed to convert arrow_schema to record_schema, arrow_schema:{}, err:{e}",
plan.schema()
))
})?;
let encoded_plan = EncodedPlan {
plan: plan_bytes,
schema: record_schema,
};

// Send plan for remote execution.
let stream_future =
self.remote_executor
.execute(sub_table.clone(), &context, encoded_plan)?;
.execute(sub_table.clone(), &context, plan.clone())?;
let record_stream = PartitionedScanStream::new(stream_future, plan.schema());

Ok(Box::pin(record_stream))
Expand Down
67 changes: 9 additions & 58 deletions df_engine_extensions/src/dist_sql_query/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,13 @@ use async_recursion::async_recursion;
use catalog::manager::ManagerRef as CatalogManagerRef;
use datafusion::{
error::{DataFusionError, Result as DfResult},
execution::{runtime_env::RuntimeEnv, FunctionRegistry},
physical_plan::ExecutionPlan,
};
use datafusion_proto::{
physical_plan::{AsExecutionPlan, PhysicalExtensionCodec},
protobuf,
};
use prost::Message;
use table_engine::{remote::model::TableIdentifier, table::TableRef};

use crate::{
codec::PhysicalExtensionCodecImpl,
dist_sql_query::{
physical_plan::{
ResolvedPartitionedScan, UnresolvedPartitionedScan, UnresolvedSubTableScan,
},
ExecutableScanBuilderRef, RemotePhysicalPlanExecutorRef,
},
use crate::dist_sql_query::{
physical_plan::{ResolvedPartitionedScan, UnresolvedPartitionedScan, UnresolvedSubTableScan},
ExecutableScanBuilderRef, RemotePhysicalPlanExecutorRef,
};

/// Resolver which makes datafuison dist query related plan executable.
Expand All @@ -50,29 +39,18 @@ pub struct Resolver {
remote_executor: RemotePhysicalPlanExecutorRef,
catalog_manager: CatalogManagerRef,
scan_builder: ExecutableScanBuilderRef,

// TODO: hold `SessionContext` here rather than these two parts.
runtime_env: Arc<RuntimeEnv>,
function_registry: Arc<dyn FunctionRegistry + Send + Sync>,

extension_codec: Arc<dyn PhysicalExtensionCodec>,
}

impl Resolver {
pub fn new(
remote_executor: RemotePhysicalPlanExecutorRef,
catalog_manager: CatalogManagerRef,
scan_builder: ExecutableScanBuilderRef,
runtime_env: Arc<RuntimeEnv>,
function_registry: Arc<dyn FunctionRegistry + Send + Sync>,
) -> Self {
Self {
remote_executor,
catalog_manager,
scan_builder,
runtime_env,
function_registry,
extension_codec: Arc::new(PhysicalExtensionCodecImpl::new()),
}
}

Expand All @@ -98,7 +76,6 @@ impl Resolver {
return Ok(Arc::new(ResolvedPartitionedScan {
remote_executor: self.remote_executor.clone(),
remote_exec_plans: remote_plans,
extension_codec: self.extension_codec.clone(),
}));
}

Expand All @@ -119,25 +96,8 @@ impl Resolver {
plan.with_new_children(new_children)
}

/// Resolve encoded sub table scanning plan.
// TODO: because we need to async init the `ScanTable` plan before executing,
// so sub scan plan's resolving can just be async now...
pub async fn resolve_sub_scan(&self, encoded_plan: &[u8]) -> DfResult<Arc<dyn ExecutionPlan>> {
// Decode to datafusion physical plan.
let protobuf = protobuf::PhysicalPlanNode::decode(encoded_plan).map_err(|e| {
DataFusionError::Plan(format!("failed to decode bytes to physical plan, err:{e}"))
})?;
let plan = protobuf.try_into_physical_plan(
self.function_registry.as_ref(),
&self.runtime_env,
self.extension_codec.as_ref(),
)?;

self.resolve_sub_scan_internal(plan).await
}

#[async_recursion]
async fn resolve_sub_scan_internal(
pub async fn resolve_sub_scan(
&self,
plan: Arc<dyn ExecutionPlan>,
) -> DfResult<Arc<dyn ExecutionPlan>> {
Expand Down Expand Up @@ -165,7 +125,7 @@ impl Resolver {
// Resolve children if exist.
let mut new_children = Vec::with_capacity(children.len());
for child in children {
let child = self.resolve_sub_scan_internal(child).await?;
let child = self.resolve_sub_scan(child).await?;

new_children.push(child);
}
Expand Down Expand Up @@ -215,15 +175,9 @@ mod test {
let ctx = TestContext::new();
let plan = ctx.build_basic_sub_table_plan();
let resolver = ctx.resolver();
let new_plan = displayable(
resolver
.resolve_sub_scan_internal(plan)
.await
.unwrap()
.as_ref(),
)
.indent(true)
.to_string();
let new_plan = displayable(resolver.resolve_sub_scan(plan).await.unwrap().as_ref())
.indent(true)
.to_string();
insta::assert_snapshot!(new_plan);
}

Expand All @@ -243,10 +197,7 @@ mod test {
assert_eq!(original_plan_display, new_plan_display);

// It should not be processed by `resolve_sub_scan_internal`.
let new_plan = resolver
.resolve_sub_scan_internal(plan.clone())
.await
.unwrap();
let new_plan = resolver.resolve_sub_scan(plan.clone()).await.unwrap();

let new_plan_display = displayable(new_plan.as_ref()).indent(true).to_string();

Expand Down
8 changes: 3 additions & 5 deletions df_engine_extensions/src/dist_sql_query/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use catalog::{manager::ManagerRef, test_util::MockCatalogManagerBuilder};
use common_types::{projected_schema::ProjectedSchema, tests::build_schema_for_cpu};
use datafusion::{
error::{DataFusionError, Result as DfResult},
execution::{runtime_env::RuntimeEnv, FunctionRegistry, TaskContext},
execution::{FunctionRegistry, TaskContext},
logical_expr::{expr_fn, Literal, Operator},
physical_plan::{
expressions::{binary, col, lit},
Expand All @@ -51,7 +51,7 @@ use trace_metric::MetricsCollector;
use crate::dist_sql_query::{
physical_plan::{PartitionedScanStream, UnresolvedPartitionedScan, UnresolvedSubTableScan},
resolver::Resolver,
EncodedPlan, ExecutableScanBuilder, RemotePhysicalPlanExecutor,
ExecutableScanBuilder, RemotePhysicalPlanExecutor,
};

// Test context
Expand Down Expand Up @@ -187,8 +187,6 @@ impl TestContext {
Arc::new(MockRemotePhysicalPlanExecutor),
self.catalog_manager.clone(),
Box::new(MockScanBuilder),
Arc::new(RuntimeEnv::default()),
Arc::new(MockFunctionRegistry),
)
}

Expand Down Expand Up @@ -348,7 +346,7 @@ impl RemotePhysicalPlanExecutor for MockRemotePhysicalPlanExecutor {
&self,
_table: TableIdentifier,
_task_context: &TaskContext,
_encoded_plan: EncodedPlan,
_plan: Arc<dyn ExecutionPlan>,
) -> DfResult<BoxFuture<'static, DfResult<SendableRecordBatchStream>>> {
unimplemented!()
}
Expand Down
17 changes: 13 additions & 4 deletions interpreters/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ use catalog::{
use catalog_impls::table_based::TableBasedManager;
use common_types::request_id::RequestId;
use datafusion::execution::runtime_env::RuntimeConfig;
use df_operator::registry::{FunctionRegistry, FunctionRegistryImpl};
use query_engine::{datafusion_impl::DatafusionQueryEngineImpl, QueryEngineRef};
use query_frontend::{
parser::Parser, plan::Plan, planner::Planner, provider::MetaProvider, tests::MockMetaProvider,
};
use table_engine::engine::TableEngineRef;
use table_engine::{engine::TableEngineRef, memory::MockRemoteEngine};

use crate::{
context::Context,
Expand Down Expand Up @@ -369,9 +370,17 @@ async fn test_interpreters<T: EngineBuildContext>(engine_context: T) {
let catalog_manager = Arc::new(build_catalog_manager(engine.clone()).await);
let table_operator = TableOperator::new(catalog_manager.clone());
let table_manipulator = Arc::new(TableManipulatorImpl::new(table_operator));
let query_engine = Box::new(
DatafusionQueryEngineImpl::new(query_engine::Config::default(), RuntimeConfig::default())
.unwrap(),
let function_registry = Arc::new(FunctionRegistryImpl::default());
let remote_engine = Arc::new(MockRemoteEngine);
let query_engine = Arc::new(
DatafusionQueryEngineImpl::new(
query_engine::config::Config::default(),
RuntimeConfig::default(),
function_registry.to_df_function_registry(),
remote_engine,
catalog_manager.clone(),
)
.unwrap(),
);

let env = Env {
Expand Down
1 change: 1 addition & 0 deletions query_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ workspace = true
arrow = { workspace = true }
async-trait = { workspace = true }
bytes_ext = { workspace = true }
catalog = { workspace = true }
chrono = { workspace = true }
common_types = { workspace = true }
datafusion = { workspace = true }
Expand Down
36 changes: 28 additions & 8 deletions query_engine/src/datafusion_impl/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,42 @@ use time_ext::InstantExt;

use crate::{
context::Context,
datafusion_impl::DfContextBuilder,
datafusion_impl::{
task_context::{DatafusionTaskExecContext, Preprocessor},
DfContextBuilder,
},
error::*,
executor::Executor,
physical_planner::{PhysicalPlanPtr, TaskContext},
physical_planner::{PhysicalPlanPtr, TaskExecContext},
};

#[derive(Debug, Clone)]
pub struct DatafusionExecutorImpl {
// Datafuison session context builder
/// Datafuison session context builder
df_ctx_builder: Arc<DfContextBuilder>,

/// Preprocessor for processing some physical plan before executing
preprocessor: Arc<Preprocessor>,
}

impl DatafusionExecutorImpl {
pub fn new(df_ctx_builder: Arc<DfContextBuilder>) -> Self {
Self { df_ctx_builder }
pub fn new(df_ctx_builder: Arc<DfContextBuilder>, preprocessor: Arc<Preprocessor>) -> Self {
Self {
df_ctx_builder,
preprocessor,
}
}

fn task_exec_context(&self, ctx: &Context) -> TaskExecContext {
let session_ctx = self.df_ctx_builder.build(ctx);
let task_ctx = session_ctx.task_ctx();

let df_ctx = DatafusionTaskExecContext {
task_ctx,
preprocessor: self.preprocessor.clone(),
};

TaskExecContext::default().with_datafusion_context(df_ctx)
}
}

Expand All @@ -56,11 +77,10 @@ impl Executor for DatafusionExecutorImpl {
let begin_instant = Instant::now();

// TODO: build the `TaskContext` directly rather than through `SessionContext`.
let session_ctx = self.df_ctx_builder.build(ctx);
let df_task_ctx = session_ctx.task_ctx();
let task_ctx = TaskContext::default().with_datafusion_task_ctx(df_task_ctx);
let task_ctx = self.task_exec_context(ctx);
let stream = physical_plan
.execute(&task_ctx)
.await
.box_err()
.with_context(|| ExecutorWithCause {
msg: Some("failed to execute physical plan".to_string()),
Expand Down
Loading

0 comments on commit 28b37a3

Please sign in to comment.