-
Couldn't load subscription status.
- Fork 1.9k
feat(vector sink): Add connection_ttl_secs config for load balancing #24034
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| The `vector` sink now supports a `connection_ttl_secs` configuration option to periodically close and re-establish connections. This enables proper load balancing by preventing long-lived HTTP/2 connections from sticking to a single backend instance. When set, the sink will automatically reconnect after the specified duration, allowing load balancers to distribute traffic across multiple backend instances. | ||
|
|
||
| authors: Donny Xia |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,8 @@ | ||
| use std::task::{Context, Poll}; | ||
| use std::{ | ||
| sync::{Arc, Mutex}, | ||
| task::{Context, Poll}, | ||
| time::{Duration, Instant}, | ||
| }; | ||
|
|
||
| use futures::{TryFutureExt, future::BoxFuture}; | ||
| use http::Uri; | ||
|
|
@@ -16,17 +20,43 @@ use vector_lib::{ | |
| use super::VectorSinkError; | ||
| use crate::{ | ||
| Error, | ||
| config::ProxyConfig, | ||
| event::{EventFinalizers, EventStatus, Finalizable}, | ||
| http::build_proxy_connector, | ||
| internal_events::EndpointBytesSent, | ||
| proto::vector as proto_vector, | ||
| sinks::util::uri, | ||
| tls::MaybeTlsSettings, | ||
| }; | ||
|
|
||
| #[derive(Clone, Debug)] | ||
| struct ClientState { | ||
| client: hyper::Client<ProxyConnector<HttpsConnector<HttpConnector>>, BoxBody>, | ||
| created_at: Instant, | ||
| } | ||
|
|
||
| #[derive(Clone)] | ||
| pub struct VectorService { | ||
| pub client: proto_vector::Client<HyperSvc>, | ||
| pub protocol: String, | ||
| pub endpoint: String, | ||
| uri: Uri, | ||
| compression: bool, | ||
| connection_ttl: Option<Duration>, | ||
| tls_settings: MaybeTlsSettings, | ||
| proxy_config: ProxyConfig, | ||
| client_state: Arc<Mutex<ClientState>>, | ||
| } | ||
|
|
||
| impl std::fmt::Debug for VectorService { | ||
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can at least add |
||
| f.debug_struct("VectorService") | ||
| .field("protocol", &self.protocol) | ||
| .field("endpoint", &self.endpoint) | ||
| .field("uri", &self.uri) | ||
| .field("compression", &self.compression) | ||
| .field("connection_ttl", &self.connection_ttl) | ||
| .finish_non_exhaustive() | ||
| } | ||
| } | ||
|
|
||
| pub struct VectorResponse { | ||
|
|
@@ -71,20 +101,67 @@ impl VectorService { | |
| hyper_client: hyper::Client<ProxyConnector<HttpsConnector<HttpConnector>>, BoxBody>, | ||
| uri: Uri, | ||
| compression: bool, | ||
| connection_ttl: Option<Duration>, | ||
| tls_settings: MaybeTlsSettings, | ||
| proxy_config: ProxyConfig, | ||
| ) -> Self { | ||
| let (protocol, endpoint) = uri::protocol_endpoint(uri.clone()); | ||
| let mut proto_client = proto_vector::Client::new(HyperSvc { | ||
| uri, | ||
| client: hyper_client, | ||
| uri: uri.clone(), | ||
| client: hyper_client.clone(), | ||
| }); | ||
|
|
||
| if compression { | ||
| proto_client = proto_client.send_compressed(tonic::codec::CompressionEncoding::Gzip); | ||
| } | ||
|
|
||
| let client_state = Arc::new(Mutex::new(ClientState { | ||
| client: hyper_client, | ||
| created_at: Instant::now(), | ||
| })); | ||
|
|
||
| Self { | ||
| client: proto_client, | ||
| protocol, | ||
| endpoint, | ||
| uri, | ||
| compression, | ||
| connection_ttl, | ||
| tls_settings, | ||
| proxy_config, | ||
| client_state, | ||
| } | ||
| } | ||
|
|
||
| fn check_and_recreate_client(&mut self) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This silently ignores errors when rebuilding the proxy connector. If client recreation fails, it continues with the old client without logging or alerting. Also, worth considering emitting internal metrics tracking recreations. |
||
| if let Some(ttl) = self.connection_ttl { | ||
| let mut state = self.client_state.lock().unwrap(); | ||
| let elapsed = state.created_at.elapsed(); | ||
|
|
||
| if elapsed >= ttl { | ||
| // Recreate the client | ||
| if let Ok(proxy) = | ||
| build_proxy_connector(self.tls_settings.clone(), &self.proxy_config) | ||
| { | ||
| let new_client = hyper::Client::builder().http2_only(true).build(proxy); | ||
|
|
||
| state.client = new_client.clone(); | ||
| state.created_at = Instant::now(); | ||
|
|
||
| // Update the proto client with the new hyper client | ||
| let mut proto_client = proto_vector::Client::new(HyperSvc { | ||
| uri: self.uri.clone(), | ||
| client: new_client, | ||
| }); | ||
|
|
||
| if self.compression { | ||
| proto_client = | ||
| proto_client.send_compressed(tonic::codec::CompressionEncoding::Gzip); | ||
| } | ||
|
|
||
| self.client = proto_client; | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -106,6 +183,9 @@ impl Service<VectorRequest> for VectorService { | |
|
|
||
| // Emission of internal events for errors and dropped events is handled upstream by the caller. | ||
| fn call(&mut self, mut list: VectorRequest) -> Self::Future { | ||
| // Check if we need to recreate the client due to TTL expiration | ||
| self.check_and_recreate_client(); | ||
|
|
||
| let mut service = self.clone(); | ||
| let byte_size = list.request.encoded_len(); | ||
| let metadata = std::mem::take(list.metadata_mut()); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.