Skip to content

Commit 8cbb750

Browse files
authored
Add End-to-end test for parquet pruning + metrics for ParquetExec (#657)
* End to end tests for parquet pruning * remove unused dep * Make the separation of per-partition and per-exec metrics clearer * Account for statistics once rather than per row group * Fix timestamps to use UTC time
1 parent 58da159 commit 8cbb750

File tree

5 files changed

+508
-37
lines changed

5 files changed

+508
-37
lines changed

datafusion/src/physical_optimizer/repartition.rs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,9 @@ mod tests {
110110

111111
use super::*;
112112
use crate::datasource::datasource::Statistics;
113-
use crate::physical_plan::parquet::{ParquetExec, ParquetPartition};
113+
use crate::physical_plan::parquet::{
114+
ParquetExec, ParquetExecMetrics, ParquetPartition,
115+
};
114116
use crate::physical_plan::projection::ProjectionExec;
115117

116118
#[test]
@@ -119,12 +121,13 @@ mod tests {
119121
let parquet_project = ProjectionExec::try_new(
120122
vec![],
121123
Arc::new(ParquetExec::new(
122-
vec![ParquetPartition {
123-
filenames: vec!["x".to_string()],
124-
statistics: Statistics::default(),
125-
}],
124+
vec![ParquetPartition::new(
125+
vec!["x".to_string()],
126+
Statistics::default(),
127+
)],
126128
schema,
127129
None,
130+
ParquetExecMetrics::new(),
128131
None,
129132
2048,
130133
None,
@@ -156,12 +159,13 @@ mod tests {
156159
Arc::new(ProjectionExec::try_new(
157160
vec![],
158161
Arc::new(ParquetExec::new(
159-
vec![ParquetPartition {
160-
filenames: vec!["x".to_string()],
161-
statistics: Statistics::default(),
162-
}],
162+
vec![ParquetPartition::new(
163+
vec!["x".to_string()],
164+
Statistics::default(),
165+
)],
163166
schema,
164167
None,
168+
ParquetExecMetrics::new(),
165169
None,
166170
2048,
167171
None,

datafusion/src/physical_plan/mod.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,20 @@ pub fn visit_execution_plan<V: ExecutionPlanVisitor>(
297297
Ok(())
298298
}
299299

300+
/// Recursively gateher all execution metrics from this plan and all of its input plans
301+
pub fn plan_metrics(plan: Arc<dyn ExecutionPlan>) -> HashMap<String, SQLMetric> {
302+
fn get_metrics_inner(
303+
plan: &dyn ExecutionPlan,
304+
mut metrics: HashMap<String, SQLMetric>,
305+
) -> HashMap<String, SQLMetric> {
306+
metrics.extend(plan.metrics().into_iter());
307+
plan.children().into_iter().fold(metrics, |metrics, child| {
308+
get_metrics_inner(child.as_ref(), metrics)
309+
})
310+
}
311+
get_metrics_inner(plan.as_ref(), HashMap::new())
312+
}
313+
300314
/// Execute the [ExecutionPlan] and collect the results in memory
301315
pub async fn collect(plan: Arc<dyn ExecutionPlan>) -> Result<Vec<RecordBatch>> {
302316
match plan.output_partitioning().partition_count() {

datafusion/src/physical_plan/parquet.rs

Lines changed: 133 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ use arrow::{
4040
error::{ArrowError, Result as ArrowResult},
4141
record_batch::RecordBatch,
4242
};
43+
use hashbrown::HashMap;
44+
use log::debug;
4345
use parquet::file::{
4446
metadata::RowGroupMetaData,
4547
reader::{FileReader, SerializedFileReader},
@@ -59,6 +61,8 @@ use crate::datasource::datasource::{ColumnStatistics, Statistics};
5961
use async_trait::async_trait;
6062
use futures::stream::{Stream, StreamExt};
6163

64+
use super::SQLMetric;
65+
6266
/// Execution plan for scanning one or more Parquet partitions
6367
#[derive(Debug, Clone)]
6468
pub struct ParquetExec {
@@ -72,6 +76,8 @@ pub struct ParquetExec {
7276
batch_size: usize,
7377
/// Statistics for the data set (sum of statistics for all partitions)
7478
statistics: Statistics,
79+
/// metrics for the overall execution
80+
metrics: ParquetExecMetrics,
7581
/// Optional predicate builder
7682
predicate_builder: Option<PruningPredicate>,
7783
/// Optional limit of the number of rows
@@ -93,6 +99,24 @@ pub struct ParquetPartition {
9399
pub filenames: Vec<String>,
94100
/// Statistics for this partition
95101
pub statistics: Statistics,
102+
/// Execution metrics
103+
metrics: ParquetPartitionMetrics,
104+
}
105+
106+
/// Stores metrics about the overall parquet execution
107+
#[derive(Debug, Clone)]
108+
pub struct ParquetExecMetrics {
109+
/// Numer of times the pruning predicate could not be created
110+
pub predicate_creation_errors: Arc<SQLMetric>,
111+
}
112+
113+
/// Stores metrics about the parquet execution for a particular ParquetPartition
114+
#[derive(Debug, Clone)]
115+
struct ParquetPartitionMetrics {
116+
/// Numer of times the predicate could not be evaluated
117+
pub predicate_evaluation_errors: Arc<SQLMetric>,
118+
/// Number of row groups pruned using
119+
pub row_groups_pruned: Arc<SQLMetric>,
96120
}
97121

98122
impl ParquetExec {
@@ -140,6 +164,8 @@ impl ParquetExec {
140164
max_concurrency: usize,
141165
limit: Option<usize>,
142166
) -> Result<Self> {
167+
debug!("Creating ParquetExec, filenames: {:?}, projection {:?}, predicate: {:?}, limit: {:?}",
168+
filenames, projection, predicate, limit);
143169
// build a list of Parquet partitions with statistics and gather all unique schemas
144170
// used in this data set
145171
let mut schemas: Vec<Schema> = vec![];
@@ -205,10 +231,7 @@ impl ParquetExec {
205231
};
206232
// remove files that are not needed in case of limit
207233
filenames.truncate(total_files);
208-
partitions.push(ParquetPartition {
209-
filenames,
210-
statistics,
211-
});
234+
partitions.push(ParquetPartition::new(filenames, statistics));
212235
if limit_exhausted {
213236
break;
214237
}
@@ -225,14 +248,27 @@ impl ParquetExec {
225248
)));
226249
}
227250
let schema = Arc::new(schemas.pop().unwrap());
251+
let metrics = ParquetExecMetrics::new();
252+
228253
let predicate_builder = predicate.and_then(|predicate_expr| {
229-
PruningPredicate::try_new(&predicate_expr, schema.clone()).ok()
254+
match PruningPredicate::try_new(&predicate_expr, schema.clone()) {
255+
Ok(predicate_builder) => Some(predicate_builder),
256+
Err(e) => {
257+
debug!(
258+
"Could not create pruning predicate for {:?}: {}",
259+
predicate_expr, e
260+
);
261+
metrics.predicate_creation_errors.add(1);
262+
None
263+
}
264+
}
230265
});
231266

232267
Ok(Self::new(
233268
partitions,
234269
schema,
235270
projection,
271+
metrics,
236272
predicate_builder,
237273
batch_size,
238274
limit,
@@ -244,6 +280,7 @@ impl ParquetExec {
244280
partitions: Vec<ParquetPartition>,
245281
schema: SchemaRef,
246282
projection: Option<Vec<usize>>,
283+
metrics: ParquetExecMetrics,
247284
predicate_builder: Option<PruningPredicate>,
248285
batch_size: usize,
249286
limit: Option<usize>,
@@ -307,6 +344,7 @@ impl ParquetExec {
307344
partitions,
308345
schema: Arc::new(projected_schema),
309346
projection,
347+
metrics,
310348
predicate_builder,
311349
batch_size,
312350
statistics,
@@ -341,6 +379,7 @@ impl ParquetPartition {
341379
Self {
342380
filenames,
343381
statistics,
382+
metrics: ParquetPartitionMetrics::new(),
344383
}
345384
}
346385

@@ -355,6 +394,25 @@ impl ParquetPartition {
355394
}
356395
}
357396

397+
impl ParquetExecMetrics {
398+
/// Create new metrics
399+
pub fn new() -> Self {
400+
Self {
401+
predicate_creation_errors: SQLMetric::counter(),
402+
}
403+
}
404+
}
405+
406+
impl ParquetPartitionMetrics {
407+
/// Create new metrics
408+
pub fn new() -> Self {
409+
Self {
410+
predicate_evaluation_errors: SQLMetric::counter(),
411+
row_groups_pruned: SQLMetric::counter(),
412+
}
413+
}
414+
}
415+
358416
#[async_trait]
359417
impl ExecutionPlan for ParquetExec {
360418
/// Return a reference to Any that can be used for downcasting
@@ -398,7 +456,9 @@ impl ExecutionPlan for ParquetExec {
398456
Receiver<ArrowResult<RecordBatch>>,
399457
) = channel(2);
400458

401-
let filenames = self.partitions[partition].filenames.clone();
459+
let partition = &self.partitions[partition];
460+
let filenames = partition.filenames.clone();
461+
let metrics = partition.metrics.clone();
402462
let projection = self.projection.clone();
403463
let predicate_builder = self.predicate_builder.clone();
404464
let batch_size = self.batch_size;
@@ -407,6 +467,7 @@ impl ExecutionPlan for ParquetExec {
407467
task::spawn_blocking(move || {
408468
if let Err(e) = read_files(
409469
&filenames,
470+
metrics,
410471
&projection,
411472
&predicate_builder,
412473
batch_size,
@@ -448,6 +509,31 @@ impl ExecutionPlan for ParquetExec {
448509
}
449510
}
450511
}
512+
513+
fn metrics(&self) -> HashMap<String, SQLMetric> {
514+
self.partitions
515+
.iter()
516+
.flat_map(|p| {
517+
[
518+
(
519+
format!(
520+
"numPredicateEvaluationErrors for {}",
521+
p.filenames.join(",")
522+
),
523+
p.metrics.predicate_evaluation_errors.as_ref().clone(),
524+
),
525+
(
526+
format!("numRowGroupsPruned for {}", p.filenames.join(",")),
527+
p.metrics.row_groups_pruned.as_ref().clone(),
528+
),
529+
]
530+
})
531+
.chain(std::iter::once((
532+
"numPredicateCreationErrors".to_string(),
533+
self.metrics.predicate_creation_errors.as_ref().clone(),
534+
)))
535+
.collect()
536+
}
451537
}
452538

453539
fn send_result(
@@ -547,6 +633,7 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> {
547633

548634
fn build_row_group_predicate(
549635
predicate_builder: &PruningPredicate,
636+
metrics: ParquetPartitionMetrics,
550637
row_group_metadata: &[RowGroupMetaData],
551638
) -> Box<dyn Fn(&RowGroupMetaData, usize) -> bool> {
552639
let parquet_schema = predicate_builder.schema().as_ref();
@@ -555,21 +642,28 @@ fn build_row_group_predicate(
555642
row_group_metadata,
556643
parquet_schema,
557644
};
558-
559645
let predicate_values = predicate_builder.prune(&pruning_stats);
560646

561-
let predicate_values = match predicate_values {
562-
Ok(values) => values,
647+
match predicate_values {
648+
Ok(values) => {
649+
// NB: false means don't scan row group
650+
let num_pruned = values.iter().filter(|&v| !v).count();
651+
metrics.row_groups_pruned.add(num_pruned);
652+
Box::new(move |_, i| values[i])
653+
}
563654
// stats filter array could not be built
564655
// return a closure which will not filter out any row groups
565-
_ => return Box::new(|_r, _i| true),
566-
};
567-
568-
Box::new(move |_, i| predicate_values[i])
656+
Err(e) => {
657+
debug!("Error evaluating row group predicate values {}", e);
658+
metrics.predicate_evaluation_errors.add(1);
659+
Box::new(|_r, _i| true)
660+
}
661+
}
569662
}
570663

571664
fn read_files(
572665
filenames: &[String],
666+
metrics: ParquetPartitionMetrics,
573667
projection: &[usize],
574668
predicate_builder: &Option<PruningPredicate>,
575669
batch_size: usize,
@@ -583,6 +677,7 @@ fn read_files(
583677
if let Some(predicate_builder) = predicate_builder {
584678
let row_group_predicate = build_row_group_predicate(
585679
predicate_builder,
680+
metrics.clone(),
586681
file_reader.metadata().row_groups(),
587682
);
588683
file_reader.filter_row_groups(&row_group_predicate);
@@ -757,8 +852,11 @@ mod tests {
757852
vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)],
758853
);
759854
let row_group_metadata = vec![rgm1, rgm2];
760-
let row_group_predicate =
761-
build_row_group_predicate(&predicate_builder, &row_group_metadata);
855+
let row_group_predicate = build_row_group_predicate(
856+
&predicate_builder,
857+
ParquetPartitionMetrics::new(),
858+
&row_group_metadata,
859+
);
762860
let row_group_filter = row_group_metadata
763861
.iter()
764862
.enumerate()
@@ -787,8 +885,11 @@ mod tests {
787885
vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)],
788886
);
789887
let row_group_metadata = vec![rgm1, rgm2];
790-
let row_group_predicate =
791-
build_row_group_predicate(&predicate_builder, &row_group_metadata);
888+
let row_group_predicate = build_row_group_predicate(
889+
&predicate_builder,
890+
ParquetPartitionMetrics::new(),
891+
&row_group_metadata,
892+
);
792893
let row_group_filter = row_group_metadata
793894
.iter()
794895
.enumerate()
@@ -832,8 +933,11 @@ mod tests {
832933
],
833934
);
834935
let row_group_metadata = vec![rgm1, rgm2];
835-
let row_group_predicate =
836-
build_row_group_predicate(&predicate_builder, &row_group_metadata);
936+
let row_group_predicate = build_row_group_predicate(
937+
&predicate_builder,
938+
ParquetPartitionMetrics::new(),
939+
&row_group_metadata,
940+
);
837941
let row_group_filter = row_group_metadata
838942
.iter()
839943
.enumerate()
@@ -847,8 +951,11 @@ mod tests {
847951
// this bypasses the entire predicate expression and no row groups are filtered out
848952
let expr = col("c1").gt(lit(15)).or(col("c2").modulus(lit(2)));
849953
let predicate_builder = PruningPredicate::try_new(&expr, schema)?;
850-
let row_group_predicate =
851-
build_row_group_predicate(&predicate_builder, &row_group_metadata);
954+
let row_group_predicate = build_row_group_predicate(
955+
&predicate_builder,
956+
ParquetPartitionMetrics::new(),
957+
&row_group_metadata,
958+
);
852959
let row_group_filter = row_group_metadata
853960
.iter()
854961
.enumerate()
@@ -891,8 +998,11 @@ mod tests {
891998
],
892999
);
8931000
let row_group_metadata = vec![rgm1, rgm2];
894-
let row_group_predicate =
895-
build_row_group_predicate(&predicate_builder, &row_group_metadata);
1001+
let row_group_predicate = build_row_group_predicate(
1002+
&predicate_builder,
1003+
ParquetPartitionMetrics::new(),
1004+
&row_group_metadata,
1005+
);
8961006
let row_group_filter = row_group_metadata
8971007
.iter()
8981008
.enumerate()

0 commit comments

Comments
 (0)