Skip to content

Commit

Permalink
Merge pull request #36 from dfinity/fixes-v4
Browse files Browse the repository at this point in the history
Fixes v4
  • Loading branch information
blind-oracle authored Sep 11, 2024
2 parents 8d367bd + cba98f0 commit 39ecb62
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 45 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ axum = "0.7"
axum-extra = "0.9"
backoff = { version = "0.4", features = ["tokio"] }
base64 = "0.22"
bytes = "1.6"
bytes = "1.7"
candid = "0.10"
chrono = "0.4"
clap = { version = "4.5", features = ["derive", "string", "env"] }
Expand Down
6 changes: 3 additions & 3 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,13 +427,13 @@ pub struct Vector {

/// Vector buffer size (in number of events) to account for ingest problems.
/// If the buffer is full then new events will be dropped.
#[clap(env, long, default_value = "131072")]
#[clap(env, long, default_value = "524288")]
pub log_vector_buffer: usize,

/// Number of batch flusher tasks to spawn.
/// If there's a big event volume - increasing this number might help.
/// Each task is flushing a single batch which contains time-orderded events.
#[clap(env, long, default_value = "4")]
/// Each task is flushing a single batch which contains time-ordered events.
#[clap(env, long, default_value = "8")]
pub log_vector_flushers: usize,

/// Vector HTTP request timeout for a batch flush
Expand Down
6 changes: 1 addition & 5 deletions src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ use prometheus::{
IntCounterVec, Registry,
};
use serde_json::json;
use tokio::sync::RwLock;
use tower_http::compression::CompressionLayer;
use tracing::info;

Expand All @@ -51,7 +50,6 @@ pub use {
};

const KB: f64 = 1024.0;
const METRICS_CACHE_CAPACITY: usize = 15 * 1024 * 1024;

pub const HTTP_DURATION_BUCKETS: &[f64] = &[0.05, 0.2, 1.0, 2.0];
pub const HTTP_REQUEST_SIZE_BUCKETS: &[f64] = &[128.0, KB, 2.0 * KB, 4.0 * KB, 8.0 * KB];
Expand All @@ -62,9 +60,7 @@ pub fn setup(
tls_session_cache: Arc<sessions::Storage>,
tasks: &mut TaskManager,
) -> Router {
let cache = Arc::new(RwLock::new(runner::MetricsCache::new(
METRICS_CACHE_CAPACITY,
)));
let cache = Arc::new(runner::MetricsCache::new());
let runner = Arc::new(runner::MetricsRunner::new(
cache.clone(),
registry,
Expand Down
34 changes: 18 additions & 16 deletions src/metrics/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,34 @@ use std::{
};

use anyhow::Error;
use arc_swap::ArcSwap;
use axum::{async_trait, extract::State, response::IntoResponse};
use bytes::{BufMut, Bytes, BytesMut};
use http::header::CONTENT_TYPE;
use ic_bn_lib::{tasks::Run, tls::sessions};
use prometheus::{register_int_gauge_with_registry, Encoder, IntGauge, Registry, TextEncoder};
use tikv_jemalloc_ctl::{epoch, stats};
use tokio::{select, sync::RwLock};
use tokio::select;
use tokio_util::sync::CancellationToken;
use tracing::{debug, warn};

// https://prometheus.io/docs/instrumenting/exposition_formats/#basic-info
const PROMETHEUS_CONTENT_TYPE: &str = "text/plain; version=0.0.4";

pub struct MetricsCache {
buffer: Vec<u8>,
buffer: ArcSwap<Bytes>,
}

impl MetricsCache {
pub fn new(capacity: usize) -> Self {
pub fn new() -> Self {
Self {
// Preallocate a large enough vector, it'll be expanded if needed
buffer: Vec::with_capacity(capacity),
buffer: ArcSwap::new(Arc::new(Bytes::from(vec![]))),
}
}
}

pub struct MetricsRunner {
metrics_cache: Arc<RwLock<MetricsCache>>,
metrics_cache: Arc<MetricsCache>,
registry: Registry,
tls_session_cache: Arc<sessions::Storage>,
encoder: TextEncoder,
Expand All @@ -45,7 +46,7 @@ pub struct MetricsRunner {
// Snapshots & encodes the metrics for the handler to export
impl MetricsRunner {
pub fn new(
metrics_cache: Arc<RwLock<MetricsCache>>,
metrics_cache: Arc<MetricsCache>,
registry: &Registry,
tls_session_cache: Arc<sessions::Storage>,
) -> Self {
Expand Down Expand Up @@ -107,12 +108,14 @@ impl MetricsRunner {
// Get a snapshot of metrics
let metric_families = self.registry.gather();

// Take a write lock, truncate the vector and encode the metrics into it
let mut metrics_cache = self.metrics_cache.write().await;
metrics_cache.buffer.clear();
self.encoder
.encode(&metric_families, &mut metrics_cache.buffer)?;
drop(metrics_cache); // clippy
// Encode the metrics into the buffer
let mut buffer = BytesMut::with_capacity(10 * 1024 * 1024).writer();
self.encoder.encode(&metric_families, &mut buffer)?;

// Store the new snapshot
self.metrics_cache
.buffer
.store(Arc::new(buffer.into_inner().freeze()));

Ok(())
}
Expand Down Expand Up @@ -147,10 +150,9 @@ impl Run for MetricsRunner {
}
}

pub async fn handler(State(state): State<Arc<RwLock<MetricsCache>>>) -> impl IntoResponse {
// Get a read lock and clone the buffer contents
pub async fn handler(State(state): State<Arc<MetricsCache>>) -> impl IntoResponse {
(
[(CONTENT_TYPE, PROMETHEUS_CONTENT_TYPE)],
state.read().await.buffer.clone(),
state.buffer.load_full().as_ref().clone(),
)
}
34 changes: 14 additions & 20 deletions src/metrics/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,26 +270,10 @@ impl Batcher {
return;
}
};
let batch_size = batch.len();

let batch = match zstd::encode_all(batch.reader(), 5) {
Ok(v) => v,
Err(e) => {
warn!("Vector: Batcher: unable to compress batch: {e:#}");
self.metrics.batch_encoding_failures.inc();
self.batch.clear();
return;
}
};
let batch = Bytes::from(batch);

// In our case the Batcher is dropped before the Flusher, so no error can occur
let start = Instant::now();
info!(
"Vector: Batcher: queueing batch (len {}, compressed {})",
batch_size,
batch.len()
);
info!("Vector: Batcher: queueing batch (len {})", batch.len());
let _ = self.tx.send(batch).await;
info!(
"Vector: Batcher: batch queued in {}s",
Expand Down Expand Up @@ -391,14 +375,21 @@ impl Flusher {
}

async fn flush(&self, batch: Bytes) -> Result<(), Error> {
let raw_size = batch.len();

let batch = zstd::encode_all(batch.reader(), 3).context("unable to compress batch")?;
let batch = Bytes::from(batch);

// Retry
// TODO make configurable?
let mut interval = RETRY_INTERVAL;
let mut retries = RETRY_COUNT;

while retries > 0 {
let start = Instant::now();
info!(
"Vector: {self}: sending batch (retry {})",
"Vector: {self}: sending batch (raw size {raw_size}, compressed {}, retry {})",
batch.len(),
RETRY_COUNT - retries + 1
);

Expand All @@ -410,7 +401,10 @@ impl Flusher {
);
} else {
self.metrics.batch_flushes.with_label_values(&["yes"]).inc();
info!("Vector: {self}: batch sent");
info!(
"Vector: {self}: batch sent in {}s",
start.elapsed().as_secs_f64()
);
return Ok(());
}

Expand Down Expand Up @@ -551,6 +545,6 @@ mod test {

// 6k sent, buffer 5k => 1k will be dropped
assert_eq!(client.0.load(Ordering::SeqCst), 100);
assert_eq!(client.1.load(Ordering::SeqCst), 16981); // Compressed size
assert_eq!(client.1.load(Ordering::SeqCst), 16327); // Compressed size
}
}

0 comments on commit 39ecb62

Please sign in to comment.