Skip to content

Commit

Permalink
add tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachelint committed Mar 7, 2023
1 parent 273fd88 commit cc210ad
Show file tree
Hide file tree
Showing 7 changed files with 239 additions and 42 deletions.
9 changes: 6 additions & 3 deletions server/src/grpc/storage_service/prom_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,12 @@ use crate::grpc::storage_service::{
};

fn is_table_not_found_error(e: &FrontendError) -> bool {
matches!(&e, FrontendError::CreatePlan { source }
if matches!(source, sql::planner::Error::BuildPromPlanError { source }
if matches!(source, sql::promql::Error::TableNotFound { .. })))
matches!(&e,
FrontendError::CreatePlan {
source,
.. }
if matches!(source, sql::planner::Error::BuildPromPlanError { source }
if matches!(source, sql::promql::Error::TableNotFound { .. })))
}

pub async fn handle_query<Q>(
Expand Down
4 changes: 3 additions & 1 deletion sql/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ impl<P: MetaProvider> Frontend<P> {

planner
.write_req_to_plan(schema_config, write_table)
.context(CreatePlan)
.context(CreatePlan {
plan_type: "internal write",
})
}
}
23 changes: 19 additions & 4 deletions sql/src/influxql/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.

pub mod planner;
mod stmt_rewriter;
pub(crate) mod util;
Expand All @@ -12,20 +14,33 @@ pub mod error {
Unimplemented { msg: String },

#[snafu(display(
"Rewrite influxql from clause with cause, msg:{}, source:{}",
"Failed to rewrite influxql from statement with cause, msg:{}, source:{}",
msg,
source
))]
RewriteFromWithCause { msg: String, source: GenericError },

#[snafu(display("Rewrite influxql from clause no cause, msg:{}", msg))]
#[snafu(display("Failed to rewrite influxql from statement no cause, msg:{}", msg))]
RewriteFromNoCause { msg: String },

#[snafu(display("Unimplemented influxql select fields with cause, source: {}", source))]
#[snafu(display(
"Failed to rewrite influxql projection statement with cause, msg:{}, source: {}",
msg,
source
))]
RewriteFieldsWithCause { msg: String, source: GenericError },

#[snafu(display("Unimplemented influxql select fields no cause, msg: {}", msg))]
#[snafu(display(
"Failed to rewrite influxql projection statement no cause, msg: {}",
msg
))]
RewriteFieldsNoCause { msg: String },

#[snafu(display("Failed to find table with case, source:{}", source))]
FindTableWithCause { source: GenericError },

#[snafu(display("Failed to find table no case, msg: {}", msg))]
FindTableNoCause { msg: String },
}

define_result!(Error);
Expand Down
27 changes: 21 additions & 6 deletions sql/src/influxql/planner.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.

use common_util::error::BoxError;
use influxdb_influxql_parser::statement::Statement as InfluxqlStatement;
use snafu::ResultExt;
use table_engine::table::TableRef;

use super::stmt_rewriter::StmtRewriter;
use crate::{influxql::error::*, plan::Plan, provider::MetaProvider};

#[allow(dead_code)]
pub(crate) struct Planner<'a, P: MetaProvider> {
sql_planner: crate::planner::PlannerDelegate<'a, P>,
}
Expand All @@ -27,11 +32,21 @@ impl<'a, P: MetaProvider> Planner<'a, P> {
InfluxqlStatement::Explain(_) => todo!(),
}
}
}

pub trait MeasurementProvider {
fn measurement(&self, measurement_name: &str) -> Result<Option<TableRef>>;
}

pub(crate) struct MeasurementProviderImpl<'a, P: MetaProvider>(
crate::planner::PlannerDelegate<'a, P>,
);

fn rewrite_stmt(&self, stmt: InfluxqlStatement) -> Result<InfluxqlStatement> {
let mut stmt = stmt;
let stmt_rewriter = StmtRewriter::new(&self.sql_planner);
// stmt_rewriter.rewrite_from(&mut stmt)?;
todo!()
impl<'a, P: MetaProvider> MeasurementProvider for MeasurementProviderImpl<'a, P> {
fn measurement(&self, measurement_name: &str) -> Result<Option<TableRef>> {
self.0
.find_table(measurement_name)
.box_err()
.context(FindTableWithCause)
}
}
164 changes: 137 additions & 27 deletions sql/src/influxql/stmt_rewriter.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use std::{collections::HashSet, ops::ControlFlow};
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.

use std::{collections::BTreeSet, ops::ControlFlow};

use common_util::error::BoxError;
use influxdb_influxql_parser::{
Expand All @@ -13,20 +15,32 @@ use influxdb_influxql_parser::{
use itertools::{Either, Itertools};
use snafu::{ensure, OptionExt, ResultExt};

use super::util;
use crate::{influxql::error::*, provider::MetaProvider};
use super::{planner::MeasurementProvider, util};
use crate::influxql::error::*;

pub(crate) struct StmtRewriter<'a, P: MetaProvider> {
sql_planner: &'a crate::planner::PlannerDelegate<'a, P>,
/// Rewriter for the influxql statement
///
/// It will rewrite statement before converting it to sql statement.
// Partial copy from influxdb_iox.
pub(crate) struct StmtRewriter<'a> {
measurement_provider: &'a dyn MeasurementProvider,
}

// Partial copy from influxdb_iox.
impl<'a, P: MetaProvider> StmtRewriter<'a, P> {
pub fn new(sql_planner: &'a crate::planner::PlannerDelegate<'a, P>) -> Self {
Self { sql_planner }
impl<'a> StmtRewriter<'a> {
#[allow(dead_code)]
pub fn new(measurement_provider: &'a dyn MeasurementProvider) -> Self {
Self {
measurement_provider,
}
}

pub fn rewrite_from(&self, stmt: &mut SelectStatement) -> Result<()> {
#[allow(dead_code)]
pub fn rewrite(&self, stmt: &mut SelectStatement) -> Result<()> {
self.rewrite_from(stmt)?;
self.rewrite_field_list(stmt)
}

fn rewrite_from(&self, stmt: &mut SelectStatement) -> Result<()> {
let mut new_from = Vec::new();
for ms in stmt.from.iter() {
match ms {
Expand All @@ -35,11 +49,13 @@ impl<'a, P: MetaProvider> StmtRewriter<'a, P> {
name: MeasurementName::Name(name),
..
} => {
let table = self.sql_planner.find_table(name).box_err().context(
RewriteFromWithCause {
let table = self
.measurement_provider
.measurement(name)
.box_err()
.context(RewriteFromWithCause {
msg: format!("measurement not found, measurement:{name}"),
},
)?;
})?;
if table.is_some() {
new_from.push(ms.clone())
}
Expand Down Expand Up @@ -96,7 +112,7 @@ impl<'a, P: MetaProvider> StmtRewriter<'a, P> {
MeasurementName::Name(name) => {
// Get schema, and split columns to tags and fields.
let (tags, fields) = self.tags_and_fields_in_measurement(name.as_str())?;
let mut group_by_tags = HashSet::new();
let mut group_by_tags = BTreeSet::new();
maybe_rewrite_group_by(&tags, &mut group_by_tags, stmt)?;
maybe_rewrite_projection(&tags, &fields, &group_by_tags, stmt)?;

Expand All @@ -117,14 +133,13 @@ impl<'a, P: MetaProvider> StmtRewriter<'a, P> {
}
}

// TODO: just support from one table now.
fn tags_and_fields_in_measurement(
&self,
measurement_name: &str,
) -> Result<(Vec<String>, Vec<String>)> {
let measurement = self
.sql_planner
.find_table(measurement_name)
.measurement_provider
.measurement(measurement_name)
.box_err()
.context(RewriteFieldsWithCause {
msg: format!("failed to find measurement, measurement:{measurement_name}"),
Expand All @@ -135,22 +150,39 @@ impl<'a, P: MetaProvider> StmtRewriter<'a, P> {

// Get schema and split to tags and fields.
let schema = measurement.schema();
let tags_and_fields: (Vec<String>, Vec<String>) =
schema.columns().iter().partition_map(|column| {
if column.is_tag {
Either::Left(column.name.clone())
let tsid_idx_opt = schema.index_of_tsid();
let timestamp_key_idx = schema.timestamp_index();
let tags_and_fields: (Vec<String>, Vec<String>) = schema
.columns()
.iter()
.enumerate()
.filter_map(|(col_idx, col)| {
let is_tsid_col = match tsid_idx_opt {
Some(idx) => col_idx == idx,
None => false,
};
let is_timestamp_key_col = col_idx == timestamp_key_idx;

if !is_tsid_col && !is_timestamp_key_col {
Some(col)
} else {
None
}
})
.partition_map(|col| {
if col.is_tag {
Either::Left(col.name.clone())
} else {
Either::Right(column.name.clone())
Either::Right(col.name.clone())
}
});

Ok(tags_and_fields)
}
}

fn maybe_rewrite_group_by(
tags: &[String],
group_by_tags: &mut HashSet<String>,
group_by_tags: &mut BTreeSet<String>,
stmt: &mut SelectStatement,
) -> Result<()> {
if let Some(group_by) = &stmt.group_by {
Expand All @@ -164,7 +196,7 @@ fn maybe_rewrite_group_by(
}

Dimension::Tag(tag) => {
if tags.contains(&tag.to_string()) {
if !tags.contains(&tag.to_string()) {
return RewriteFieldsNoCause {
msg: format!("group by tag not exist, tag:{tag}, exist tags:{tags:?}"),
}
Expand Down Expand Up @@ -207,7 +239,7 @@ fn maybe_rewrite_group_by(
fn maybe_rewrite_projection(
tags: &[String],
fields: &[String],
groub_by_tags: &HashSet<String>,
groub_by_tags: &BTreeSet<String>,
stmt: &mut SelectStatement,
) -> Result<()> {
let mut new_fields = Vec::new();
Expand Down Expand Up @@ -348,3 +380,81 @@ fn maybe_rewrite_projection(

Ok(())
}

#[cfg(test)]
mod test {
use datafusion::sql::TableReference;
use influxdb_influxql_parser::{
parse_statements, select::SelectStatement, statement::Statement,
};

use super::StmtRewriter;
use crate::{
influxql::planner::MeasurementProvider, provider::MetaProvider, tests::MockMetaProvider,
};

impl MeasurementProvider for MockMetaProvider {
fn measurement(
&self,
measurement_name: &str,
) -> crate::influxql::error::Result<Option<table_engine::table::TableRef>> {
let table_ref = TableReference::Bare {
table: std::borrow::Cow::Borrowed(measurement_name),
};
Ok(self.table(table_ref).unwrap())
}
}

#[test]
fn test_wildcard_and_regex_in_projection() {
let namespace = MockMetaProvider::default();

let mut stmt = parse_select("SELECT * FROM influxql_test");
rewrite_statement(&namespace, &mut stmt);
assert_eq!(
"SELECT col1, col2, col3 FROM influxql_test",
stmt.to_string()
);

let mut stmt = parse_select("SELECT *::tag FROM influxql_test");
rewrite_statement(&namespace, &mut stmt);
assert_eq!("SELECT col1, col2 FROM influxql_test", stmt.to_string());

let mut stmt = parse_select("SELECT *::field FROM influxql_test");
rewrite_statement(&namespace, &mut stmt);
assert_eq!("SELECT col3 FROM influxql_test", stmt.to_string());
}

#[test]
fn test_wildcard_and_regex_in_group_by() {
let namespace = MockMetaProvider::default();

let mut stmt = parse_select("SELECT * FROM influxql_test GROUP BY *");
rewrite_statement(&namespace, &mut stmt);
assert_eq!(
"SELECT col3 FROM influxql_test GROUP BY col1, col2",
stmt.to_string()
);

let mut stmt = parse_select("SELECT * FROM influxql_test GROUP BY col1");
rewrite_statement(&namespace, &mut stmt);
assert_eq!(
"SELECT col2, col3 FROM influxql_test GROUP BY col1",
stmt.to_string()
);
}

pub fn rewrite_statement(provider: &dyn MeasurementProvider, stmt: &mut SelectStatement) {
let rewriter = StmtRewriter::new(provider);
rewriter.rewrite(stmt).unwrap();
}

/// Returns the InfluxQL [`SelectStatement`] for the specified SQL, `s`.
pub fn parse_select(s: &str) -> SelectStatement {
let statements = parse_statements(s).unwrap();
match statements.first() {
Some(Statement::Select(sel)) => *sel.clone(),
_ => panic!("expected SELECT statement"),
}
}
}
2 changes: 2 additions & 0 deletions sql/src/influxql/util.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.

use influxdb_influxql_parser::string::Regex;

// copy from influxdb_iox.
Expand Down
Loading

0 comments on commit cc210ad

Please sign in to comment.