Skip to content

Commit 58da159

Browse files
authored
Implement metrics for shuffle read and write (#676)
1 parent e036a62 commit 58da159

File tree

6 files changed

+87
-12
lines changed

6 files changed

+87
-12
lines changed

ballista/rust/core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ simd = ["datafusion/simd"]
3333
ahash = "0.7"
3434
async-trait = "0.1.36"
3535
futures = "0.3"
36+
hashbrown = "0.11"
3637
log = "0.4"
3738
prost = "0.7"
3839
serde = {version = "1", features = ["derive"]}

ballista/rust/core/src/execution_plans/shuffle_reader.rs

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,17 @@ use async_trait::async_trait;
2828
use datafusion::arrow::datatypes::SchemaRef;
2929
use datafusion::arrow::error::Result as ArrowResult;
3030
use datafusion::arrow::record_batch::RecordBatch;
31-
use datafusion::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning};
31+
use datafusion::physical_plan::{
32+
DisplayFormatType, ExecutionPlan, Partitioning, SQLMetric,
33+
};
3234
use datafusion::{
3335
error::{DataFusionError, Result},
3436
physical_plan::RecordBatchStream,
3537
};
3638
use futures::{future, Stream, StreamExt};
39+
use hashbrown::HashMap;
3740
use log::info;
41+
use std::time::Instant;
3842

3943
/// ShuffleReaderExec reads partitions that have already been materialized by a ShuffleWriterExec
4044
/// being executed by an executor
@@ -43,6 +47,8 @@ pub struct ShuffleReaderExec {
4347
/// Each partition of a shuffle can read data from multiple locations
4448
pub(crate) partition: Vec<Vec<PartitionLocation>>,
4549
pub(crate) schema: SchemaRef,
50+
/// Time to fetch data from executor
51+
fetch_time: Arc<SQLMetric>,
4652
}
4753

4854
impl ShuffleReaderExec {
@@ -51,7 +57,11 @@ impl ShuffleReaderExec {
5157
partition: Vec<Vec<PartitionLocation>>,
5258
schema: SchemaRef,
5359
) -> Result<Self> {
54-
Ok(Self { partition, schema })
60+
Ok(Self {
61+
partition,
62+
schema,
63+
fetch_time: SQLMetric::time_nanos(),
64+
})
5565
}
5666
}
5767

@@ -88,11 +98,13 @@ impl ExecutionPlan for ShuffleReaderExec {
8898
) -> Result<Pin<Box<dyn RecordBatchStream + Send + Sync>>> {
8999
info!("ShuffleReaderExec::execute({})", partition);
90100

101+
let start = Instant::now();
91102
let partition_locations = &self.partition[partition];
92103
let result = future::join_all(partition_locations.iter().map(fetch_partition))
93104
.await
94105
.into_iter()
95106
.collect::<Result<Vec<_>>>()?;
107+
self.fetch_time.add_elapsed(start);
96108

97109
let result = WrappedStream::new(
98110
Box::pin(futures::stream::iter(result).flatten()),
@@ -115,7 +127,7 @@ impl ExecutionPlan for ShuffleReaderExec {
115127
x.iter()
116128
.map(|l| {
117129
format!(
118-
"[executor={} part={}:{}:{} stats={:?}]",
130+
"[executor={} part={}:{}:{} stats={}]",
119131
l.executor_meta.id,
120132
l.partition_id.job_id,
121133
l.partition_id.stage_id,
@@ -127,11 +139,17 @@ impl ExecutionPlan for ShuffleReaderExec {
127139
.join(",")
128140
})
129141
.collect::<Vec<String>>()
130-
.join("\n");
142+
.join(", ");
131143
write!(f, "ShuffleReaderExec: partition_locations={}", loc_str)
132144
}
133145
}
134146
}
147+
148+
fn metrics(&self) -> HashMap<String, SQLMetric> {
149+
let mut metrics = HashMap::new();
150+
metrics.insert("fetchTime".to_owned(), (*self.fetch_time).clone());
151+
metrics
152+
}
135153
}
136154

137155
async fn fetch_partition(

ballista/rust/core/src/execution_plans/shuffle_writer.rs

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
//! partition is re-partitioned and streamed to disk in Arrow IPC format. Future stages of the query
2121
//! will use the ShuffleReaderExec to read these results.
2222
23+
use std::fs::File;
2324
use std::iter::Iterator;
2425
use std::path::PathBuf;
2526
use std::sync::{Arc, Mutex};
@@ -43,11 +44,11 @@ use datafusion::arrow::record_batch::RecordBatch;
4344
use datafusion::error::{DataFusionError, Result};
4445
use datafusion::physical_plan::hash_join::create_hashes;
4546
use datafusion::physical_plan::{
46-
DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
47+
DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SQLMetric,
4748
};
4849
use futures::StreamExt;
50+
use hashbrown::HashMap;
4951
use log::info;
50-
use std::fs::File;
5152
use uuid::Uuid;
5253

5354
/// ShuffleWriterExec represents a section of a query plan that has consistent partitioning and
@@ -66,6 +67,22 @@ pub struct ShuffleWriterExec {
6667
work_dir: String,
6768
/// Optional shuffle output partitioning
6869
shuffle_output_partitioning: Option<Partitioning>,
70+
/// Shuffle write metrics
71+
metrics: ShuffleWriteMetrics,
72+
}
73+
74+
#[derive(Debug, Clone)]
75+
struct ShuffleWriteMetrics {
76+
/// Time spend writing batches to shuffle files
77+
write_time: Arc<SQLMetric>,
78+
}
79+
80+
impl ShuffleWriteMetrics {
81+
fn new() -> Self {
82+
Self {
83+
write_time: SQLMetric::time_nanos(),
84+
}
85+
}
6986
}
7087

7188
impl ShuffleWriterExec {
@@ -83,6 +100,7 @@ impl ShuffleWriterExec {
83100
plan,
84101
work_dir,
85102
shuffle_output_partitioning,
103+
metrics: ShuffleWriteMetrics::new(),
86104
})
87105
}
88106

@@ -150,12 +168,16 @@ impl ExecutionPlan for ShuffleWriterExec {
150168
info!("Writing results to {}", path);
151169

152170
// stream results to disk
153-
let stats = utils::write_stream_to_disk(&mut stream, path)
154-
.await
155-
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
171+
let stats = utils::write_stream_to_disk(
172+
&mut stream,
173+
path,
174+
self.metrics.write_time.clone(),
175+
)
176+
.await
177+
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
156178

157179
info!(
158-
"Executed partition {} in {} seconds. Statistics: {:?}",
180+
"Executed partition {} in {} seconds. Statistics: {}",
159181
partition,
160182
now.elapsed().as_secs(),
161183
stats
@@ -231,6 +253,7 @@ impl ExecutionPlan for ShuffleWriterExec {
231253
RecordBatch::try_new(input_batch.schema(), columns)?;
232254

233255
// write batch out
256+
let start = Instant::now();
234257
match &mut writers[num_output_partition] {
235258
Some(w) => {
236259
w.write(&output_batch)?;
@@ -251,6 +274,7 @@ impl ExecutionPlan for ShuffleWriterExec {
251274
writers[num_output_partition] = Some(writer);
252275
}
253276
}
277+
self.metrics.write_time.add_elapsed(start);
254278
}
255279
}
256280

@@ -310,6 +334,12 @@ impl ExecutionPlan for ShuffleWriterExec {
310334
}
311335
}
312336

337+
fn metrics(&self) -> HashMap<String, SQLMetric> {
338+
let mut metrics = HashMap::new();
339+
metrics.insert("writeTime".to_owned(), (*self.metrics.write_time).clone());
340+
metrics
341+
}
342+
313343
fn fmt_as(
314344
&self,
315345
t: DisplayFormatType,

ballista/rust/core/src/serde/scheduler/mod.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::{collections::HashMap, sync::Arc};
18+
use std::{collections::HashMap, fmt, sync::Arc};
1919

2020
use datafusion::arrow::array::{
2121
ArrayBuilder, ArrayRef, StructArray, StructBuilder, UInt64Array, UInt64Builder,
@@ -113,6 +113,16 @@ impl Default for PartitionStats {
113113
}
114114
}
115115

116+
impl fmt::Display for PartitionStats {
117+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
118+
write!(
119+
f,
120+
"numBatches={:?}, numRows={:?}, numBytes={:?}",
121+
self.num_batches, self.num_rows, self.num_bytes
122+
)
123+
}
124+
}
125+
116126
impl PartitionStats {
117127
pub fn new(
118128
num_rows: Option<u64>,

ballista/rust/core/src/utils.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,15 +53,17 @@ use datafusion::physical_plan::parquet::ParquetExec;
5353
use datafusion::physical_plan::projection::ProjectionExec;
5454
use datafusion::physical_plan::sort::SortExec;
5555
use datafusion::physical_plan::{
56-
AggregateExpr, ExecutionPlan, PhysicalExpr, RecordBatchStream,
56+
AggregateExpr, ExecutionPlan, PhysicalExpr, RecordBatchStream, SQLMetric,
5757
};
5858
use futures::{future, Stream, StreamExt};
59+
use std::time::Instant;
5960

6061
/// Stream data to disk in Arrow IPC format
6162
6263
pub async fn write_stream_to_disk(
6364
stream: &mut Pin<Box<dyn RecordBatchStream + Send + Sync>>,
6465
path: &str,
66+
disk_write_metric: Arc<SQLMetric>,
6567
) -> Result<PartitionStats> {
6668
let file = File::create(&path).map_err(|e| {
6769
BallistaError::General(format!(
@@ -86,9 +88,14 @@ pub async fn write_stream_to_disk(
8688
num_batches += 1;
8789
num_rows += batch.num_rows();
8890
num_bytes += batch_size_bytes;
91+
92+
let start = Instant::now();
8993
writer.write(&batch)?;
94+
disk_write_metric.add_elapsed(start);
9095
}
96+
let start = Instant::now();
9197
writer.finish()?;
98+
disk_write_metric.add_elapsed(start);
9299
Ok(PartitionStats::new(
93100
Some(num_rows as u64),
94101
Some(num_batches),

ballista/rust/executor/src/executor.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use ballista_core::error::BallistaError;
2323
use ballista_core::execution_plans::ShuffleWriterExec;
2424
use ballista_core::utils;
2525
use datafusion::arrow::record_batch::RecordBatch;
26+
use datafusion::physical_plan::display::DisplayableExecutionPlan;
2627
use datafusion::physical_plan::ExecutionPlan;
2728

2829
/// Ballista executor
@@ -60,6 +61,14 @@ impl Executor {
6061
)?;
6162
let mut stream = exec.execute(part).await?;
6263
let batches = utils::collect_stream(&mut stream).await?;
64+
65+
println!(
66+
"=== Physical plan with metrics ===\n{}\n",
67+
DisplayableExecutionPlan::with_metrics(&exec)
68+
.indent()
69+
.to_string()
70+
);
71+
6372
// the output should be a single batch containing metadata (path and statistics)
6473
assert!(batches.len() == 1);
6574
Ok(batches[0].clone())

0 commit comments

Comments
 (0)