Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ async fn explain_analyze_baseline_metrics() {
"FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434",
"output_bytes="
);
assert_metrics!(
&formatted,
"FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434",
"selectivity=99% (99/100)"
);
assert_metrics!(
&formatted,
"ProjectionExec: expr=[]",
Expand Down
36 changes: 30 additions & 6 deletions datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,13 @@ use crate::filter_pushdown::{
ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase,
FilterPushdownPropagation, PushedDown, PushedDownPredicate,
};
use crate::metrics::{MetricBuilder, MetricType};
use crate::projection::{
make_with_child, try_embed_projection, update_expr, EmbeddedProjection,
ProjectionExec, ProjectionExpr,
};
use crate::{
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RatioMetrics},
DisplayFormatType, ExecutionPlan,
};

Expand Down Expand Up @@ -384,12 +385,12 @@ impl ExecutionPlan for FilterExec {
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
trace!("Start FilterExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
let metrics = FilterExecMetrics::new(&self.metrics, partition);
Ok(Box::pin(FilterExecStream {
schema: self.schema(),
predicate: Arc::clone(&self.predicate),
input: self.input.execute(partition, context)?,
baseline_metrics,
metrics,
projection: self.projection.clone(),
}))
}
Expand Down Expand Up @@ -623,11 +624,30 @@ struct FilterExecStream {
/// The input partition to filter.
input: SendableRecordBatchStream,
/// Runtime metrics recording
baseline_metrics: BaselineMetrics,
metrics: FilterExecMetrics,
/// The projection indices of the columns in the input schema
projection: Option<Vec<usize>>,
}

/// The metrics for `FilterExec`
struct FilterExecMetrics {
// Common metrics for most operators
baseline_metrics: BaselineMetrics,
// Selectivity of the filter, calculated as output_rows / input_rows
selectivity: RatioMetrics,
}

impl FilterExecMetrics {
pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
Self {
baseline_metrics: BaselineMetrics::new(metrics, partition),
selectivity: MetricBuilder::new(metrics)
.with_type(MetricType::SUMMARY)
.ratio_metrics("selectivity", partition),
}
}
}

pub fn batch_filter(
batch: &RecordBatch,
predicate: &Arc<dyn PhysicalExpr>,
Expand Down Expand Up @@ -679,14 +699,18 @@ impl Stream for FilterExecStream {
loop {
match ready!(self.input.poll_next_unpin(cx)) {
Some(Ok(batch)) => {
let timer = self.baseline_metrics.elapsed_compute().timer();
let timer = self.metrics.baseline_metrics.elapsed_compute().timer();
let filtered_batch = filter_and_project(
&batch,
&self.predicate,
self.projection.as_ref(),
&self.schema,
)?;
timer.done();

self.metrics.selectivity.add_part(filtered_batch.num_rows());
self.metrics.selectivity.add_total(batch.num_rows());

// Skip entirely filtered batches
if filtered_batch.num_rows() == 0 {
continue;
Expand All @@ -700,7 +724,7 @@ impl Stream for FilterExecStream {
}
}
}
self.baseline_metrics.record_poll(poll)
self.metrics.baseline_metrics.record_poll(poll)
}

fn size_hint(&self) -> (usize, Option<usize>) {
Expand Down
19 changes: 18 additions & 1 deletion datafusion/physical-plan/src/metrics/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@

use std::{borrow::Cow, sync::Arc};

use crate::metrics::{value::PruningMetrics, MetricType};
use crate::metrics::{
value::{PruningMetrics, RatioMetrics},
MetricType,
};

use super::{
Count, ExecutionPlanMetricsSet, Gauge, Label, Metric, MetricValue, Time, Timestamp,
Expand Down Expand Up @@ -266,4 +269,18 @@ impl<'a> MetricBuilder<'a> {
});
pruning_metrics
}

/// Consumes self and creates a new [`RatioMetrics`]
pub fn ratio_metrics(
self,
name: impl Into<Cow<'static, str>>,
partition: usize,
) -> RatioMetrics {
let ratio_metrics = RatioMetrics::new();
self.with_partition(partition).build(MetricValue::Ratio {
name: name.into(),
ratio_metrics: ratio_metrics.clone(),
});
ratio_metrics
}
}
4 changes: 3 additions & 1 deletion datafusion/physical-plan/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ pub use baseline::{BaselineMetrics, RecordOutput, SpillMetrics, SplitMetrics};
pub use builder::MetricBuilder;
pub use custom::CustomMetricValue;
pub use value::{
Count, Gauge, MetricValue, PruningMetrics, ScopedTimerGuard, Time, Timestamp,
Count, Gauge, MetricValue, PruningMetrics, RatioMetrics, ScopedTimerGuard, Time,
Timestamp,
};

/// Something that tracks a value of interest (metric) of a DataFusion
Expand Down Expand Up @@ -305,6 +306,7 @@ impl MetricsSet {
MetricValue::StartTimestamp(_) => false,
MetricValue::EndTimestamp(_) => false,
MetricValue::PruningMetrics { name, .. } => name == metric_name,
MetricValue::Ratio { name, .. } => name == metric_name,
MetricValue::Custom { .. } => false,
})
}
Expand Down
165 changes: 162 additions & 3 deletions datafusion/physical-plan/src/metrics/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,92 @@ impl PruningMetrics {
}
}

/// Counters tracking ratio metrics (e.g. matched vs total)
///
/// The counters are thread-safe and shared across clones.
#[derive(Debug, Clone, Default)]
pub struct RatioMetrics {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is basically the same as Pruned metrics except the display is different -- I wonder if we could consolidate the two somehow 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed the core logic is mostly duplicated, but I think the tradeoff is more lines of code for better readability, the APIs named add_matched/add_pruned for pruning are easier to understand than the APIs in the ratio metric.

part: Arc<AtomicUsize>,
total: Arc<AtomicUsize>,
}

impl RatioMetrics {
/// Create a new [`RatioMetrics`]
pub fn new() -> Self {
Self {
part: Arc::new(AtomicUsize::new(0)),
total: Arc::new(AtomicUsize::new(0)),
}
}

/// Add `n` to the numerator (`part`) value
pub fn add_part(&self, n: usize) {
self.part.fetch_add(n, Ordering::Relaxed);
}

/// Add `n` to the denominator (`total`) value
pub fn add_total(&self, n: usize) {
self.total.fetch_add(n, Ordering::Relaxed);
}

/// Merge the value from `other` into `self`
pub fn merge(&self, other: &Self) {
self.add_part(other.part());
self.add_total(other.total());
}

/// Return the numerator (`part`) value
pub fn part(&self) -> usize {
self.part.load(Ordering::Relaxed)
}

/// Return the denominator (`total`) value
pub fn total(&self) -> usize {
self.total.load(Ordering::Relaxed)
}
}

impl PartialEq for RatioMetrics {
fn eq(&self, other: &Self) -> bool {
self.part() == other.part() && self.total() == other.total()
}
}

/// Format a float number with `digits` most significant numbers.
///
/// fmt_significant(12.5) -> "12"
/// fmt_significant(0.0543) -> "0.054"
/// fmt_significant(0.000123) -> "0.00012"
fn fmt_significant(mut x: f64, digits: usize) -> String {
if x == 0.0 {
return "0".to_string();
}

let exp = x.abs().log10().floor(); // exponent of first significant digit
let scale = 10f64.powf(-(exp - (digits as f64 - 1.0)));
x = (x * scale).round() / scale; // round to N significant digits
format!("{x}")
}

impl Display for RatioMetrics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let part = self.part();
let total = self.total();

if total == 0 {
if part == 0 {
write!(f, "N/A (0/0)")
} else {
write!(f, "N/A ({part}/0)")
}
} else {
let percentage = (part as f64 / total as f64) * 100.0;

write!(f, "{}% ({part}/{total})", fmt_significant(percentage, 2))
}
}
}

/// Possible values for a [super::Metric].
///
/// Among other differences, the metric types have different ways to
Expand Down Expand Up @@ -499,6 +585,11 @@ pub enum MetricValue {
name: Cow<'static, str>,
pruning_metrics: PruningMetrics,
},
/// Metrics that should be displayed as ratio like (42%)
Ratio {
name: Cow<'static, str>,
ratio_metrics: RatioMetrics,
},
Custom {
/// The provided name of this metric
name: Cow<'static, str>,
Expand Down Expand Up @@ -563,6 +654,30 @@ impl PartialEq for MetricValue {
(MetricValue::EndTimestamp(timestamp), MetricValue::EndTimestamp(other)) => {
timestamp == other
}
(
MetricValue::PruningMetrics {
name,
pruning_metrics,
},
MetricValue::PruningMetrics {
name: other_name,
pruning_metrics: other_pruning_metrics,
},
) => {
name == other_name
&& pruning_metrics.pruned() == other_pruning_metrics.pruned()
&& pruning_metrics.matched() == other_pruning_metrics.matched()
}
(
MetricValue::Ratio {
name,
ratio_metrics,
},
MetricValue::Ratio {
name: other_name,
ratio_metrics: other_ratio_metrics,
},
) => name == other_name && ratio_metrics == other_ratio_metrics,
(
MetricValue::Custom { name, value },
MetricValue::Custom {
Expand Down Expand Up @@ -593,6 +708,7 @@ impl MetricValue {
Self::StartTimestamp(_) => "start_timestamp",
Self::EndTimestamp(_) => "end_timestamp",
Self::PruningMetrics { name, .. } => name.borrow(),
Self::Ratio { name, .. } => name.borrow(),
Self::Custom { name, .. } => name.borrow(),
}
}
Expand Down Expand Up @@ -625,6 +741,8 @@ impl MetricValue {
// like `PruningMetrics`, this function is not supposed to get called.
// Metrics aggregation for them are implemented inside `MetricsSet` directly.
Self::PruningMetrics { .. } => 0,
// Should not be used. See comments in `PruningMetrics` for details.
Self::Ratio { .. } => 0,
Self::Custom { value, .. } => value.as_usize(),
}
}
Expand Down Expand Up @@ -658,6 +776,10 @@ impl MetricValue {
name: name.clone(),
pruning_metrics: PruningMetrics::new(),
},
Self::Ratio { name, .. } => Self::Ratio {
name: name.clone(),
ratio_metrics: RatioMetrics::new(),
},
Self::Custom { name, value } => Self::Custom {
name: name.clone(),
value: value.new_empty(),
Expand Down Expand Up @@ -723,6 +845,15 @@ impl MetricValue {
pruning_metrics.add_pruned(pruned);
pruning_metrics.add_matched(matched);
}
(
Self::Ratio { ratio_metrics, .. },
Self::Ratio {
ratio_metrics: other_ratio_metrics,
..
},
) => {
ratio_metrics.merge(other_ratio_metrics);
}
(
Self::Custom { value, .. },
Self::Custom {
Expand Down Expand Up @@ -770,9 +901,10 @@ impl MetricValue {
Self::Count { .. } => 12,
Self::Gauge { .. } => 13,
Self::Time { .. } => 14,
Self::StartTimestamp(_) => 15, // show timestamps last
Self::EndTimestamp(_) => 16,
Self::Custom { .. } => 17,
Self::Ratio { .. } => 15,
Self::StartTimestamp(_) => 16, // show timestamps last
Self::EndTimestamp(_) => 17,
Self::Custom { .. } => 18,
}
}

Expand Down Expand Up @@ -816,6 +948,7 @@ impl Display for MetricValue {
} => {
write!(f, "{pruning_metrics}")
}
Self::Ratio { ratio_metrics, .. } => write!(f, "{ratio_metrics}"),
Self::Custom { name, value } => {
write!(f, "name:{name} {value}")
}
Expand Down Expand Up @@ -970,6 +1103,32 @@ mod tests {
}
}

#[test]
fn test_display_ratio() {
let ratio_metrics = RatioMetrics::new();
let ratio = MetricValue::Ratio {
name: Cow::Borrowed("ratio_metric"),
ratio_metrics: ratio_metrics.clone(),
};

assert_eq!("N/A (0/0)", ratio.to_string());

ratio_metrics.add_part(10);
assert_eq!("N/A (10/0)", ratio.to_string());

ratio_metrics.add_total(40);
assert_eq!("25% (10/40)", ratio.to_string());

let tiny_ratio_metrics = RatioMetrics::new();
let tiny_ratio = MetricValue::Ratio {
name: Cow::Borrowed("tiny_ratio_metric"),
ratio_metrics: tiny_ratio_metrics.clone(),
};
tiny_ratio_metrics.add_part(1);
tiny_ratio_metrics.add_total(3000);
assert_eq!("0.033% (1/3000)", tiny_ratio.to_string());
}

#[test]
fn test_display_timestamp() {
let timestamp = Timestamp::new();
Expand Down