Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion lading/src/generator/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::{

use hyper::{HeaderMap, Request, Uri, header::CONTENT_LENGTH};
use hyper_util::{client::legacy::Client, rt::TokioExecutor};
use metrics::counter;
use metrics::{counter, histogram};
use once_cell::sync::OnceCell;
use rand::{SeedableRng, prelude::StdRng};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -258,6 +258,7 @@ impl Http {
match client.request(request).await {
Ok(response) => {
counter!("bytes_written", &labels).increment(block_length as u64);
histogram!("bytes_written_histogram", &labels).record(block_length as f64);

if let Some(dp) = data_points {
counter!("data_points_transmitted", &labels).increment(dp);
Expand Down
2 changes: 1 addition & 1 deletion lading_capture/src/formats/multi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub enum Error {

/// Multi-format writer
///
/// Writes metrics to both JSONL and Parquet formats, uses fail-fast sematics on
/// Writes metrics to both JSONL and Parquet formats, uses fail-fast semantics on
/// errors from either format.
#[derive(Debug)]
pub struct Format<W1: Write, W2: Write + Seek + Send> {
Expand Down
11 changes: 9 additions & 2 deletions lading_capture/src/formats/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ struct ColumnBuffers {
values_histogram: Vec<Vec<u8>>,
}

const EMPTY_HISTOGRAM: Vec<u8> = Vec::<_>::new();

impl ColumnBuffers {
/// Create new column buffers with default capacity
fn new() -> Self {
Expand Down Expand Up @@ -111,10 +113,17 @@ impl ColumnBuffers {
line::LineValue::Int(v) => {
self.values_int.push(Some(v));
self.values_float.push(None);
self.values_histogram.push(EMPTY_HISTOGRAM);
}
line::LineValue::Float(v) => {
self.values_int.push(None);
self.values_float.push(Some(v));
self.values_histogram.push(EMPTY_HISTOGRAM);
}
line::LineValue::ExternalHistogram => {
self.values_int.push(None);
self.values_float.push(None);
self.values_histogram.push(line.value_histogram.clone());
}
}

Expand All @@ -125,8 +134,6 @@ impl ColumnBuffers {
}
#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
self.label_offsets.push(self.label_keys.len() as i32);

self.values_histogram.push(line.value_histogram.clone());
}

/// Check if buffers are empty
Expand Down
4 changes: 4 additions & 0 deletions lading_capture/src/line.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ pub enum LineValue {
Int(u64),
/// A floating point, 64 bits wide
Float(f64),
/// Value given externally
ExternalHistogram,
}

impl LineValue {
Expand All @@ -134,6 +136,7 @@ impl LineValue {
match self {
LineValue::Int(int) => *int as f64,
LineValue::Float(float) => *float,
LineValue::ExternalHistogram => -1.0,
}
}
}
Expand All @@ -143,6 +146,7 @@ impl std::fmt::Display for LineValue {
match self {
LineValue::Int(int) => write!(f, "{int}"),
LineValue::Float(float) => write!(f, "{float}"),
LineValue::ExternalHistogram => write!(f, "[hist]"),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion lading_capture/src/manager/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ impl<F: OutputFormat, C: Clock> StateMachine<F, C> {
fetch_index: tick,
metric_name: key.name().into(),
metric_kind: line::MetricKind::Histogram,
value: line::LineValue::Float(0.0),
value: line::LineValue::ExternalHistogram,
labels,
value_histogram: sketch_bytes.clone(),
};
Expand Down
Loading