Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: pass the TaskContext when execute physical plan rather than holding it #1163

Merged
merged 4 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 20 additions & 19 deletions interpreters/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@ use catalog::{
};
use catalog_impls::table_based::TableBasedManager;
use common_types::request_id::RequestId;
use datafusion::execution::runtime_env::RuntimeEnv;
use query_engine::{
datafusion_impl::physical_planner::DatafusionPhysicalPlannerImpl, executor::ExecutorImpl,
};
use datafusion::execution::runtime_env::RuntimeConfig;
use df_operator::registry::{FunctionRegistry, FunctionRegistryImpl};
use query_engine::{datafusion_impl::DatafusionQueryEngineImpl, QueryEngineRef};
use query_frontend::{
parser::Parser, plan::Plan, planner::Planner, provider::MetaProvider, tests::MockMetaProvider,
};
Expand Down Expand Up @@ -60,6 +59,7 @@ where
pub meta_provider: M,
pub catalog_manager: ManagerRef,
pub table_manipulator: TableManipulatorRef,
pub query_engine: QueryEngineRef,
}

impl<M> Env<M>
Expand All @@ -77,11 +77,8 @@ where
{
async fn build_factory(&self) -> Factory {
Factory::new(
Arc::new(ExecutorImpl),
Arc::new(DatafusionPhysicalPlannerImpl::new(
query_engine::Config::default(),
Arc::new(RuntimeEnv::default()),
)),
self.query_engine.executor(),
self.query_engine.physical_planner(),
self.catalog_manager.clone(),
self.engine(),
self.table_manipulator.clone(),
Expand Down Expand Up @@ -232,11 +229,8 @@ where
let table_operator = TableOperator::new(catalog_manager.clone());
let table_manipulator = Arc::new(TableManipulatorImpl::new(table_operator));
let insert_factory = Factory::new(
Arc::new(ExecutorImpl),
Arc::new(DatafusionPhysicalPlannerImpl::new(
query_engine::Config::default(),
Arc::new(RuntimeEnv::default()),
)),
self.query_engine.executor(),
self.query_engine.physical_planner(),
catalog_manager.clone(),
self.engine(),
table_manipulator.clone(),
Expand All @@ -255,11 +249,8 @@ where
let select_sql =
"SELECT key1, key2, field1, field2, field3, field4, field5 from test_missing_columns_table";
let select_factory = Factory::new(
Arc::new(ExecutorImpl),
Arc::new(DatafusionPhysicalPlannerImpl::new(
query_engine::Config::default(),
Arc::new(RuntimeEnv::default()),
)),
self.query_engine.executor(),
self.query_engine.physical_planner(),
catalog_manager,
self.engine(),
table_manipulator,
Expand Down Expand Up @@ -379,12 +370,22 @@ async fn test_interpreters<T: EngineBuildContext>(engine_context: T) {
let catalog_manager = Arc::new(build_catalog_manager(engine.clone()).await);
let table_operator = TableOperator::new(catalog_manager.clone());
let table_manipulator = Arc::new(TableManipulatorImpl::new(table_operator));
let function_registry = Arc::new(FunctionRegistryImpl::default());
let query_engine = Box::new(
DatafusionQueryEngineImpl::new(
query_engine::Config::default(),
RuntimeConfig::default(),
function_registry.to_df_function_registry(),
)
.unwrap(),
);

let env = Env {
engine: test_ctx.clone_engine(),
meta_provider: mock,
catalog_manager,
table_manipulator,
query_engine,
};

env.test_create_table().await;
Expand Down
1 change: 1 addition & 0 deletions query_engine/src/datafusion_impl/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub struct DataFusionPhysicalPlanEncoderImpl {
}

impl DataFusionPhysicalPlanEncoderImpl {
// TODO: use `SessionContext` to init it.
pub fn new(
runtime_env: Arc<RuntimeEnv>,
function_registry: Arc<dyn FunctionRegistry + Send + Sync>,
Expand Down
78 changes: 78 additions & 0 deletions query_engine/src/datafusion_impl/executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2023 The CeresDB Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{sync::Arc, time::Instant};

use async_trait::async_trait;
use generic_error::BoxError;
use log::{debug, info};
use snafu::ResultExt;
use table_engine::stream::SendableRecordBatchStream;
use time_ext::InstantExt;

use crate::{
context::Context,
datafusion_impl::DfContextBuilder,
error::*,
executor::Executor,
physical_planner::{PhysicalPlanPtr, TaskContext},
};

#[derive(Debug, Clone)]
pub struct DatafusionExecutorImpl {
// Datafuison session context builder
df_ctx_builder: Arc<DfContextBuilder>,
}

impl DatafusionExecutorImpl {
pub fn new(df_ctx_builder: Arc<DfContextBuilder>) -> Self {
Self { df_ctx_builder }
}
}

#[async_trait]
impl Executor for DatafusionExecutorImpl {
async fn execute(
&self,
ctx: &Context,
physical_plan: PhysicalPlanPtr,
) -> Result<SendableRecordBatchStream> {
debug!(
"DatafusionExecutorImpl begin to execute plan, request_id:{}, physical_plan: {:?}",
ctx.request_id, physical_plan
);

let begin_instant = Instant::now();

// TODO: build the `TaskContext` directly rather than through `SessionContext`.
let session_ctx = self.df_ctx_builder.build(ctx);
let df_task_ctx = session_ctx.task_ctx();
let task_ctx = TaskContext::default().with_datafusion_task_ctx(df_task_ctx);
let stream = physical_plan
.execute(&task_ctx)
.box_err()
.with_context(|| ExecutorWithCause {
msg: Some("failed to execute physical plan".to_string()),
})?;

info!(
"DatafusionExecutorImpl finish to execute plan, request_id:{}, cost:{}ms, plan_and_metrics: {}",
ctx.request_id,
begin_instant.saturating_elapsed().as_millis(),
physical_plan.metrics_to_string()
);

Ok(stream)
}
}
124 changes: 115 additions & 9 deletions query_engine/src/datafusion_impl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,36 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;
use std::{fmt, sync::Arc, time::Instant};

use datafusion::execution::{
runtime_env::{RuntimeConfig, RuntimeEnv},
FunctionRegistry,
use datafusion::{
execution::{
context::{QueryPlanner, SessionState},
runtime_env::{RuntimeConfig, RuntimeEnv},
FunctionRegistry,
},
optimizer::analyzer::Analyzer,
physical_optimizer::PhysicalOptimizerRule,
prelude::{SessionConfig, SessionContext},
};
use table_engine::provider::CeresdbOptions;

use crate::{
codec::PhysicalPlanCodecRef,
context::Context,
datafusion_impl::{
codec::DataFusionPhysicalPlanEncoderImpl, physical_planner::DatafusionPhysicalPlannerImpl,
codec::DataFusionPhysicalPlanEncoderImpl, executor::DatafusionExecutorImpl,
logical_optimizer::type_conversion::TypeConversion,
physical_planner::DatafusionPhysicalPlannerImpl,
physical_planner_extension::QueryPlannerAdapter,
},
executor::{ExecutorImpl, ExecutorRef},
executor::ExecutorRef,
physical_planner::PhysicalPlannerRef,
Config, QueryEngine,
};

pub mod codec;
pub mod executor;
pub mod logical_optimizer;
pub mod physical_optimizer;
pub mod physical_plan;
Expand All @@ -53,12 +65,15 @@ impl DatafusionQueryEngineImpl {
function_registry: Arc<dyn FunctionRegistry + Send + Sync>,
) -> Result<Self> {
let runtime_env = Arc::new(RuntimeEnv::new(runtime_config).unwrap());

let physical_planner = Arc::new(DatafusionPhysicalPlannerImpl::new(
let physical_planner = Arc::new(QueryPlannerAdapter);
let df_ctx_builder = Arc::new(DfContextBuilder::new(
config,
runtime_env.clone(),
physical_planner,
));
let executor = Arc::new(ExecutorImpl);

let physical_planner = Arc::new(DatafusionPhysicalPlannerImpl::new(df_ctx_builder.clone()));
let executor = Arc::new(DatafusionExecutorImpl::new(df_ctx_builder));
let physical_plan_codec = Arc::new(DataFusionPhysicalPlanEncoderImpl::new(
runtime_env,
function_registry,
Expand All @@ -85,3 +100,94 @@ impl QueryEngine for DatafusionQueryEngineImpl {
self.physical_plan_codec.clone()
}
}

/// Datafusion context builder
#[derive(Clone)]
pub struct DfContextBuilder {
config: Config,
runtime_env: Arc<RuntimeEnv>,
physical_planner: Arc<dyn QueryPlanner + Send + Sync>,
}

impl fmt::Debug for DfContextBuilder {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("DfContextBuilder")
.field("config", &self.config)
.field("runtime_env", &self.runtime_env)
.field("physical_planner", &"QueryPlannerAdapter")
.finish()
}
}

impl DfContextBuilder {
pub fn new(
config: Config,
runtime_env: Arc<RuntimeEnv>,
physical_planner: Arc<dyn QueryPlanner + Send + Sync>,
) -> Self {
Self {
config,
runtime_env,
physical_planner,
}
}

pub fn build(&self, ctx: &Context) -> SessionContext {
let timeout = ctx
.deadline
.map(|deadline| deadline.duration_since(Instant::now()).as_millis() as u64);
let ceresdb_options = CeresdbOptions {
request_id: ctx.request_id.as_u64(),
request_timeout: timeout,
};
let mut df_session_config = SessionConfig::new()
.with_default_catalog_and_schema(
ctx.default_catalog.clone(),
ctx.default_schema.clone(),
)
.with_target_partitions(self.config.read_parallelism);

df_session_config
.options_mut()
.extensions
.insert(ceresdb_options);

// Using default logcial optimizer, if want to add more custom rule, using
// `add_optimizer_rule` to add.
let state = SessionState::with_config_rt(df_session_config, self.runtime_env.clone())
.with_query_planner(self.physical_planner.clone());

// Register analyzer rules
let state = Self::register_analyzer_rules(state);

// Register iox optimizers, used by influxql.
let state = influxql_query::logical_optimizer::register_iox_logical_optimizers(state);

SessionContext::with_state(state)
}

// TODO: this is not used now, bug of RepartitionAdapter is already fixed in
// datafusion itself. Remove this code in future.
#[allow(dead_code)]
fn apply_adapters_for_physical_optimize_rules(
default_rules: &[Arc<dyn PhysicalOptimizerRule + Send + Sync>],
) -> Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> {
let mut new_rules = Vec::with_capacity(default_rules.len());
for rule in default_rules {
new_rules.push(physical_optimizer::may_adapt_optimize_rule(rule.clone()))
}

new_rules
}

fn register_analyzer_rules(mut state: SessionState) -> SessionState {
// Our analyzer has high priority, so first add we custom rules, then add the
// default ones.
state = state.with_analyzer_rules(vec![Arc::new(TypeConversion)]);
for rule in Analyzer::new().rules {
state = state.add_analyzer_rule(rule);
}

state
}
}
34 changes: 18 additions & 16 deletions query_engine/src/datafusion_impl/physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,25 @@ use std::{
};

use async_trait::async_trait;
use datafusion::{
execution::context::TaskContext,
physical_plan::{
coalesce_partitions::CoalescePartitionsExec, display::DisplayableExecutionPlan,
ExecutionPlan,
},
prelude::SessionContext,
use datafusion::physical_plan::{
coalesce_partitions::CoalescePartitionsExec, display::DisplayableExecutionPlan, ExecutionPlan,
};
use generic_error::BoxError;
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use table_engine::stream::{FromDfStream, SendableRecordBatchStream};

use crate::{error::*, physical_planner::PhysicalPlan};
use crate::{
error::*,
physical_planner::{PhysicalPlan, TaskContext},
};

pub struct DataFusionPhysicalPlanImpl {
ctx: SessionContext,
plan: Arc<dyn ExecutionPlan>,
}

impl DataFusionPhysicalPlanImpl {
pub fn with_plan(ctx: SessionContext, plan: Arc<dyn ExecutionPlan>) -> Self {
Self { ctx, plan }
pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
Self { plan }
}

pub fn as_df_physical_plan(&self) -> Arc<dyn ExecutionPlan> {
Expand All @@ -59,12 +56,17 @@ impl Debug for DataFusionPhysicalPlanImpl {

#[async_trait]
impl PhysicalPlan for DataFusionPhysicalPlanImpl {
fn execute(&self) -> Result<SendableRecordBatchStream> {
let task_context = Arc::new(TaskContext::from(&self.ctx));
fn execute(&self, task_ctx: &TaskContext) -> Result<SendableRecordBatchStream> {
let df_task_ctx =
task_ctx
.try_to_datafusion_task_ctx()
.with_context(|| PhysicalPlanNoCause {
msg: Some("datafusion task ctx not found".to_string()),
})?;
let partition_count = self.plan.output_partitioning().partition_count();
let df_stream = if partition_count <= 1 {
self.plan
.execute(0, task_context)
.execute(0, df_task_ctx)
.box_err()
.context(PhysicalPlanWithCause {
msg: Some(format!("partition_count:{partition_count}")),
Expand All @@ -74,7 +76,7 @@ impl PhysicalPlan for DataFusionPhysicalPlanImpl {
let plan = CoalescePartitionsExec::new(self.plan.clone());
// MergeExec must produce a single partition
assert_eq!(1, plan.output_partitioning().partition_count());
plan.execute(0, task_context)
plan.execute(0, df_task_ctx)
.box_err()
.context(PhysicalPlanWithCause {
msg: Some(format!("partition_count:{partition_count}")),
Expand Down
Loading
Loading