Skip to content

Commit 1c63759

Browse files
authored
Add spill_count and spilled_bytes to baseline metrics, test sort with spill of metrics (#1641)
1 parent af8786e commit 1c63759

File tree

5 files changed

+186
-27
lines changed

5 files changed

+186
-27
lines changed

datafusion/src/physical_plan/metrics/baseline.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,12 @@ pub struct BaselineMetrics {
5050
/// amount of time the operator was actively trying to use the CPU
5151
elapsed_compute: Time,
5252

53+
/// count of spills during the execution of the operator
54+
spill_count: Count,
55+
56+
/// total spilled bytes during the execution of the operator
57+
spilled_bytes: Count,
58+
5359
/// output rows: the total output rows
5460
output_rows: Count,
5561
}
@@ -63,6 +69,8 @@ impl BaselineMetrics {
6369
Self {
6470
end_time: MetricBuilder::new(metrics).end_timestamp(partition),
6571
elapsed_compute: MetricBuilder::new(metrics).elapsed_compute(partition),
72+
spill_count: MetricBuilder::new(metrics).spill_count(partition),
73+
spilled_bytes: MetricBuilder::new(metrics).spilled_bytes(partition),
6674
output_rows: MetricBuilder::new(metrics).output_rows(partition),
6775
}
6876
}
@@ -72,6 +80,22 @@ impl BaselineMetrics {
7280
&self.elapsed_compute
7381
}
7482

83+
/// return the metric for the total number of spills triggered during execution
84+
pub fn spill_count(&self) -> &Count {
85+
&self.spill_count
86+
}
87+
88+
/// return the metric for the total spilled bytes during execution
89+
pub fn spilled_bytes(&self) -> &Count {
90+
&self.spilled_bytes
91+
}
92+
93+
/// Record a spill of `spilled_bytes` size.
94+
pub fn record_spill(&self, spilled_bytes: usize) {
95+
self.spill_count.add(1);
96+
self.spilled_bytes.add(spilled_bytes);
97+
}
98+
7599
/// return the metric for the total number of output rows produced
76100
pub fn output_rows(&self) -> &Count {
77101
&self.output_rows

datafusion/src/physical_plan/metrics/builder.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,24 @@ impl<'a> MetricBuilder<'a> {
105105
count
106106
}
107107

108+
/// Consume self and create a new counter for recording the number of spills
109+
/// triggered by an operator
110+
pub fn spill_count(self, partition: usize) -> Count {
111+
let count = Count::new();
112+
self.with_partition(partition)
113+
.build(MetricValue::SpillCount(count.clone()));
114+
count
115+
}
116+
117+
/// Consume self and create a new counter for recording the total spilled bytes
118+
/// triggered by an operator
119+
pub fn spilled_bytes(self, partition: usize) -> Count {
120+
let count = Count::new();
121+
self.with_partition(partition)
122+
.build(MetricValue::SpilledBytes(count.clone()));
123+
count
124+
}
125+
108126
/// Consumes self and creates a new [`Count`] for recording some
109127
/// arbitrary metric of an operator.
110128
pub fn counter(

datafusion/src/physical_plan/metrics/mod.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,20 @@ impl MetricsSet {
191191
.map(|v| v.as_usize())
192192
}
193193

194+
/// convenience: return the count of spills, aggregated
195+
/// across partitions or None if no metric is present
196+
pub fn spill_count(&self) -> Option<usize> {
197+
self.sum(|metric| matches!(metric.value(), MetricValue::SpillCount(_)))
198+
.map(|v| v.as_usize())
199+
}
200+
201+
/// convenience: return the total byte size of spills, aggregated
202+
/// across partitions or None if no metric is present
203+
pub fn spilled_bytes(&self) -> Option<usize> {
204+
self.sum(|metric| matches!(metric.value(), MetricValue::SpilledBytes(_)))
205+
.map(|v| v.as_usize())
206+
}
207+
194208
/// convenience: return the amount of elapsed CPU time spent,
195209
/// aggregated across partitions or None if no metric is present
196210
pub fn elapsed_compute(&self) -> Option<usize> {

datafusion/src/physical_plan/metrics/value.rs

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,10 @@ pub enum MetricValue {
283283
/// classical defintion of "cpu_time", which is the time reported
284284
/// from `clock_gettime(CLOCK_THREAD_CPUTIME_ID, ..)`.
285285
ElapsedCompute(Time),
286+
/// Number of spills produced: "spill_count" metric
287+
SpillCount(Count),
288+
/// Total size of spilled bytes produced: "spilled_bytes" metric
289+
SpilledBytes(Count),
286290
/// Operator defined count.
287291
Count {
288292
/// The provided name of this metric
@@ -308,6 +312,8 @@ impl MetricValue {
308312
pub fn name(&self) -> &str {
309313
match self {
310314
Self::OutputRows(_) => "output_rows",
315+
Self::SpillCount(_) => "spill_count",
316+
Self::SpilledBytes(_) => "spilled_bytes",
311317
Self::ElapsedCompute(_) => "elapsed_compute",
312318
Self::Count { name, .. } => name.borrow(),
313319
Self::Time { name, .. } => name.borrow(),
@@ -320,6 +326,8 @@ impl MetricValue {
320326
pub fn as_usize(&self) -> usize {
321327
match self {
322328
Self::OutputRows(count) => count.value(),
329+
Self::SpillCount(count) => count.value(),
330+
Self::SpilledBytes(bytes) => bytes.value(),
323331
Self::ElapsedCompute(time) => time.value(),
324332
Self::Count { count, .. } => count.value(),
325333
Self::Time { time, .. } => time.value(),
@@ -339,6 +347,8 @@ impl MetricValue {
339347
pub fn new_empty(&self) -> Self {
340348
match self {
341349
Self::OutputRows(_) => Self::OutputRows(Count::new()),
350+
Self::SpillCount(_) => Self::SpillCount(Count::new()),
351+
Self::SpilledBytes(_) => Self::SpilledBytes(Count::new()),
342352
Self::ElapsedCompute(_) => Self::ElapsedCompute(Time::new()),
343353
Self::Count { name, .. } => Self::Count {
344354
name: name.clone(),
@@ -365,6 +375,8 @@ impl MetricValue {
365375
pub fn aggregate(&mut self, other: &Self) {
366376
match (self, other) {
367377
(Self::OutputRows(count), Self::OutputRows(other_count))
378+
| (Self::SpillCount(count), Self::SpillCount(other_count))
379+
| (Self::SpilledBytes(count), Self::SpilledBytes(other_count))
368380
| (
369381
Self::Count { count, .. },
370382
Self::Count {
@@ -401,10 +413,12 @@ impl MetricValue {
401413
match self {
402414
Self::OutputRows(_) => 0, // show first
403415
Self::ElapsedCompute(_) => 1, // show second
404-
Self::Count { .. } => 2,
405-
Self::Time { .. } => 3,
406-
Self::StartTimestamp(_) => 4, // show timestamps last
407-
Self::EndTimestamp(_) => 5,
416+
Self::SpillCount(_) => 2,
417+
Self::SpilledBytes(_) => 3,
418+
Self::Count { .. } => 4,
419+
Self::Time { .. } => 5,
420+
Self::StartTimestamp(_) => 6, // show timestamps last
421+
Self::EndTimestamp(_) => 7,
408422
}
409423
}
410424

@@ -418,7 +432,10 @@ impl std::fmt::Display for MetricValue {
418432
/// Prints the value of this metric
419433
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
420434
match self {
421-
Self::OutputRows(count) | Self::Count { count, .. } => {
435+
Self::OutputRows(count)
436+
| Self::SpillCount(count)
437+
| Self::SpilledBytes(count)
438+
| Self::Count { count, .. } => {
422439
write!(f, "{}", count)
423440
}
424441
Self::ElapsedCompute(time) | Self::Time { time, .. } => {

0 commit comments

Comments
 (0)