Skip to content

Commit

Permalink
feat: support integration tests for influxql (apache#719)
Browse files Browse the repository at this point in the history
* feat: support integration tests for influxql

* update the testcases for influxql

* update cargo.lock

* revert issue-302 test case

* Update Cargo.toml

Co-authored-by: Jiacai Liu <dev@liujiacai.net>

* address CR

* fix wrong usage of sqlness arg

---------

Co-authored-by: Jiacai Liu <dev@liujiacai.net>
  • Loading branch information
ShiKaiWi and jiacai2050 authored Mar 10, 2023
1 parent 7640eec commit 3899404
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 8 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion integration_tests/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ ROOT = $(shell pwd)
DATA_DIR = /tmp/ceresdb

export CERESDB_BINARY_PATH ?= $(ROOT)/../target/$(MODE)/ceresdb-server
export CERESDB_SERVER_ENDPOINT ?= 127.0.0.1:8831
export CERESDB_SERVER_GRPC_ENDPOINT ?= 127.0.0.1:8831
export CERESDB_SERVER_HTTP_ENDPOINT ?= 127.0.0.1:5440
export CERESDB_CLUSTER_SERVER_ENDPOINT ?= 127.0.0.1:8832
export CERESDB_TEST_CASE_PATH ?= $(ROOT)/cases/env
export CERESDB_TEST_BINARY ?= $(ROOT)/../target/$(MODE)/ceresdb-test
Expand Down
40 changes: 40 additions & 0 deletions integration_tests/cases/env/local/influxql/basic.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
DROP TABLE IF EXISTS `h2o_feet`;

affected_rows: 0

CREATE TABLE `h2o_feet` (
`time` timestamp NOT NULL,
`level_description` string TAG,
`location` string TAG,
`water_level` double NOT NULL,
timestamp KEY (time)) ENGINE = Analytic WITH (
enable_ttl = 'false'
);

affected_rows: 0

INSERT INTO h2o_feet(time, level_description, location, water_level)
VALUES
(1439827200000, "between 6 and 9 feet", "coyote_creek", 8.12),
(1439827200000, "below 3 feet", "santa_monica", 2.064),
(1439827560000, "between 6 and 9 feet", "coyote_creek", 8.005),
(1439827560000, "below 3 feet", "santa_monica", 2.116),
(1439827620000, "between 6 and 9 feet", "coyote_creek", 7.887),
(1439827620000, "below 3 feet", "santa_monica", 2.028);

affected_rows: 6

-- SQLNESS ARG protocol=influxql
SELECT * FROM "h2o_feet";

{"rows":[{"level_description":"between 6 and 9 feet","location":"coyote_creek","water_level":8.12},{"level_description":"between 6 and 9 feet","location":"coyote_creek","water_level":8.005},{"level_description":"between 6 and 9 feet","location":"coyote_creek","water_level":7.887},{"level_description":"below 3 feet","location":"santa_monica","water_level":2.064},{"level_description":"below 3 feet","location":"santa_monica","water_level":2.116},{"level_description":"below 3 feet","location":"santa_monica","water_level":2.028}]}

-- SQLNESS ARG protocol=influxql
SELECT "level_description", location, water_level FROM "h2o_feet" where location = 'santa_monica';

{"rows":[{"level_description":"below 3 feet","location":"santa_monica","water_level":2.064},{"level_description":"below 3 feet","location":"santa_monica","water_level":2.116},{"level_description":"below 3 feet","location":"santa_monica","water_level":2.028}]}

DROP TABLE IF EXISTS `h2o_feet`;

affected_rows: 0

35 changes: 35 additions & 0 deletions integration_tests/cases/env/local/influxql/basic.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
DROP TABLE IF EXISTS `h2o_feet`;

CREATE TABLE `h2o_feet` (
`time` timestamp NOT NULL,
`level_description` string TAG,
`location` string TAG,
`water_level` double NOT NULL,
timestamp KEY (time)) ENGINE = Analytic WITH (
enable_ttl = 'false'
);

-- Insert Records:
-- ("2015-08-18T00:00:00Z", "between 6 and 9 feet", "coyote_creek", 8.12),
-- ("2015-08-18T00:00:00Z", "below 3 feet", "santa_monica", 2.064),
-- ("2015-08-18T00:06:00Z", "between 6 and 9 feet", "coyote_creek", 8.005),
-- ("2015-08-18T00:06:00Z", "below 3 feet", "santa_monica", 2.116),
-- ("2015-08-18T00:12:00Z", "between 6 and 9 feet", "coyote_creek", 7.887),
-- ("2015-08-18T00:12:00Z", "below 3 feet", "santa_monica", 2.028);
INSERT INTO h2o_feet(time, level_description, location, water_level)
VALUES
(1439827200000, "between 6 and 9 feet", "coyote_creek", 8.12),
(1439827200000, "below 3 feet", "santa_monica", 2.064),
(1439827560000, "between 6 and 9 feet", "coyote_creek", 8.005),
(1439827560000, "below 3 feet", "santa_monica", 2.116),
(1439827620000, "between 6 and 9 feet", "coyote_creek", 7.887),
(1439827620000, "below 3 feet", "santa_monica", 2.028);


-- SQLNESS ARG protocol=influxql
SELECT * FROM "h2o_feet";

-- SQLNESS ARG protocol=influxql
SELECT "level_description", location, water_level FROM "h2o_feet" where location = 'santa_monica';

DROP TABLE IF EXISTS `h2o_feet`;
107 changes: 100 additions & 7 deletions integration_tests/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use std::{
borrow::Cow,
collections::HashMap,
env,
fmt::Display,
fs::File,
Expand All @@ -16,6 +17,8 @@ use ceresdb_client::{
model::sql_query::{display::CsvFormatter, Request},
RpcContext,
};
use reqwest::ClientBuilder;
use serde::Serialize;
use sql::{
ast::{Statement, TableName},
parser::Parser,
Expand All @@ -24,7 +27,8 @@ use sqlness::{Database, QueryContext};
use sqlparser::ast::{SetExpr, Statement as SqlStatement, TableFactor};

const BINARY_PATH_ENV: &str = "CERESDB_BINARY_PATH";
const SERVER_ENDPOINT_ENV: &str = "CERESDB_SERVER_ENDPOINT";
const SERVER_GRPC_ENDPOINT_ENV: &str = "CERESDB_SERVER_GRPC_ENDPOINT";
const SERVER_HTTP_ENDPOINT_ENV: &str = "CERESDB_SERVER_HTTP_ENDPOINT";
const CLUSTER_SERVER_ENDPOINT_ENV: &str = "CERESDB_CLUSTER_SERVER_ENDPOINT";
const CERESDB_STDOUT_FILE: &str = "CERESDB_STDOUT_FILE";
const CERESDB_STDERR_FILE: &str = "CERESDB_STDERR_FILE";
Expand All @@ -35,15 +39,82 @@ pub enum DeployMode {
Cluster,
}

// Used to access CeresDB by http service.
#[derive(Clone)]
struct HttpClient {
client: reqwest::Client,
endpoint: String,
}

#[derive(Clone, Serialize)]
struct InfluxQLRequest {
query: String,
}

impl HttpClient {
fn new(endpoint: String) -> Self {
let client = ClientBuilder::new()
.build()
.expect("should succeed to build http client");
Self { client, endpoint }
}
}

pub struct CeresDB {
server_process: Option<Child>,
db_client: Arc<dyn DbClient>,
// FIXME: Currently, the new protocol does not support by the dbclient but is exposed by http
// service. And remove this client when the new protocol is supported by the dbclient.
http_client: Option<HttpClient>,
}

#[derive(Debug, Clone, Copy)]
enum Protocol {
Sql,
InfluxQL,
}

impl TryFrom<&str> for Protocol {
type Error = String;

fn try_from(s: &str) -> Result<Self, Self::Error> {
let protocol = match s {
"influxql" => Protocol::InfluxQL,
"sql" => Protocol::Sql,
_ => return Err(format!("unknown protocol:{s}")),
};

Ok(protocol)
}
}

struct ProtocolParser;

impl ProtocolParser {
fn parse_from_ctx(&self, ctx: &HashMap<String, String>) -> Result<Protocol, String> {
ctx.get("protocol")
.map(|s| Protocol::try_from(s.as_str()))
.unwrap_or(Ok(Protocol::Sql))
}
}

#[async_trait]
impl Database for CeresDB {
async fn query(&self, _context: QueryContext, query: String) -> Box<dyn Display> {
Self::execute(query, self.db_client.clone()).await
async fn query(&self, context: QueryContext, query: String) -> Box<dyn Display> {
let protocol = ProtocolParser
.parse_from_ctx(&context.context)
.expect("parse protocol");

match protocol {
Protocol::Sql => Self::execute_sql(query, self.db_client.clone()).await,
Protocol::InfluxQL => {
let http_client = self
.http_client
.clone()
.expect("http client is not initialized for execute influxql");
Self::execute_influxql(query, http_client).await
}
}
}
}

Expand All @@ -63,26 +134,31 @@ impl CeresDB {
let server_process = Self::start_standalone(stdout, stderr, bin, config);
// Wait for a while
std::thread::sleep(std::time::Duration::from_secs(5));
let endpoint = env::var(SERVER_ENDPOINT_ENV).unwrap_or_else(|_| {
panic!("Cannot read server endpoint from env {SERVER_ENDPOINT_ENV:?}")
let endpoint = env::var(SERVER_GRPC_ENDPOINT_ENV).unwrap_or_else(|_| {
panic!("Cannot read server endpoint from env {SERVER_GRPC_ENDPOINT_ENV:?}")
});
let db_client = Builder::new(endpoint, Mode::Proxy).build();
let http_endpoint = env::var(SERVER_HTTP_ENDPOINT_ENV).unwrap_or_else(|_| {
panic!("Cannot read server endpoint from env {SERVER_HTTP_ENDPOINT_ENV:?}")
});
CeresDB {
server_process: Some(server_process),
db_client,
http_client: Some(HttpClient::new(http_endpoint)),
}
}
DeployMode::Cluster => {
Self::start_cluster(stdout, stderr);
// Wait for a while
std::thread::sleep(std::time::Duration::from_secs(10));
let endpoint = env::var(CLUSTER_SERVER_ENDPOINT_ENV).unwrap_or_else(|_| {
panic!("Cannot read server endpoint from env {SERVER_ENDPOINT_ENV:?}")
panic!("Cannot read server endpoint from env {CLUSTER_SERVER_ENDPOINT_ENV:?}")
});
let db_client = Builder::new(endpoint, Mode::Proxy).build();
CeresDB {
server_process: None,
db_client,
http_client: None,
}
}
}
Expand All @@ -101,7 +177,24 @@ impl CeresDB {
}
}

async fn execute(query: String, client: Arc<dyn DbClient>) -> Box<dyn Display> {
async fn execute_influxql(query: String, http_client: HttpClient) -> Box<dyn Display> {
let url = format!("http://{}/influxql", http_client.endpoint);
let query_request = InfluxQLRequest { query };
let resp = http_client
.client
.post(url)
.json(&query_request)
.send()
.await
.unwrap();
let query_res = match resp.text().await {
Ok(text) => text,
Err(e) => format!("Failed to do influxql query, err:{e:?}"),
};
Box::new(query_res)
}

async fn execute_sql(query: String, client: Arc<dyn DbClient>) -> Box<dyn Display> {
let query_ctx = RpcContext {
database: Some("public".to_string()),
timeout: None,
Expand Down

0 comments on commit 3899404

Please sign in to comment.