Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ExecutionEngine abstraction #687

Merged
merged 11 commits into from
Mar 2, 2023
1 change: 1 addition & 0 deletions ballista/executor/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ async fn main() -> Result<()> {
print_thread_info: opt.print_thread_info,
job_data_ttl_seconds: opt.job_data_ttl_seconds,
job_data_clean_up_interval_seconds: opt.job_data_clean_up_interval_seconds,
execution_engine: None,
};

start_executor_process(config).await
Expand Down
114 changes: 114 additions & 0 deletions ballista/executor/src/execution_engine.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use async_trait::async_trait;
use ballista_core::execution_plans::ShuffleWriterExec;
use ballista_core::serde::protobuf::ShuffleWritePartition;
use ballista_core::utils;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::metrics::MetricsSet;
use datafusion::physical_plan::ExecutionPlan;
use std::fmt::Debug;
use std::sync::Arc;

/// Execution engine extension point

pub trait ExecutionEngine: Sync + Send {
fn create_query_stage_exec(
&self,
job_id: String,
stage_id: usize,
plan: Arc<dyn ExecutionPlan>,
work_dir: &str,
) -> Result<Arc<dyn QueryStageExecutor>>;
}

/// QueryStageExecutor executes a section of a query plan that has consistent partitioning and
/// can be executed as one unit with each partition being executed in parallel. The output of each
/// partition is re-partitioned and streamed to disk in Arrow IPC format. Future stages of the query
/// will use the ShuffleReaderExec to read these results.
#[async_trait]
pub trait QueryStageExecutor: Sync + Send + Debug {
async fn execute_query_stage(
&self,
input_partition: usize,
context: Arc<TaskContext>,
) -> Result<Vec<ShuffleWritePartition>>;

fn collect_plan_metrics(&self) -> Vec<MetricsSet>;
}

pub struct DefaultExecutionEngine {}

impl ExecutionEngine for DefaultExecutionEngine {
fn create_query_stage_exec(
&self,
job_id: String,
stage_id: usize,
plan: Arc<dyn ExecutionPlan>,
work_dir: &str,
) -> Result<Arc<dyn QueryStageExecutor>> {
// the query plan created by the scheduler always starts with a ShuffleWriterExec
let exec = if let Some(shuffle_writer) =
plan.as_any().downcast_ref::<ShuffleWriterExec>()
{
// recreate the shuffle writer with the correct working directory
ShuffleWriterExec::try_new(
job_id,
stage_id,
plan.children()[0].clone(),
work_dir.to_string(),
shuffle_writer.shuffle_output_partitioning().cloned(),
)
} else {
Err(DataFusionError::Internal(
"Plan passed to new_query_stage_exec is not a ShuffleWriterExec"
.to_string(),
))
}?;
Ok(Arc::new(DefaultQueryStageExec::new(exec)))
}
}

#[derive(Debug)]
pub struct DefaultQueryStageExec {
shuffle_writer: ShuffleWriterExec,
}

impl DefaultQueryStageExec {
pub fn new(shuffle_writer: ShuffleWriterExec) -> Self {
Self { shuffle_writer }
}
}

#[async_trait]
impl QueryStageExecutor for DefaultQueryStageExec {
async fn execute_query_stage(
&self,
input_partition: usize,
context: Arc<TaskContext>,
) -> Result<Vec<ShuffleWritePartition>> {
self.shuffle_writer
.execute_shuffle_write(input_partition, context)
.await
}

fn collect_plan_metrics(&self) -> Vec<MetricsSet> {
utils::collect_plan_metrics(self.shuffle_writer.children()[0].as_ref())
}
}
15 changes: 9 additions & 6 deletions ballista/executor/src/execution_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use crate::{as_task_status, TaskExecutionTimes};
use ballista_core::error::BallistaError;
use ballista_core::serde::scheduler::{ExecutorSpecification, PartitionId};
use ballista_core::serde::BallistaCodec;
use ballista_core::utils::collect_plan_metrics;
use datafusion::execution::context::TaskContext;
use datafusion_proto::logical_plan::AsLogicalPlan;
use datafusion_proto::physical_plan::from_proto::parse_protobuf_hash_partitioning;
Expand Down Expand Up @@ -209,8 +208,12 @@ async fn run_received_task<T: 'static + AsLogicalPlan, U: 'static + AsExecutionP
plan.schema().as_ref(),
)?;

let shuffle_writer_plan =
executor.new_shuffle_writer(job_id.clone(), stage_id as usize, plan)?;
let query_stage_exec = executor.execution_engine.create_query_stage_exec(
job_id.clone(),
stage_id as usize,
plan,
&executor.work_dir,
)?;
dedicated_executor.spawn(async move {
use std::panic::AssertUnwindSafe;
let part = PartitionId {
Expand All @@ -219,10 +222,10 @@ async fn run_received_task<T: 'static + AsLogicalPlan, U: 'static + AsExecutionP
partition_id: partition_id as usize,
};

let execution_result = match AssertUnwindSafe(executor.execute_shuffle_write(
let execution_result = match AssertUnwindSafe(executor.execute_query_stage(
task_id as usize,
part.clone(),
shuffle_writer_plan.clone(),
query_stage_exec.clone(),
task_context,
shuffle_output_partitioning,
))
Expand All @@ -240,7 +243,7 @@ async fn run_received_task<T: 'static + AsLogicalPlan, U: 'static + AsExecutionP
info!("Done with task {}", task_identity);
debug!("Statistics: {:?}", execution_result);

let plan_metrics = collect_plan_metrics(shuffle_writer_plan.as_ref());
let plan_metrics = query_stage_exec.collect_plan_metrics();
let operator_metrics = plan_metrics
.into_iter()
.map(|m| m.try_into())
Expand Down
74 changes: 28 additions & 46 deletions ballista/executor/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,26 @@

//! Ballista executor logic

use dashmap::DashMap;
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use crate::execution_engine::DefaultExecutionEngine;
use crate::execution_engine::ExecutionEngine;
use crate::execution_engine::QueryStageExecutor;
use crate::metrics::ExecutorMetricsCollector;
use ballista_core::error::BallistaError;
use ballista_core::execution_plans::ShuffleWriterExec;
use ballista_core::serde::protobuf;
use ballista_core::serde::protobuf::ExecutorRegistration;
use datafusion::error::DataFusionError;
use ballista_core::serde::scheduler::PartitionId;
use dashmap::DashMap;
use datafusion::execution::context::TaskContext;
use datafusion::execution::runtime_env::RuntimeEnv;

use datafusion::physical_plan::udaf::AggregateUDF;
use datafusion::physical_plan::udf::ScalarUDF;
use datafusion::physical_plan::{ExecutionPlan, Partitioning};
use datafusion::physical_plan::Partitioning;
use futures::future::AbortHandle;

use ballista_core::serde::scheduler::PartitionId;
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

pub struct TasksDrainedFuture(pub Arc<Executor>);

Expand Down Expand Up @@ -82,6 +80,10 @@ pub struct Executor {

/// Handles to abort executing tasks
abort_handles: AbortHandles,

/// Execution engine that the executor will delegate to
/// for executing query stages
pub(crate) execution_engine: Arc<dyn ExecutionEngine>,
}

impl Executor {
Expand All @@ -92,6 +94,7 @@ impl Executor {
runtime: Arc<RuntimeEnv>,
metrics_collector: Arc<dyn ExecutorMetricsCollector>,
concurrent_tasks: usize,
execution_engine: Option<Arc<dyn ExecutionEngine>>,
) -> Self {
Self {
metadata,
Expand All @@ -103,6 +106,8 @@ impl Executor {
metrics_collector,
concurrent_tasks,
abort_handles: Default::default(),
execution_engine: execution_engine
.unwrap_or_else(|| Arc::new(DefaultExecutionEngine {})),
}
}
}
Expand All @@ -111,16 +116,16 @@ impl Executor {
/// Execute one partition of a query stage and persist the result to disk in IPC format. On
/// success, return a RecordBatch containing metadata about the results, including path
/// and statistics.
pub async fn execute_shuffle_write(
pub async fn execute_query_stage(
&self,
task_id: usize,
partition: PartitionId,
shuffle_writer: Arc<ShuffleWriterExec>,
query_stage_exec: Arc<dyn QueryStageExecutor>,
task_ctx: Arc<TaskContext>,
_shuffle_output_partitioning: Option<Partitioning>,
) -> Result<Vec<protobuf::ShuffleWritePartition>, BallistaError> {
let (task, abort_handle) = futures::future::abortable(
shuffle_writer.execute_shuffle_write(partition.partition_id, task_ctx),
query_stage_exec.execute_query_stage(partition.partition_id, task_ctx),
);

self.abort_handles
Expand All @@ -134,39 +139,12 @@ impl Executor {
&partition.job_id,
partition.stage_id,
partition.partition_id,
shuffle_writer,
query_stage_exec,
);

Ok(partitions)
}

/// Recreate the shuffle writer with the correct working directory.
pub fn new_shuffle_writer(
&self,
job_id: String,
stage_id: usize,
plan: Arc<dyn ExecutionPlan>,
) -> Result<Arc<ShuffleWriterExec>, BallistaError> {
let exec = if let Some(shuffle_writer) =
plan.as_any().downcast_ref::<ShuffleWriterExec>()
{
// recreate the shuffle writer with the correct working directory
ShuffleWriterExec::try_new(
job_id,
stage_id,
plan.children()[0].clone(),
self.work_dir.clone(),
shuffle_writer.shuffle_output_partitioning().cloned(),
)
} else {
Err(DataFusionError::Internal(
"Plan passed to execute_shuffle_write is not a ShuffleWriterExec"
.to_string(),
))
}?;
Ok(Arc::new(exec))
}

pub async fn cancel_task(
&self,
task_id: usize,
Expand Down Expand Up @@ -208,6 +186,7 @@ mod test {
use ballista_core::serde::protobuf::ExecutorRegistration;
use datafusion::execution::context::TaskContext;

use crate::execution_engine::DefaultQueryStageExec;
use ballista_core::serde::scheduler::PartitionId;
use datafusion::error::DataFusionError;
use datafusion::physical_expr::PhysicalSortExpr;
Expand Down Expand Up @@ -307,6 +286,8 @@ mod test {
)
.expect("creating shuffle writer");

let query_stage_exec = DefaultQueryStageExec::new(shuffle_write);

let executor_registration = ExecutorRegistration {
id: "executor".to_string(),
port: 0,
Expand All @@ -323,6 +304,7 @@ mod test {
ctx.runtime_env(),
Arc::new(LoggingMetricsCollector {}),
2,
None,
);

let (sender, receiver) = tokio::sync::oneshot::channel();
Expand All @@ -336,10 +318,10 @@ mod test {
partition_id: 0,
};
let task_result = executor_clone
.execute_shuffle_write(
.execute_query_stage(
1,
part,
Arc::new(shuffle_write),
Arc::new(query_stage_exec),
ctx.task_ctx(),
None,
)
Expand Down
5 changes: 5 additions & 0 deletions ballista/executor/src/executor_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use ballista_core::utils::{
};
use ballista_core::BALLISTA_VERSION;

use crate::execution_engine::ExecutionEngine;
use crate::executor::{Executor, TasksDrainedFuture};
use crate::executor_server::TERMINATING;
use crate::flight_service::BallistaFlightService;
Expand Down Expand Up @@ -82,6 +83,9 @@ pub struct ExecutorProcessConfig {
pub log_rotation_policy: LogRotationPolicy,
pub job_data_ttl_seconds: u64,
pub job_data_clean_up_interval_seconds: u64,
/// Optional execution engine to use to execute physical plans, will default to
/// DataFusion if none is provided.
pub execution_engine: Option<Arc<dyn ExecutionEngine>>,
}

pub async fn start_executor_process(opt: ExecutorProcessConfig) -> Result<()> {
Expand Down Expand Up @@ -181,6 +185,7 @@ pub async fn start_executor_process(opt: ExecutorProcessConfig) -> Result<()> {
runtime,
metrics_collector,
concurrent_tasks,
opt.execution_engine,
));

let connect_timeout = opt.scheduler_connect_timeout_seconds as u64;
Expand Down
Loading