Skip to content

Commit

Permalink
logging
Browse files Browse the repository at this point in the history
  • Loading branch information
blind-oracle committed Sep 12, 2024
1 parent 35632bf commit 56a5f2c
Showing 1 changed file with 23 additions and 23 deletions.
46 changes: 23 additions & 23 deletions src/metrics/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,10 +274,16 @@ struct Batcher {
metrics: Metrics,
}

impl Display for Batcher {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Vector: Batcher")
}
}

impl Batcher {
fn add_to_batch(&mut self, event: Event) {
if let Err(e) = self.encoder.encode_event(event, &mut self.batch) {
warn!("Vector: Batcher: unable to encode event: {e:#}");
warn!("{self}: unable to encode event: {e:#}");
self.metrics.encoding_failures.inc();

// Reclaim back the space that was split off
Expand All @@ -303,13 +309,10 @@ impl Batcher {
let batch = self.batch.clone().freeze();

let start = Instant::now();
info!("Vector: Batcher: queueing batch (len {})", batch.len());
info!("{self}: queueing batch (len {})", batch.len());
// In our case the Batcher is dropped before the Flusher, so no error can occur
let _ = self.tx.send(batch).await;
info!(
"Vector: Batcher: batch queued in {}s",
start.elapsed().as_secs_f64()
);
info!("{self}: batch queued in {}s", start.elapsed().as_secs_f64());
self.metrics.buffer_batch_size.inc();
self.batch.clear();
}
Expand All @@ -332,15 +335,15 @@ impl Batcher {
}

async fn run(mut self) {
warn!("Vector: Batcher started");
warn!("{self}: started");
loop {
select! {
biased;

() = self.token.cancelled() => {
warn!("Vector: Batcher: stopping, draining");
warn!("{self}: stopping, draining");
self.drain().await;
warn!("Vector: Batcher: stopped");
warn!("{self}: stopped");
return;
}

Expand Down Expand Up @@ -370,7 +373,7 @@ struct Flusher {

impl Display for Flusher {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Flusher{}", self.id)
write!(f, "Vector: Flusher{}", self.id)
}
}

Expand Down Expand Up @@ -422,26 +425,23 @@ impl Flusher {
while retries > 0 {
let start = Instant::now();
info!(
"Vector: {self}: sending batch (raw size {raw_size}, compressed {}, retry {})",
"{self}: sending batch (raw size {raw_size}, compressed {}, retry {})",
batch.len(),
RETRY_COUNT - retries + 1
);

// Bytes is cheap to clone
if let Err(e) = self.send(batch.clone()).await {
warn!(
"Vector: {self}: unable to send (try {}): {e:#}",
"{self}: unable to send (try {}): {e:#}",
RETRY_COUNT - retries + 1
);
} else {
self.metrics.sent.inc_by(raw_size as u64);
self.metrics.sent_compressed.inc_by(batch.len() as u64);
self.metrics.batch_flushes.with_label_values(&["yes"]).inc();

info!(
"Vector: {self}: batch sent in {}s",
start.elapsed().as_secs_f64()
);
info!("{self}: batch sent in {}s", start.elapsed().as_secs_f64());
return Ok(());
}

Expand Down Expand Up @@ -470,32 +470,32 @@ impl Flusher {
}

async fn run(self) {
warn!("Vector: {self} started");
warn!("{self} started");

loop {
select! {
biased;

() = self.token.cancelled() => {
warn!("Vector: {self}: stopping, draining");
warn!("{self}: stopping, draining");

if let Err(e) = self.drain().await {
warn!("Vector: {self}: unable to drain: {e:#}");
warn!("{self}: unable to drain: {e:#}");
}

warn!("Vector: {self}: stopped");
warn!("{self}: stopped");
return;
}

Ok(batch) = self.rx.recv() => {
self.metrics.buffer_batch_size.dec();
info!("Vector: {self}: received batch (len {})", batch.len());
info!("{self}: received batch (len {})", batch.len());

if let Err(e) = self.flush(batch).await {
warn!("Vector: {self}: unable to flush: {e:#}");
warn!("{self}: unable to flush: {e:#}");
};

info!("Vector: {self}: received batch flushed");
info!("{self}: received batch flushed");
}
}
}
Expand Down

0 comments on commit 56a5f2c

Please sign in to comment.