From c9183641c7049878d350763108e25ac2e8fcd496 Mon Sep 17 00:00:00 2001 From: Igor Novgorodov Date: Wed, 18 Sep 2024 13:27:58 +0200 Subject: [PATCH 1/2] more metrics --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/cli.rs | 2 +- src/tls/cert/mod.rs | 6 ++- src/tls/cert/storage.rs | 34 ++++++++++++++- src/tls/mod.rs | 12 +++--- src/tls/resolver.rs | 91 +++++++++++++++++++++++++++++++++++++++-- 7 files changed, 134 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f6eba42..2818db7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3041,7 +3041,7 @@ dependencies = [ [[package]] name = "ic-bn-lib" version = "0.1.0" -source = "git+https://github.com/dfinity/ic-bn-lib?rev=bded503b282a61cea498ce47bc5990019197367a#bded503b282a61cea498ce47bc5990019197367a" +source = "git+https://github.com/dfinity/ic-bn-lib?rev=4256445ad924a8e4c99ce706318c4671fad7eb90#4256445ad924a8e4c99ce706318c4671fad7eb90" dependencies = [ "ahash 0.8.11", "anyhow", diff --git a/Cargo.toml b/Cargo.toml index 6a1e032..1a1fe9a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,7 +45,7 @@ http-body-util = "0.1" humantime = "2.1" hyper-util = "0.1" ic-agent = { version = "0.37.1", features = ["reqwest"] } -ic-bn-lib = { git = "https://github.com/dfinity/ic-bn-lib", rev = "bded503b282a61cea498ce47bc5990019197367a" } +ic-bn-lib = { git = "https://github.com/dfinity/ic-bn-lib", rev = "4256445ad924a8e4c99ce706318c4671fad7eb90" } ic-http-gateway = { git = "https://github.com/dfinity/http-gateway", tag = "0.1.0-b0" } itertools = "0.13" lazy_static = "1.5" diff --git a/src/cli.rs b/src/cli.rs index 4c8d01d..658fb6e 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -227,7 +227,7 @@ pub struct Cert { pub cert_provider_issuer_url: Vec, /// How frequently to poll providers for certificates - #[clap(env, long, default_value = "5s", value_parser = parse_duration)] + #[clap(env, long, default_value = "30s", value_parser = parse_duration)] pub cert_provider_poll_interval: Duration, /// Disable OCSP stapling diff --git a/src/tls/cert/mod.rs b/src/tls/cert/mod.rs index 0df21fb..820785a 100644 --- a/src/tls/cert/mod.rs +++ b/src/tls/cert/mod.rs @@ -15,7 +15,6 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, warn}; use providers::{Pem, ProvidesCertificates}; -pub use storage::Storage; use storage::StoresCertificates; // Generic certificate and a list of its SANs @@ -194,6 +193,7 @@ impl Run for Aggregator { pub mod test { use std::sync::atomic::{AtomicUsize, Ordering}; + use prometheus::Registry; use providers::Pem; use super::*; @@ -341,7 +341,9 @@ pub mod test { AtomicUsize::new(0), ); - let storage = Arc::new(storage::StorageKey::new()); + let storage = Arc::new(storage::StorageKey::new(storage::Metrics::new( + &Registry::new(), + ))); let aggregator = Aggregator::new( vec![Arc::new(prov1), Arc::new(prov2)], storage, diff --git a/src/tls/cert/storage.rs b/src/tls/cert/storage.rs index 60e6d15..52d7d4f 100644 --- a/src/tls/cert/storage.rs +++ b/src/tls/cert/storage.rs @@ -6,6 +6,7 @@ use arc_swap::ArcSwapOption; use derive_new::new; use fqdn::{Fqdn, FQDN}; use ic_bn_lib::http::ALPN_ACME; +use prometheus::{register_int_gauge_vec_with_registry, IntGaugeVec, Registry}; use rustls::{server::ClientHello, sign::CertifiedKey}; use super::Cert; @@ -15,6 +16,25 @@ pub trait StoresCertificates: Send + Sync { fn store(&self, cert_list: Vec>) -> Result<(), Error>; } +#[derive(Debug, Clone)] +pub struct Metrics { + count: IntGaugeVec, +} + +impl Metrics { + pub fn new(registry: &Registry) -> Self { + Self { + count: register_int_gauge_vec_with_registry!( + format!("cert_storage_count_total"), + format!("Counts the number of certificates in the storage"), + &["wildcard"], + registry + ) + .unwrap(), + } + } +} + struct StorageInner { certs: BTreeMap>>, certs_wildcard: BTreeMap>>, @@ -25,6 +45,7 @@ struct StorageInner { pub struct Storage { #[new(default)] inner: ArcSwapOption>, + metrics: Metrics, } impl fmt::Debug for Storage { @@ -90,6 +111,16 @@ impl StoresCertificates for Storage { } } + self.metrics + .count + .with_label_values(&["no"]) + .set(certs.len() as i64); + + self.metrics + .count + .with_label_values(&["yes"]) + .set(certs_wildcard.len() as i64); + let inner = StorageInner { certs, certs_wildcard, @@ -124,11 +155,12 @@ impl ResolvesServerCert for StorageKey { #[cfg(test)] pub mod test { use fqdn::fqdn; + use prometheus::Registry; use super::*; pub fn create_test_storage() -> Storage { - let storage: Storage = Storage::new(); + let storage: Storage = Storage::new(Metrics::new(&Registry::new())); let certs = vec![ Cert { diff --git a/src/tls/mod.rs b/src/tls/mod.rs index 67df825..3b0f2b3 100644 --- a/src/tls/mod.rs +++ b/src/tls/mod.rs @@ -5,7 +5,6 @@ use std::{fs, sync::Arc}; use anyhow::{anyhow, Context, Error}; use async_trait::async_trait; -use cert::Storage; use fqdn::FQDN; use ic_bn_lib::{ http::{dns::Resolves, Client, ALPN_ACME, ALPN_H1, ALPN_H2}, @@ -39,7 +38,7 @@ use crate::{ }, }; -use cert::providers::ProvidesCertificates; +use cert::{providers::ProvidesCertificates, storage}; // Wrapper is needed since we can't implement foreign traits struct OcspStaplerWrapper(Arc); @@ -143,7 +142,7 @@ pub async fn setup( registry: &Registry, ) -> Result<(ServerConfig, Vec>), Error> { // Prepare certificate storage - let cert_storage = Arc::new(Storage::new()); + let cert_storage = Arc::new(storage::Storage::new(storage::Metrics::new(registry))); let mut cert_providers: Vec> = vec![]; let mut custom_domain_providers: Vec> = vec![]; @@ -183,8 +182,11 @@ pub async fn setup( tasks.add("cert_aggregator", cert_aggregator); // Set up certificate resolver - let certificate_resolver = - Arc::new(AggregatingResolver::new(acme_resolver, vec![cert_storage])); + let certificate_resolver = Arc::new(AggregatingResolver::new( + acme_resolver, + vec![cert_storage], + resolver::Metrics::new(registry), + )); // Optionally wrap resolver with OCSP stapler let certificate_resolver: Arc = diff --git a/src/tls/resolver.rs b/src/tls/resolver.rs index 16c2caf..aa22836 100644 --- a/src/tls/resolver.rs +++ b/src/tls/resolver.rs @@ -1,5 +1,9 @@ -use std::{fmt::Debug, sync::Arc}; +use std::{fmt::Debug, sync::Arc, time::Instant}; +use prometheus::{ + register_histogram_vec_with_registry, register_int_counter_vec_with_registry, HistogramVec, + IntCounterVec, Registry, +}; use rustls::{ server::{ClientHello, ResolvesServerCert as ResolvesServerCertRustls}, sign::CertifiedKey, @@ -19,17 +23,64 @@ pub trait ResolvesServerCert: Debug + Send + Sync { } } +#[derive(Debug, Clone)] +pub struct Metrics { + resolve_count: IntCounterVec, + supported_scheme: IntCounterVec, + supported_cipher: IntCounterVec, + resolve_duration: HistogramVec, +} + +impl Metrics { + pub fn new(registry: &Registry) -> Self { + Self { + resolve_count: register_int_counter_vec_with_registry!( + format!("tls_resolver_total"), + format!("Counts the number of resolves"), + &["found"], + registry + ) + .unwrap(), + + supported_scheme: register_int_counter_vec_with_registry!( + format!("tls_resolver_supported_scheme"), + format!("Counts the number clients that support given scheme"), + &["scheme"], + registry + ) + .unwrap(), + + supported_cipher: register_int_counter_vec_with_registry!( + format!("tls_resolver_supported_cipher"), + format!("Counts the number clients that support given ciphersuite"), + &["cipher"], + registry + ) + .unwrap(), + + resolve_duration: register_histogram_vec_with_registry!( + format!("tls_resolver_duration_sec"), + format!("Records the duration of resolves in seconds"), + &["found"], + [0.0001, 0.0005, 0.001, 0.002, 0.004, 0.008, 0.016, 0.032].to_vec(), + registry + ) + .unwrap(), + } + } +} + // Combines several certificate resolvers into one. // Only one Rustls-compatible resolver can be used since it consumes ClientHello. #[derive(Debug, derive_new::new)] pub struct AggregatingResolver { rustls: Option>, resolvers: Vec>, + metrics: Metrics, } -// Implement certificate resolving for Rustls -impl ResolvesServerCertRustls for AggregatingResolver { - fn resolve(&self, ch: ClientHello) -> Option> { +impl AggregatingResolver { + fn resolve_inner(&self, ch: ClientHello) -> Option> { // Accept missing SNI e.g. for testing cases when we're accessed over IP directly let sni = ch.server_name().unwrap_or("").to_string(); @@ -58,3 +109,35 @@ impl ResolvesServerCertRustls for AggregatingResolver { self.resolvers.iter().find_map(|x| x.resolve_any()) } } + +// Implement certificate resolving for Rustls +impl ResolvesServerCertRustls for AggregatingResolver { + fn resolve(&self, ch: ClientHello) -> Option> { + for v in ch.signature_schemes() { + self.metrics + .supported_scheme + .with_label_values(&[v.as_str().unwrap_or("unknown")]) + .inc(); + } + + for v in ch.cipher_suites() { + self.metrics + .supported_cipher + .with_label_values(&[v.as_str().unwrap_or("unknown")]) + .inc(); + } + + let start = Instant::now(); + let r = self.resolve_inner(ch); + let found = if r.is_some() { "yes" } else { "no" }; + + self.metrics + .resolve_duration + .with_label_values(&[found]) + .observe(start.elapsed().as_secs_f64()); + + self.metrics.resolve_count.with_label_values(&[found]).inc(); + + r + } +} From b4df1ad400c070c02b1c6329e32b1f33f8468159 Mon Sep 17 00:00:00 2001 From: Igor Novgorodov Date: Fri, 20 Sep 2024 09:41:17 +0200 Subject: [PATCH 2/2] make cert-issuer-provider poll itself internally --- src/cli.rs | 6 +- src/tls/cert/providers/issuer/mod.rs | 119 +++++++++++++++++---------- src/tls/mod.rs | 10 ++- 3 files changed, 89 insertions(+), 46 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index 658fb6e..637daa5 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -226,8 +226,12 @@ pub struct Cert { #[clap(env, long, value_delimiter = ',')] pub cert_provider_issuer_url: Vec, - /// How frequently to poll providers for certificates + /// How frequently to refresh certificate issuers #[clap(env, long, default_value = "30s", value_parser = parse_duration)] + pub cert_provider_issuer_poll_interval: Duration, + + /// How frequently to poll providers for certificates + #[clap(env, long, default_value = "5s", value_parser = parse_duration)] pub cert_provider_poll_interval: Duration, /// Disable OCSP stapling diff --git a/src/tls/cert/providers/issuer/mod.rs b/src/tls/cert/providers/issuer/mod.rs index 91316fe..ab0f4d5 100644 --- a/src/tls/cert/providers/issuer/mod.rs +++ b/src/tls/cert/providers/issuer/mod.rs @@ -7,23 +7,23 @@ use std::{ }; use anyhow::{anyhow, Context as AnyhowContext}; +use arc_swap::ArcSwapOption; use async_trait::async_trait; use candid::Principal; use fqdn::FQDN; -use ic_bn_lib::http; +use ic_bn_lib::{http, tasks::Run}; use mockall::automock; use reqwest::{Method, Request, StatusCode, Url}; use serde::Deserialize; -use tokio::sync::Mutex; -use tracing::info; +use tokio::select; +use tokio_util::sync::CancellationToken; +use tracing::{info, warn}; use crate::routing::domain::{CustomDomain, ProvidesCustomDomains}; use verify::{Verify, VerifyError, WithVerify}; use super::{Pem, ProvidesCertificates}; -const CACHE_TTL: Duration = Duration::from_secs(9); - #[derive(Debug, thiserror::Error)] pub enum Error { #[error(transparent)] @@ -52,15 +52,11 @@ pub trait Import: Sync + Send { async fn import(&self) -> Result, Error>; } -struct Cache { - updated_at: Instant, - packages: Vec, -} - pub struct CertificatesImporter { http_client: Arc, exporter_url: Url, - cache: Mutex, + poll_interval: Duration, + snapshot: ArcSwapOption>, } impl std::fmt::Debug for CertificatesImporter { @@ -70,28 +66,46 @@ impl std::fmt::Debug for CertificatesImporter { } impl CertificatesImporter { - pub fn new(http_client: Arc, mut exporter_url: Url) -> Self { + pub fn new( + http_client: Arc, + mut exporter_url: Url, + poll_interval: Duration, + ) -> Self { exporter_url.set_path(""); let exporter_url = exporter_url.join("/certificates").unwrap(); Self { http_client, exporter_url, - cache: Mutex::new(Cache { - updated_at: Instant::now().checked_sub(CACHE_TTL * 2).unwrap(), - packages: vec![], - }), + poll_interval, + snapshot: ArcSwapOption::empty(), } } + + async fn refresh(&self) -> Result<(), Error> { + let start = Instant::now(); + let packages = self.import().await.context("unable to fetch packages")?; + info!( + "{self:?}: {} certs loaded in {}s", + packages.len(), + start.elapsed().as_secs_f64() + ); + + self.snapshot.store(Some(Arc::new(packages))); + Ok(()) + } } #[async_trait] impl ProvidesCustomDomains for CertificatesImporter { async fn get_custom_domains(&self) -> Result, anyhow::Error> { - let domains = self - .import() - .await? - .into_iter() + let packages = self + .snapshot + .load_full() + .ok_or_else(|| anyhow!("no packages fetched yet"))?; + + let domains = packages + .iter() .map(|x| -> Result<_, anyhow::Error> { Ok(CustomDomain { name: FQDN::from_str(&x.name)?, @@ -107,9 +121,14 @@ impl ProvidesCustomDomains for CertificatesImporter { #[async_trait] impl ProvidesCertificates for CertificatesImporter { async fn get_certificates(&self) -> Result, anyhow::Error> { - let certs = self - .import() - .await? + let packages = self + .snapshot + .load_full() + .ok_or_else(|| anyhow!("no packages fetched yet"))?; + + let certs = packages + .as_ref() + .clone() .into_iter() .map(|x| Pem { cert: x.pair.1, @@ -117,28 +136,16 @@ impl ProvidesCertificates for CertificatesImporter { }) .collect::>(); - info!( - "IssuerProvider ({}): {} certs loaded", - self.exporter_url, - certs.len() - ); - Ok(certs) } } -#[allow(clippy::significant_drop_tightening)] #[async_trait] impl Import for CertificatesImporter { async fn import(&self) -> Result, Error> { - // Return result from cache if available - let now = Instant::now(); - let mut cache = self.cache.lock().await; - if cache.updated_at >= now.checked_sub(CACHE_TTL).unwrap() { - return Ok(cache.packages.clone()); - } + let mut req = Request::new(Method::GET, self.exporter_url.clone()); + *req.timeout_mut() = Some(Duration::from_secs(30)); - let req = Request::new(Method::GET, self.exporter_url.clone()); let response = self .http_client .execute(req) @@ -146,24 +153,47 @@ impl Import for CertificatesImporter { .context("failed to make http request")?; if response.status() != StatusCode::OK { - return Err(anyhow!(format!("request failed: {}", response.status())).into()); + return Err(anyhow!("incorrect response code: {}", response.status()).into()); } let bs = response .bytes() .await - .context("failed to consume response")? + .context("failed to fetch response body")? .to_vec(); let pkgs: Vec = serde_json::from_slice(&bs).context("failed to parse json body")?; - cache.packages.clone_from(&pkgs); - cache.updated_at = now; Ok(pkgs) } } +#[async_trait] +impl Run for CertificatesImporter { + async fn run(&self, token: CancellationToken) -> Result<(), anyhow::Error> { + let mut interval = tokio::time::interval(self.poll_interval); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + select! { + biased; + + () = token.cancelled() => { + warn!("{self:?}: exiting"); + return Ok(()); + }, + + _ = interval.tick() => { + if let Err(e) = self.refresh().await { + warn!("{self:?}: unable to refresh certificates: {e:#}"); + }; + } + } + } + } +} + // Wraps an importer with a verifier // The importer imports a set of packages as usual, but then passes the packages to the verifier. // The verifier parses out the public certificate and compares the common name to the name in the package to make sure they match. @@ -222,8 +252,11 @@ mod tests { .into()) }); - let importer = - CertificatesImporter::new(Arc::new(http_client), Url::from_str("http://foo")?); + let importer = CertificatesImporter::new( + Arc::new(http_client), + Url::from_str("http://foo")?, + Duration::ZERO, + ); let out = importer.import().await?; diff --git a/src/tls/mod.rs b/src/tls/mod.rs index 3b0f2b3..86f27d4 100644 --- a/src/tls/mod.rs +++ b/src/tls/mod.rs @@ -155,9 +155,15 @@ pub async fn setup( // Create CertIssuer providers // It's a custom domain & cert provider at the same time. for v in &cli.cert.cert_provider_issuer_url { - let issuer = Arc::new(providers::Issuer::new(http_client.clone(), v.clone())); + let issuer = Arc::new(providers::Issuer::new( + http_client.clone(), + v.clone(), + cli.cert.cert_provider_issuer_poll_interval, + )); + cert_providers.push(issuer.clone()); - custom_domain_providers.push(issuer); + custom_domain_providers.push(issuer.clone()); + tasks.add(&format!("{issuer:?}"), issuer); } // Prepare ACME if configured