Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/vector_sink_connection_ttl.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `vector` sink now supports a `connection_ttl_secs` configuration option to periodically close and re-establish connections. This enables proper load balancing by preventing long-lived HTTP/2 connections from sticking to a single backend instance. When set, the sink will automatically reconnect after the specified duration, allowing load balancers to distribute traffic across multiple backend instances.

authors: Donny Xia
39 changes: 35 additions & 4 deletions src/sinks/vector/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,18 @@ pub struct VectorConfig {
#[serde(default)]
compression: bool,

/// Time-to-live (TTL) for connections in seconds.
///
/// After this duration, the connection will be closed and a new one will be established.
/// This is useful for load balancing scenarios where long-lived connections prevent
/// proper distribution of traffic across backend instances.
///
/// If not set, connections will be reused indefinitely (subject to HTTP/2 keep-alive).
#[configurable(metadata(docs::examples = 300))]
#[configurable(metadata(docs::examples = 600))]
#[serde(default)]
pub(in crate::sinks::vector) connection_ttl_secs: Option<u64>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub(in crate::sinks::vector) connection_ttl_secs: Option<u64>,
pub connection_ttl_secs: Option<u64>,


#[configurable(derived)]
#[serde(default)]
pub batch: BatchConfig<RealtimeEventBasedDefaultBatchSettings>,
Expand Down Expand Up @@ -98,6 +110,7 @@ fn default_config(address: &str) -> VectorConfig {
version: None,
address: address.to_owned(),
compression: false,
connection_ttl_secs: None,
batch: BatchConfig::default(),
request: TowerRequestConfig::default(),
tls: None,
Expand All @@ -111,18 +124,36 @@ impl SinkConfig for VectorConfig {
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSinkType, Healthcheck)> {
let tls = MaybeTlsSettings::from_config(self.tls.as_ref(), false)?;
let uri = with_default_scheme(&self.address, tls.is_tls())?;
let proxy_config = cx.proxy().clone();

let client = new_client(&tls, cx.proxy())?;
let client = new_client(&tls, &proxy_config)?;

let healthcheck_uri = cx
.healthcheck
.uri
.clone()
.map(|uri| uri.uri)
.unwrap_or_else(|| uri.clone());
let healthcheck_client = VectorService::new(client.clone(), healthcheck_uri, false);
let healthcheck = healthcheck(healthcheck_client, cx.healthcheck);
let service = VectorService::new(client, uri, self.compression);
let healthcheck_options = cx.healthcheck;
let healthcheck_client = VectorService::new(
client.clone(),
healthcheck_uri,
false,
None,
tls.clone(),
proxy_config.clone(),
);
let healthcheck = healthcheck(healthcheck_client, healthcheck_options);

let connection_ttl = self.connection_ttl_secs.map(std::time::Duration::from_secs);
let service = VectorService::new(
client,
uri,
self.compression,
connection_ttl,
tls,
proxy_config,
);
let request_settings = self.request.into_settings();
let batch_settings = self.batch.into_batcher_settings()?;

Expand Down
18 changes: 18 additions & 0 deletions src/sinks/vector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,24 @@ mod tests {
);
}

#[test]
fn test_connection_ttl_config() {
// Test that connection_ttl_secs is parsed correctly
let config = r#"
address = "http://127.0.0.1:9000"
connection_ttl_secs = 300
"#;
let config: VectorConfig = toml::from_str(config).unwrap();
assert_eq!(config.connection_ttl_secs, Some(300));

// Test default (no TTL)
let config = r#"
address = "http://127.0.0.1:9000"
"#;
let config: VectorConfig = toml::from_str(config).unwrap();
assert_eq!(config.connection_ttl_secs, None);
}

async fn get_received(
rx: mpsc::Receiver<(Parts, Bytes)>,
assert_parts: impl Fn(Parts),
Expand Down
88 changes: 84 additions & 4 deletions src/sinks/vector/service.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use std::task::{Context, Poll};
use std::{
sync::{Arc, Mutex},
task::{Context, Poll},
time::{Duration, Instant},
};

use futures::{TryFutureExt, future::BoxFuture};
use http::Uri;
Expand All @@ -16,17 +20,43 @@ use vector_lib::{
use super::VectorSinkError;
use crate::{
Error,
config::ProxyConfig,
event::{EventFinalizers, EventStatus, Finalizable},
http::build_proxy_connector,
internal_events::EndpointBytesSent,
proto::vector as proto_vector,
sinks::util::uri,
tls::MaybeTlsSettings,
};

#[derive(Clone, Debug)]
struct ClientState {
client: hyper::Client<ProxyConnector<HttpsConnector<HttpConnector>>, BoxBody>,
created_at: Instant,
}

#[derive(Clone)]
pub struct VectorService {
pub client: proto_vector::Client<HyperSvc>,
pub protocol: String,
pub endpoint: String,
uri: Uri,
compression: bool,
connection_ttl: Option<Duration>,
tls_settings: MaybeTlsSettings,
proxy_config: ProxyConfig,
client_state: Arc<Mutex<ClientState>>,
}

impl std::fmt::Debug for VectorService {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can at least add self.client.created_at here but also can we just derive debug here? Is it too verbose?

f.debug_struct("VectorService")
.field("protocol", &self.protocol)
.field("endpoint", &self.endpoint)
.field("uri", &self.uri)
.field("compression", &self.compression)
.field("connection_ttl", &self.connection_ttl)
.finish_non_exhaustive()
}
}

pub struct VectorResponse {
Expand Down Expand Up @@ -71,20 +101,67 @@ impl VectorService {
hyper_client: hyper::Client<ProxyConnector<HttpsConnector<HttpConnector>>, BoxBody>,
uri: Uri,
compression: bool,
connection_ttl: Option<Duration>,
tls_settings: MaybeTlsSettings,
proxy_config: ProxyConfig,
) -> Self {
let (protocol, endpoint) = uri::protocol_endpoint(uri.clone());
let mut proto_client = proto_vector::Client::new(HyperSvc {
uri,
client: hyper_client,
uri: uri.clone(),
client: hyper_client.clone(),
});

if compression {
proto_client = proto_client.send_compressed(tonic::codec::CompressionEncoding::Gzip);
}

let client_state = Arc::new(Mutex::new(ClientState {
client: hyper_client,
created_at: Instant::now(),
}));

Self {
client: proto_client,
protocol,
endpoint,
uri,
compression,
connection_ttl,
tls_settings,
proxy_config,
client_state,
}
}

fn check_and_recreate_client(&mut self) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This silently ignores errors when rebuilding the proxy connector. If client recreation fails, it continues with the old client without logging or alerting.

Also, worth considering emitting internal metrics tracking recreations.

if let Some(ttl) = self.connection_ttl {
let mut state = self.client_state.lock().unwrap();
let elapsed = state.created_at.elapsed();

if elapsed >= ttl {
// Recreate the client
if let Ok(proxy) =
build_proxy_connector(self.tls_settings.clone(), &self.proxy_config)
{
let new_client = hyper::Client::builder().http2_only(true).build(proxy);

state.client = new_client.clone();
state.created_at = Instant::now();

// Update the proto client with the new hyper client
let mut proto_client = proto_vector::Client::new(HyperSvc {
uri: self.uri.clone(),
client: new_client,
});

if self.compression {
proto_client =
proto_client.send_compressed(tonic::codec::CompressionEncoding::Gzip);
}

self.client = proto_client;
}
}
}
}
}
Expand All @@ -106,6 +183,9 @@ impl Service<VectorRequest> for VectorService {

// Emission of internal events for errors and dropped events is handled upstream by the caller.
fn call(&mut self, mut list: VectorRequest) -> Self::Future {
// Check if we need to recreate the client due to TTL expiration
self.check_and_recreate_client();

let mut service = self.clone();
let byte_size = list.request.encoded_len();
let metadata = std::mem::take(list.metadata_mut());
Expand Down
Loading