Skip to content

Commit

Permalink
Fix Encoding/Escaping according to the InfluxDb Line-Protocol (#55)
Browse files Browse the repository at this point in the history
* Fix encoding/escaping according to the influxdb line protocol.

Co-authored-by: Gero Gerke <hello@gerogerke.de>
  • Loading branch information
Empty2k12 and Empty2k12 authored Mar 5, 2020
1 parent 789b388 commit 90f5f95
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 13 deletions.
6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ futures = "0.3.4"
reqwest = { version = "0.10.1", features = ["json"] }
serde = { version = "1.0.104", features = ["derive"], optional = true }
serde_json = { version = "1.0.46", optional = true }
regex = "1.3.4"
lazy_static = "1.4.0"

# This is a temporary work around to fix a Failure-derive compilation error
# Should be removed when https://github.com/Empty2k12/influxdb-rust/issues/48 is being done
quote = "=1.0.2"

[features]
use-serde = ["serde", "serde_json"]
Expand Down
88 changes: 88 additions & 0 deletions src/query/line_proto_term.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/// InfluxDB Line Protocol escaping helper module.
/// https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/
use crate::Type;
use lazy_static::lazy_static;
use regex::Regex;

lazy_static! {
pub static ref COMMAS_SPACES: Regex = Regex::new("[, ]").unwrap();
pub static ref COMMAS_SPACES_EQUALS: Regex = Regex::new("[, =]").unwrap();
pub static ref QUOTES_SLASHES: Regex = Regex::new(r#"["\\]"#).unwrap();
}

pub enum LineProtoTerm<'a> {
Measurement(&'a str), // escape commas, spaces
TagKey(&'a str), // escape commas, equals, spaces
TagValue(&'a str), // escape commas, equals, spaces
FieldKey(&'a str), // escape commas, equals, spaces
FieldValue(&'a Type), // escape quotes, backslashes + quote
}

impl LineProtoTerm<'_> {
pub fn escape(self) -> String {
use LineProtoTerm::*;
match self {
Measurement(x) => Self::escape_any(x, &*COMMAS_SPACES),
TagKey(x) | TagValue(x) | FieldKey(x) => Self::escape_any(x, &*COMMAS_SPACES_EQUALS),
FieldValue(x) => Self::escape_field_value(x),
}
}

fn escape_field_value(v: &Type) -> String {
use Type::*;
match v {
Boolean(v) => {
if *v {
"true"
} else {
"false"
}
}
.to_string(),
Float(v) => v.to_string(),
SignedInteger(v) => format!("{}i", v),
UnsignedInteger(v) => format!("{}i", v),
Text(v) => format!("\"{}\"", Self::escape_any(v, &*QUOTES_SLASHES)),
}
}

fn escape_any(s: &str, re: &Regex) -> String {
re.replace_all(s, r#"\$0"#).to_string()
}
}

#[cfg(test)]
mod test {
use crate::query::line_proto_term::LineProtoTerm::*;
use crate::Type;

#[test]
fn test() {
assert_eq!(Measurement(r#"wea", ther"#).escape(), r#"wea"\,\ ther"#);
assert_eq!(TagKey(r#"locat\ ,=ion"#).escape(), r#"locat\\ \,\=ion"#);

assert_eq!(FieldValue(&Type::Boolean(true)).escape(), r#"true"#);
assert_eq!(FieldValue(&Type::Boolean(false)).escape(), r#"false"#);

assert_eq!(FieldValue(&Type::Float(0.0)).escape(), r#"0"#);
assert_eq!(FieldValue(&Type::Float(-0.1)).escape(), r#"-0.1"#);

assert_eq!(FieldValue(&Type::SignedInteger(0)).escape(), r#"0i"#);
assert_eq!(FieldValue(&Type::SignedInteger(83)).escape(), r#"83i"#);

assert_eq!(FieldValue(&Type::Text("".into())).escape(), r#""""#);
assert_eq!(FieldValue(&Type::Text("0".into())).escape(), r#""0""#);
assert_eq!(FieldValue(&Type::Text("\"".into())).escape(), r#""\"""#);
assert_eq!(
FieldValue(&Type::Text(r#"locat"\ ,=ion"#.into())).escape(),
r#""locat\"\\ ,=ion""#
);
}

#[test]
fn test_empty_tag_value() {
// InfluxDB doesn't support empty tag values. But that's a job
// of a calling site to validate an entire write request.
assert_eq!(TagValue("").escape(), "");
}
}
1 change: 1 addition & 0 deletions src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use std::convert::TryInto;

#[cfg(feature = "chrono_timestamps")]
pub mod consts;
mod line_proto_term;
pub mod read_query;
pub mod write_query;
use std::fmt;
Expand Down
57 changes: 44 additions & 13 deletions src/query/write_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,26 @@
//!
//! Can only be instantiated by using Query::write_query
use crate::query::line_proto_term::LineProtoTerm;
use crate::query::{QueryType, ValidQuery};
use crate::{Error, Query, Timestamp};
use std::fmt::{Display, Formatter};

// todo: batch write queries

pub trait WriteField {
fn add_to_fields(self, tag: String, fields: &mut Vec<(String, String)>);
fn add_to_fields(self, tag: String, fields: &mut Vec<(String, Type)>);
}

impl<T: Into<Type>> WriteField for T {
fn add_to_fields(self, tag: String, fields: &mut Vec<(String, String)>) {
fn add_to_fields(self, tag: String, fields: &mut Vec<(String, Type)>) {
let val: Type = self.into();
fields.push((tag, val.to_string()));
fields.push((tag, val));
}
}

impl<T: Into<Type>> WriteField for Option<T> {
fn add_to_fields(self, tag: String, fields: &mut Vec<(String, String)>) {
fn add_to_fields(self, tag: String, fields: &mut Vec<(String, Type)>) {
if let Some(val) = self {
val.add_to_fields(tag, fields);
}
Expand All @@ -29,7 +30,7 @@ impl<T: Into<Type>> WriteField for Option<T> {

/// Internal Representation of a Write query that has not yet been built
pub struct WriteQuery {
fields: Vec<(String, String)>,
fields: Vec<(String, Type)>,
tags: Vec<(String, String)>,
measurement: String,
timestamp: Timestamp,
Expand Down Expand Up @@ -121,7 +122,7 @@ impl Display for Type {
Float(x) => write!(f, "{}", x),
SignedInteger(x) => write!(f, "{}", x),
UnsignedInteger(x) => write!(f, "{}", x),
Text(text) => write!(f, "\"{text}\"", text = text),
Text(text) => write!(f, "{text}", text = text),
}
}
}
Expand Down Expand Up @@ -159,22 +160,35 @@ impl Query for WriteQuery {
let mut tags = self
.tags
.iter()
.map(|(tag, value)| format!("{tag}={value}", tag = tag, value = value))
.map(|(tag, value)| {
format!(
"{tag}={value}",
tag = LineProtoTerm::TagKey(tag).escape(),
value = LineProtoTerm::TagValue(value).escape(),
)
})
.collect::<Vec<String>>()
.join(",");

if !tags.is_empty() {
tags.insert_str(0, ",");
}
let fields = self
.fields
.iter()
.map(|(field, value)| format!("{field}={value}", field = field, value = value))
.map(|(field, value)| {
format!(
"{field}={value}",
field = LineProtoTerm::FieldKey(field).escape(),
value = LineProtoTerm::FieldValue(value).escape(),
)
})
.collect::<Vec<String>>()
.join(",");

Ok(ValidQuery(format!(
"{measurement}{tags} {fields}{time}",
measurement = self.measurement,
measurement = LineProtoTerm::Measurement(&self.measurement).escape(),
tags = tags,
fields = fields,
time = match self.timestamp {
Expand Down Expand Up @@ -207,7 +221,7 @@ mod tests {
.build();

assert!(query.is_ok(), "Query was empty");
assert_eq!(query.unwrap(), "weather temperature=82 11");
assert_eq!(query.unwrap(), "weather temperature=82i 11");
}

#[test]
Expand All @@ -220,7 +234,7 @@ mod tests {
assert!(query.is_ok(), "Query was empty");
assert_eq!(
query.unwrap(),
"weather temperature=82,wind_strength=3.7 11"
"weather temperature=82i,wind_strength=3.7 11"
);
}

Expand All @@ -232,7 +246,7 @@ mod tests {
.build();

assert!(query.is_ok(), "Query was empty");
assert_eq!(query.unwrap(), "weather temperature=82 11");
assert_eq!(query.unwrap(), "weather temperature=82i 11");
}

#[test]
Expand All @@ -255,7 +269,7 @@ mod tests {
assert!(query.is_ok(), "Query was empty");
assert_eq!(
query.unwrap(),
"weather,location=\"us-midwest\",season=\"summer\" temperature=82 11"
"weather,location=us-midwest,season=summer temperature=82i 11"
);
}

Expand All @@ -270,4 +284,21 @@ mod tests {

assert_eq!(query.get_type(), QueryType::WriteQuery);
}

#[test]
fn test_escaping() {
let query = Query::write_query(Timestamp::Hours(11), "wea, ther=")
.add_field("temperature", 82)
.add_field("\"temp=era,t ure\"", r#"too"\\hot"#)
.add_field("float", 82.0)
.add_tag("location", "us-midwest")
.add_tag("loc, =\"ation", "us, \"mid=west\"")
.build();

assert!(query.is_ok(), "Query was empty");
assert_eq!(
query.unwrap().get(),
r#"wea\,\ ther=,location=us-midwest,loc\,\ \="ation=us\,\ "mid\=west" temperature=82i,"temp\=era\,t\ ure"="too\"\\\\hot",float=82 11"#
);
}
}

0 comments on commit 90f5f95

Please sign in to comment.