Skip to content
This repository has been archived by the owner on Jul 26, 2024. It is now read-only.

Commit

Permalink
feat: add more metrics (#130)
Browse files Browse the repository at this point in the history
* feat: add more metrics

* Adds richer set of metrics and metric handling
* Add stupid GC function for tiles cache.

Issue: #71, #68, #35
Closes #120
  • Loading branch information
jrconlin authored Jun 7, 2021
1 parent d4f5aa2 commit 786fe72
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 25 deletions.
12 changes: 11 additions & 1 deletion src/adm/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use url::Url;
use super::{tiles::AdmTile, AdmAdvertiserFilterSettings, Tile, DEFAULT};
use crate::{
error::{HandlerError, HandlerErrorKind, HandlerResult},
metrics::Metrics,
tags::Tags,
web::middleware::sentry as l_sentry,
};
Expand Down Expand Up @@ -208,7 +209,12 @@ impl AdmFilter {
///
/// - Returns None for tiles that shouldn't be shown to the client
/// - Modifies tiles for output to the client (adding additional fields, etc.)
pub fn filter_and_process(&self, mut tile: AdmTile, tags: &mut Tags) -> Option<Tile> {
pub fn filter_and_process(
&self,
mut tile: AdmTile,
tags: &mut Tags,
metrics: &Metrics,
) -> Option<Tile> {
// Use strict matching for now, eventually, we may want to use backwards expanding domain
// searches, (.e.g "xyz.example.com" would match "example.com")
match self.filter_set.get(&tile.name.to_lowercase()) {
Expand Down Expand Up @@ -238,16 +244,19 @@ impl AdmFilter {
};
if let Err(e) = self.check_advertiser(adv_filter, &mut tile, tags) {
trace!("Rejecting tile: bad adv");
metrics.incr_with_tags("filter.adm.err.invalid_advertiser", Some(tags));
self.report(&e, tags);
return None;
}
if let Err(e) = self.check_click(click_filter, &mut tile, tags) {
trace!("Rejecting tile: bad click");
metrics.incr_with_tags("filter.adm.err.invalid_click", Some(tags));
self.report(&e, tags);
return None;
}
if let Err(e) = self.check_impression(impression_filter, &mut tile, tags) {
trace!("Rejecting tile: bad imp");
metrics.incr_with_tags("filter.adm.err.invalid_impression", Some(tags));
self.report(&e, tags);
return None;
}
Expand All @@ -260,6 +269,7 @@ impl AdmFilter {
))
}
None => {
metrics.incr_with_tags("filter.adm.err.unexpected_advertiser", Some(tags));
self.report(
&HandlerErrorKind::UnexpectedAdvertiser(tile.name).into(),
tags,
Expand Down
5 changes: 4 additions & 1 deletion src/adm/tiles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use url::Url;
use crate::{
adm::DEFAULT,
error::{HandlerError, HandlerErrorKind, HandlerResult},
metrics::Metrics,
server::{location::LocationResult, ServerState},
settings::Settings,
tags::Tags,
Expand Down Expand Up @@ -115,6 +116,7 @@ pub async fn get_tiles(
form_factor: FormFactor,
state: &ServerState,
tags: &mut Tags,
metrics: &Metrics,
headers: Option<&HeaderMap>,
) -> Result<TileResponse, HandlerError> {
// XXX: Assumes adm_endpoint_url includes
Expand Down Expand Up @@ -154,6 +156,7 @@ pub async fn get_tiles(
trace!("Getting fake response: {:?}", &test_response);
AdmTileResponse::fake_response(&state.settings, test_response)?
} else {
// TODO: Add timeout
reqwest_client
.get(adm_url)
.send()
Expand All @@ -177,7 +180,7 @@ pub async fn get_tiles(
let tiles = response
.tiles
.into_iter()
.filter_map(|tile| state.filter.filter_and_process(tile, tags))
.filter_map(|tile| state.filter.filter_and_process(tile, tags, metrics))
.take(settings.adm_max_tiles as usize)
.collect();
Ok(TileResponse { tiles })
Expand Down
4 changes: 2 additions & 2 deletions src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,12 @@ impl Metrics {
}

/// Increment a given metric with optional [crate::tags::Tags]
pub fn incr_with_tags(&self, label: &str, tags: Option<Tags>) {
pub fn incr_with_tags(&self, label: &str, tags: Option<&Tags>) {
if let Some(client) = self.client.as_ref() {
let mut tagged = client.incr_with_tags(label);
let mut mtags = self.tags.clone().unwrap_or_default();
if let Some(tags) = tags {
mtags.extend(tags);
mtags.extend(tags.clone());
}
for key in mtags.tags.keys().clone() {
if let Some(val) = mtags.tags.get(key) {
Expand Down
47 changes: 40 additions & 7 deletions src/server/cache.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
//! Tile cache manager
use std::{collections::HashMap, fmt::Debug, ops::Deref, sync::Arc, time::Duration};
use std::{
collections::HashMap,
fmt::Debug,
ops::Deref,
sync::Arc,
time::{Duration, SystemTime},
};

use cadence::Counted;
use tokio::sync::RwLock;

use crate::{
adm,
metrics::Metrics,
server::location::LocationResult,
server::ServerState,
tags::Tags,
Expand All @@ -32,6 +38,7 @@ pub struct AudienceKey {
#[derive(Debug)]
pub struct Tiles {
pub json: String,
pub ttl: SystemTime,
}

/// The simple tile Cache
Expand Down Expand Up @@ -71,14 +78,28 @@ async fn tile_cache_updater(state: &ServerState) {
tiles_cache,
reqwest_client,
adm_endpoint_url,
metrics,
settings,
..
} = state;

trace!("tile_cache_updater running...");
let keys: Vec<_> = tiles_cache.read().await.keys().cloned().collect();
let mut cache_size = 0;
let mut cache_count: i64 = 0;
for key in keys {
// proactively remove expired tiles from the cache, since we only
// write new ones (or ones which return a value)
// TODO: This could possibly be rewritten as a one liner by someone more clever.
{
let mut tiles = tiles_cache.write().await;
if let Some(tile) = tiles.get(&key) {
if tile.ttl <= SystemTime::now() {
tiles.remove(&key);
}
}
}
let mut tags = Tags::default();
let metrics = Metrics::from(state);
let result = adm::get_tiles(
reqwest_client,
adm_endpoint_url,
Expand All @@ -92,6 +113,7 @@ async fn tile_cache_updater(state: &ServerState) {
key.form_factor,
state,
&mut tags,
&metrics,
None,
)
.await;
Expand All @@ -103,10 +125,12 @@ async fn tile_cache_updater(state: &ServerState) {
Ok(tiles) => tiles,
Err(e) => {
error!("tile_cache_updater: response error {}", e);
metrics.incr_with_tags("tile_cache_updater.error").send();
metrics.incr_with_tags("tile_cache_updater.error", Some(&tags));
continue;
}
};
cache_size += tiles.len();
cache_count += 1;
// XXX: not a great comparison (comparing json Strings)..
let new_tiles = {
tiles_cache
Expand All @@ -117,14 +141,23 @@ async fn tile_cache_updater(state: &ServerState) {
};
if new_tiles {
trace!("tile_cache_updater updating: {:?}", &key);
tiles_cache.write().await.insert(key, Tiles { json: tiles });
metrics.incr_with_tags("tile_cache_updater.update").send();
tiles_cache.write().await.insert(
key,
Tiles {
json: tiles,
ttl: SystemTime::now() + Duration::from_secs(settings.tiles_ttl as u64),
},
);
metrics.incr_with_tags("tile_cache_updater.update", Some(&tags));
}
}
Err(e) => {
error!("tile_cache_updater error: {}", e);
metrics.incr_with_tags("tile_cache_updater.error").send();
metrics.incr_with_tags("tile_cache_updater.error", Some(&tags));
}
}
}
let metrics = Metrics::from(state);
metrics.count("tile_cache_updater.size", cache_size as i64);
metrics.count("tile_cache_updater.count", cache_count);
}
35 changes: 27 additions & 8 deletions src/server/location.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use maxminddb::{self, geoip2::City, MaxMindDBError};
use serde::{self, Serialize};

use crate::error::{HandlerErrorKind, HandlerResult};
use crate::metrics::Metrics;
use crate::settings::Settings;

const GOOG_LOC_HEADER: &str = "x-client-geo-location";
Expand Down Expand Up @@ -44,29 +45,31 @@ impl From<&Settings> for LocationResult {
/// Read the [RequestHead] from either [HttpRequest] and [ServiceRequest]
/// and pull the user location
impl LocationResult {
pub fn from_header(head: &RequestHead, settings: &Settings) -> Self {
pub fn from_header(head: &RequestHead, settings: &Settings, metrics: &Metrics) -> Self {
let headers = head.headers();
if let Some(ref loc_header) = settings.location_test_header {
if let Some(header) = headers.get(loc_header) {
trace!("Using test header");
return Self::from_headervalue(header, settings);
return Self::from_headervalue(header, settings, metrics);
}
}
if let Some(header) = headers.get(GOOG_LOC_HEADER) {
trace!("Found Google Header");
return Self::from_headervalue(header, settings);
return Self::from_headervalue(header, settings, metrics);
}
Self::from(settings)
}

/// Read a [HeaderValue] to see if there's anything we can use to derive the location
fn from_headervalue(header: &HeaderValue, settings: &Settings) -> Self {
fn from_headervalue(header: &HeaderValue, settings: &Settings, metrics: &Metrics) -> Self {
let loc_string = header.to_str().unwrap_or("");
let mut loc = Self::from(settings);
let mut parts = loc_string.split(',');
if let Some(country) = parts.next().map(|country| country.trim().to_owned()) {
if !country.is_empty() {
loc.country = Some(country)
} else {
metrics.incr("location.unknown.country");
}
}
if let Some(subdivision) = parts.next().map(|subdivision| {
Expand All @@ -82,6 +85,8 @@ impl LocationResult {
}) {
if !subdivision.is_empty() {
loc.subdivision = Some(subdivision)
} else {
metrics.incr("location.unknown.subdivision");
}
}
loc
Expand Down Expand Up @@ -255,6 +260,7 @@ impl Location {
&self,
ip_addr: IpAddr,
preferred_languages: &[String],
metrics: &Metrics,
) -> HandlerResult<Option<LocationResult>> {
if self.iploc.is_none() {
return Ok(None);
Expand Down Expand Up @@ -302,9 +308,13 @@ impl Location {
Ok(location) => {
if let Some(names) = location.city.and_then(|c| c.names) {
result.city = get_preferred_language_element(&preferred_languages, &names)
} else {
metrics.incr("location.unknown.city");
};
if let Some(names) = location.country.and_then(|c| c.names) {
result.country = get_preferred_language_element(&preferred_languages, &names)
} else {
metrics.incr("location.unknown.country");
};
if let Some(divs) = location.subdivisions {
if let Some(subdivision) = divs.get(0) {
Expand All @@ -313,6 +323,8 @@ impl Location {
get_preferred_language_element(&preferred_languages, names);
}
}
} else {
metrics.incr("location.unknown.subdivision")
}
if let Some(location) = location.location {
result.dma = location.metro_code;
Expand Down Expand Up @@ -396,9 +408,13 @@ mod test {
..Default::default()
};
let location = Location::from(&settings);
let metrics = Metrics::noop();
if location.is_available() {
// TODO: either mock maxminddb::Reader or pass it in as a wrapped impl
let result = location.mmdb_locate(test_ip, &langs).await?.unwrap();
let result = location
.mmdb_locate(test_ip, &langs, &metrics)
.await?
.unwrap();
assert_eq!(result.city, Some("Milton".to_owned()));
assert_eq!(result.subdivision, Some("Washington".to_owned()));
assert_eq!(result.country, Some("United States".to_owned()));
Expand All @@ -417,8 +433,9 @@ mod test {
..Default::default()
};
let location = Location::from(&settings);
let metrics = Metrics::noop();
if location.is_available() {
let result = location.mmdb_locate(test_ip, &langs).await?;
let result = location.mmdb_locate(test_ip, &langs, &metrics).await?;
assert!(result.is_none());
} else {
println!("⚠Location Database not found, cannot test location, skipping");
Expand All @@ -433,6 +450,7 @@ mod test {
location_test_header: Some(test_header.to_string()),
..Default::default()
};
let metrics = Metrics::noop();

let mut test_head = RequestHead::default();
let hv = "US, USCA";
Expand All @@ -441,7 +459,7 @@ mod test {
HeaderValue::from_static(&hv),
);

let loc = LocationResult::from_header(&test_head, &settings);
let loc = LocationResult::from_header(&test_head, &settings, &metrics);
assert!(loc.region() == *"CA");
assert!(loc.country() == *"US");
Ok(())
Expand All @@ -455,6 +473,7 @@ mod test {
adm_endpoint_url: "http://localhost:8080".to_owned(),
..Default::default()
};
let metrics = Metrics::noop();
assert!(settings.verify_settings().is_err());
settings.fallback_location = "Us, Oklahoma".to_owned();
assert!(settings.verify_settings().is_err());
Expand All @@ -469,7 +488,7 @@ mod test {
HeaderName::from_static(GOOG_LOC_HEADER),
HeaderValue::from_static(&hv),
);
let loc = LocationResult::from_header(&test_head, &settings);
let loc = LocationResult::from_header(&test_head, &settings, &metrics);
assert!(loc.region() == *"OK");
assert!(loc.country() == *"US");
Ok(())
Expand Down
16 changes: 15 additions & 1 deletion src/tags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,21 @@ impl From<HttpRequest> for Tags {
}
}

/// Tags are extra data to be recorded in metric and logging calls.
/// Convenience function to bulk load `extra`
impl Tags {
pub fn from_extra(map: Vec<(&'static str, String)>) -> Self {
let mut extra = HashMap::new();
for (key, val) in map {
extra.insert(key.to_owned(), val);
}
Self {
tags: HashMap::new(),
extra,
}
}
}

// Tags are extra data to be recorded in metric and logging calls.
/// If additional tags are required or desired, you will need to add them to the
/// mutable extensions, e.g.
/// ```compile_fail
Expand Down
Loading

0 comments on commit 786fe72

Please sign in to comment.