Skip to content

Commit

Permalink
zstd configurable compr, retry fail early
Browse files Browse the repository at this point in the history
  • Loading branch information
blind-oracle committed Sep 13, 2024
1 parent 56a5f2c commit 1da16f0
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 6 deletions.
4 changes: 4 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,10 @@ pub struct Vector {
/// Vector HTTP request timeout for a batch flush
#[clap(env, long, default_value = "30s", value_parser = parse_duration)]
pub log_vector_timeout: Duration,

/// ZSTD compression level to use when sending data
#[clap(env, long, default_value = "3")]
pub log_vector_zstd_level: usize,
}

#[derive(Args)]
Expand Down
17 changes: 12 additions & 5 deletions src/metrics/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ impl Vector {
client: client.clone(),
url: cli.log_vector_url.clone().unwrap(),
auth: auth.clone(),
zstd_level: cli.log_vector_zstd_level,
token: token_flushers.child_token(),
timeout: cli.log_vector_timeout,
metrics: metrics.clone(),
Expand Down Expand Up @@ -367,6 +368,7 @@ struct Flusher {
timeout: Duration,
url: Url,
auth: Option<HeaderValue>,
zstd_level: usize,
token: CancellationToken,
metrics: Metrics,
}
Expand Down Expand Up @@ -414,15 +416,16 @@ impl Flusher {
async fn flush(&self, batch: Bytes) -> Result<(), Error> {
let raw_size = batch.len();

let batch = zstd::encode_all(batch.reader(), 3).context("unable to compress batch")?;
let batch = zstd::encode_all(batch.reader(), self.zstd_level as i32)
.context("unable to compress batch")?;
let batch = Bytes::from(batch);

// Retry
// TODO make configurable?
let mut interval = RETRY_INTERVAL;
let mut retries = RETRY_COUNT;

while retries > 0 {
loop {
let start = Instant::now();
info!(
"{self}: sending batch (raw size {raw_size}, compressed {}, retry {})",
Expand All @@ -445,12 +448,15 @@ impl Flusher {
return Ok(());
}

self.metrics.batch_flush_retries.inc();
sleep(interval).await;

// Back off a bit
retries -= 1;
interval *= 2;
if retries == 0 {
break;
}

self.metrics.batch_flush_retries.inc();
sleep(interval).await;
}

self.metrics.batch_flushes.with_label_values(&["no"]).inc();
Expand Down Expand Up @@ -566,6 +572,7 @@ mod test {
log_vector_interval: Duration::from_secs(100),
log_vector_timeout: Duration::from_secs(10),
log_vector_flushers: 4,
log_vector_zstd_level: 3,
};

let client = Arc::new(TestClient(AtomicU64::new(0), AtomicU64::new(0)));
Expand Down
2 changes: 1 addition & 1 deletion src/routing/ic/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ impl ReqwestTransport {
body: Option<Vec<u8>>,
) -> Result<(StatusCode, Vec<u8>), AgentError> {
// Create the initial request with a fake URL which will be overridden later
let mut http_request = Request::new(method.clone(), Url::parse("http://foo").unwrap());
let mut http_request = Request::new(method.clone(), Url::parse("https://foo").unwrap());

http_request
.headers_mut()
Expand Down

0 comments on commit 1da16f0

Please sign in to comment.