Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ExecutionEngine abstraction #687

Merged
merged 11 commits into from
Mar 2, 2023
2 changes: 1 addition & 1 deletion ballista/core/src/execution_plans/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ mod unresolved_shuffle;

pub use distributed_query::DistributedQueryExec;
pub use shuffle_reader::ShuffleReaderExec;
pub use shuffle_writer::ShuffleWriterExec;
pub use shuffle_writer::{ShuffleWriter, ShuffleWriterExec};
pub use unresolved_shuffle::UnresolvedShuffleExec;
37 changes: 35 additions & 2 deletions ballista/core/src/execution_plans/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@

use datafusion::physical_plan::expressions::PhysicalSortExpr;

use async_trait::async_trait;
use std::any::Any;
use std::fmt::Debug;
use std::future::Future;
use std::iter::Iterator;
use std::path::PathBuf;
Expand Down Expand Up @@ -57,6 +59,21 @@ use datafusion::physical_plan::repartition::BatchPartitioner;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use log::{debug, info};

/// ShuffleWriter represents a section of a query plan that has consistent partitioning and
/// can be executed as one unit with each partition being executed in parallel. The output of each
/// partition is re-partitioned and streamed to disk in Arrow IPC format. Future stages of the query
/// will use the ShuffleReaderExec to read these results.
#[async_trait]
pub trait ShuffleWriter: Sync + Send + Debug {
async fn execute_shuffle_write(
&self,
input_partition: usize,
context: Arc<TaskContext>,
) -> Result<Vec<ShuffleWritePartition>>;

fn collect_plan_metrics(&self) -> Vec<MetricsSet>;
}

/// ShuffleWriterExec represents a section of a query plan that has consistent partitioning and
/// can be executed as one unit with each partition being executed in parallel. The output of each
/// partition is re-partitioned and streamed to disk in Arrow IPC format. Future stages of the query
Expand Down Expand Up @@ -139,7 +156,7 @@ impl ShuffleWriterExec {
self.shuffle_output_partitioning.as_ref()
}

pub fn execute_shuffle_write(
fn execute_shuffle_write_internal(
andygrove marked this conversation as resolved.
Show resolved Hide resolved
&self,
input_partition: usize,
context: Arc<TaskContext>,
Expand Down Expand Up @@ -288,6 +305,22 @@ impl ShuffleWriterExec {
}
}

#[async_trait]
impl ShuffleWriter for ShuffleWriterExec {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems inverted to me. Shouldn't ShuffleWriterExec have a Arc<dyn ShuffleWriter> instead of impl the trait?

I know this is a step toward the pluggable execution engine, but also thinking about how this fits with allowing for different shuffle methods (write to disk, write to object store, stream to network, etc.) and seems like we would need a second layer of indirection for that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. Maybe want I want for now is just an ExecutionEngine trait with an execute method that returns the metadata with shuffle file locations. I will give this some more thought.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@thinkharderdev I have renamed the trait and made some other naming changes. I also added the ExecutionEngine abstraction in this PR. PTAL when you have time.

async fn execute_shuffle_write(
&self,
input_partition: usize,
context: Arc<TaskContext>,
) -> Result<Vec<ShuffleWritePartition>> {
self.execute_shuffle_write_internal(input_partition, context)
.await
}

fn collect_plan_metrics(&self) -> Vec<MetricsSet> {
utils::collect_plan_metrics(self.plan.as_ref())
}
}

impl ExecutionPlan for ShuffleWriterExec {
fn as_any(&self) -> &dyn Any {
self
Expand Down Expand Up @@ -335,7 +368,7 @@ impl ExecutionPlan for ShuffleWriterExec {

let schema_captured = schema.clone();
let fut_stream = self
.execute_shuffle_write(partition, context)
.execute_shuffle_write_internal(partition, context)
.and_then(|part_loc| async move {
// build metadata result batch
let num_writers = part_loc.len();
Expand Down
7 changes: 3 additions & 4 deletions ballista/executor/src/execution_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use crate::{as_task_status, TaskExecutionTimes};
use ballista_core::error::BallistaError;
use ballista_core::serde::scheduler::{ExecutorSpecification, PartitionId};
use ballista_core::serde::BallistaCodec;
use ballista_core::utils::collect_plan_metrics;
use datafusion::execution::context::TaskContext;
use datafusion_proto::logical_plan::AsLogicalPlan;
use datafusion_proto::physical_plan::from_proto::parse_protobuf_hash_partitioning;
Expand Down Expand Up @@ -209,7 +208,7 @@ async fn run_received_task<T: 'static + AsLogicalPlan, U: 'static + AsExecutionP
plan.schema().as_ref(),
)?;

let shuffle_writer_plan =
let shuffle_writer =
executor.new_shuffle_writer(job_id.clone(), stage_id as usize, plan)?;
dedicated_executor.spawn(async move {
use std::panic::AssertUnwindSafe;
Expand All @@ -222,7 +221,7 @@ async fn run_received_task<T: 'static + AsLogicalPlan, U: 'static + AsExecutionP
let execution_result = match AssertUnwindSafe(executor.execute_shuffle_write(
task_id as usize,
part.clone(),
shuffle_writer_plan.clone(),
shuffle_writer.clone(),
task_context,
shuffle_output_partitioning,
))
Expand All @@ -240,7 +239,7 @@ async fn run_received_task<T: 'static + AsLogicalPlan, U: 'static + AsExecutionP
info!("Done with task {}", task_identity);
debug!("Statistics: {:?}", execution_result);

let plan_metrics = collect_plan_metrics(shuffle_writer_plan.as_ref());
let plan_metrics = shuffle_writer.collect_plan_metrics();
let operator_metrics = plan_metrics
.into_iter()
.map(|m| m.try_into())
Expand Down
6 changes: 3 additions & 3 deletions ballista/executor/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ use std::sync::Arc;

use crate::metrics::ExecutorMetricsCollector;
use ballista_core::error::BallistaError;
use ballista_core::execution_plans::ShuffleWriter;
use ballista_core::execution_plans::ShuffleWriterExec;
use ballista_core::serde::protobuf;
use ballista_core::serde::protobuf::ExecutorRegistration;
use datafusion::error::DataFusionError;
use datafusion::execution::context::TaskContext;
use datafusion::execution::runtime_env::RuntimeEnv;

use datafusion::physical_plan::udaf::AggregateUDF;
use datafusion::physical_plan::udf::ScalarUDF;
use datafusion::physical_plan::{ExecutionPlan, Partitioning};
Expand Down Expand Up @@ -98,7 +98,7 @@ impl Executor {
&self,
task_id: usize,
partition: PartitionId,
shuffle_writer: Arc<ShuffleWriterExec>,
shuffle_writer: Arc<dyn ShuffleWriter>,
task_ctx: Arc<TaskContext>,
_shuffle_output_partitioning: Option<Partitioning>,
) -> Result<Vec<protobuf::ShuffleWritePartition>, BallistaError> {
Expand Down Expand Up @@ -129,7 +129,7 @@ impl Executor {
job_id: String,
stage_id: usize,
plan: Arc<dyn ExecutionPlan>,
) -> Result<Arc<ShuffleWriterExec>, BallistaError> {
) -> Result<Arc<dyn ShuffleWriter>, BallistaError> {
let exec = if let Some(shuffle_writer) =
plan.as_any().downcast_ref::<ShuffleWriterExec>()
{
Expand Down
10 changes: 4 additions & 6 deletions ballista/executor/src/executor_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ use ballista_core::serde::protobuf::{
use ballista_core::serde::scheduler::PartitionId;
use ballista_core::serde::scheduler::TaskDefinition;
use ballista_core::serde::BallistaCodec;
use ballista_core::utils::{
collect_plan_metrics, create_grpc_client_connection, create_grpc_server,
};
use ballista_core::utils::{create_grpc_client_connection, create_grpc_server};
use dashmap::DashMap;
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::ExecutionPlan;
Expand Down Expand Up @@ -341,7 +339,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorServer<T,
let stage_id = task.stage_id;
let stage_attempt_num = task.stage_attempt_num;
let partition_id = task.partition_id;
let shuffle_writer_plan =
let shuffle_writer =
self.executor
.new_shuffle_writer(job_id.clone(), stage_id, plan)?;

Expand All @@ -358,15 +356,15 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorServer<T,
.execute_shuffle_write(
task_id,
part.clone(),
shuffle_writer_plan.clone(),
shuffle_writer.clone(),
task_context,
shuffle_output_partitioning,
)
.await;
info!("Done with task {}", task_identity);
debug!("Statistics: {:?}", execution_result);

let plan_metrics = collect_plan_metrics(shuffle_writer_plan.as_ref());
let plan_metrics = shuffle_writer.collect_plan_metrics();
let operator_metrics = plan_metrics
.into_iter()
.map(|m| m.try_into())
Expand Down
14 changes: 5 additions & 9 deletions ballista/executor/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use ballista_core::execution_plans::ShuffleWriterExec;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use ballista_core::execution_plans::ShuffleWriter;
use log::info;
use std::sync::Arc;

Expand All @@ -32,7 +31,7 @@ pub trait ExecutorMetricsCollector: Send + Sync {
job_id: &str,
stage_id: usize,
partition: usize,
plan: Arc<ShuffleWriterExec>,
plan: Arc<dyn ShuffleWriter>,
);
}

Expand All @@ -47,14 +46,11 @@ impl ExecutorMetricsCollector for LoggingMetricsCollector {
job_id: &str,
stage_id: usize,
partition: usize,
plan: Arc<ShuffleWriterExec>,
plan: Arc<dyn ShuffleWriter>,
) {
info!(
"=== [{}/{}/{}] Physical plan with metrics ===\n{}\n",
job_id,
stage_id,
partition,
DisplayableExecutionPlan::with_metrics(plan.as_ref()).indent()
"=== [{}/{}/{}] Physical plan with metrics ===\n{:?}\n",
job_id, stage_id, partition, plan
);
}
}