Skip to content

Commit 13dfdd6

Browse files
authored
[BugFix] fix explain csv/json/avro exec can not see metrics bug (#5018)
When we executing explain analyze select * from tablexxx, csv/json/avro exev metrics is empty. This bug is introduced by not implementing metrics traits in csv/json/avro. Signed-off-by: xyz <a997647204@gmail.com> Signed-off-by: xyz <a997647204@gmail.com>
1 parent 0820eb9 commit 13dfdd6

File tree

5 files changed

+43
-18
lines changed

5 files changed

+43
-18
lines changed

datafusion/core/src/physical_plan/file_format/avro.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use crate::physical_plan::{
2424
use arrow::datatypes::SchemaRef;
2525

2626
use crate::execution::context::TaskContext;
27-
use crate::physical_plan::metrics::ExecutionPlanMetricsSet;
27+
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
2828
use std::any::Any;
2929
use std::sync::Arc;
3030

@@ -122,7 +122,7 @@ impl ExecutionPlan for AvroExec {
122122
let opener = private::AvroOpener { config };
123123

124124
let stream =
125-
FileStream::new(&self.base_config, partition, opener, self.metrics.clone())?;
125+
FileStream::new(&self.base_config, partition, opener, &self.metrics)?;
126126
Ok(Box::pin(stream))
127127
}
128128

@@ -146,6 +146,10 @@ impl ExecutionPlan for AvroExec {
146146
fn statistics(&self) -> Statistics {
147147
self.projected_statistics.clone()
148148
}
149+
150+
fn metrics(&self) -> Option<MetricsSet> {
151+
Some(self.metrics.clone_inner())
152+
}
149153
}
150154

151155
#[cfg(feature = "avro")]

datafusion/core/src/physical_plan/file_format/csv.rs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use crate::physical_plan::file_format::file_stream::{
2626
FileOpenFuture, FileOpener, FileStream,
2727
};
2828
use crate::physical_plan::file_format::FileMeta;
29-
use crate::physical_plan::metrics::ExecutionPlanMetricsSet;
29+
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
3030
use crate::physical_plan::{
3131
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
3232
};
@@ -153,7 +153,7 @@ impl ExecutionPlan for CsvExec {
153153
file_compression_type: self.file_compression_type.to_owned(),
154154
};
155155
let stream =
156-
FileStream::new(&self.base_config, partition, opener, self.metrics.clone())?;
156+
FileStream::new(&self.base_config, partition, opener, &self.metrics)?;
157157
Ok(Box::pin(stream) as SendableRecordBatchStream)
158158
}
159159

@@ -179,6 +179,10 @@ impl ExecutionPlan for CsvExec {
179179
fn statistics(&self) -> Statistics {
180180
self.projected_statistics.clone()
181181
}
182+
183+
fn metrics(&self) -> Option<MetricsSet> {
184+
Some(self.metrics.clone_inner())
185+
}
182186
}
183187

184188
#[derive(Debug, Clone)]
@@ -515,6 +519,13 @@ mod tests {
515519
"+----+------------+",
516520
];
517521
crate::assert_batches_eq!(expected, &[batch.slice(0, 5)]);
522+
523+
let metrics = csv.metrics().expect("doesn't found metrics");
524+
let time_elapsed_processing = get_value(&metrics, "time_elapsed_processing");
525+
assert!(
526+
time_elapsed_processing > 0,
527+
"Expected time_elapsed_processing greater than 0",
528+
);
518529
Ok(())
519530
}
520531

@@ -676,4 +687,15 @@ mod tests {
676687

677688
Ok(())
678689
}
690+
691+
fn get_value(metrics: &MetricsSet, metric_name: &str) -> usize {
692+
match metrics.sum_by_name(metric_name) {
693+
Some(v) => v.as_usize(),
694+
_ => {
695+
panic!(
696+
"Expected metric not found. Looking for '{metric_name}' in\n\n{metrics:#?}"
697+
);
698+
}
699+
}
700+
}
679701
}

datafusion/core/src/physical_plan/file_format/file_stream.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ impl<F: FileOpener> FileStream<F> {
166166
config: &FileScanConfig,
167167
partition: usize,
168168
file_reader: F,
169-
metrics: ExecutionPlanMetricsSet,
169+
metrics: &ExecutionPlanMetricsSet,
170170
) -> Result<Self> {
171171
let (projected_schema, _) = config.project();
172172
let pc_projector = PartitionColumnProjector::new(
@@ -187,8 +187,8 @@ impl<F: FileOpener> FileStream<F> {
187187
file_reader,
188188
pc_projector,
189189
state: FileStreamState::Idle,
190-
file_stream_metrics: FileStreamMetrics::new(&metrics, partition),
191-
baseline_metrics: BaselineMetrics::new(&metrics, partition),
190+
file_stream_metrics: FileStreamMetrics::new(metrics, partition),
191+
baseline_metrics: BaselineMetrics::new(metrics, partition),
192192
})
193193
}
194194

@@ -353,9 +353,8 @@ mod tests {
353353
output_ordering: None,
354354
infinite_source: false,
355355
};
356-
357-
let file_stream =
358-
FileStream::new(&config, 0, reader, ExecutionPlanMetricsSet::new()).unwrap();
356+
let metrics_set = ExecutionPlanMetricsSet::new();
357+
let file_stream = FileStream::new(&config, 0, reader, &metrics_set).unwrap();
359358

360359
file_stream
361360
.map(|b| b.expect("No error expected in stream"))

datafusion/core/src/physical_plan/file_format/json.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use crate::physical_plan::file_format::file_stream::{
2626
FileOpenFuture, FileOpener, FileStream,
2727
};
2828
use crate::physical_plan::file_format::FileMeta;
29-
use crate::physical_plan::metrics::ExecutionPlanMetricsSet;
29+
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
3030
use crate::physical_plan::{
3131
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
3232
};
@@ -134,7 +134,7 @@ impl ExecutionPlan for NdJsonExec {
134134
};
135135

136136
let stream =
137-
FileStream::new(&self.base_config, partition, opener, self.metrics.clone())?;
137+
FileStream::new(&self.base_config, partition, opener, &self.metrics)?;
138138

139139
Ok(Box::pin(stream) as SendableRecordBatchStream)
140140
}
@@ -159,6 +159,10 @@ impl ExecutionPlan for NdJsonExec {
159159
fn statistics(&self) -> Statistics {
160160
self.projected_statistics.clone()
161161
}
162+
163+
fn metrics(&self) -> Option<MetricsSet> {
164+
Some(self.metrics.clone_inner())
165+
}
162166
}
163167

164168
struct JsonOpener {

datafusion/core/src/physical_plan/file_format/parquet.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -306,12 +306,8 @@ impl ExecutionPlan for ParquetExec {
306306
enable_page_index: self.enable_page_index(config_options),
307307
};
308308

309-
let stream = FileStream::new(
310-
&self.base_config,
311-
partition_index,
312-
opener,
313-
self.metrics.clone(),
314-
)?;
309+
let stream =
310+
FileStream::new(&self.base_config, partition_index, opener, &self.metrics)?;
315311

316312
Ok(Box::pin(stream))
317313
}

0 commit comments

Comments
 (0)