diff --git a/Cargo.toml b/Cargo.toml index cebc081..3608ebf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,12 @@ futures = "0.3.4" reqwest = { version = "0.10.1", features = ["json"] } serde = { version = "1.0.104", features = ["derive"], optional = true } serde_json = { version = "1.0.46", optional = true } +regex = "1.3.4" +lazy_static = "1.4.0" + +# This is a temporary work around to fix a Failure-derive compilation error +# Should be removed when https://github.com/Empty2k12/influxdb-rust/issues/48 is being done +quote = "=1.0.2" [features] use-serde = ["serde", "serde_json"] diff --git a/src/query/line_proto_term.rs b/src/query/line_proto_term.rs new file mode 100644 index 0000000..f9e4d1a --- /dev/null +++ b/src/query/line_proto_term.rs @@ -0,0 +1,88 @@ +/// InfluxDB Line Protocol escaping helper module. +/// https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/ +use crate::Type; +use lazy_static::lazy_static; +use regex::Regex; + +lazy_static! { + pub static ref COMMAS_SPACES: Regex = Regex::new("[, ]").unwrap(); + pub static ref COMMAS_SPACES_EQUALS: Regex = Regex::new("[, =]").unwrap(); + pub static ref QUOTES_SLASHES: Regex = Regex::new(r#"["\\]"#).unwrap(); +} + +pub enum LineProtoTerm<'a> { + Measurement(&'a str), // escape commas, spaces + TagKey(&'a str), // escape commas, equals, spaces + TagValue(&'a str), // escape commas, equals, spaces + FieldKey(&'a str), // escape commas, equals, spaces + FieldValue(&'a Type), // escape quotes, backslashes + quote +} + +impl LineProtoTerm<'_> { + pub fn escape(self) -> String { + use LineProtoTerm::*; + match self { + Measurement(x) => Self::escape_any(x, &*COMMAS_SPACES), + TagKey(x) | TagValue(x) | FieldKey(x) => Self::escape_any(x, &*COMMAS_SPACES_EQUALS), + FieldValue(x) => Self::escape_field_value(x), + } + } + + fn escape_field_value(v: &Type) -> String { + use Type::*; + match v { + Boolean(v) => { + if *v { + "true" + } else { + "false" + } + } + .to_string(), + Float(v) => v.to_string(), + SignedInteger(v) => format!("{}i", v), + UnsignedInteger(v) => format!("{}i", v), + Text(v) => format!("\"{}\"", Self::escape_any(v, &*QUOTES_SLASHES)), + } + } + + fn escape_any(s: &str, re: &Regex) -> String { + re.replace_all(s, r#"\$0"#).to_string() + } +} + +#[cfg(test)] +mod test { + use crate::query::line_proto_term::LineProtoTerm::*; + use crate::Type; + + #[test] + fn test() { + assert_eq!(Measurement(r#"wea", ther"#).escape(), r#"wea"\,\ ther"#); + assert_eq!(TagKey(r#"locat\ ,=ion"#).escape(), r#"locat\\ \,\=ion"#); + + assert_eq!(FieldValue(&Type::Boolean(true)).escape(), r#"true"#); + assert_eq!(FieldValue(&Type::Boolean(false)).escape(), r#"false"#); + + assert_eq!(FieldValue(&Type::Float(0.0)).escape(), r#"0"#); + assert_eq!(FieldValue(&Type::Float(-0.1)).escape(), r#"-0.1"#); + + assert_eq!(FieldValue(&Type::SignedInteger(0)).escape(), r#"0i"#); + assert_eq!(FieldValue(&Type::SignedInteger(83)).escape(), r#"83i"#); + + assert_eq!(FieldValue(&Type::Text("".into())).escape(), r#""""#); + assert_eq!(FieldValue(&Type::Text("0".into())).escape(), r#""0""#); + assert_eq!(FieldValue(&Type::Text("\"".into())).escape(), r#""\"""#); + assert_eq!( + FieldValue(&Type::Text(r#"locat"\ ,=ion"#.into())).escape(), + r#""locat\"\\ ,=ion""# + ); + } + + #[test] + fn test_empty_tag_value() { + // InfluxDB doesn't support empty tag values. But that's a job + // of a calling site to validate an entire write request. + assert_eq!(TagValue("").escape(), ""); + } +} diff --git a/src/query/mod.rs b/src/query/mod.rs index ae6a64d..3dede5a 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -29,6 +29,7 @@ use std::convert::TryInto; #[cfg(feature = "chrono_timestamps")] pub mod consts; +mod line_proto_term; pub mod read_query; pub mod write_query; use std::fmt; diff --git a/src/query/write_query.rs b/src/query/write_query.rs index f664832..74bef11 100644 --- a/src/query/write_query.rs +++ b/src/query/write_query.rs @@ -2,6 +2,7 @@ //! //! Can only be instantiated by using Query::write_query +use crate::query::line_proto_term::LineProtoTerm; use crate::query::{QueryType, ValidQuery}; use crate::{Error, Query, Timestamp}; use std::fmt::{Display, Formatter}; @@ -9,18 +10,18 @@ use std::fmt::{Display, Formatter}; // todo: batch write queries pub trait WriteField { - fn add_to_fields(self, tag: String, fields: &mut Vec<(String, String)>); + fn add_to_fields(self, tag: String, fields: &mut Vec<(String, Type)>); } impl> WriteField for T { - fn add_to_fields(self, tag: String, fields: &mut Vec<(String, String)>) { + fn add_to_fields(self, tag: String, fields: &mut Vec<(String, Type)>) { let val: Type = self.into(); - fields.push((tag, val.to_string())); + fields.push((tag, val)); } } impl> WriteField for Option { - fn add_to_fields(self, tag: String, fields: &mut Vec<(String, String)>) { + fn add_to_fields(self, tag: String, fields: &mut Vec<(String, Type)>) { if let Some(val) = self { val.add_to_fields(tag, fields); } @@ -29,7 +30,7 @@ impl> WriteField for Option { /// Internal Representation of a Write query that has not yet been built pub struct WriteQuery { - fields: Vec<(String, String)>, + fields: Vec<(String, Type)>, tags: Vec<(String, String)>, measurement: String, timestamp: Timestamp, @@ -121,7 +122,7 @@ impl Display for Type { Float(x) => write!(f, "{}", x), SignedInteger(x) => write!(f, "{}", x), UnsignedInteger(x) => write!(f, "{}", x), - Text(text) => write!(f, "\"{text}\"", text = text), + Text(text) => write!(f, "{text}", text = text), } } } @@ -159,22 +160,35 @@ impl Query for WriteQuery { let mut tags = self .tags .iter() - .map(|(tag, value)| format!("{tag}={value}", tag = tag, value = value)) + .map(|(tag, value)| { + format!( + "{tag}={value}", + tag = LineProtoTerm::TagKey(tag).escape(), + value = LineProtoTerm::TagValue(value).escape(), + ) + }) .collect::>() .join(","); + if !tags.is_empty() { tags.insert_str(0, ","); } let fields = self .fields .iter() - .map(|(field, value)| format!("{field}={value}", field = field, value = value)) + .map(|(field, value)| { + format!( + "{field}={value}", + field = LineProtoTerm::FieldKey(field).escape(), + value = LineProtoTerm::FieldValue(value).escape(), + ) + }) .collect::>() .join(","); Ok(ValidQuery(format!( "{measurement}{tags} {fields}{time}", - measurement = self.measurement, + measurement = LineProtoTerm::Measurement(&self.measurement).escape(), tags = tags, fields = fields, time = match self.timestamp { @@ -207,7 +221,7 @@ mod tests { .build(); assert!(query.is_ok(), "Query was empty"); - assert_eq!(query.unwrap(), "weather temperature=82 11"); + assert_eq!(query.unwrap(), "weather temperature=82i 11"); } #[test] @@ -220,7 +234,7 @@ mod tests { assert!(query.is_ok(), "Query was empty"); assert_eq!( query.unwrap(), - "weather temperature=82,wind_strength=3.7 11" + "weather temperature=82i,wind_strength=3.7 11" ); } @@ -232,7 +246,7 @@ mod tests { .build(); assert!(query.is_ok(), "Query was empty"); - assert_eq!(query.unwrap(), "weather temperature=82 11"); + assert_eq!(query.unwrap(), "weather temperature=82i 11"); } #[test] @@ -255,7 +269,7 @@ mod tests { assert!(query.is_ok(), "Query was empty"); assert_eq!( query.unwrap(), - "weather,location=\"us-midwest\",season=\"summer\" temperature=82 11" + "weather,location=us-midwest,season=summer temperature=82i 11" ); } @@ -270,4 +284,21 @@ mod tests { assert_eq!(query.get_type(), QueryType::WriteQuery); } + + #[test] + fn test_escaping() { + let query = Query::write_query(Timestamp::Hours(11), "wea, ther=") + .add_field("temperature", 82) + .add_field("\"temp=era,t ure\"", r#"too"\\hot"#) + .add_field("float", 82.0) + .add_tag("location", "us-midwest") + .add_tag("loc, =\"ation", "us, \"mid=west\"") + .build(); + + assert!(query.is_ok(), "Query was empty"); + assert_eq!( + query.unwrap().get(), + r#"wea\,\ ther=,location=us-midwest,loc\,\ \="ation=us\,\ "mid\=west" temperature=82i,"temp\=era\,t\ ure"="too\"\\\\hot",float=82 11"# + ); + } }