Skip to content

Commit 098f0b0

Browse files
Ted-Jiangyangzhongalamb
authored
Add opening, scanning, processing metrics in file stream (#3070)
* Fix file stream metrics * Update datafusion/core/src/physical_plan/file_format/file_stream.rs Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> * fix comment * fix ut. Co-authored-by: yangzhong <yangzhong@ebay.com> Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent 31381bf commit 098f0b0

File tree

5 files changed

+90
-20
lines changed

5 files changed

+90
-20
lines changed

datafusion/core/src/physical_plan/file_format/avro.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@ use crate::physical_plan::{
2424
use arrow::datatypes::SchemaRef;
2525

2626
use crate::execution::context::TaskContext;
27-
#[cfg(feature = "avro")]
28-
use crate::physical_plan::metrics::BaselineMetrics;
2927
use crate::physical_plan::metrics::ExecutionPlanMetricsSet;
3028
use std::any::Any;
3129
use std::sync::Arc;
@@ -123,7 +121,7 @@ impl ExecutionPlan for AvroExec {
123121
partition,
124122
context,
125123
opener,
126-
BaselineMetrics::new(&self.metrics, partition),
124+
self.metrics.clone(),
127125
)?;
128126
Ok(Box::pin(stream))
129127
}

datafusion/core/src/physical_plan/file_format/csv.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use crate::physical_plan::file_format::delimited_stream::newline_delimited_strea
2929
use crate::physical_plan::file_format::file_stream::{
3030
FileOpenFuture, FileOpener, FileStream,
3131
};
32-
use crate::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
32+
use crate::physical_plan::metrics::ExecutionPlanMetricsSet;
3333
use arrow::csv;
3434
use arrow::datatypes::SchemaRef;
3535
use bytes::Buf;
@@ -139,7 +139,7 @@ impl ExecutionPlan for CsvExec {
139139
partition,
140140
context,
141141
opener,
142-
BaselineMetrics::new(&self.metrics, partition),
142+
self.metrics.clone(),
143143
)?;
144144
Ok(Box::pin(stream) as SendableRecordBatchStream)
145145
}

datafusion/core/src/physical_plan/file_format/file_stream.rs

Lines changed: 84 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use std::collections::VecDeque;
2525
use std::pin::Pin;
2626
use std::sync::Arc;
2727
use std::task::{Context, Poll};
28+
use std::time::Instant;
2829

2930
use arrow::datatypes::SchemaRef;
3031
use arrow::{error::Result as ArrowResult, record_batch::RecordBatch};
@@ -39,7 +40,9 @@ use crate::datasource::listing::{FileRange, PartitionedFile};
3940
use crate::error::Result;
4041
use crate::execution::context::TaskContext;
4142
use crate::physical_plan::file_format::{FileScanConfig, PartitionColumnProjector};
42-
use crate::physical_plan::metrics::BaselineMetrics;
43+
use crate::physical_plan::metrics::{
44+
BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, Time,
45+
};
4346
use crate::physical_plan::RecordBatchStream;
4447

4548
/// A fallible future that resolves to a stream of [`RecordBatch`]
@@ -75,7 +78,9 @@ pub struct FileStream<F: FileOpener> {
7578
object_store: Arc<dyn ObjectStore>,
7679
/// The stream state
7780
state: FileStreamState,
78-
/// runtime metrics recording
81+
/// File stream specific metrics
82+
file_stream_metrics: FileStreamMetrics,
83+
/// runtime baseline metrics
7984
baseline_metrics: BaselineMetrics,
8085
}
8186

@@ -104,13 +109,69 @@ enum FileStreamState {
104109
Limit,
105110
}
106111

112+
struct StartableTime {
113+
metrics: Time,
114+
// use for record each part cost time, will eventually add into 'metrics'.
115+
start: Option<Instant>,
116+
}
117+
118+
impl StartableTime {
119+
fn start(&mut self) {
120+
assert!(self.start.is_none());
121+
self.start = Some(Instant::now());
122+
}
123+
124+
fn stop(&mut self) {
125+
if let Some(start) = self.start.take() {
126+
self.metrics.add_elapsed(start);
127+
}
128+
}
129+
}
130+
131+
struct FileStreamMetrics {
132+
/// Time elapsed for file opening
133+
pub time_opening: StartableTime,
134+
/// Time elapsed for file scanning + first record batch of decompression + decoding
135+
pub time_scanning: StartableTime,
136+
/// Time elapsed for data decompression + decoding
137+
pub time_processing: StartableTime,
138+
}
139+
140+
impl FileStreamMetrics {
141+
fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
142+
let time_opening = StartableTime {
143+
metrics: MetricBuilder::new(metrics)
144+
.subset_time("time_elapsed_opening", partition),
145+
start: None,
146+
};
147+
148+
let time_scanning = StartableTime {
149+
metrics: MetricBuilder::new(metrics)
150+
.subset_time("time_elapsed_scanning", partition),
151+
start: None,
152+
};
153+
154+
let time_processing = StartableTime {
155+
metrics: MetricBuilder::new(metrics)
156+
.subset_time("time_elapsed_processing", partition),
157+
start: None,
158+
};
159+
160+
Self {
161+
time_opening,
162+
time_scanning,
163+
time_processing,
164+
}
165+
}
166+
}
167+
107168
impl<F: FileOpener> FileStream<F> {
108169
pub fn new(
109170
config: &FileScanConfig,
110171
partition: usize,
111172
context: Arc<TaskContext>,
112173
file_reader: F,
113-
baseline_metrics: BaselineMetrics,
174+
metrics: ExecutionPlanMetricsSet,
114175
) -> Result<Self> {
115176
let (projected_schema, _) = config.project();
116177
let pc_projector = PartitionColumnProjector::new(
@@ -132,7 +193,8 @@ impl<F: FileOpener> FileStream<F> {
132193
pc_projector,
133194
object_store,
134195
state: FileStreamState::Idle,
135-
baseline_metrics,
196+
file_stream_metrics: FileStreamMetrics::new(&metrics, partition),
197+
baseline_metrics: BaselineMetrics::new(&metrics, partition),
136198
})
137199
}
138200

@@ -148,6 +210,7 @@ impl<F: FileOpener> FileStream<F> {
148210
None => return Poll::Ready(None),
149211
};
150212

213+
self.file_stream_metrics.time_opening.start();
151214
let future = self.file_reader.open(
152215
self.object_store.clone(),
153216
file.object_meta,
@@ -164,6 +227,8 @@ impl<F: FileOpener> FileStream<F> {
164227
partition_values,
165228
} => match ready!(future.poll_unpin(cx)) {
166229
Ok(reader) => {
230+
self.file_stream_metrics.time_opening.stop();
231+
self.file_stream_metrics.time_scanning.start();
167232
self.state = FileStreamState::Scan {
168233
partition_values: std::mem::take(partition_values),
169234
reader,
@@ -179,6 +244,7 @@ impl<F: FileOpener> FileStream<F> {
179244
partition_values,
180245
} => match ready!(reader.poll_next_unpin(cx)) {
181246
Some(result) => {
247+
self.file_stream_metrics.time_scanning.stop();
182248
let result = result
183249
.and_then(|b| self.pc_projector.project(b, partition_values))
184250
.map(|batch| match &mut self.remain {
@@ -202,7 +268,10 @@ impl<F: FileOpener> FileStream<F> {
202268

203269
return Poll::Ready(Some(result));
204270
}
205-
None => self.state = FileStreamState::Idle,
271+
None => {
272+
self.file_stream_metrics.time_scanning.stop();
273+
self.state = FileStreamState::Idle;
274+
}
206275
},
207276
FileStreamState::Error | FileStreamState::Limit => {
208277
return Poll::Ready(None)
@@ -219,10 +288,9 @@ impl<F: FileOpener> Stream for FileStream<F> {
219288
mut self: Pin<&mut Self>,
220289
cx: &mut Context<'_>,
221290
) -> Poll<Option<Self::Item>> {
222-
let cloned_time = self.baseline_metrics.elapsed_compute().clone();
223-
let timer = cloned_time.timer();
291+
self.file_stream_metrics.time_processing.start();
224292
let result = self.poll_inner(cx);
225-
timer.done();
293+
self.file_stream_metrics.time_processing.stop();
226294
self.baseline_metrics.record_poll(result)
227295
}
228296
}
@@ -286,9 +354,14 @@ mod tests {
286354
table_partition_cols: vec![],
287355
};
288356

289-
let metrics = BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
290-
let file_stream =
291-
FileStream::new(&config, 0, ctx.task_ctx(), reader, metrics).unwrap();
357+
let file_stream = FileStream::new(
358+
&config,
359+
0,
360+
ctx.task_ctx(),
361+
reader,
362+
ExecutionPlanMetricsSet::new(),
363+
)
364+
.unwrap();
292365

293366
file_stream
294367
.map(|b| b.expect("No error expected in stream"))

datafusion/core/src/physical_plan/file_format/json.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use crate::physical_plan::file_format::delimited_stream::newline_delimited_strea
2727
use crate::physical_plan::file_format::file_stream::{
2828
FileOpenFuture, FileOpener, FileStream,
2929
};
30-
use crate::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
30+
use crate::physical_plan::metrics::ExecutionPlanMetricsSet;
3131
use crate::physical_plan::{
3232
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
3333
};
@@ -126,7 +126,7 @@ impl ExecutionPlan for NdJsonExec {
126126
partition,
127127
context,
128128
opener,
129-
BaselineMetrics::new(&self.metrics, partition),
129+
self.metrics.clone(),
130130
)?;
131131

132132
Ok(Box::pin(stream) as SendableRecordBatchStream)

datafusion/core/src/physical_plan/file_format/parquet.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ use crate::datasource::listing::FileRange;
5454
use crate::physical_plan::file_format::file_stream::{
5555
FileOpenFuture, FileOpener, FileStream,
5656
};
57-
use crate::physical_plan::metrics::BaselineMetrics;
5857
use crate::{
5958
error::{DataFusionError, Result},
6059
execution::context::{SessionState, TaskContext},
@@ -232,7 +231,7 @@ impl ExecutionPlan for ParquetExec {
232231
partition_index,
233232
context,
234233
opener,
235-
BaselineMetrics::new(&self.metrics, partition_index),
234+
self.metrics.clone(),
236235
)?;
237236

238237
Ok(Box::pin(stream))

0 commit comments

Comments
 (0)