Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ballista/rust/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ simd = ["datafusion/simd"]
ahash = "0.7"
async-trait = "0.1.36"
futures = "0.3"
hashbrown = "0.11"
Copy link
Member

@houqp houqp Jul 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the official readme:

Since Rust 1.36, this is now the HashMap implementation for the Rust standard library. However you may still want to use this crate instead since it works in environments without std, such as embedded systems and kernels.

Are we introducing it because we are expecting to use ballista in embedded systems?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the dependency was added to DataFusion for a performance optimization in ec06207 and when the metrics method was added in DataFusion is used this hashmap as well, but perhaps we should have had it use the std hashmap instead?

@Dandandan what do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed #677 as a follow up so we don't block the merge of this PR. I am curious whether the std hashmap also uses ahash as the default hasher.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some comments on the ticket.

log = "0.4"
prost = "0.7"
serde = {version = "1", features = ["derive"]}
Expand Down
26 changes: 22 additions & 4 deletions ballista/rust/core/src/execution_plans/shuffle_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,17 @@ use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::error::Result as ArrowResult;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning};
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, SQLMetric,
};
use datafusion::{
error::{DataFusionError, Result},
physical_plan::RecordBatchStream,
};
use futures::{future, Stream, StreamExt};
use hashbrown::HashMap;
use log::info;
use std::time::Instant;

/// ShuffleReaderExec reads partitions that have already been materialized by a ShuffleWriterExec
/// being executed by an executor
Expand All @@ -43,6 +47,8 @@ pub struct ShuffleReaderExec {
/// Each partition of a shuffle can read data from multiple locations
pub(crate) partition: Vec<Vec<PartitionLocation>>,
pub(crate) schema: SchemaRef,
/// Time to fetch data from executor
fetch_time: Arc<SQLMetric>,
}

impl ShuffleReaderExec {
Expand All @@ -51,7 +57,11 @@ impl ShuffleReaderExec {
partition: Vec<Vec<PartitionLocation>>,
schema: SchemaRef,
) -> Result<Self> {
Ok(Self { partition, schema })
Ok(Self {
partition,
schema,
fetch_time: SQLMetric::time_nanos(),
})
}
}

Expand Down Expand Up @@ -88,11 +98,13 @@ impl ExecutionPlan for ShuffleReaderExec {
) -> Result<Pin<Box<dyn RecordBatchStream + Send + Sync>>> {
info!("ShuffleReaderExec::execute({})", partition);

let start = Instant::now();
let partition_locations = &self.partition[partition];
let result = future::join_all(partition_locations.iter().map(fetch_partition))
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;
self.fetch_time.add_elapsed(start);

let result = WrappedStream::new(
Box::pin(futures::stream::iter(result).flatten()),
Expand All @@ -115,7 +127,7 @@ impl ExecutionPlan for ShuffleReaderExec {
x.iter()
.map(|l| {
format!(
"[executor={} part={}:{}:{} stats={:?}]",
"[executor={} part={}:{}:{} stats={}]",
l.executor_meta.id,
l.partition_id.job_id,
l.partition_id.stage_id,
Expand All @@ -127,11 +139,17 @@ impl ExecutionPlan for ShuffleReaderExec {
.join(",")
})
.collect::<Vec<String>>()
.join("\n");
.join(", ");
write!(f, "ShuffleReaderExec: partition_locations={}", loc_str)
}
}
}

fn metrics(&self) -> HashMap<String, SQLMetric> {
let mut metrics = HashMap::new();
metrics.insert("fetchTime".to_owned(), (*self.fetch_time).clone());
metrics
}
}

async fn fetch_partition(
Expand Down
42 changes: 36 additions & 6 deletions ballista/rust/core/src/execution_plans/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
//! partition is re-partitioned and streamed to disk in Arrow IPC format. Future stages of the query
//! will use the ShuffleReaderExec to read these results.

use std::fs::File;
use std::iter::Iterator;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
Expand All @@ -43,11 +44,11 @@ use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::{DataFusionError, Result};
use datafusion::physical_plan::hash_join::create_hashes;
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SQLMetric,
};
use futures::StreamExt;
use hashbrown::HashMap;
use log::info;
use std::fs::File;
use uuid::Uuid;

/// ShuffleWriterExec represents a section of a query plan that has consistent partitioning and
Expand All @@ -66,6 +67,22 @@ pub struct ShuffleWriterExec {
work_dir: String,
/// Optional shuffle output partitioning
shuffle_output_partitioning: Option<Partitioning>,
/// Shuffle write metrics
metrics: ShuffleWriteMetrics,
}

#[derive(Debug, Clone)]
struct ShuffleWriteMetrics {
/// Time spend writing batches to shuffle files
write_time: Arc<SQLMetric>,
}

impl ShuffleWriteMetrics {
fn new() -> Self {
Self {
write_time: SQLMetric::time_nanos(),
}
}
}

impl ShuffleWriterExec {
Expand All @@ -83,6 +100,7 @@ impl ShuffleWriterExec {
plan,
work_dir,
shuffle_output_partitioning,
metrics: ShuffleWriteMetrics::new(),
})
}

Expand Down Expand Up @@ -150,12 +168,16 @@ impl ExecutionPlan for ShuffleWriterExec {
info!("Writing results to {}", path);

// stream results to disk
let stats = utils::write_stream_to_disk(&mut stream, path)
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
let stats = utils::write_stream_to_disk(
&mut stream,
path,
self.metrics.write_time.clone(),
)
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;

info!(
"Executed partition {} in {} seconds. Statistics: {:?}",
"Executed partition {} in {} seconds. Statistics: {}",
partition,
now.elapsed().as_secs(),
stats
Expand Down Expand Up @@ -231,6 +253,7 @@ impl ExecutionPlan for ShuffleWriterExec {
RecordBatch::try_new(input_batch.schema(), columns)?;

// write batch out
let start = Instant::now();
match &mut writers[num_output_partition] {
Some(w) => {
w.write(&output_batch)?;
Expand All @@ -251,6 +274,7 @@ impl ExecutionPlan for ShuffleWriterExec {
writers[num_output_partition] = Some(writer);
}
}
self.metrics.write_time.add_elapsed(start);
}
}

Expand Down Expand Up @@ -310,6 +334,12 @@ impl ExecutionPlan for ShuffleWriterExec {
}
}

fn metrics(&self) -> HashMap<String, SQLMetric> {
let mut metrics = HashMap::new();
metrics.insert("writeTime".to_owned(), (*self.metrics.write_time).clone());
metrics
}

fn fmt_as(
&self,
t: DisplayFormatType,
Expand Down
12 changes: 11 additions & 1 deletion ballista/rust/core/src/serde/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::{collections::HashMap, sync::Arc};
use std::{collections::HashMap, fmt, sync::Arc};

use datafusion::arrow::array::{
ArrayBuilder, ArrayRef, StructArray, StructBuilder, UInt64Array, UInt64Builder,
Expand Down Expand Up @@ -113,6 +113,16 @@ impl Default for PartitionStats {
}
}

impl fmt::Display for PartitionStats {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"numBatches={:?}, numRows={:?}, numBytes={:?}",
self.num_batches, self.num_rows, self.num_bytes
)
}
}

impl PartitionStats {
pub fn new(
num_rows: Option<u64>,
Expand Down
9 changes: 8 additions & 1 deletion ballista/rust/core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,17 @@ use datafusion::physical_plan::parquet::ParquetExec;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::sort::SortExec;
use datafusion::physical_plan::{
AggregateExpr, ExecutionPlan, PhysicalExpr, RecordBatchStream,
AggregateExpr, ExecutionPlan, PhysicalExpr, RecordBatchStream, SQLMetric,
};
use futures::{future, Stream, StreamExt};
use std::time::Instant;

/// Stream data to disk in Arrow IPC format

pub async fn write_stream_to_disk(
stream: &mut Pin<Box<dyn RecordBatchStream + Send + Sync>>,
path: &str,
disk_write_metric: Arc<SQLMetric>,
) -> Result<PartitionStats> {
let file = File::create(&path).map_err(|e| {
BallistaError::General(format!(
Expand All @@ -86,9 +88,14 @@ pub async fn write_stream_to_disk(
num_batches += 1;
num_rows += batch.num_rows();
num_bytes += batch_size_bytes;

let start = Instant::now();
writer.write(&batch)?;
disk_write_metric.add_elapsed(start);
}
let start = Instant::now();
writer.finish()?;
disk_write_metric.add_elapsed(start);
Ok(PartitionStats::new(
Some(num_rows as u64),
Some(num_batches),
Expand Down
9 changes: 9 additions & 0 deletions ballista/rust/executor/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use ballista_core::error::BallistaError;
use ballista_core::execution_plans::ShuffleWriterExec;
use ballista_core::utils;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::ExecutionPlan;

/// Ballista executor
Expand Down Expand Up @@ -60,6 +61,14 @@ impl Executor {
)?;
let mut stream = exec.execute(part).await?;
let batches = utils::collect_stream(&mut stream).await?;

println!(
"=== Physical plan with metrics ===\n{}\n",
DisplayableExecutionPlan::with_metrics(&exec)
.indent()
.to_string()
);

// the output should be a single batch containing metadata (path and statistics)
assert!(batches.len() == 1);
Ok(batches[0].clone())
Expand Down