diff --git a/src/cli.rs b/src/cli.rs index dd93abd..ca6eb57 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -417,8 +417,9 @@ pub struct Vector { #[clap(env, long)] pub log_vector_pass: Option, - /// Vector batch size - #[clap(env, long, default_value = "10000")] + /// Vector batch size in bytes, uncompressed. + /// When it's exceeded then the batch is closed & queued for sending. + #[clap(env, long, default_value = "16MB", value_parser = parse_size_usize)] pub log_vector_batch: usize, /// Vector batch flush interval @@ -439,6 +440,10 @@ pub struct Vector { /// Vector HTTP request timeout for a batch flush #[clap(env, long, default_value = "30s", value_parser = parse_duration)] pub log_vector_timeout: Duration, + + /// ZSTD compression level to use when sending data + #[clap(env, long, default_value = "3")] + pub log_vector_zstd_level: usize, } #[derive(Args)] diff --git a/src/metrics/vector.rs b/src/metrics/vector.rs index 0be5b9f..d252619 100644 --- a/src/metrics/vector.rs +++ b/src/metrics/vector.rs @@ -18,7 +18,8 @@ use reqwest::{ }; use tokio::{ select, - time::{interval, sleep}, + sync::mpsc, + time::{interval, sleep, Interval}, }; use tokio_util::{ codec::{Encoder, LengthDelimitedCodec}, @@ -37,10 +38,13 @@ const CONTENT_ENCODING_ZSTD: HeaderValue = HeaderValue::from_static("zstd"); #[derive(Clone)] struct Metrics { - buffer_size: IntGauge, + sent: IntCounter, + sent_compressed: IntCounter, + buffer_event_size: IntGauge, + buffer_batch_size: IntGauge, batch_size: IntGauge, buffer_drops: IntCounter, - batch_encoding_failures: IntCounter, + encoding_failures: IntCounter, batch_flush_retries: IntCounter, batch_flushes: IntCounterVec, } @@ -48,13 +52,34 @@ struct Metrics { impl Metrics { pub fn new(registry: &Registry) -> Self { Self { - buffer_size: register_int_gauge_with_registry!( - format!("vector_buffer_size"), + sent: register_int_counter_with_registry!( + format!("vector_sent"), + format!("Number of bytes sent"), + registry + ) + .unwrap(), + + sent_compressed: register_int_counter_with_registry!( + format!("vector_sent_compressed"), + format!("Number of bytes sent (compressed)"), + registry + ) + .unwrap(), + + buffer_event_size: register_int_gauge_with_registry!( + format!("vector_event_buffer_size"), format!("Number of events in the incoming buffer"), registry ) .unwrap(), + buffer_batch_size: register_int_gauge_with_registry!( + format!("vector_batch_buffer_size"), + format!("Number of batchs in the outgoing buffer"), + registry + ) + .unwrap(), + batch_size: register_int_gauge_with_registry!( format!("vector_batch_size"), format!("Number of events in the outgoing batch"), @@ -69,9 +94,9 @@ impl Metrics { ) .unwrap(), - batch_encoding_failures: register_int_counter_with_registry!( - format!("vector_batch_encoding_failures"), - format!("Number of batches that were dropped due to encoding failure"), + encoding_failures: register_int_counter_with_registry!( + format!("vector_encoding_failures"), + format!("Number of events that were dropped due to encoding failure"), registry ) .unwrap(), @@ -122,20 +147,13 @@ impl EventEncoder { // Add framing let bytes = payload.split().freeze(); - self.framer.encode(bytes, &mut payload)?; + self.framer + .encode(bytes, &mut payload) + .map_err(|e| anyhow!("unable to add framing: {e:#}"))?; buf.unsplit(payload); Ok(()) } - - /// Encodes the provided batch into wire format leaving the provided Vec empty - fn encode_batch(&mut self, batch: &mut Vec) -> Result { - let mut body = BytesMut::new(); - for event in batch.drain(..) { - self.encode_event(event, &mut body)?; - } - Ok(body.freeze()) - } } pub struct Vector { @@ -143,7 +161,7 @@ pub struct Vector { token_flushers: CancellationToken, tracker_batcher: TaskTracker, tracker_flushers: TaskTracker, - tx: Sender, + tx: mpsc::Sender, metrics: Metrics, } @@ -151,18 +169,27 @@ impl Vector { pub fn new(cli: &cli::Vector, client: Arc, registry: &Registry) -> Self { let cli = cli.clone(); - let (tx_event, rx_event) = bounded(cli.log_vector_buffer); + let (tx_event, rx_event) = mpsc::channel(cli.log_vector_buffer); let (tx_batch, rx_batch) = bounded(64); let metrics = Metrics::new(registry); // Start batcher warn!("Vector: starting batcher"); + let batch_capacity = cli.log_vector_batch + cli.log_vector_batch / 10; let token_batcher = CancellationToken::new(); + + let mut interval = interval(cli.log_vector_interval); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + let batcher = Batcher { rx: rx_event, tx: tx_batch, - batch: Vec::with_capacity(cli.log_vector_batch), + // Allocate an extra 10% to make sure that we don't reallocate when pushing into batch + batch: BytesMut::with_capacity(batch_capacity), + batch_capacity, + batch_size: cli.log_vector_batch, + interval, token: token_batcher.child_token(), encoder: EventEncoder::new(), metrics: metrics.clone(), @@ -170,7 +197,7 @@ impl Vector { let tracker_batcher = TaskTracker::new(); tracker_batcher.spawn(async move { - batcher.run(cli.log_vector_interval).await; + batcher.run().await; }); // Start flushers @@ -190,6 +217,7 @@ impl Vector { client: client.clone(), url: cli.log_vector_url.clone().unwrap(), auth: auth.clone(), + zstd_level: cli.log_vector_zstd_level, token: token_flushers.child_token(), timeout: cli.log_vector_timeout, metrics: metrics.clone(), @@ -218,7 +246,7 @@ impl Vector { if self.tx.try_send(event).is_err() { self.metrics.buffer_drops.inc(); } else { - self.metrics.buffer_size.inc(); + self.metrics.buffer_event_size.inc(); }; } @@ -236,21 +264,41 @@ impl Vector { } struct Batcher { - rx: Receiver, + rx: mpsc::Receiver, tx: Sender, - batch: Vec, + batch: BytesMut, + batch_capacity: usize, + batch_size: usize, encoder: EventEncoder, + interval: Interval, token: CancellationToken, metrics: Metrics, } +impl Display for Batcher { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Vector: Batcher") + } +} + impl Batcher { - async fn add_to_batch(&mut self, event: Event) { - self.batch.push(event); + fn add_to_batch(&mut self, event: Event) { + if let Err(e) = self.encoder.encode_event(event, &mut self.batch) { + warn!("{self}: unable to encode event: {e:#}"); + self.metrics.encoding_failures.inc(); + + // Reclaim back the space that was split off + let additional = self.batch_capacity - self.batch.capacity(); + self.batch.reserve(additional); + + // Clear the batch since the encoding failure might leave it inconsistent + self.batch.clear(); + }; self.metrics.batch_size.set(self.batch.len() as i64); - if self.batch.len() == self.batch.capacity() { - self.flush().await; + if self.batch.len() >= self.batch_size { + // Reset the interval to cause the flushing + self.interval.reset_immediately(); } } @@ -259,26 +307,15 @@ impl Batcher { return; } - // Encode the batch - let mut encoder = self.encoder.clone(); - let batch = match encoder.encode_batch(&mut self.batch) { - Ok(v) => v, - Err(e) => { - warn!("Vector: Batcher: unable to encode batch: {e:#}"); - self.metrics.batch_encoding_failures.inc(); - self.batch.clear(); - return; - } - }; + let batch = self.batch.clone().freeze(); - // In our case the Batcher is dropped before the Flusher, so no error can occur let start = Instant::now(); - info!("Vector: Batcher: queueing batch (len {})", batch.len()); + info!("{self}: queueing batch (len {})", batch.len()); + // In our case the Batcher is dropped before the Flusher, so no error can occur let _ = self.tx.send(batch).await; - info!( - "Vector: Batcher: batch queued in {}s", - start.elapsed().as_secs_f64() - ); + info!("{self}: batch queued in {}s", start.elapsed().as_secs_f64()); + self.metrics.buffer_batch_size.inc(); + self.batch.clear(); } async fn drain(&mut self) { @@ -286,37 +323,38 @@ impl Batcher { self.rx.close(); // Drain the buffer - while let Ok(v) = self.rx.recv().await { - self.add_to_batch(v).await; + while let Some(v) = self.rx.recv().await { + self.add_to_batch(v); + + if self.batch.len() >= self.batch_size { + self.flush().await; + } } // Flush the rest if anything left self.flush().await; } - async fn run(mut self, flush_interval: Duration) { - let mut interval = interval(flush_interval); - interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - - warn!("Vector: Batcher started"); + async fn run(mut self) { + warn!("{self}: started"); loop { select! { biased; () = self.token.cancelled() => { - warn!("Vector: Batcher: stopping, draining"); + warn!("{self}: stopping, draining"); self.drain().await; - warn!("Vector: Batcher: stopped"); + warn!("{self}: stopped"); return; } - _ = interval.tick() => { + _ = self.interval.tick() => { self.flush().await; } - Ok(event) = self.rx.recv() => { - self.metrics.buffer_size.dec(); - self.add_to_batch(event).await; + Some(event) = self.rx.recv() => { + self.metrics.buffer_event_size.dec(); + self.add_to_batch(event); } } } @@ -330,13 +368,14 @@ struct Flusher { timeout: Duration, url: Url, auth: Option, + zstd_level: usize, token: CancellationToken, metrics: Metrics, } impl Display for Flusher { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "Flusher{}", self.id) + write!(f, "Vector: Flusher{}", self.id) } } @@ -377,7 +416,8 @@ impl Flusher { async fn flush(&self, batch: Bytes) -> Result<(), Error> { let raw_size = batch.len(); - let batch = zstd::encode_all(batch.reader(), 3).context("unable to compress batch")?; + let batch = zstd::encode_all(batch.reader(), self.zstd_level as i32) + .context("unable to compress batch")?; let batch = Bytes::from(batch); // Retry @@ -385,10 +425,10 @@ impl Flusher { let mut interval = RETRY_INTERVAL; let mut retries = RETRY_COUNT; - while retries > 0 { + loop { let start = Instant::now(); info!( - "Vector: {self}: sending batch (raw size {raw_size}, compressed {}, retry {})", + "{self}: sending batch (raw size {raw_size}, compressed {}, retry {})", batch.len(), RETRY_COUNT - retries + 1 ); @@ -396,24 +436,27 @@ impl Flusher { // Bytes is cheap to clone if let Err(e) = self.send(batch.clone()).await { warn!( - "Vector: {self}: unable to send (try {}): {e:#}", + "{self}: unable to send (try {}): {e:#}", RETRY_COUNT - retries + 1 ); } else { + self.metrics.sent.inc_by(raw_size as u64); + self.metrics.sent_compressed.inc_by(batch.len() as u64); self.metrics.batch_flushes.with_label_values(&["yes"]).inc(); - info!( - "Vector: {self}: batch sent in {}s", - start.elapsed().as_secs_f64() - ); + + info!("{self}: batch sent in {}s", start.elapsed().as_secs_f64()); return Ok(()); } - self.metrics.batch_flush_retries.inc(); - sleep(interval).await; - // Back off a bit retries -= 1; interval *= 2; + if retries == 0 { + break; + } + + self.metrics.batch_flush_retries.inc(); + sleep(interval).await; } self.metrics.batch_flushes.with_label_values(&["no"]).inc(); @@ -433,31 +476,32 @@ impl Flusher { } async fn run(self) { - warn!("Vector: {self} started"); + warn!("{self} started"); loop { select! { biased; () = self.token.cancelled() => { - warn!("Vector: {self}: stopping, draining"); + warn!("{self}: stopping, draining"); if let Err(e) = self.drain().await { - warn!("Vector: {self}: unable to drain: {e:#}"); + warn!("{self}: unable to drain: {e:#}"); } - warn!("Vector: {self}: stopped"); + warn!("{self}: stopped"); return; } Ok(batch) = self.rx.recv() => { - info!("Vector: {self}: received batch (len {})", batch.len()); + self.metrics.buffer_batch_size.dec(); + info!("{self}: received batch (len {})", batch.len()); if let Err(e) = self.flush(batch).await { - warn!("Vector: {self}: unable to flush: {e:#}"); + warn!("{self}: unable to flush: {e:#}"); }; - info!("Vector: {self}: received batch flushed"); + info!("{self}: received batch flushed"); } } } @@ -490,7 +534,7 @@ mod test { return Ok(resp.into()); } - let body = req.body().unwrap().as_bytes().unwrap(); + let body = zstd::decode_all(req.body().unwrap().as_bytes().unwrap()).unwrap(); self.0.fetch_add(1, Ordering::SeqCst); self.1.fetch_add(body.len() as u64, Ordering::SeqCst); @@ -523,11 +567,12 @@ mod test { log_vector_url: Some(Url::parse("http://127.0.0.1:1234").unwrap()), log_vector_user: None, log_vector_pass: None, - log_vector_batch: 50, + log_vector_batch: 1500, log_vector_buffer: 5000, log_vector_interval: Duration::from_secs(100), log_vector_timeout: Duration::from_secs(10), log_vector_flushers: 4, + log_vector_zstd_level: 3, }; let client = Arc::new(TestClient(AtomicU64::new(0), AtomicU64::new(0))); @@ -544,7 +589,7 @@ mod test { vector.stop().await; // 6k sent, buffer 5k => 1k will be dropped - assert_eq!(client.0.load(Ordering::SeqCst), 100); - assert_eq!(client.1.load(Ordering::SeqCst), 16327); // Compressed size + assert_eq!(client.0.load(Ordering::SeqCst), 131); + assert_eq!(client.1.load(Ordering::SeqCst), 197780); // Uncompressed size } } diff --git a/src/routing/ic/transport.rs b/src/routing/ic/transport.rs index 34b867f..b63c2ff 100644 --- a/src/routing/ic/transport.rs +++ b/src/routing/ic/transport.rs @@ -120,7 +120,7 @@ impl ReqwestTransport { body: Option>, ) -> Result<(StatusCode, Vec), AgentError> { // Create the initial request with a fake URL which will be overridden later - let mut http_request = Request::new(method.clone(), Url::parse("http://foo").unwrap()); + let mut http_request = Request::new(method.clone(), Url::parse("https://foo").unwrap()); http_request .headers_mut()