Skip to content

Commit

Permalink
Merge pull request #41 from dfinity/improvements
Browse files Browse the repository at this point in the history
Improvement to the issuer querying
  • Loading branch information
blind-oracle authored Sep 20, 2024
2 parents aca9be7 + b4df1ad commit 26de8ca
Show file tree
Hide file tree
Showing 8 changed files with 221 additions and 59 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ http-body-util = "0.1"
humantime = "2.1"
hyper-util = "0.1"
ic-agent = { version = "0.37.1", features = ["reqwest"] }
ic-bn-lib = { git = "https://github.com/dfinity/ic-bn-lib", rev = "bded503b282a61cea498ce47bc5990019197367a" }
ic-bn-lib = { git = "https://github.com/dfinity/ic-bn-lib", rev = "4256445ad924a8e4c99ce706318c4671fad7eb90" }
ic-http-gateway = { git = "https://github.com/dfinity/http-gateway", tag = "0.1.0-b0" }
itertools = "0.13"
lazy_static = "1.5"
Expand Down
4 changes: 4 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,10 @@ pub struct Cert {
#[clap(env, long, value_delimiter = ',')]
pub cert_provider_issuer_url: Vec<Url>,

/// How frequently to refresh certificate issuers
#[clap(env, long, default_value = "30s", value_parser = parse_duration)]
pub cert_provider_issuer_poll_interval: Duration,

/// How frequently to poll providers for certificates
#[clap(env, long, default_value = "5s", value_parser = parse_duration)]
pub cert_provider_poll_interval: Duration,
Expand Down
6 changes: 4 additions & 2 deletions src/tls/cert/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use tokio_util::sync::CancellationToken;
use tracing::{debug, warn};

use providers::{Pem, ProvidesCertificates};
pub use storage::Storage;
use storage::StoresCertificates;

// Generic certificate and a list of its SANs
Expand Down Expand Up @@ -194,6 +193,7 @@ impl Run for Aggregator {
pub mod test {
use std::sync::atomic::{AtomicUsize, Ordering};

use prometheus::Registry;
use providers::Pem;

use super::*;
Expand Down Expand Up @@ -341,7 +341,9 @@ pub mod test {
AtomicUsize::new(0),
);

let storage = Arc::new(storage::StorageKey::new());
let storage = Arc::new(storage::StorageKey::new(storage::Metrics::new(
&Registry::new(),
)));
let aggregator = Aggregator::new(
vec![Arc::new(prov1), Arc::new(prov2)],
storage,
Expand Down
119 changes: 76 additions & 43 deletions src/tls/cert/providers/issuer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,23 @@ use std::{
};

use anyhow::{anyhow, Context as AnyhowContext};
use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use candid::Principal;
use fqdn::FQDN;
use ic_bn_lib::http;
use ic_bn_lib::{http, tasks::Run};
use mockall::automock;
use reqwest::{Method, Request, StatusCode, Url};
use serde::Deserialize;
use tokio::sync::Mutex;
use tracing::info;
use tokio::select;
use tokio_util::sync::CancellationToken;
use tracing::{info, warn};

use crate::routing::domain::{CustomDomain, ProvidesCustomDomains};
use verify::{Verify, VerifyError, WithVerify};

use super::{Pem, ProvidesCertificates};

const CACHE_TTL: Duration = Duration::from_secs(9);

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error(transparent)]
Expand Down Expand Up @@ -52,15 +52,11 @@ pub trait Import: Sync + Send {
async fn import(&self) -> Result<Vec<Package>, Error>;
}

struct Cache {
updated_at: Instant,
packages: Vec<Package>,
}

pub struct CertificatesImporter {
http_client: Arc<dyn http::Client>,
exporter_url: Url,
cache: Mutex<Cache>,
poll_interval: Duration,
snapshot: ArcSwapOption<Vec<Package>>,
}

impl std::fmt::Debug for CertificatesImporter {
Expand All @@ -70,28 +66,46 @@ impl std::fmt::Debug for CertificatesImporter {
}

impl CertificatesImporter {
pub fn new(http_client: Arc<dyn http::Client>, mut exporter_url: Url) -> Self {
pub fn new(
http_client: Arc<dyn http::Client>,
mut exporter_url: Url,
poll_interval: Duration,
) -> Self {
exporter_url.set_path("");
let exporter_url = exporter_url.join("/certificates").unwrap();

Self {
http_client,
exporter_url,
cache: Mutex::new(Cache {
updated_at: Instant::now().checked_sub(CACHE_TTL * 2).unwrap(),
packages: vec![],
}),
poll_interval,
snapshot: ArcSwapOption::empty(),
}
}

async fn refresh(&self) -> Result<(), Error> {
let start = Instant::now();
let packages = self.import().await.context("unable to fetch packages")?;
info!(
"{self:?}: {} certs loaded in {}s",
packages.len(),
start.elapsed().as_secs_f64()
);

self.snapshot.store(Some(Arc::new(packages)));
Ok(())
}
}

#[async_trait]
impl ProvidesCustomDomains for CertificatesImporter {
async fn get_custom_domains(&self) -> Result<Vec<CustomDomain>, anyhow::Error> {
let domains = self
.import()
.await?
.into_iter()
let packages = self
.snapshot
.load_full()
.ok_or_else(|| anyhow!("no packages fetched yet"))?;

let domains = packages
.iter()
.map(|x| -> Result<_, anyhow::Error> {
Ok(CustomDomain {
name: FQDN::from_str(&x.name)?,
Expand All @@ -107,63 +121,79 @@ impl ProvidesCustomDomains for CertificatesImporter {
#[async_trait]
impl ProvidesCertificates for CertificatesImporter {
async fn get_certificates(&self) -> Result<Vec<Pem>, anyhow::Error> {
let certs = self
.import()
.await?
let packages = self
.snapshot
.load_full()
.ok_or_else(|| anyhow!("no packages fetched yet"))?;

let certs = packages
.as_ref()
.clone()
.into_iter()
.map(|x| Pem {
cert: x.pair.1,
key: x.pair.0,
})
.collect::<Vec<_>>();

info!(
"IssuerProvider ({}): {} certs loaded",
self.exporter_url,
certs.len()
);

Ok(certs)
}
}

#[allow(clippy::significant_drop_tightening)]
#[async_trait]
impl Import for CertificatesImporter {
async fn import(&self) -> Result<Vec<Package>, Error> {
// Return result from cache if available
let now = Instant::now();
let mut cache = self.cache.lock().await;
if cache.updated_at >= now.checked_sub(CACHE_TTL).unwrap() {
return Ok(cache.packages.clone());
}
let mut req = Request::new(Method::GET, self.exporter_url.clone());
*req.timeout_mut() = Some(Duration::from_secs(30));

let req = Request::new(Method::GET, self.exporter_url.clone());
let response = self
.http_client
.execute(req)
.await
.context("failed to make http request")?;

if response.status() != StatusCode::OK {
return Err(anyhow!(format!("request failed: {}", response.status())).into());
return Err(anyhow!("incorrect response code: {}", response.status()).into());
}

let bs = response
.bytes()
.await
.context("failed to consume response")?
.context("failed to fetch response body")?
.to_vec();

let pkgs: Vec<Package> =
serde_json::from_slice(&bs).context("failed to parse json body")?;

cache.packages.clone_from(&pkgs);
cache.updated_at = now;
Ok(pkgs)
}
}

#[async_trait]
impl Run for CertificatesImporter {
async fn run(&self, token: CancellationToken) -> Result<(), anyhow::Error> {
let mut interval = tokio::time::interval(self.poll_interval);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

loop {
select! {
biased;

() = token.cancelled() => {
warn!("{self:?}: exiting");
return Ok(());
},

_ = interval.tick() => {
if let Err(e) = self.refresh().await {
warn!("{self:?}: unable to refresh certificates: {e:#}");
};
}
}
}
}
}

// Wraps an importer with a verifier
// The importer imports a set of packages as usual, but then passes the packages to the verifier.
// The verifier parses out the public certificate and compares the common name to the name in the package to make sure they match.
Expand Down Expand Up @@ -222,8 +252,11 @@ mod tests {
.into())
});

let importer =
CertificatesImporter::new(Arc::new(http_client), Url::from_str("http://foo")?);
let importer = CertificatesImporter::new(
Arc::new(http_client),
Url::from_str("http://foo")?,
Duration::ZERO,
);

let out = importer.import().await?;

Expand Down
34 changes: 33 additions & 1 deletion src/tls/cert/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use arc_swap::ArcSwapOption;
use derive_new::new;
use fqdn::{Fqdn, FQDN};
use ic_bn_lib::http::ALPN_ACME;
use prometheus::{register_int_gauge_vec_with_registry, IntGaugeVec, Registry};
use rustls::{server::ClientHello, sign::CertifiedKey};

use super::Cert;
Expand All @@ -15,6 +16,25 @@ pub trait StoresCertificates<T: Clone + Send + Sync>: Send + Sync {
fn store(&self, cert_list: Vec<Cert<T>>) -> Result<(), Error>;
}

#[derive(Debug, Clone)]
pub struct Metrics {
count: IntGaugeVec,
}

impl Metrics {
pub fn new(registry: &Registry) -> Self {
Self {
count: register_int_gauge_vec_with_registry!(
format!("cert_storage_count_total"),
format!("Counts the number of certificates in the storage"),
&["wildcard"],
registry
)
.unwrap(),
}
}
}

struct StorageInner<T: Clone + Send + Sync> {
certs: BTreeMap<FQDN, Arc<Cert<T>>>,
certs_wildcard: BTreeMap<FQDN, Arc<Cert<T>>>,
Expand All @@ -25,6 +45,7 @@ struct StorageInner<T: Clone + Send + Sync> {
pub struct Storage<T: Clone + Send + Sync> {
#[new(default)]
inner: ArcSwapOption<StorageInner<T>>,
metrics: Metrics,
}

impl<T: Clone + Send + Sync> fmt::Debug for Storage<T> {
Expand Down Expand Up @@ -90,6 +111,16 @@ impl<T: Clone + Send + Sync> StoresCertificates<T> for Storage<T> {
}
}

self.metrics
.count
.with_label_values(&["no"])
.set(certs.len() as i64);

self.metrics
.count
.with_label_values(&["yes"])
.set(certs_wildcard.len() as i64);

let inner = StorageInner {
certs,
certs_wildcard,
Expand Down Expand Up @@ -124,11 +155,12 @@ impl ResolvesServerCert for StorageKey {
#[cfg(test)]
pub mod test {
use fqdn::fqdn;
use prometheus::Registry;

use super::*;

pub fn create_test_storage() -> Storage<String> {
let storage: Storage<String> = Storage::new();
let storage: Storage<String> = Storage::new(Metrics::new(&Registry::new()));

let certs = vec![
Cert {
Expand Down
Loading

0 comments on commit 26de8ca

Please sign in to comment.