diff --git a/build/Makefile b/build/Makefile index 699db5ced6..ac63f1518c 100644 --- a/build/Makefile +++ b/build/Makefile @@ -86,7 +86,7 @@ test-quilkin: ensure-build-image docker run --rm $(common_rust_args) \ --entrypoint=cargo $(BUILD_IMAGE_TAG) fmt -- --check docker run --rm $(common_rust_args) \ - --entrypoint=cargo $(BUILD_IMAGE_TAG) test + -e RUST_BACKTRACE=1 --entrypoint=cargo $(BUILD_IMAGE_TAG) test -- --nocapture # Run tests against the examples test-examples: ensure-build-image diff --git a/docs/src/services/agent.md b/docs/src/services/agent.md new file mode 100644 index 0000000000..4e2b610c4f --- /dev/null +++ b/docs/src/services/agent.md @@ -0,0 +1,48 @@ +# Control Plane Relay + +| services | ports | Protocol | +|----------|-------|-----------| +| QCMP | 7600 | UDP(IPv4 && IPv6) | + +> **Note:** This service is currently in active experimentation and development + so there may be bugs which cause it to be unusable for production, as always + all bug reports are welcome and appreciated. + +For multi-cluster integration, Quilkin provides a `agent` service, that can be +deployed to a cluster to act as a beacon for QCMP pings and forward cluster +configuration information to a `relay` service + +To view all options for the `agent` subcommand, run: + +```shell +$ quilkin agent --help +{{#include ../../../target/quilkin.agent.commands}} +``` + +## Quickstart +The simplest version of the `agent` service is just running `quilkin agent`, +this will setup just the QCMP service allowing the agent to be pinged for +measuring round-time-trips (RTT). + +``` +quilkin agent +``` + +To run an agent with the relay (see [`relay` quickstart](./relay.md#quickstart) +for more information), you just need to specify the relay endpoint with the +`--relay` flag **and** provide a configuration discovery provider such as a +configuration file or Agones. + +``` +quilkin --admin-adress http://localhost:8001 agent --relay http://localhost:7900 file quilkin.yaml +``` + +Now if we run cURL on both the relay and the control plane we should see that +they both contain the same set of endpoints. + +```bash +# Check Agent +curl localhost:8001/config +# Check Relay +curl localhost:8000/config +``` diff --git a/src/cli.rs b/src/cli.rs index 488d25ce8a..96c0ae01e2 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -25,7 +25,8 @@ use tokio::{signal, sync::watch}; use crate::{admin::Mode, Config}; pub use self::{ - generate_config_schema::GenerateConfigSchema, manage::Manage, proxy::Proxy, relay::Relay, + agent::Agent, generate_config_schema::GenerateConfigSchema, manage::Manage, proxy::Proxy, + relay::Relay, }; macro_rules! define_port { @@ -38,6 +39,7 @@ macro_rules! define_port { }; } +pub mod agent; pub mod generate_config_schema; pub mod manage; pub mod proxy; @@ -70,16 +72,17 @@ pub struct Cli { /// The various Quilkin commands. #[derive(Clone, Debug, clap::Subcommand)] pub enum Commands { - Proxy(Proxy), + Agent(Agent), GenerateConfigSchema(GenerateConfigSchema), Manage(Manage), + Proxy(Proxy), Relay(Relay), } impl Commands { pub fn admin_mode(&self) -> Option { match self { - Self::Proxy(_) => Some(Mode::Proxy), + Self::Proxy(_) | Self::Agent(_) => Some(Mode::Proxy), Self::Relay(_) | Self::Manage(_) => Some(Mode::Xds), Self::GenerateConfigSchema(_) => None, } @@ -148,6 +151,13 @@ impl Cli { let fut = tryhard::retry_fn({ let shutdown_rx = shutdown_rx.clone(); move || match self.command.clone() { + Commands::Agent(agent) => { + let config = config.clone(); + let shutdown_rx = shutdown_rx.clone(); + tokio::spawn( + async move { agent.run(config.clone(), shutdown_rx.clone()).await }, + ) + } Commands::Proxy(runner) => { let config = config.clone(); let shutdown_rx = shutdown_rx.clone(); diff --git a/src/cli/agent.rs b/src/cli/agent.rs new file mode 100644 index 0000000000..9ef76506e1 --- /dev/null +++ b/src/cli/agent.rs @@ -0,0 +1,101 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::sync::Arc; + +use crate::config::Config; + +define_port!(7600); + +/// Runs Quilkin as a relay service that runs a Manager Discovery Service +/// (mDS) for accepting cluster and configuration information from xDS +/// management services, and exposing it as a single merged xDS service for +/// proxy services. +#[derive(clap::Args, Clone, Debug)] +pub struct Agent { + /// Port for QCMP service. + #[clap(short, long, env = "QCMP_PORT", default_value_t = PORT)] + pub qcmp_port: u16, + /// One or more `quilkin relay` endpoints to push configuration changes to. + #[clap(short, long, env = "QUILKIN_MANAGEMENT_SERVER")] + pub relay: Vec, + /// The `region` to set in the cluster map for any provider + /// endpoints discovered. + #[clap(long, env = "QUILKIN_REGION")] + pub region: Option, + /// The `zone` in the `region` to set in the cluster map for any provider + /// endpoints discovered. + #[clap(long, env = "QUILKIN_ZONE")] + pub zone: Option, + /// The `sub_zone` in the `zone` in the `region` to set in the cluster map + /// for any provider endpoints discovered. + #[clap(long, env = "QUILKIN_SUB_ZONE")] + pub sub_zone: Option, + /// The configuration source for a management server. + #[clap(subcommand)] + pub provider: Option, +} + +impl Default for Agent { + fn default() -> Self { + Self { + qcmp_port: PORT, + relay: <_>::default(), + region: <_>::default(), + zone: <_>::default(), + sub_zone: <_>::default(), + provider: <_>::default(), + } + } +} + +impl Agent { + pub async fn run( + &self, + config: Arc, + mut shutdown_rx: tokio::sync::watch::Receiver<()>, + ) -> crate::Result<()> { + let locality = (self.region.is_some() || self.zone.is_some() || self.sub_zone.is_some()) + .then(|| crate::endpoint::Locality { + region: self.region.clone().unwrap_or_default(), + zone: self.zone.clone().unwrap_or_default(), + sub_zone: self.sub_zone.clone().unwrap_or_default(), + }); + + let _mds_task = if !self.relay.is_empty() { + let _provider_task = match self.provider.as_ref() { + Some(provider) => Some(provider.spawn(config.clone(), locality.clone())), + None => return Err(eyre::eyre!("no configuration provider given")), + }; + + let task = crate::xds::client::MdsClient::connect( + String::clone(&config.id.load()), + self.relay.clone(), + ); + + tokio::select! { + result = task => Some(result?.mds_client_stream(config.clone())), + _ = shutdown_rx.changed() => return Ok(()), + } + } else { + tracing::info!("no relay servers given"); + None + }; + + crate::protocol::spawn(self.qcmp_port).await?; + shutdown_rx.changed().await.map_err(From::from) + } +} diff --git a/src/cli/proxy.rs b/src/cli/proxy.rs index 739dd904c5..4b22d7a196 100644 --- a/src/cli/proxy.rs +++ b/src/cli/proxy.rs @@ -14,9 +14,8 @@ * limitations under the License. */ -use std::{net::SocketAddr, sync::Arc}; +use std::{net::SocketAddr, sync::Arc, time::Duration}; -use tokio::{sync::watch, time::Duration}; use tonic::transport::Endpoint; use crate::{proxy::SessionMap, utils::net, xds::ResourceType, Config, Result}; @@ -115,7 +114,7 @@ impl Proxy { None }; - self.run_recv_from(&config, sessions.clone(), shutdown_rx.clone())?; + self.run_recv_from(&config, sessions.clone())?; tracing::info!("Quilkin is ready"); shutdown_rx @@ -138,12 +137,7 @@ impl Proxy { /// This function also spawns the set of worker tasks responsible for consuming packets /// off the aforementioned queue and processing them through the filter chain and session /// pipeline. - fn run_recv_from( - &self, - config: &Arc, - sessions: SessionMap, - shutdown_rx: watch::Receiver<()>, - ) -> Result<()> { + fn run_recv_from(&self, config: &Arc, sessions: SessionMap) -> Result<()> { // The number of worker tasks to spawn. Each task gets a dedicated queue to // consume packets off. let num_workers = num_cpus::get(); @@ -155,7 +149,6 @@ impl Proxy { workers.push(crate::proxy::DownstreamReceiveWorkerConfig { worker_id, socket: socket.clone(), - shutdown_rx: shutdown_rx.clone(), config: config.clone(), sessions: sessions.clone(), }) @@ -315,7 +308,6 @@ mod tests { let socket = Arc::new(create_socket().await); let addr = socket.local_addr().unwrap(); - let (_shutdown_tx, shutdown_rx) = watch::channel(()); let endpoint = t.open_socket_and_recv_single_packet().await; let msg = "hello"; let config = Arc::new(Config::default()); @@ -329,7 +321,6 @@ mod tests { socket: socket.clone(), config, sessions: <_>::default(), - shutdown_rx, } .spawn(); @@ -348,7 +339,6 @@ mod tests { #[tokio::test] async fn run_recv_from() { let t = TestHelper::default(); - let (_shutdown_tx, shutdown_rx) = watch::channel(()); let msg = "hello"; let endpoint = t.open_socket_and_recv_single_packet().await; @@ -363,9 +353,7 @@ mod tests { clusters.insert_default(vec![endpoint.socket.local_addr().unwrap()]) }); - proxy - .run_recv_from(&config, <_>::default(), shutdown_rx) - .unwrap(); + proxy.run_recv_from(&config, <_>::default()).unwrap(); let socket = create_socket().await; socket.send_to(msg.as_bytes(), &local_addr).await.unwrap(); diff --git a/src/config/providers/k8s.rs b/src/config/providers/k8s.rs index 599eba90b7..353391ea64 100644 --- a/src/config/providers/k8s.rs +++ b/src/config/providers/k8s.rs @@ -145,12 +145,7 @@ pub fn update_endpoints_from_gameservers( } Event::Deleted(server) => { - let found = if let Some(status) = &server.status { - let port = status.ports.as_ref() - .and_then(|ports| ports.first().map(|status| status.port)) - .unwrap_or_default(); - - let endpoint = Endpoint::from((status.address.clone(), port)); + let found = if let Some(endpoint) = server.endpoint() { config.clusters.value().remove_endpoint(&endpoint) } else { config.clusters.value().remove_endpoint_if(|endpoint| { @@ -159,7 +154,11 @@ pub fn update_endpoints_from_gameservers( }; if found.is_none() { - tracing::warn!(?server, "received unknown gameserver to delete from k8s"); + tracing::warn!( + endpoint=%serde_json::to_value(server.endpoint()).unwrap(), + name=%serde_json::to_value(server.metadata.name).unwrap(), + "received unknown gameserver to delete from k8s" + ); } } }; diff --git a/src/config/providers/k8s/agones.rs b/src/config/providers/k8s/agones.rs index 6112a20095..80db134b15 100644 --- a/src/config/providers/k8s/agones.rs +++ b/src/config/providers/k8s/agones.rs @@ -39,6 +39,53 @@ pub struct GameServer { pub status: Option, } +impl GameServer { + pub fn endpoint(&self) -> Option { + self.status.as_ref().map(|status| { + let port = status + .ports + .as_ref() + .and_then(|ports| ports.first().map(|status| status.port)) + .unwrap_or_default(); + + let tokens = self.tokens(); + let extra_metadata = { + let mut map = serde_json::Map::default(); + map.insert( + "name".into(), + self.metadata.name.clone().unwrap_or_default().into(), + ); + map + }; + + Endpoint::with_metadata( + (status.address.clone(), port).into(), + crate::metadata::MetadataView::with_unknown( + crate::endpoint::Metadata { tokens }, + extra_metadata, + ), + ) + }) + } + + pub fn tokens(&self) -> std::collections::BTreeSet> { + match self.metadata.annotations.as_ref() { + Some(annotations) => annotations + .get(QUILKIN_TOKEN_LABEL) + .map(|value| { + value + .split(',') + .map(String::from) + .map(crate::utils::base64_decode) + .filter_map(Result::ok) + .collect() + }) + .unwrap_or_default(), + None => <_>::default(), + } + } +} + #[derive(Clone, Debug, Deserialize, schemars::JsonSchema)] #[serde(rename_all = "camelCase")] pub struct Inner { @@ -246,42 +293,9 @@ impl TryFrom for Endpoint { type Error = tonic::Status; fn try_from(server: GameServer) -> Result { - let status = server - .status - .as_ref() - .ok_or_else(|| tonic::Status::internal("No status found for game server"))?; - let mut extra_metadata = serde_json::Map::default(); - extra_metadata.insert( - "name".into(), - server.metadata.name.clone().unwrap_or_default().into(), - ); - - let tokens = match server.metadata.annotations.as_ref() { - Some(annotations) => annotations - .get(QUILKIN_TOKEN_LABEL) - .map(|value| { - value - .split(',') - .map(String::from) - .map(crate::utils::base64_decode) - .filter_map(Result::ok) - .collect::>() - }) - .unwrap_or_default(), - None => <_>::default(), - }; - - let address = status.address.clone(); - let port = status - .ports - .as_ref() - .and_then(|ports| ports.first().map(|status| status.port)) - .unwrap_or_default(); - let filter_metadata = crate::endpoint::Metadata { tokens }; - Ok(Self::with_metadata( - (address, port).into(), - crate::metadata::MetadataView::with_unknown(filter_metadata, extra_metadata), - )) + server + .endpoint() + .ok_or_else(|| tonic::Status::internal("No status found for game server")) } } diff --git a/src/protocol.rs b/src/protocol.rs index f0476670c0..be7e5ae6fb 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -17,6 +17,11 @@ //! Logic for parsing and generating Quilkin Control Message Protocol (QCMP) messages. use nom::bytes::complete; +use std::net::SocketAddr; +use tracing::Instrument; + +use crate::utils::net; + // Magic number to distinguish control packets from regular traffic. const MAGIC_NUMBER: &[u8] = b"QLKN"; const TIMESTAMP_LEN: usize = (i64::BITS / 8) as usize; @@ -28,6 +33,64 @@ const DISCRIMINANT_LEN: usize = 1; type Result = std::result::Result; +pub async fn spawn(port: u16) -> crate::Result<()> { + let socket = net::DualStackLocalSocket::new(port)?; + let v4_addr = socket.local_ipv4_addr()?; + let v6_addr = socket.local_ip6_addr()?; + tokio::spawn( + async move { + // Initialize a buffer for the UDP packet. We use the maximum size of a UDP + // packet, which is the maximum value of 16 a bit integer. + let mut v4_buf = vec![0; 1 << 16]; + let mut v6_buf = vec![0; 1 << 16]; + let mut output_buf = Vec::new(); + + loop { + tracing::debug!(%v4_addr, %v6_addr, "awaiting qcmp packets"); + + match socket.recv_from(&mut v4_buf, &mut v6_buf).await { + Ok((size, source)) => { + let received_at = chrono::Utc::now().timestamp_nanos(); + let contents = match source { + SocketAddr::V4(_) => &v4_buf[..size], + SocketAddr::V6(_) => &v6_buf[..size], + }; + + let command = match Protocol::parse(contents) { + Ok(Some(command)) => command, + Ok(None) => { + tracing::debug!("rejected non-qcmp packet"); + continue; + } + Err(error) => { + tracing::debug!(%error, "rejected malformed packet"); + continue; + } + }; + + let Protocol::Ping { client_timestamp, nonce, } = command else { + tracing::warn!("rejected unsupported QCMP packet"); + continue; + }; + + Protocol::ping_reply(nonce, client_timestamp, received_at) + .encode_into_buffer(&mut output_buf); + + if let Err(error) = socket.send_to(&output_buf, &source).await { + tracing::warn!(%error, "error responding to ping"); + } + + output_buf.clear(); + } + Err(error) => tracing::warn!(%error, "error receiving packet"), + } + } + } + .instrument(tracing::info_span!("qcmp_task", %v4_addr, %v6_addr)), + ); + Ok(()) +} + /// The set of possible QCMP commands. #[derive(Clone, Copy, Debug)] pub enum Protocol { @@ -84,21 +147,25 @@ impl Protocol { /// Encodes the protocol command into a buffer of bytes for network transmission. pub fn encode(&self) -> Vec { - let mut buffer = Vec::from(MAGIC_NUMBER); + let mut buffer = Vec::new(); + self.encode_into_buffer(&mut buffer); + buffer + } + /// Encodes the protocol command into a buffer of bytes for network transmission. + pub fn encode_into_buffer(&self, buffer: &mut Vec) { + buffer.extend(MAGIC_NUMBER); buffer.push(VERSION); buffer.push(self.discriminant()); buffer.extend_from_slice(&self.discriminant_length().to_be_bytes()); let length = buffer.len(); - self.encode_payload(&mut buffer); + self.encode_payload(buffer); debug_assert_eq!( buffer.len(), length + usize::from(self.discriminant_length()) ); - - buffer } /// Returns the packet's nonce. diff --git a/src/proxy.rs b/src/proxy.rs index c6c44e9fbd..a45e97fc33 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -18,7 +18,7 @@ mod sessions; use std::sync::Arc; -use tokio::{net::UdpSocket, sync::watch}; +use tokio::net::UdpSocket; use crate::{ endpoint::{Endpoint, EndpointAddress}, @@ -47,8 +47,6 @@ pub(crate) struct DownstreamReceiveWorkerConfig { pub socket: Arc, pub config: Arc, pub sessions: SessionMap, - /// The worker task exits when a value is received from this shutdown channel. - pub shutdown_rx: watch::Receiver<()>, } impl DownstreamReceiveWorkerConfig { @@ -58,7 +56,6 @@ impl DownstreamReceiveWorkerConfig { socket, config, sessions, - mut shutdown_rx, } = self; tokio::spawn(async move { @@ -71,26 +68,20 @@ impl DownstreamReceiveWorkerConfig { addr = ?socket.local_addr(), "Awaiting packet" ); - tokio::select! { - result = socket.recv_from(&mut buf) => { - match result { - Ok((size, source)) => { - let packet = DownstreamPacket { - received_at: chrono::Utc::now().timestamp_nanos(), - source: source.into(), - contents: buf[..size].to_vec(), - }; - - Self::spawn_process_task(packet, source, worker_id, &socket, &config, &sessions) - } - Err(error) => { - tracing::error!(%error, "error receiving packet"); - return; - } - } + match socket.recv_from(&mut buf).await { + Ok((size, source)) => { + let packet = DownstreamPacket { + received_at: chrono::Utc::now().timestamp_nanos(), + source: source.into(), + contents: buf[..size].to_vec(), + }; + + Self::spawn_process_task( + packet, source, worker_id, &socket, &config, &sessions, + ) } - _ = shutdown_rx.changed() => { - tracing::debug!(id = worker_id, "Received shutdown signal"); + Err(error) => { + tracing::error!(%error, "error receiving packet"); return; } } diff --git a/src/utils/net.rs b/src/utils/net.rs index d94e1826cb..d35c82b2f9 100644 --- a/src/utils/net.rs +++ b/src/utils/net.rs @@ -14,17 +14,28 @@ * limitations under the License. */ -use crate::Result; -use socket2::{Protocol, Socket, Type}; use std::{io, net::SocketAddr}; -use tokio::net::UdpSocket; + +use socket2::{Protocol, Socket, Type}; +use tokio::{net::UdpSocket, select}; + +use crate::Result; /// returns a UdpSocket with address and port reuse. pub fn socket_with_reuse(port: u16) -> Result { - let sock = Socket::new(socket2::Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?; + socket_with_reuse_and_address((std::net::Ipv4Addr::UNSPECIFIED, port).into()) +} + +fn socket_with_reuse_and_address(addr: SocketAddr) -> Result { + let domain = match addr { + SocketAddr::V4(_) => socket2::Domain::IPV4, + SocketAddr::V6(_) => socket2::Domain::IPV6, + }; + + let sock = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?; enable_reuse(&sock)?; sock.set_nonblocking(true)?; - sock.bind(&SocketAddr::from((std::net::Ipv4Addr::UNSPECIFIED, port)).into())?; + sock.bind(&addr.into())?; UdpSocket::from_std(sock.into()).map_err(|error| eyre::eyre!(error)) } @@ -40,9 +51,74 @@ fn enable_reuse(sock: &Socket) -> io::Result<()> { Ok(()) } +/// Socket that can accept and send data from either a local ipv4 address or ipv6 address. +pub struct DualStackLocalSocket { + v4: UdpSocket, + v6: UdpSocket, +} + +impl DualStackLocalSocket { + pub fn new(port: u16) -> Result { + // if ephemeral port, make sure they are on the same ports. + if port == 0 { + let v4 = socket_with_reuse_and_address((std::net::Ipv4Addr::UNSPECIFIED, port).into())?; + let port = v4.local_addr()?.port(); + + return Ok(Self { + v4, + v6: socket_with_reuse_and_address((std::net::Ipv6Addr::UNSPECIFIED, port).into())?, + }); + } + + Ok(Self { + v4: socket_with_reuse_and_address((std::net::Ipv4Addr::UNSPECIFIED, port).into())?, + v6: socket_with_reuse_and_address((std::net::Ipv6Addr::UNSPECIFIED, port).into())?, + }) + } + + // Receives datagrams from either an ipv4 address or ipv6. Match on the returned [`SocketAddr`] to + // determine if the received data is in the ipv4_buf or ipv6_buf on a successful result. + pub async fn recv_from( + &self, + v4_buf: &mut [u8], + v6_buf: &mut [u8], + ) -> io::Result<(usize, SocketAddr)> { + select! { + v4 = self.v4.recv_from(v4_buf) => { + v4 + } + v6 = self.v6.recv_from(v6_buf) => { + v6 + } + } + } + + pub fn local_ipv4_addr(&self) -> io::Result { + self.v4.local_addr() + } + + pub fn local_ip6_addr(&self) -> io::Result { + self.v6.local_addr() + } + + pub async fn send_to(&self, buf: &[u8], target: &SocketAddr) -> io::Result { + match target { + SocketAddr::V4(_) => self.v4.send_to(buf, target).await, + SocketAddr::V6(_) => self.v6.send_to(buf, target).await, + } + } +} + #[cfg(test)] mod tests { - use crate::test_utils::available_addr; + use crate::test_utils::{available_addr, TestHelper}; + use crate::utils::net::DualStackLocalSocket; + use std::net::SocketAddr; + use std::str::from_utf8; + use std::sync::Arc; + use std::time::Duration; + use tokio::sync::oneshot; + use tokio::time::timeout; #[tokio::test] async fn socket_with_reuse() { @@ -57,4 +133,48 @@ mod tests { let addr2 = socket.local_addr().unwrap(); assert_eq!(addr, addr2); } + + #[tokio::test] + async fn dual_domain_socket() { + let mut t = TestHelper::default(); + + let expected = available_addr().await; + let socket = Arc::new(DualStackLocalSocket::new(expected.port()).unwrap()); + + // TODO: when DualStackSocket is used everywhere, add a test for Ipv6 as well. + let echo_addr = t.run_echo_server().await; + + let (packet_tx, packet_rx) = oneshot::channel::(); + let socket_recv = socket.clone(); + tokio::spawn(async move { + let mut v4_buf = vec![0; 1024]; + let mut v6_buf = vec![0; 1024]; + let (size, addr) = socket_recv + .recv_from(&mut v4_buf, &mut v6_buf) + .await + .unwrap(); + + let contents = match addr { + SocketAddr::V4(_) => &v4_buf[..size], + SocketAddr::V6(_) => &v6_buf[..size], + }; + + packet_tx + .send(from_utf8(contents).unwrap().to_string()) + .unwrap(); + }); + + let msg = "hello"; + socket + .send_to(msg.as_bytes(), &echo_addr.to_socket_addr().await.unwrap()) + .await + .unwrap(); + assert_eq!( + msg, + timeout(Duration::from_secs(5), packet_rx) + .await + .expect("should not timeout") + .unwrap() + ); + } } diff --git a/tests/qcmp.rs b/tests/qcmp.rs index f3e7146c2a..bb1ad968ba 100644 --- a/tests/qcmp.rs +++ b/tests/qcmp.rs @@ -21,9 +21,9 @@ use tokio::time::Duration; use quilkin::{protocol::Protocol, test_utils::TestHelper}; #[tokio::test] -async fn ping() { +async fn proxy_ping() { let mut t = TestHelper::default(); - let server_port = 12348; + let server_port = quilkin::test_utils::available_addr().await.port(); let server_proxy = quilkin::cli::Proxy { port: server_port, to: vec![(Ipv4Addr::UNSPECIFIED, 0).into()], @@ -31,11 +31,32 @@ async fn ping() { }; let server_config = std::sync::Arc::new(quilkin::Config::default()); t.run_server(server_config, server_proxy, None); + ping(server_port).await; +} + +#[tokio::test] +async fn agent_ping() { + let qcmp_port = quilkin::test_utils::available_addr().await.port(); + let agent = quilkin::cli::Agent { + qcmp_port, + ..<_>::default() + }; + let server_config = std::sync::Arc::new(quilkin::Config::default()); + let (_tx, rx) = tokio::sync::watch::channel(()); + tokio::spawn(async move { + agent + .run(server_config, rx) + .await + .expect("Agent should run") + }); + ping(qcmp_port).await; +} +async fn ping(port: u16) { let socket = tokio::net::UdpSocket::bind((Ipv4Addr::UNSPECIFIED, 0)) .await .unwrap(); - let local_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), server_port); + let local_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port); let ping = Protocol::ping(); socket.send_to(&ping.encode(), &local_addr).await.unwrap(); @@ -48,9 +69,15 @@ async fn ping() { let reply = Protocol::parse(&buf[..size]).unwrap().unwrap(); assert_eq!(ping.nonce(), reply.nonce()); - const TEN_MILLIS_IN_NANOS: i64 = 10_000_000; + const FIFTY_MILLIS_IN_NANOS: i64 = 50_000_000; - // If it takes longer than 10 milliseconds locally, it's likely that there + // If it takes longer than 50 milliseconds locally, it's likely that there // is bug. - assert!(TEN_MILLIS_IN_NANOS > reply.round_trip_delay(recv_time).unwrap()); + let delay = reply.round_trip_delay(recv_time).unwrap(); + assert!( + FIFTY_MILLIS_IN_NANOS > delay, + "Delay {}ns greater than {}ns", + delay, + FIFTY_MILLIS_IN_NANOS + ); }