From aee1984c15ff8e43992ce80fc4f3f1b95220f4fc Mon Sep 17 00:00:00 2001 From: Kruno Tomola Fabro Date: Thu, 7 Jul 2022 16:55:04 +0200 Subject: [PATCH] enhancement(vrl): Add `geoip` enrichment table (#13338) * Inital impl * Add tests * Add docs * Rename to path Signed-off-by: Kruno Tomola Fabro * Remove examples Signed-off-by: Kruno Tomola Fabro * Small fixes Signed-off-by: Kruno Tomola Fabro * Apply review comments Signed-off-by: Kruno Tomola Fabro * Add benches Signed-off-by: Kruno Tomola Fabro * Bump Signed-off-by: Kruno Tomola Fabro * Bump Signed-off-by: Kruno Tomola Fabro --- Cargo.toml | 7 +- ...nt_tables_file.rs => enrichment_tables.rs} | 91 +++- src/enrichment_tables/geoip.rs | 429 ++++++++++++++++++ src/enrichment_tables/mod.rs | 3 + website/cue/reference/configuration.cue | 57 ++- website/cue/reference/remap/functions.cue | 33 +- 6 files changed, 609 insertions(+), 11 deletions(-) rename benches/{enrichment_tables_file.rs => enrichment_tables.rs} (69%) create mode 100644 src/enrichment_tables/geoip.rs diff --git a/Cargo.toml b/Cargo.toml index 5db0ad77c9c50..b06c4980f862b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -428,8 +428,9 @@ protobuf-build = ["dep:tonic-build", "dep:prost-build"] gcp = ["dep:base64", "dep:goauth", "dep:smpl_jwt"] # Enrichment Tables -enrichment-tables = ["enrichment-tables-file"] +enrichment-tables = ["enrichment-tables-file","enrichment-tables-geoip"] enrichment-tables-file = [ "dep:csv", "dep:seahash", "dep:hash_hasher" ] +enrichment-tables-geoip = ["dep:maxminddb"] # Sources sources = ["sources-logs", "sources-metrics"] @@ -809,7 +810,7 @@ remap-benches = ["transforms-remap"] transform-benches = ["transforms-filter", "transforms-dedupe", "transforms-reduce", "transforms-route"] codecs-benches = [] loki-benches = ["sinks-loki"] -enrichment-tables-benches = ["enrichment-tables-file"] +enrichment-tables-benches = ["enrichment-tables-file","enrichment-tables-geoip"] [[bench]] name = "default" @@ -828,7 +829,7 @@ harness = false required-features = ["remap-benches"] [[bench]] -name = "enrichment_tables_file" +name = "enrichment_tables" harness = false required-features = ["enrichment-tables-benches"] diff --git a/benches/enrichment_tables_file.rs b/benches/enrichment_tables.rs similarity index 69% rename from benches/enrichment_tables_file.rs rename to benches/enrichment_tables.rs index d2a93e6c84445..73038ff913e6f 100644 --- a/benches/enrichment_tables_file.rs +++ b/benches/enrichment_tables.rs @@ -4,12 +4,16 @@ use chrono::prelude::*; use criterion::{criterion_group, criterion_main, BatchSize, Criterion}; use enrichment::Case; use value::Value; -use vector::enrichment_tables::{file::File, Condition, Table}; +use vector::enrichment_tables::{ + file::File, + geoip::{Geoip, GeoipConfig}, + Condition, Table, +}; criterion_group!( name = benches; config = Criterion::default().noise_threshold(0.02).sample_size(10); - targets = benchmark_enrichment_tables_file + targets = benchmark_enrichment_tables_file, benchmark_enrichment_tables_geoip ); criterion_main!(benches); @@ -221,3 +225,86 @@ fn benchmark_enrichment_tables_file(c: &mut Criterion) { }, ); } + +fn benchmark_enrichment_tables_geoip(c: &mut Criterion) { + let mut group = c.benchmark_group("enrichment_tables_geoip"); + let build = |path: &str| { + Geoip::new(GeoipConfig { + path: path.to_string(), + locale: "en".to_string(), + }) + .unwrap() + }; + + group.bench_function("enrichment_tables/geoip_isp", |b| { + let table = build("tests/data/GeoIP2-ISP-Test.mmdb"); + let ip = "208.192.1.2"; + let mut expected = BTreeMap::::new(); + expected.insert("autonomous_system_number".to_string(), 701i64.into()); + expected.insert( + "autonomous_system_organization".to_string(), + "MCI Communications Services, Inc. d/b/a Verizon Business".into(), + ); + expected.insert("isp".to_string(), "Verizon Business".into()); + expected.insert("organization".to_string(), "Verizon Business".into()); + + b.iter_batched( + || (&table, ip, &expected), + |(table, ip, expected)| { + assert_eq!( + Ok(expected), + table + .find_table_row( + Case::Insensitive, + &[Condition::Equals { + field: "ip", + value: ip.into(), + }], + None, + None, + ) + .as_ref() + ) + }, + BatchSize::SmallInput, + ); + }); + + group.bench_function("enrichment_tables/geoip_city", |b| { + let table = build("tests/data/GeoIP2-City-Test.mmdb"); + let ip = "67.43.156.9"; + let mut expected = BTreeMap::::new(); + expected.insert("city_name".to_string(), Value::Null); + expected.insert("country_code".to_string(), "BT".into()); + expected.insert("country_name".to_string(), "Bhutan".into()); + expected.insert("continent_code".to_string(), "AS".into()); + expected.insert("region_code".to_string(), Value::Null); + expected.insert("region_name".to_string(), Value::Null); + expected.insert("timezone".to_string(), "Asia/Thimphu".into()); + expected.insert("latitude".to_string(), Value::from(27.5)); + expected.insert("longitude".to_string(), Value::from(90.5)); + expected.insert("postal_code".to_string(), Value::Null); + expected.insert("metro_code".to_string(), Value::Null); + + b.iter_batched( + || (&table, ip, &expected), + |(table, ip, expected)| { + assert_eq!( + Ok(expected), + table + .find_table_row( + Case::Insensitive, + &[Condition::Equals { + field: "ip", + value: ip.into(), + }], + None, + None, + ) + .as_ref() + ) + }, + BatchSize::SmallInput, + ); + }); +} diff --git a/src/enrichment_tables/geoip.rs b/src/enrichment_tables/geoip.rs new file mode 100644 index 0000000000000..d1b51270a274f --- /dev/null +++ b/src/enrichment_tables/geoip.rs @@ -0,0 +1,429 @@ +use std::{collections::BTreeMap, fs, net::IpAddr, sync::Arc, time::SystemTime}; + +use enrichment::{Case, Condition, IndexHandle, Table}; +use maxminddb::{ + geoip2::{City, Isp}, + MaxMindDBError, Reader, +}; +use serde::{Deserialize, Serialize}; +use value::Value; + +use crate::config::{EnrichmentTableConfig, EnrichmentTableDescription, GenerateConfig}; + +// MaxMind GeoIP database files have a type field we can use to recognize specific +// products. If we encounter one of these two types, we look for ASN/ISP information; +// otherwise we expect to be working with a City database. +const ASN_DATABASE_TYPE: &str = "GeoLite2-ASN"; +const ISP_DATABASE_TYPE: &str = "GeoIP2-ISP"; + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub struct GeoipConfig { + pub path: String, + #[serde(default = "default_locale")] + pub locale: String, +} + +// valid locales are: “de”, "en", “es”, “fr”, “ja”, “pt-BR”, “ru”, and “zh-CN” +// +// https://dev.maxmind.com/geoip/docs/databases/city-and-country?lang=en +// +// TODO try to determine the system locale and use that as default if it matches a valid locale? +fn default_locale() -> String { + "en".to_string() +} + +impl GenerateConfig for GeoipConfig { + fn generate_config() -> toml::Value { + toml::Value::try_from(Self { + path: "/path/to/GeoLite2-City.mmdb".to_string(), + locale: default_locale(), + }) + .unwrap() + } +} + +inventory::submit! { + EnrichmentTableDescription::new::("geoip") +} + +#[async_trait::async_trait] +#[typetag::serde(name = "geoip")] +impl EnrichmentTableConfig for GeoipConfig { + async fn build( + &self, + _: &crate::config::GlobalOptions, + ) -> crate::Result> { + Ok(Box::new(Geoip::new(self.clone())?)) + } +} + +#[derive(Clone)] +pub struct Geoip { + config: GeoipConfig, + dbreader: Arc>>, + last_modified: SystemTime, +} + +impl Geoip { + pub fn new(config: GeoipConfig) -> crate::Result { + let table = Geoip { + last_modified: fs::metadata(&config.path)?.modified()?, + dbreader: Arc::new(Reader::open_readfile(config.path.clone())?), + config, + }; + + // Check if we can read database with dummy Ip. + let ip = IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)); + let result = if table.has_isp_db() { + table.dbreader.lookup::(ip).map(|_| ()) + } else { + table.dbreader.lookup::(ip).map(|_| ()) + }; + + match result { + Ok(_) | Err(MaxMindDBError::AddressNotFoundError(_)) => (), + Err(error) => return Err(error.into()), + } + + Ok(table) + } + + fn has_isp_db(&self) -> bool { + self.dbreader.metadata.database_type == ASN_DATABASE_TYPE + || self.dbreader.metadata.database_type == ISP_DATABASE_TYPE + } + + fn lookup(&self, ip: IpAddr, select: Option<&[String]>) -> Option> { + let mut map = BTreeMap::new(); + let mut add_field = |key: &str, value: Option| { + if select + .map(|fields| fields.iter().any(|field| field == key)) + .unwrap_or(true) + { + map.insert(key.to_string(), value.unwrap_or(Value::Null)); + } + }; + + if self.has_isp_db() { + let data = self.dbreader.lookup::(ip).ok()?; + + add_field( + "autonomous_system_number", + data.autonomous_system_number.map(Into::into), + ); + add_field( + "autonomous_system_organization", + data.autonomous_system_organization.map(Into::into), + ); + + add_field("isp", data.isp.map(Into::into)); + + add_field("organization", data.organization.map(Into::into)); + } else { + let data = self.dbreader.lookup::(ip).ok()?; + + add_field( + "city_name", + data.city + .as_ref() + .and_then(|c| c.names.as_ref()) + .and_then(|names| names.get(&*self.config.locale)) + .map(|&name| name.into()), + ); + + add_field( + "continent_code", + data.continent.and_then(|c| c.code).map(Into::into), + ); + + let country = data.country.as_ref(); + add_field( + "country_code", + country.and_then(|country| country.iso_code).map(Into::into), + ); + add_field( + "country_name", + country + .and_then(|country| { + country + .names + .as_ref() + .and_then(|names| names.get(&*self.config.locale)) + }) + .map(|&name| name.into()), + ); + + let location = data.location.as_ref(); + add_field( + "timezone", + location + .and_then(|location| location.time_zone) + .map(Into::into), + ); + add_field( + "latitude", + location + .and_then(|location| location.latitude) + .map(Into::into), + ); + add_field( + "longitude", + location + .and_then(|location| location.longitude) + .map(Into::into), + ); + add_field( + "metro_code", + location + .and_then(|location| location.metro_code) + .map(Into::into), + ); + + // last subdivision is most specific per https://github.com/maxmind/GeoIP2-java/blob/39385c6ce645374039450f57208b886cf87ade47/src/main/java/com/maxmind/geoip2/model/AbstractCityResponse.java#L96-L107 + let subdivision = data.subdivisions.as_ref().and_then(|s| s.last()); + add_field( + "region_name", + subdivision + .and_then(|subdivision| { + subdivision + .names + .as_ref() + .and_then(|names| names.get(&*self.config.locale)) + }) + .map(|&name| name.into()), + ); + add_field( + "region_code", + subdivision + .and_then(|subdivision| subdivision.iso_code) + .map(Into::into), + ); + + add_field( + "postal_code", + data.postal.and_then(|p| p.code).map(Into::into), + ); + } + + Some(map) + } +} + +impl Table for Geoip { + /// Search the enrichment table data with the given condition. + /// All conditions must match (AND). + /// + /// # Errors + /// Errors if no rows, or more than 1 row is found. + fn find_table_row<'a>( + &self, + case: Case, + condition: &'a [Condition<'a>], + select: Option<&[String]>, + index: Option, + ) -> Result, String> { + let mut rows = self.find_table_rows(case, condition, select, index)?; + + match rows.pop() { + Some(row) if rows.is_empty() => Ok(row), + Some(_) => Err("More than 1 row found".to_string()), + None => Err("IP not found".to_string()), + } + } + + /// Search the enrichment table data with the given condition. + /// All conditions must match (AND). + /// Can return multiple matched records + fn find_table_rows<'a>( + &self, + _: Case, + condition: &'a [Condition<'a>], + select: Option<&[String]>, + _: Option, + ) -> Result>, String> { + match condition.get(0) { + Some(_) if condition.len() > 1 => Err("Only one condition is allowed".to_string()), + Some(Condition::Equals { value, .. }) => { + let ip = value + .to_string_lossy() + .parse::() + .map_err(|_| "Invalid IP address".to_string())?; + Ok(self + .lookup(ip, select) + .map(|values| vec![values]) + .unwrap_or_default()) + } + Some(_) => Err("Only equality condition is allowed".to_string()), + None => Err("IP condition must be specified".to_string()), + } + } + + /// Hints to the enrichment table what data is going to be searched to allow it to index the + /// data in advance. + /// + /// # Errors + /// Errors if the fields are not in the table. + fn add_index(&mut self, _: Case, fields: &[&str]) -> Result { + match fields.len() { + 0 => Err("IP field is required".to_string()), + 1 => Ok(IndexHandle(0)), + _ => Err("Only one field is allowed".to_string()), + } + } + + /// Returns a list of the field names that are in each index + fn index_fields(&self) -> Vec<(Case, Vec)> { + Vec::new() + } + + /// Returns true if the underlying data has changed and the table needs reloading. + fn needs_reload(&self) -> bool { + matches!(fs::metadata(&self.config.path) + .and_then(|metadata| metadata.modified()), + Ok(modified) if modified > self.last_modified) + } +} + +impl std::fmt::Debug for Geoip { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "Geoip {} database {})", + self.config.locale, self.config.path + ) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn city_lookup() { + let values = find("2.125.160.216", "tests/data/GeoIP2-City-Test.mmdb").unwrap(); + + let mut expected = BTreeMap::::new(); + expected.insert("city_name".to_string(), "Boxford".into()); + expected.insert("country_code".to_string(), "GB".into()); + expected.insert("continent_code".to_string(), "EU".into()); + expected.insert("country_name".to_string(), "United Kingdom".into()); + expected.insert("region_code".to_string(), "WBK".into()); + expected.insert("region_name".to_string(), "West Berkshire".into()); + expected.insert("timezone".to_string(), "Europe/London".into()); + expected.insert("latitude".to_string(), Value::from(51.75)); + expected.insert("longitude".to_string(), Value::from(-1.25)); + expected.insert("postal_code".to_string(), "OX1".into()); + expected.insert("metro_code".to_string(), Value::Null); + + assert_eq!(values, expected); + } + + #[test] + fn city_partial_lookup() { + let values = find_select( + "2.125.160.216", + "tests/data/GeoIP2-City-Test.mmdb", + Some(&["latitude".to_string(), "longitude".to_string()]), + ) + .unwrap(); + + let mut expected = BTreeMap::::new(); + expected.insert("latitude".to_string(), Value::from(51.75)); + expected.insert("longitude".to_string(), Value::from(-1.25)); + + assert_eq!(values, expected); + } + + #[test] + fn city_lookup_partial_results() { + let values = find("67.43.156.9", "tests/data/GeoIP2-City-Test.mmdb").unwrap(); + + let mut expected = BTreeMap::::new(); + expected.insert("city_name".to_string(), Value::Null); + expected.insert("country_code".to_string(), "BT".into()); + expected.insert("country_name".to_string(), "Bhutan".into()); + expected.insert("continent_code".to_string(), "AS".into()); + expected.insert("region_code".to_string(), Value::Null); + expected.insert("region_name".to_string(), Value::Null); + expected.insert("timezone".to_string(), "Asia/Thimphu".into()); + expected.insert("latitude".to_string(), Value::from(27.5)); + expected.insert("longitude".to_string(), Value::from(90.5)); + expected.insert("postal_code".to_string(), Value::Null); + expected.insert("metro_code".to_string(), Value::Null); + + assert_eq!(values, expected); + } + + #[test] + fn city_lookup_no_results() { + let values = find("10.1.12.1", "tests/data/GeoIP2-City-Test.mmdb"); + + assert!(values.is_none()); + } + + #[test] + fn isp_lookup() { + let values = find("208.192.1.2", "tests/data/GeoIP2-ISP-Test.mmdb").unwrap(); + + let mut expected = BTreeMap::::new(); + expected.insert("autonomous_system_number".to_string(), 701i64.into()); + expected.insert( + "autonomous_system_organization".to_string(), + "MCI Communications Services, Inc. d/b/a Verizon Business".into(), + ); + expected.insert("isp".to_string(), "Verizon Business".into()); + expected.insert("organization".to_string(), "Verizon Business".into()); + + assert_eq!(values, expected); + } + + #[test] + fn isp_lookup_partial_results() { + let values = find("2600:7000::1", "tests/data/GeoLite2-ASN-Test.mmdb").unwrap(); + + let mut expected = BTreeMap::::new(); + expected.insert("autonomous_system_number".to_string(), 6939i64.into()); + expected.insert( + "autonomous_system_organization".to_string(), + "Hurricane Electric, Inc.".into(), + ); + expected.insert("isp".to_string(), Value::Null); + expected.insert("organization".to_string(), Value::Null); + + assert_eq!(values, expected); + } + + #[test] + fn isp_lookup_no_results() { + let values = find("10.1.12.1", "tests/data/GeoLite2-ASN-Test.mmdb"); + + assert!(values.is_none()); + } + + fn find(ip: &str, database: &str) -> Option> { + find_select(ip, database, None) + } + + fn find_select( + ip: &str, + database: &str, + select: Option<&[String]>, + ) -> Option> { + Geoip::new(GeoipConfig { + path: database.to_string(), + locale: default_locale(), + }) + .unwrap() + .find_table_rows( + Case::Insensitive, + &[Condition::Equals { + field: "ip", + value: ip.into(), + }], + select, + None, + ) + .unwrap() + .pop() + } +} diff --git a/src/enrichment_tables/mod.rs b/src/enrichment_tables/mod.rs index 7a21b417ee3d8..4ad76216e61fd 100644 --- a/src/enrichment_tables/mod.rs +++ b/src/enrichment_tables/mod.rs @@ -2,3 +2,6 @@ pub use enrichment::{Condition, IndexHandle, Table}; #[cfg(feature = "enrichment-tables-file")] pub mod file; + +#[cfg(feature = "enrichment-tables-geoip")] +pub mod geoip; diff --git a/website/cue/reference/configuration.cue b/website/cue/reference/configuration.cue index 01848d8d98d13..cacf7be33404d 100644 --- a/website/cue/reference/configuration.cue +++ b/website/cue/reference/configuration.cue @@ -41,8 +41,10 @@ configuration: { common: false description: """ Configuration options for an [enrichment table](\(urls.enrichment_tables_concept)) to be used in a - [`remap`](\(urls.vector_remap_transform)) transform. Currently, only [CSV](\(urls.csv)) files are - supported. + [`remap`](\(urls.vector_remap_transform)) transform. Currently supported are: + + * [CSV](\(urls.csv)) files + * [MaxMind](\(urls.maxmind)) databases For the lookup in the enrichment tables to be as performant as possible, the data is indexed according to the fields that are used in the search. Note that indices can only be created for fields for which an @@ -134,6 +136,57 @@ configuration: { } } } + type: object: options: { + geoip: { + required: true + description: """ + Configuration options for [MaxMind](\(urls.maxmind)) databases. + + The following [MaxMind](\(urls.maxmind)) databases are currently supported: + + * [GeoLite2-ASN.mmdb](\(urls.maxmind_geolite2_asn)) (free) — Determine the + autonomous system number and organization associated with an IP address. + * [GeoLite2-City.mmdb](\(urls.maxmind_geolite2_city)) (free) — Determine the + country, subdivisions, city, and postal code associated with IPv4 and IPv6 + addresses worldwide. + * [GeoIP2-City.mmdb](\(urls.maxmind_geoip2_city)) (paid) — Determine the country, + subdivisions, city, and postal code associated with IPv4 and IPv6 + addresses worldwide. + * [GeoIP2-ISP.mmdb](\(urls.maxmind_geoip2_isp)) (paid) — Determine the Internet + Service Provider (ISP), organization name, and autonomous system organization + and number associated with an IP address. + + The database file should be in the [MaxMind DB file format](\(urls.maxmind_db_file_format)). + + This enrichment table only supports lookup with IP address. + """ + type: object: options: { + path: { + description: """ + Path to the [MaxMind GeoIP2](\(urls.maxmind_geoip2)) or [GeoLite2 binary city + database](\(urls.maxmind_geolite2_city)) file (`GeoLite2-City.mmdb`). Other + databases, such as the country database, are not supported. + """ + required: true + type: string: { + examples: ["/path/to/GeoLite2-City.mmdb", "/path/to/GeoLite2-ISP.mmdb"] + } + } + locale: { + description: """ + The locale to use to lookup the country name and region name for the city database. + See [Locations Files](https://dev.maxmind.com/geoip/docs/databases/city-and-country?lang=en) + """ + required: false + common: false + type: string: { + default: "en" + examples: ["de", "en", "es", "fr", "ja", "pt-BR", "ru", "zh-CN"] + } + } + } + } + } } log_schema: { diff --git a/website/cue/reference/remap/functions.cue b/website/cue/reference/remap/functions.cue index 24623dcef3a18..d491c9873e1ca 100644 --- a/website/cue/reference/remap/functions.cue +++ b/website/cue/reference/remap/functions.cue @@ -58,10 +58,10 @@ remap: { // Reusable text _enrichment_table_explainer: """ - This condition needs to be a VRL object in which the key-value pairs indicate a field to - search mapped to a value to search in that field. This function returns the rows that match - the provided condition(s). _All_ fields need to match for rows to be returned; if any fields - don't match, no rows are returned. + For `file` enrichment tables this condition needs to be a VRL object in which + the key-value pairs indicate a field to search mapped to a value to search in that field. + This function returns the rows that match the provided condition(s). _All_ fields need to + match for rows to be returned; if any fields don't match, no rows are returned. There are currently two forms of search criteria: @@ -77,6 +77,31 @@ remap: { match criteria. We recommend using date ranges as the _only_ criteria when the enrichment data set is very small. + For `geoip` enrichment tables this condition needs to be a VRL object with a single key-value pair + whose value needs to be a valid IP address. Example: `{"ip": .ip }`. If a return field is expected + and without a value, `null` will be used. This table can return following fields: + + * ISP databases: + + * `autonomous_system_number` + * `autonomous_system_organization` + * `isp` + * `organization` + + * City databases: + + * `city_name` + * `continent_code` + * `country_code` + * `country_name` + * `region_code` + * `region_name` + * `metro_code` + * `latitude` + * `longitude` + * `postal_code` + * `timezone` + To use this function, you need to update your Vector configuration to include an [`enrichment_tables`](\(urls.vector_configuration_global)/#enrichment_tables)