Skip to content

Commit

Permalink
Batch write support (#87)
Browse files Browse the repository at this point in the history
* (feat) add support for batching writes

* (refactor) hold precision in WriteQuery type

* (test) fix test for new query type enum

* (test) add test for batch query

* (chore) stick to influxdb 1.8 for integration test

* (fix) fix lint warnings
  • Loading branch information
sunng87 authored Mar 6, 2021
1 parent 7931c89 commit 1950876
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 64 deletions.
34 changes: 17 additions & 17 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ jobs:
runs-on: ubuntu-latest
services:
influxdb:
image: influxdb
image: influxdb:1.8
ports:
- 8086:8086
authed_influxdb:
image: influxdb
image: influxdb:1.8
ports:
- 9086:8086
env:
Expand All @@ -68,17 +68,17 @@ jobs:
- uses: actions/checkout@v1
- uses: dtolnay/rust-toolchain@stable
- run: cargo test --package influxdb --package influxdb_derive --all-features --no-fail-fast

coverage:
name: Code Coverage (stable/ubuntu-20.04)
runs-on: ubuntu-20.04
services:
influxdb:
image: influxdb
image: influxdb:1.8
ports:
- 8086:8086
authed_influxdb:
image: influxdb
image: influxdb:1.8
ports:
- 9086:8086
env:
Expand All @@ -87,19 +87,19 @@ jobs:
INFLUXDB_ADMIN_PASSWORD: password
INFLUXDB_USER: nopriv_user
INFLUXDB_USER_PASSWORD: password

steps:
- uses: actions/checkout@v2
- uses: dtolnay/rust-toolchain@stable

- name: Get Rust Version
id: rust-version
run: echo "::set-output name=VERSION::$(cargo -V | head -n1 | awk '{print $2}')"

- name: Get Tarpaulin Version
id: tarpaulin-version
run: echo "::set-output name=VERSION::$(wget -qO- 'https://api.github.com/repos/xd009642/tarpaulin/releases/latest' | jq -r '.tag_name')"

- uses: actions/cache@v2
with:
path: |
Expand All @@ -108,12 +108,12 @@ jobs:
~/.cargo/registry
target
key: ${{ runner.os }}-cargo-${{ steps.rust-version.outputs.VERSION }}-tarpaulin-${{ steps.tarpaulin-version.outputs.VERSION }} }}

- name: Install Tarpaulin
run: |
ls -lh ~/.cargo/bin
test -e ~/.cargo/bin/cargo-tarpaulin || cargo install cargo-tarpaulin --version ${{ steps.tarpaulin-version.outputs.VERSION }}
- name: Run Tarpaulin coverage tests
run: |
cargo tarpaulin -v \
Expand All @@ -127,14 +127,14 @@ jobs:
env:
RUST_BACKTRACE: 1
RUST_LOG: info

- uses: actions/upload-artifact@v2
with:
name: tarpaulin-report
path: |
tarpaulin-report.json
tarpaulin-report.html
pages:
runs-on: ubuntu-20.04
needs:
Expand All @@ -144,19 +144,19 @@ jobs:
- uses: actions/checkout@v2
with:
ref: gh-pages

- uses: actions/download-artifact@v2
with:
name: tarpaulin-report

- run: |
coverage=$(jq '.files | { covered: map(.covered) | add, coverable: map(.coverable) | add } | .covered / .coverable * 10000 | round | . / 100' tarpaulin-report.json)
color=$([[ $coverage < 80 ]] && printf yellow || printf brightgreen)
wget -qO coverage.svg "https://img.shields.io/badge/coverage-$coverage%25-$color"
git add coverage.svg tarpaulin-report.html
git status
- uses: stefanzweifel/git-auto-commit-action@v4
with:
commit_message: "GitHub Pages for ${{ github.sha }}"
11 changes: 5 additions & 6 deletions influxdb/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use futures::prelude::*;
use surf::{self, Client as SurfClient, StatusCode};

use crate::query::QueryTypes;
use crate::query::QueryType;
use crate::Error;
use crate::Query;
use std::collections::HashMap;
Expand Down Expand Up @@ -159,14 +159,13 @@ impl Client {
pub async fn query<'q, Q>(&self, q: &'q Q) -> Result<String, Error>
where
Q: Query,
&'q Q: Into<QueryTypes<'q>>,
{
let query = q.build().map_err(|err| Error::InvalidQueryError {
error: err.to_string(),
})?;

let request_builder = match q.into() {
QueryTypes::Read(_) => {
let request_builder = match q.get_type() {
QueryType::ReadQuery => {
let read_query = query.get();
let url = &format!("{}/query", &self.url);
let mut parameters = self.parameters.as_ref().clone();
Expand All @@ -178,10 +177,10 @@ impl Client {
self.client.post(url).query(&parameters)
}
}
QueryTypes::Write(write_query) => {
QueryType::WriteQuery(precision) => {
let url = &format!("{}/write", &self.url);
let mut parameters = self.parameters.as_ref().clone();
parameters.insert("precision", write_query.get_precision());
parameters.insert("precision", precision);

self.client.post(url).body(query.get()).query(&parameters)
}
Expand Down
8 changes: 4 additions & 4 deletions influxdb/src/integrations/serde_integration/de.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ where
Name,
Columns,
Values,
};
}

struct SeriesVisitor<T> {
_inner_type: PhantomData<T>,
};
}

impl<'de, T> Visitor<'de> for SeriesVisitor<T>
where
Expand Down Expand Up @@ -115,12 +115,12 @@ where
Tags,
Columns,
Values,
};
}

struct SeriesVisitor<TAG, T> {
_tag_type: PhantomData<TAG>,
_value_type: PhantomData<T>,
};
}

impl<'de, TAG, T> Visitor<'de> for SeriesVisitor<TAG, T>
where
Expand Down
2 changes: 1 addition & 1 deletion influxdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ pub use error::Error;
pub use query::{
read_query::ReadQuery,
write_query::{Type, WriteQuery},
InfluxDbWriteable, Query, QueryType, QueryTypes, Timestamp, ValidQuery,
InfluxDbWriteable, Query, QueryType, Timestamp, ValidQuery,
};

#[cfg(feature = "use-serde")]
Expand Down
27 changes: 5 additions & 22 deletions influxdb/src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ impl fmt::Display for Timestamp {
}
}

impl Into<DateTime<Utc>> for Timestamp {
fn into(self) -> DateTime<Utc> {
match self {
impl From<Timestamp> for DateTime<Utc> {
fn from(ts: Timestamp) -> DateTime<Utc> {
match ts {
Timestamp::Hours(h) => {
let nanos =
h * MINUTES_PER_HOUR * SECONDS_PER_MINUTE * MILLIS_PER_SECOND * NANOS_PER_MILLI;
Expand Down Expand Up @@ -93,24 +93,6 @@ where
}
}

/// Internal enum used to represent either type of query.
pub enum QueryTypes<'a> {
Read(&'a ReadQuery),
Write(&'a WriteQuery),
}

impl<'a> From<&'a ReadQuery> for QueryTypes<'a> {
fn from(query: &'a ReadQuery) -> Self {
Self::Read(query)
}
}

impl<'a> From<&'a WriteQuery> for QueryTypes<'a> {
fn from(query: &'a WriteQuery) -> Self {
Self::Write(query)
}
}

pub trait Query {
/// Builds valid InfluxSQL which can be run against the Database.
/// In case no fields have been specified, it will return an error,
Expand Down Expand Up @@ -192,7 +174,8 @@ impl PartialEq<&str> for ValidQuery {
#[derive(PartialEq, Debug)]
pub enum QueryType {
ReadQuery,
WriteQuery,
/// write query with precision
WriteQuery(String),
}

#[cfg(test)]
Expand Down
47 changes: 45 additions & 2 deletions influxdb/src/query/write_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,29 @@ impl Query for WriteQuery {
}

fn get_type(&self) -> QueryType {
QueryType::WriteQuery
QueryType::WriteQuery(self.get_precision())
}
}

impl Query for Vec<WriteQuery> {
fn build(&self) -> Result<ValidQuery, Error> {
let mut qlines = Vec::new();

for q in self {
let valid_query = q.build()?;
qlines.push(valid_query.0);
}

Ok(ValidQuery(qlines.join("\n")))
}

fn get_type(&self) -> QueryType {
QueryType::WriteQuery(
self.get(0)
.map(|q| q.get_precision())
// use "ms" as placeholder if query is empty
.unwrap_or_else(|| "ms".to_owned()),
)
}
}

Expand Down Expand Up @@ -296,7 +318,7 @@ mod tests {
.add_tag("location", "us-midwest")
.add_tag("season", "summer");

assert_eq!(query.get_type(), QueryType::WriteQuery);
assert_eq!(query.get_type(), QueryType::WriteQuery("h".to_owned()));
}

#[test]
Expand All @@ -318,4 +340,25 @@ mod tests {
r#"wea\,\ ther=,location=us-midwest,loc\,\ \="ation=us\,\ \"mid\=west temperature=82i,"temp\=era\,t\ ure"="too\"\\\\hot",float=82 11"#
);
}

#[test]
fn test_batch() {
let q0 = Timestamp::Hours(11)
.into_query("weather")
.add_field("temperature", 82)
.add_tag("location", "us-midwest");

let q1 = Timestamp::Hours(12)
.into_query("weather")
.add_field("temperature", 65)
.add_tag("location", "us-midwest");

let query = vec![q0, q1].build();

assert_eq!(
query.unwrap().get(),
r#"weather,location=us-midwest temperature=82i 11
weather,location=us-midwest temperature=65i 12"#
);
}
}
24 changes: 12 additions & 12 deletions influxdb/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ async fn test_connection_error() {
assert_result_err(&read_result);
match read_result {
Err(Error::ConnectionError { .. }) => {}
_ => panic!(format!(
_ => panic!(
"Should cause a ConnectionError: {}",
read_result.unwrap_err()
)),
),
}
}

Expand Down Expand Up @@ -139,21 +139,21 @@ async fn test_wrong_authed_write_and_read() {
assert_result_err(&write_result);
match write_result {
Err(Error::AuthorizationError) => {}
_ => panic!(format!(
_ => panic!(
"Should be an AuthorizationError: {}",
write_result.unwrap_err()
)),
),
}

let read_query = Query::raw_read_query("SELECT * FROM weather");
let read_result = client.query(&read_query).await;
assert_result_err(&read_result);
match read_result {
Err(Error::AuthorizationError) => {}
_ => panic!(format!(
_ => panic!(
"Should be an AuthorizationError: {}",
read_result.unwrap_err()
)),
),
}

let client = Client::new("http://localhost:9086", TEST_NAME)
Expand All @@ -163,10 +163,10 @@ async fn test_wrong_authed_write_and_read() {
assert_result_err(&read_result);
match read_result {
Err(Error::AuthenticationError) => {}
_ => panic!(format!(
_ => panic!(
"Should be an AuthenticationError: {}",
read_result.unwrap_err()
)),
),
}
},
|| async move {
Expand Down Expand Up @@ -207,21 +207,21 @@ async fn test_non_authed_write_and_read() {
assert_result_err(&write_result);
match write_result {
Err(Error::AuthorizationError) => {}
_ => panic!(format!(
_ => panic!(
"Should be an AuthorizationError: {}",
write_result.unwrap_err()
)),
),
}

let read_query = Query::raw_read_query("SELECT * FROM weather");
let read_result = non_authed_client.query(&read_query).await;
assert_result_err(&read_result);
match read_result {
Err(Error::AuthorizationError) => {}
_ => panic!(format!(
_ => panic!(
"Should be an AuthorizationError: {}",
read_result.unwrap_err()
)),
),
}
},
|| async move {
Expand Down

0 comments on commit 1950876

Please sign in to comment.