From 786fe729fecf2d1e1d9d72b1d719270831dc3134 Mon Sep 17 00:00:00 2001 From: JR Conlin Date: Mon, 7 Jun 2021 16:32:38 -0700 Subject: [PATCH] feat: add more metrics (#130) * feat: add more metrics * Adds richer set of metrics and metric handling * Add stupid GC function for tiles cache. Issue: #71, #68, #35 Closes #120 --- src/adm/filter.rs | 12 ++++++++++- src/adm/tiles.rs | 5 ++++- src/metrics.rs | 4 ++-- src/server/cache.rs | 47 +++++++++++++++++++++++++++++++++++------- src/server/location.rs | 35 ++++++++++++++++++++++++------- src/tags.rs | 16 +++++++++++++- src/web/handlers.rs | 18 +++++++++++----- 7 files changed, 112 insertions(+), 25 deletions(-) diff --git a/src/adm/filter.rs b/src/adm/filter.rs index e7c874d5..ba7be6be 100644 --- a/src/adm/filter.rs +++ b/src/adm/filter.rs @@ -10,6 +10,7 @@ use url::Url; use super::{tiles::AdmTile, AdmAdvertiserFilterSettings, Tile, DEFAULT}; use crate::{ error::{HandlerError, HandlerErrorKind, HandlerResult}, + metrics::Metrics, tags::Tags, web::middleware::sentry as l_sentry, }; @@ -208,7 +209,12 @@ impl AdmFilter { /// /// - Returns None for tiles that shouldn't be shown to the client /// - Modifies tiles for output to the client (adding additional fields, etc.) - pub fn filter_and_process(&self, mut tile: AdmTile, tags: &mut Tags) -> Option { + pub fn filter_and_process( + &self, + mut tile: AdmTile, + tags: &mut Tags, + metrics: &Metrics, + ) -> Option { // Use strict matching for now, eventually, we may want to use backwards expanding domain // searches, (.e.g "xyz.example.com" would match "example.com") match self.filter_set.get(&tile.name.to_lowercase()) { @@ -238,16 +244,19 @@ impl AdmFilter { }; if let Err(e) = self.check_advertiser(adv_filter, &mut tile, tags) { trace!("Rejecting tile: bad adv"); + metrics.incr_with_tags("filter.adm.err.invalid_advertiser", Some(tags)); self.report(&e, tags); return None; } if let Err(e) = self.check_click(click_filter, &mut tile, tags) { trace!("Rejecting tile: bad click"); + metrics.incr_with_tags("filter.adm.err.invalid_click", Some(tags)); self.report(&e, tags); return None; } if let Err(e) = self.check_impression(impression_filter, &mut tile, tags) { trace!("Rejecting tile: bad imp"); + metrics.incr_with_tags("filter.adm.err.invalid_impression", Some(tags)); self.report(&e, tags); return None; } @@ -260,6 +269,7 @@ impl AdmFilter { )) } None => { + metrics.incr_with_tags("filter.adm.err.unexpected_advertiser", Some(tags)); self.report( &HandlerErrorKind::UnexpectedAdvertiser(tile.name).into(), tags, diff --git a/src/adm/tiles.rs b/src/adm/tiles.rs index 84e6c672..530747b9 100644 --- a/src/adm/tiles.rs +++ b/src/adm/tiles.rs @@ -7,6 +7,7 @@ use url::Url; use crate::{ adm::DEFAULT, error::{HandlerError, HandlerErrorKind, HandlerResult}, + metrics::Metrics, server::{location::LocationResult, ServerState}, settings::Settings, tags::Tags, @@ -115,6 +116,7 @@ pub async fn get_tiles( form_factor: FormFactor, state: &ServerState, tags: &mut Tags, + metrics: &Metrics, headers: Option<&HeaderMap>, ) -> Result { // XXX: Assumes adm_endpoint_url includes @@ -154,6 +156,7 @@ pub async fn get_tiles( trace!("Getting fake response: {:?}", &test_response); AdmTileResponse::fake_response(&state.settings, test_response)? } else { + // TODO: Add timeout reqwest_client .get(adm_url) .send() @@ -177,7 +180,7 @@ pub async fn get_tiles( let tiles = response .tiles .into_iter() - .filter_map(|tile| state.filter.filter_and_process(tile, tags)) + .filter_map(|tile| state.filter.filter_and_process(tile, tags, metrics)) .take(settings.adm_max_tiles as usize) .collect(); Ok(TileResponse { tiles }) diff --git a/src/metrics.rs b/src/metrics.rs index 967f4dc7..672d19de 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -135,12 +135,12 @@ impl Metrics { } /// Increment a given metric with optional [crate::tags::Tags] - pub fn incr_with_tags(&self, label: &str, tags: Option) { + pub fn incr_with_tags(&self, label: &str, tags: Option<&Tags>) { if let Some(client) = self.client.as_ref() { let mut tagged = client.incr_with_tags(label); let mut mtags = self.tags.clone().unwrap_or_default(); if let Some(tags) = tags { - mtags.extend(tags); + mtags.extend(tags.clone()); } for key in mtags.tags.keys().clone() { if let Some(val) = mtags.tags.get(key) { diff --git a/src/server/cache.rs b/src/server/cache.rs index 298254ea..38c2dc1a 100644 --- a/src/server/cache.rs +++ b/src/server/cache.rs @@ -1,11 +1,17 @@ //! Tile cache manager -use std::{collections::HashMap, fmt::Debug, ops::Deref, sync::Arc, time::Duration}; +use std::{ + collections::HashMap, + fmt::Debug, + ops::Deref, + sync::Arc, + time::{Duration, SystemTime}, +}; -use cadence::Counted; use tokio::sync::RwLock; use crate::{ adm, + metrics::Metrics, server::location::LocationResult, server::ServerState, tags::Tags, @@ -32,6 +38,7 @@ pub struct AudienceKey { #[derive(Debug)] pub struct Tiles { pub json: String, + pub ttl: SystemTime, } /// The simple tile Cache @@ -71,14 +78,28 @@ async fn tile_cache_updater(state: &ServerState) { tiles_cache, reqwest_client, adm_endpoint_url, - metrics, + settings, .. } = state; trace!("tile_cache_updater running..."); let keys: Vec<_> = tiles_cache.read().await.keys().cloned().collect(); + let mut cache_size = 0; + let mut cache_count: i64 = 0; for key in keys { + // proactively remove expired tiles from the cache, since we only + // write new ones (or ones which return a value) + // TODO: This could possibly be rewritten as a one liner by someone more clever. + { + let mut tiles = tiles_cache.write().await; + if let Some(tile) = tiles.get(&key) { + if tile.ttl <= SystemTime::now() { + tiles.remove(&key); + } + } + } let mut tags = Tags::default(); + let metrics = Metrics::from(state); let result = adm::get_tiles( reqwest_client, adm_endpoint_url, @@ -92,6 +113,7 @@ async fn tile_cache_updater(state: &ServerState) { key.form_factor, state, &mut tags, + &metrics, None, ) .await; @@ -103,10 +125,12 @@ async fn tile_cache_updater(state: &ServerState) { Ok(tiles) => tiles, Err(e) => { error!("tile_cache_updater: response error {}", e); - metrics.incr_with_tags("tile_cache_updater.error").send(); + metrics.incr_with_tags("tile_cache_updater.error", Some(&tags)); continue; } }; + cache_size += tiles.len(); + cache_count += 1; // XXX: not a great comparison (comparing json Strings).. let new_tiles = { tiles_cache @@ -117,14 +141,23 @@ async fn tile_cache_updater(state: &ServerState) { }; if new_tiles { trace!("tile_cache_updater updating: {:?}", &key); - tiles_cache.write().await.insert(key, Tiles { json: tiles }); - metrics.incr_with_tags("tile_cache_updater.update").send(); + tiles_cache.write().await.insert( + key, + Tiles { + json: tiles, + ttl: SystemTime::now() + Duration::from_secs(settings.tiles_ttl as u64), + }, + ); + metrics.incr_with_tags("tile_cache_updater.update", Some(&tags)); } } Err(e) => { error!("tile_cache_updater error: {}", e); - metrics.incr_with_tags("tile_cache_updater.error").send(); + metrics.incr_with_tags("tile_cache_updater.error", Some(&tags)); } } } + let metrics = Metrics::from(state); + metrics.count("tile_cache_updater.size", cache_size as i64); + metrics.count("tile_cache_updater.count", cache_count); } diff --git a/src/server/location.rs b/src/server/location.rs index 99d14735..3e096bf6 100644 --- a/src/server/location.rs +++ b/src/server/location.rs @@ -12,6 +12,7 @@ use maxminddb::{self, geoip2::City, MaxMindDBError}; use serde::{self, Serialize}; use crate::error::{HandlerErrorKind, HandlerResult}; +use crate::metrics::Metrics; use crate::settings::Settings; const GOOG_LOC_HEADER: &str = "x-client-geo-location"; @@ -44,29 +45,31 @@ impl From<&Settings> for LocationResult { /// Read the [RequestHead] from either [HttpRequest] and [ServiceRequest] /// and pull the user location impl LocationResult { - pub fn from_header(head: &RequestHead, settings: &Settings) -> Self { + pub fn from_header(head: &RequestHead, settings: &Settings, metrics: &Metrics) -> Self { let headers = head.headers(); if let Some(ref loc_header) = settings.location_test_header { if let Some(header) = headers.get(loc_header) { trace!("Using test header"); - return Self::from_headervalue(header, settings); + return Self::from_headervalue(header, settings, metrics); } } if let Some(header) = headers.get(GOOG_LOC_HEADER) { trace!("Found Google Header"); - return Self::from_headervalue(header, settings); + return Self::from_headervalue(header, settings, metrics); } Self::from(settings) } /// Read a [HeaderValue] to see if there's anything we can use to derive the location - fn from_headervalue(header: &HeaderValue, settings: &Settings) -> Self { + fn from_headervalue(header: &HeaderValue, settings: &Settings, metrics: &Metrics) -> Self { let loc_string = header.to_str().unwrap_or(""); let mut loc = Self::from(settings); let mut parts = loc_string.split(','); if let Some(country) = parts.next().map(|country| country.trim().to_owned()) { if !country.is_empty() { loc.country = Some(country) + } else { + metrics.incr("location.unknown.country"); } } if let Some(subdivision) = parts.next().map(|subdivision| { @@ -82,6 +85,8 @@ impl LocationResult { }) { if !subdivision.is_empty() { loc.subdivision = Some(subdivision) + } else { + metrics.incr("location.unknown.subdivision"); } } loc @@ -255,6 +260,7 @@ impl Location { &self, ip_addr: IpAddr, preferred_languages: &[String], + metrics: &Metrics, ) -> HandlerResult> { if self.iploc.is_none() { return Ok(None); @@ -302,9 +308,13 @@ impl Location { Ok(location) => { if let Some(names) = location.city.and_then(|c| c.names) { result.city = get_preferred_language_element(&preferred_languages, &names) + } else { + metrics.incr("location.unknown.city"); }; if let Some(names) = location.country.and_then(|c| c.names) { result.country = get_preferred_language_element(&preferred_languages, &names) + } else { + metrics.incr("location.unknown.country"); }; if let Some(divs) = location.subdivisions { if let Some(subdivision) = divs.get(0) { @@ -313,6 +323,8 @@ impl Location { get_preferred_language_element(&preferred_languages, names); } } + } else { + metrics.incr("location.unknown.subdivision") } if let Some(location) = location.location { result.dma = location.metro_code; @@ -396,9 +408,13 @@ mod test { ..Default::default() }; let location = Location::from(&settings); + let metrics = Metrics::noop(); if location.is_available() { // TODO: either mock maxminddb::Reader or pass it in as a wrapped impl - let result = location.mmdb_locate(test_ip, &langs).await?.unwrap(); + let result = location + .mmdb_locate(test_ip, &langs, &metrics) + .await? + .unwrap(); assert_eq!(result.city, Some("Milton".to_owned())); assert_eq!(result.subdivision, Some("Washington".to_owned())); assert_eq!(result.country, Some("United States".to_owned())); @@ -417,8 +433,9 @@ mod test { ..Default::default() }; let location = Location::from(&settings); + let metrics = Metrics::noop(); if location.is_available() { - let result = location.mmdb_locate(test_ip, &langs).await?; + let result = location.mmdb_locate(test_ip, &langs, &metrics).await?; assert!(result.is_none()); } else { println!("⚠Location Database not found, cannot test location, skipping"); @@ -433,6 +450,7 @@ mod test { location_test_header: Some(test_header.to_string()), ..Default::default() }; + let metrics = Metrics::noop(); let mut test_head = RequestHead::default(); let hv = "US, USCA"; @@ -441,7 +459,7 @@ mod test { HeaderValue::from_static(&hv), ); - let loc = LocationResult::from_header(&test_head, &settings); + let loc = LocationResult::from_header(&test_head, &settings, &metrics); assert!(loc.region() == *"CA"); assert!(loc.country() == *"US"); Ok(()) @@ -455,6 +473,7 @@ mod test { adm_endpoint_url: "http://localhost:8080".to_owned(), ..Default::default() }; + let metrics = Metrics::noop(); assert!(settings.verify_settings().is_err()); settings.fallback_location = "Us, Oklahoma".to_owned(); assert!(settings.verify_settings().is_err()); @@ -469,7 +488,7 @@ mod test { HeaderName::from_static(GOOG_LOC_HEADER), HeaderValue::from_static(&hv), ); - let loc = LocationResult::from_header(&test_head, &settings); + let loc = LocationResult::from_header(&test_head, &settings, &metrics); assert!(loc.region() == *"OK"); assert!(loc.country() == *"US"); Ok(()) diff --git a/src/tags.rs b/src/tags.rs index b09e6155..cec2459c 100644 --- a/src/tags.rs +++ b/src/tags.rs @@ -139,7 +139,21 @@ impl From for Tags { } } -/// Tags are extra data to be recorded in metric and logging calls. +/// Convenience function to bulk load `extra` +impl Tags { + pub fn from_extra(map: Vec<(&'static str, String)>) -> Self { + let mut extra = HashMap::new(); + for (key, val) in map { + extra.insert(key.to_owned(), val); + } + Self { + tags: HashMap::new(), + extra, + } + } +} + +// Tags are extra data to be recorded in metric and logging calls. /// If additional tags are required or desired, you will need to add them to the /// mutable extensions, e.g. /// ```compile_fail diff --git a/src/web/handlers.rs b/src/web/handlers.rs index 82ecf2ed..324b14dc 100644 --- a/src/web/handlers.rs +++ b/src/web/handlers.rs @@ -1,4 +1,6 @@ //! API Handlers +use std::time::{Duration, SystemTime}; + use actix_web::{web, HttpRequest, HttpResponse}; use super::user_agent; @@ -22,8 +24,10 @@ pub async fn get_tiles( request: HttpRequest, ) -> Result { trace!("get_tiles"); + metrics.incr("tiles.update"); let cinfo = request.connection_info(); + let settings = &state.settings; let ip_addr_str = cinfo.remote_addr().unwrap_or({ let default = state .adm_country_ip_map @@ -47,11 +51,11 @@ pub async fn get_tiles( }; state .mmdb - .mmdb_locate(addr, &["en".to_owned()]) + .mmdb_locate(addr, &["en".to_owned()], &metrics) .await? - .unwrap_or_else(|| LocationResult::from_header(header, &state.settings)) + .unwrap_or_else(|| LocationResult::from_header(header, settings, &metrics)) } else { - LocationResult::from_header(header, &state.settings) + LocationResult::from_header(header, settings, &metrics) }; let mut tags = Tags::default(); @@ -69,7 +73,7 @@ pub async fn get_tiles( form_factor, os_family, }; - if !state.settings.test_mode { + if !settings.test_mode { if let Some(tiles) = state.tiles_cache.read().await.get(&audience_key) { trace!("get_tiles: cache hit: {:?}", audience_key); metrics.incr("tiles_cache.hit"); @@ -86,8 +90,9 @@ pub async fn get_tiles( form_factor, &state, &mut tags, + &metrics, // be aggressive about not passing headers unless we absolutely need to - if state.settings.test_mode { + if settings.test_mode { Some(request.head().headers()) } else { None @@ -98,6 +103,7 @@ pub async fn get_tiles( Ok(response) => { // adM sometimes returns an invalid response. We don't want to cache that. if response.tiles.is_empty() { + metrics.incr_with_tags("tiles.empty", Some(&tags)); return Ok(HttpResponse::NoContent().finish()); }; let tiles = serde_json::to_string(&response).map_err(|e| { @@ -109,6 +115,7 @@ pub async fn get_tiles( audience_key, cache::Tiles { json: tiles.clone(), + ttl: SystemTime::now() + Duration::from_secs(settings.tiles_ttl as u64), }, ); tiles @@ -116,6 +123,7 @@ pub async fn get_tiles( Err(e) => match e.kind() { HandlerErrorKind::BadAdmResponse(es) => { warn!("Bad response from ADM: {:?}", e); + metrics.incr_with_tags("tiles.invalid", Some(&tags)); // Report directly to sentry // (This is starting to become a pattern. 🤔) let mut tags = Tags::from(request.head());