diff --git a/server/src/grpc/storage_service/prom_query.rs b/server/src/grpc/storage_service/prom_query.rs index 843e7a96d1..ab0fb7b441 100644 --- a/server/src/grpc/storage_service/prom_query.rs +++ b/server/src/grpc/storage_service/prom_query.rs @@ -35,9 +35,12 @@ use crate::grpc::storage_service::{ }; fn is_table_not_found_error(e: &FrontendError) -> bool { - matches!(&e, FrontendError::CreatePlan { source } - if matches!(source, sql::planner::Error::BuildPromPlanError { source } - if matches!(source, sql::promql::Error::TableNotFound { .. }))) + matches!(&e, + FrontendError::CreatePlan { + source, + .. } + if matches!(source, sql::planner::Error::BuildPromPlanError { source } + if matches!(source, sql::promql::Error::TableNotFound { .. }))) } pub async fn handle_query( diff --git a/sql/src/frontend.rs b/sql/src/frontend.rs index c3887d43f4..bd71731c14 100644 --- a/sql/src/frontend.rs +++ b/sql/src/frontend.rs @@ -162,6 +162,8 @@ impl Frontend

{ planner .write_req_to_plan(schema_config, write_table) - .context(CreatePlan) + .context(CreatePlan { + plan_type: "internal write", + }) } } diff --git a/sql/src/influxql/mod.rs b/sql/src/influxql/mod.rs index bdad80c5a5..762017100e 100644 --- a/sql/src/influxql/mod.rs +++ b/sql/src/influxql/mod.rs @@ -1,3 +1,5 @@ +// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. + pub mod planner; mod stmt_rewriter; pub(crate) mod util; @@ -26,6 +28,12 @@ pub mod error { #[snafu(display("Unimplemented influxql select fields no cause, msg: {}", msg))] RewriteFieldsNoCause { msg: String }, + + #[snafu(display("Failed to find table with case, source:{}", source))] + FindTableWithCause { source: GenericError }, + + #[snafu(display("Failed to find table no case, msg: {}", msg))] + FindTableNoCause { msg: String }, } define_result!(Error); diff --git a/sql/src/influxql/planner.rs b/sql/src/influxql/planner.rs index 7521e18131..7ddc1510ed 100644 --- a/sql/src/influxql/planner.rs +++ b/sql/src/influxql/planner.rs @@ -1,8 +1,14 @@ +// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. + +use common_util::error::BoxError; use influxdb_influxql_parser::statement::Statement as InfluxqlStatement; +use snafu::{ResultExt}; +use table_engine::table::TableRef; + -use super::stmt_rewriter::StmtRewriter; use crate::{influxql::error::*, plan::Plan, provider::MetaProvider}; +#[allow(dead_code)] pub(crate) struct Planner<'a, P: MetaProvider> { sql_planner: crate::planner::PlannerDelegate<'a, P>, } @@ -27,11 +33,21 @@ impl<'a, P: MetaProvider> Planner<'a, P> { InfluxqlStatement::Explain(_) => todo!(), } } +} + +pub trait MeasurementProvider { + fn measurement(&self, measurement_name: &str) -> Result>; +} + +pub(crate) struct MeasurementProviderImpl<'a, P: MetaProvider>( + crate::planner::PlannerDelegate<'a, P>, +); - fn rewrite_stmt(&self, stmt: InfluxqlStatement) -> Result { - let mut stmt = stmt; - let stmt_rewriter = StmtRewriter::new(&self.sql_planner); - // stmt_rewriter.rewrite_from(&mut stmt)?; - todo!() +impl<'a, P: MetaProvider> MeasurementProvider for MeasurementProviderImpl<'a, P> { + fn measurement(&self, measurement_name: &str) -> Result> { + self.0 + .find_table(measurement_name) + .box_err() + .context(FindTableWithCause) } } diff --git a/sql/src/influxql/stmt_rewriter.rs b/sql/src/influxql/stmt_rewriter.rs index 710436c3bb..a635f5fa47 100644 --- a/sql/src/influxql/stmt_rewriter.rs +++ b/sql/src/influxql/stmt_rewriter.rs @@ -1,4 +1,6 @@ -use std::{collections::HashSet, ops::ControlFlow}; +// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. + +use std::{collections::BTreeSet, ops::ControlFlow}; use common_util::error::BoxError; use influxdb_influxql_parser::{ @@ -13,20 +15,32 @@ use influxdb_influxql_parser::{ use itertools::{Either, Itertools}; use snafu::{ensure, OptionExt, ResultExt}; -use super::util; -use crate::{influxql::error::*, provider::MetaProvider}; +use super::{planner::MeasurementProvider, util}; +use crate::influxql::error::*; -pub(crate) struct StmtRewriter<'a, P: MetaProvider> { - sql_planner: &'a crate::planner::PlannerDelegate<'a, P>, +/// Rewriter for the influxql statement +/// +/// It will rewrite statement before converting it to sql statement. +// Partial copy from influxdb_iox. +pub(crate) struct StmtRewriter<'a> { + measurement_provider: &'a dyn MeasurementProvider, } -// Partial copy from influxdb_iox. -impl<'a, P: MetaProvider> StmtRewriter<'a, P> { - pub fn new(sql_planner: &'a crate::planner::PlannerDelegate<'a, P>) -> Self { - Self { sql_planner } +impl<'a> StmtRewriter<'a> { + #[allow(dead_code)] + pub fn new(measurement_provider: &'a dyn MeasurementProvider) -> Self { + Self { + measurement_provider, + } } - pub fn rewrite_from(&self, stmt: &mut SelectStatement) -> Result<()> { + #[allow(dead_code)] + pub fn rewrite(&self, stmt: &mut SelectStatement) -> Result<()> { + self.rewrite_from(stmt)?; + self.rewrite_field_list(stmt) + } + + fn rewrite_from(&self, stmt: &mut SelectStatement) -> Result<()> { let mut new_from = Vec::new(); for ms in stmt.from.iter() { match ms { @@ -35,11 +49,13 @@ impl<'a, P: MetaProvider> StmtRewriter<'a, P> { name: MeasurementName::Name(name), .. } => { - let table = self.sql_planner.find_table(name).box_err().context( - RewriteFromWithCause { + let table = self + .measurement_provider + .measurement(name) + .box_err() + .context(RewriteFromWithCause { msg: format!("measurement not found, measurement:{name}"), - }, - )?; + })?; if table.is_some() { new_from.push(ms.clone()) } @@ -96,7 +112,7 @@ impl<'a, P: MetaProvider> StmtRewriter<'a, P> { MeasurementName::Name(name) => { // Get schema, and split columns to tags and fields. let (tags, fields) = self.tags_and_fields_in_measurement(name.as_str())?; - let mut group_by_tags = HashSet::new(); + let mut group_by_tags = BTreeSet::new(); maybe_rewrite_group_by(&tags, &mut group_by_tags, stmt)?; maybe_rewrite_projection(&tags, &fields, &group_by_tags, stmt)?; @@ -117,14 +133,13 @@ impl<'a, P: MetaProvider> StmtRewriter<'a, P> { } } - // TODO: just support from one table now. fn tags_and_fields_in_measurement( &self, measurement_name: &str, ) -> Result<(Vec, Vec)> { let measurement = self - .sql_planner - .find_table(measurement_name) + .measurement_provider + .measurement(measurement_name) .box_err() .context(RewriteFieldsWithCause { msg: format!("failed to find measurement, measurement:{measurement_name}"), @@ -135,22 +150,39 @@ impl<'a, P: MetaProvider> StmtRewriter<'a, P> { // Get schema and split to tags and fields. let schema = measurement.schema(); - let tags_and_fields: (Vec, Vec) = - schema.columns().iter().partition_map(|column| { - if column.is_tag { - Either::Left(column.name.clone()) + let tsid_idx_opt = schema.index_of_tsid(); + let timestamp_key_idx = schema.timestamp_index(); + let tags_and_fields: (Vec, Vec) = schema + .columns() + .iter() + .enumerate() + .filter_map(|(col_idx, col)| { + let is_tsid_col = match tsid_idx_opt { + Some(idx) => col_idx == idx, + None => false, + }; + let is_timestamp_key_col = col_idx == timestamp_key_idx; + + if !is_tsid_col && !is_timestamp_key_col { + Some(col) + } else { + None + } + }) + .partition_map(|col| { + if col.is_tag { + Either::Left(col.name.clone()) } else { - Either::Right(column.name.clone()) + Either::Right(col.name.clone()) } }); - Ok(tags_and_fields) } } fn maybe_rewrite_group_by( tags: &[String], - group_by_tags: &mut HashSet, + group_by_tags: &mut BTreeSet, stmt: &mut SelectStatement, ) -> Result<()> { if let Some(group_by) = &stmt.group_by { @@ -164,7 +196,7 @@ fn maybe_rewrite_group_by( } Dimension::Tag(tag) => { - if tags.contains(&tag.to_string()) { + if !tags.contains(&tag.to_string()) { return RewriteFieldsNoCause { msg: format!("group by tag not exist, tag:{tag}, exist tags:{tags:?}"), } @@ -207,7 +239,7 @@ fn maybe_rewrite_group_by( fn maybe_rewrite_projection( tags: &[String], fields: &[String], - groub_by_tags: &HashSet, + groub_by_tags: &BTreeSet, stmt: &mut SelectStatement, ) -> Result<()> { let mut new_fields = Vec::new(); @@ -348,3 +380,81 @@ fn maybe_rewrite_projection( Ok(()) } + +#[cfg(test)] +mod test { + use datafusion::sql::TableReference; + use influxdb_influxql_parser::{ + parse_statements, select::SelectStatement, statement::Statement, + }; + + use super::StmtRewriter; + use crate::{ + influxql::planner::MeasurementProvider, provider::MetaProvider, tests::MockMetaProvider, + }; + + impl MeasurementProvider for MockMetaProvider { + fn measurement( + &self, + measurement_name: &str, + ) -> crate::influxql::error::Result> { + let table_ref = TableReference::Bare { + table: std::borrow::Cow::Borrowed(measurement_name), + }; + Ok(self.table(table_ref).unwrap()) + } + } + + #[test] + fn test_wildcard_and_regex_in_projection() { + let namespace = MockMetaProvider::default(); + + let mut stmt = parse_select("SELECT * FROM influxql_test"); + rewrite_statement(&namespace, &mut stmt); + assert_eq!( + "SELECT col1, col2, col3 FROM influxql_test", + stmt.to_string() + ); + + let mut stmt = parse_select("SELECT *::tag FROM influxql_test"); + rewrite_statement(&namespace, &mut stmt); + assert_eq!("SELECT col1, col2 FROM influxql_test", stmt.to_string()); + + let mut stmt = parse_select("SELECT *::field FROM influxql_test"); + rewrite_statement(&namespace, &mut stmt); + assert_eq!("SELECT col3 FROM influxql_test", stmt.to_string()); + } + + #[test] + fn test_wildcard_and_regex_in_group_by() { + let namespace = MockMetaProvider::default(); + + let mut stmt = parse_select("SELECT * FROM influxql_test GROUP BY *"); + rewrite_statement(&namespace, &mut stmt); + assert_eq!( + "SELECT col3 FROM influxql_test GROUP BY col1, col2", + stmt.to_string() + ); + + let mut stmt = parse_select("SELECT * FROM influxql_test GROUP BY col1"); + rewrite_statement(&namespace, &mut stmt); + assert_eq!( + "SELECT col2, col3 FROM influxql_test GROUP BY col1", + stmt.to_string() + ); + } + + pub fn rewrite_statement(provider: &dyn MeasurementProvider, stmt: &mut SelectStatement) { + let rewriter = StmtRewriter::new(provider); + rewriter.rewrite(stmt).unwrap(); + } + + /// Returns the InfluxQL [`SelectStatement`] for the specified SQL, `s`. + pub fn parse_select(s: &str) -> SelectStatement { + let statements = parse_statements(s).unwrap(); + match statements.first() { + Some(Statement::Select(sel)) => *sel.clone(), + _ => panic!("expected SELECT statement"), + } + } +} diff --git a/sql/src/influxql/util.rs b/sql/src/influxql/util.rs index 3b2f1119c8..19e66dfe02 100644 --- a/sql/src/influxql/util.rs +++ b/sql/src/influxql/util.rs @@ -1,3 +1,5 @@ +// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. + use influxdb_influxql_parser::string::Regex; // copy from influxdb_iox. diff --git a/sql/src/tests.rs b/sql/src/tests.rs index ecaf19d67c..9304807492 100644 --- a/sql/src/tests.rs +++ b/sql/src/tests.rs @@ -3,7 +3,12 @@ use std::sync::Arc; use catalog::consts::{DEFAULT_CATALOG, DEFAULT_SCHEMA}; -use common_types::tests::{build_default_value_schema, build_schema}; +use common_types::{ + column_schema, + datum::DatumKind, + schema::{Builder, Schema, TSID_COLUMN}, + tests::{build_default_value_schema, build_schema}, +}; use datafusion::catalog::TableReference; use df_operator::{scalar::ScalarUdf, udaf::AggregateUdf}; use table_engine::{ @@ -46,6 +51,12 @@ impl Default for MockMetaProvider { build_schema(), ANALYTIC_ENGINE_TYPE.to_string(), )), + Arc::new(MemoryTable::new( + "influxql_test".to_string(), + TableId::from(144), + build_influxql_test_schema(), + ANALYTIC_ENGINE_TYPE.to_string(), + )), ], } } @@ -79,3 +90,42 @@ impl MetaProvider for MockMetaProvider { todo!() } } + +fn build_influxql_test_schema() -> Schema { + Builder::new() + .auto_increment_column_id(true) + .add_key_column( + column_schema::Builder::new(TSID_COLUMN.to_string(), DatumKind::UInt64) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .add_key_column( + column_schema::Builder::new("timestamp".to_string(), DatumKind::Timestamp) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .add_normal_column( + column_schema::Builder::new("col1".to_string(), DatumKind::String) + .is_tag(true) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .add_normal_column( + column_schema::Builder::new("col2".to_string(), DatumKind::String) + .is_tag(true) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .add_normal_column( + column_schema::Builder::new("col3".to_string(), DatumKind::Int64) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .build() + .expect("should succeed to build schema") +}