Skip to content

Commit

Permalink
Recover xDS connection if stream terminates (#566)
Browse files Browse the repository at this point in the history
  • Loading branch information
XAMPPRocky authored Aug 10, 2022
1 parent 2f7b2cb commit cb55cfd
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 61 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ stable-eyre = "0.2.2"
tempdir = "0.3"
thiserror = "1.0.30"
tokio = { version = "1.19.2", features = ["rt-multi-thread", "fs", "signal", "test-util", "parking_lot", "tracing"] }
tokio-stream = "0.1.8"
tokio-stream = { version = "0.1.8", features = ["sync"] }
tonic = "0.6.1"
tracing = "0.1.31"
tracing-subscriber = { version = "0.3.9", features = ["json", "env-filter"] }
Expand Down
7 changes: 6 additions & 1 deletion src/xds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,10 +300,15 @@ mod tests {
.map(Arc::new)
.unwrap();

tokio::spawn(manage(config.clone()));
let handle = tokio::spawn(manage(config.clone()));
let client = Client::connect(config.clone()).await.unwrap();
let mut stream = client.stream().await.unwrap();

// Test that the client can handle the manager dropping out.
handle.abort();
tokio::time::sleep(std::time::Duration::from_millis(5)).await;
tokio::spawn(manage(config.clone()));

// Each time, we create a new upstream endpoint and send a cluster update for it.
let concat_bytes = vec![("b", "c,"), ("d", "e")];
for (b1, b2) in concat_bytes.into_iter() {
Expand Down
137 changes: 78 additions & 59 deletions src/xds/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@

use std::{sync::Arc, time::Duration};

use futures::StreamExt;
use rand::Rng;
use tokio::sync::mpsc;
use tokio::sync::broadcast;
use tonic::transport::{channel::Channel as TonicChannel, Error as TonicError};
use tracing::Instrument;
use tryhard::{
Expand All @@ -38,16 +39,23 @@ use crate::{
Result,
};

type AdsClient = AggregatedDiscoveryServiceClient<TonicChannel>;

/// Client that can talk to an XDS server using the aDS protocol.
#[derive(Clone)]
pub struct Client {
client: AggregatedDiscoveryServiceClient<TonicChannel>,
client: AdsClient,
config: Arc<Config>,
}

impl Client {
#[tracing::instrument(skip_all, fields(servers = ?config.management_servers.load().iter().map(|server| &server.address).collect::<Vec<_>>()))]
pub async fn connect(config: Arc<Config>) -> Result<Self> {
let client = Self::new_ads_client(&config).await?;
Ok(Self { client, config })
}

async fn new_ads_client(config: &Config) -> Result<AdsClient> {
const BACKOFF_INITIAL_DELAY_MILLISECONDS: u64 = 500;
const BACKOFF_MAX_DELAY_SECONDS: u64 = 30;
const BACKOFF_MAX_JITTER_MILLISECONDS: u64 = 2000;
Expand Down Expand Up @@ -117,11 +125,12 @@ impl Client {
}
})
.with_config(retry_config);

let client = connect_to_server
.instrument(tracing::trace_span!("xds_client_connect"))
.await?;
tracing::info!("Connected to xDS server");
Ok(Self { client, config })
Ok(client)
}

/// Starts a new stream to the xDS management server.
Expand All @@ -133,76 +142,86 @@ impl Client {
/// An active xDS gRPC management stream.
pub struct Stream {
config: Arc<Config>,
requests: mpsc::UnboundedSender<DiscoveryRequest>,
requests: broadcast::Sender<DiscoveryRequest>,
handle_discovery_response: tokio::task::JoinHandle<Result<()>>,
}

impl Stream {
#[tracing::instrument(skip_all)]
async fn connect(xds: Client) -> Result<Self> {
let (requests, rx) = tokio::sync::mpsc::unbounded_channel();
let (requests, mut rx) = broadcast::channel(12);
let Client { mut client, config } = xds;

let handle_discovery_response = tokio::spawn({
let requests = requests.clone();
let config = config.clone();
let requests = requests.clone();
async move {
let mut responses = client
.stream_aggregated_resources(
tokio_stream::wrappers::UnboundedReceiverStream::from(rx),
)
.in_current_span()
.await?
.into_inner();

while let Some(response) = responses.message().await? {
let identifier = response
.control_plane
.as_ref()
.map(|cp| cp.identifier.clone())
.unwrap_or_default();
let _stream_metrics = super::metrics::StreamConnectionMetrics::new(&identifier);
tracing::info!(
id = &*response.version_info,
r#type = &*response.type_url,
nonce = &*response.nonce,
control_plane = &*identifier,
"Received response"
);

let result = response
.resources
.iter()
.cloned()
.map(Resource::try_from)
.try_for_each(|resource| {
let resource = resource?;
metrics::DISCOVERY_RESPONSES
.with_label_values(&[&*identifier, resource.type_url()])
loop {
let mut responses = client
.stream_aggregated_resources(
tokio_stream::wrappers::BroadcastStream::from(rx)
// Errors only happen if the stream is behind, which
// we don't care about, we only want the latest
// state of the world.
.filter_map(|result| futures::future::ready(result.ok())),
)
.in_current_span()
.await?
.into_inner();

while let Some(response) = responses.message().await? {
let identifier = response
.control_plane
.as_ref()
.map(|cp| cp.identifier.clone())
.unwrap_or_default();
let _stream_metrics =
super::metrics::StreamConnectionMetrics::new(&identifier);
tracing::info!(
id = &*response.version_info,
r#type = &*response.type_url,
nonce = &*response.nonce,
control_plane = &*identifier,
"Received response"
);

let result = response
.resources
.iter()
.cloned()
.map(Resource::try_from)
.try_for_each(|resource| {
let resource = resource?;
metrics::DISCOVERY_RESPONSES
.with_label_values(&[&*identifier, resource.type_url()])
.inc();
config.apply(&resource)
});

let mut request = DiscoveryRequest::try_from(response)?;
if let Err(error) = result {
metrics::NACKS
.with_label_values(&[&*identifier, &*request.type_url])
.inc();
config.apply(&resource)
});

let mut request = DiscoveryRequest::try_from(response)?;
if let Err(error) = result {
metrics::NACKS
.with_label_values(&[&*identifier, &*request.type_url])
.inc();
request.error_detail = Some(crate::xds::google::rpc::Status {
code: 3,
message: error.to_string(),
..<_>::default()
});
} else {
metrics::ACKS
.with_label_values(&[&*identifier, &*request.type_url])
.inc();
request.error_detail = Some(crate::xds::google::rpc::Status {
code: 3,
message: error.to_string(),
..<_>::default()
});
} else {
metrics::ACKS
.with_label_values(&[&*identifier, &*request.type_url])
.inc();
}

requests.send(request)?;
}

requests.send(request)?;
// If we've reached here, something has gone wrong with the
// connection, so we just create a new client and restart.
client = Client::new_ads_client(&config).await?;
rx = requests.subscribe();
}

Ok(())
}
.instrument(tracing::trace_span!("handle_discovery_response"))
});
Expand All @@ -228,7 +247,7 @@ impl Stream {
};

tracing::trace!("sending discovery request");
self.requests.send(request).map_err(From::from)
self.requests.send(request).map_err(From::from).map(drop)
}
}

Expand Down

0 comments on commit cb55cfd

Please sign in to comment.