From df38feede7b8761d19b7841eb13e02a4d4c78cda Mon Sep 17 00:00:00 2001 From: kamille Date: Mon, 6 Mar 2023 07:45:43 +0800 Subject: [PATCH] implement influxql stmt rewriter. --- Cargo.lock | 1 + Cargo.toml | 1 + sql/Cargo.toml | 1 + sql/src/influxql/mod.rs | 17 +- sql/src/influxql/planner.rs | 4 + sql/src/influxql/stmt_rewriter.rs | 585 +++++++++++++----------------- 6 files changed, 272 insertions(+), 337 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7fce58d64e..3186c91edd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6093,6 +6093,7 @@ dependencies = [ "df_operator", "hashbrown 0.12.3", "influxdb_influxql_parser", + "itertools", "log", "paste 1.0.8", "regex", diff --git a/Cargo.toml b/Cargo.toml index 67cd049886..d66df17d6b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -114,6 +114,7 @@ tokio = { version = "1.25", features = ["full"] } wal = { path = "wal" } message_queue = { path = "components/message_queue" } zstd = { version = "0.12", default-features = false } +itertools = "0.10.5" [workspace.dependencies.ceresdbproto] git = "https://github.com/CeresDB/ceresdbproto.git" diff --git a/sql/Cargo.toml b/sql/Cargo.toml index 8f8d7e1dfb..54726d8ed4 100644 --- a/sql/Cargo.toml +++ b/sql/Cargo.toml @@ -34,6 +34,7 @@ sqlparser = { workspace = true } table_engine = { workspace = true } influxdb_influxql_parser = {git = "https://github.com/Rachelint/influxdb_iox.git", branch = "influxql-parser"} regex-syntax = "0.6.28" +itertools = { workspace = true } [dev-dependencies] common_types = { workspace = true, features = ["test"] } diff --git a/sql/src/influxql/mod.rs b/sql/src/influxql/mod.rs index 20d7fbdbb4..bdad80c5a5 100644 --- a/sql/src/influxql/mod.rs +++ b/sql/src/influxql/mod.rs @@ -11,8 +11,21 @@ pub mod error { #[snafu(display("Unimplemented influxql statement, msg: {}", msg))] Unimplemented { msg: String }, - #[snafu(display("Unimplemented influxql statement, source: {}", source))] - RewriteStmtWithCause { source: GenericError }, + #[snafu(display( + "Rewrite influxql from clause with cause, msg:{}, source:{}", + msg, + source + ))] + RewriteFromWithCause { msg: String, source: GenericError }, + + #[snafu(display("Rewrite influxql from clause no cause, msg:{}", msg))] + RewriteFromNoCause { msg: String }, + + #[snafu(display("Unimplemented influxql select fields with cause, source: {}", source))] + RewriteFieldsWithCause { msg: String, source: GenericError }, + + #[snafu(display("Unimplemented influxql select fields no cause, msg: {}", msg))] + RewriteFieldsNoCause { msg: String }, } define_result!(Error); diff --git a/sql/src/influxql/planner.rs b/sql/src/influxql/planner.rs index 339d7a84cc..7521e18131 100644 --- a/sql/src/influxql/planner.rs +++ b/sql/src/influxql/planner.rs @@ -1,5 +1,6 @@ use influxdb_influxql_parser::statement::Statement as InfluxqlStatement; +use super::stmt_rewriter::StmtRewriter; use crate::{influxql::error::*, plan::Plan, provider::MetaProvider}; pub(crate) struct Planner<'a, P: MetaProvider> { @@ -28,6 +29,9 @@ impl<'a, P: MetaProvider> Planner<'a, P> { } fn rewrite_stmt(&self, stmt: InfluxqlStatement) -> Result { + let mut stmt = stmt; + let stmt_rewriter = StmtRewriter::new(&self.sql_planner); + // stmt_rewriter.rewrite_from(&mut stmt)?; todo!() } } diff --git a/sql/src/influxql/stmt_rewriter.rs b/sql/src/influxql/stmt_rewriter.rs index f46abca340..710436c3bb 100644 --- a/sql/src/influxql/stmt_rewriter.rs +++ b/sql/src/influxql/stmt_rewriter.rs @@ -1,11 +1,17 @@ +use std::{collections::HashSet, ops::ControlFlow}; + use common_util::error::BoxError; use influxdb_influxql_parser::{ - common::{MeasurementName, QualifiedMeasurementName}, - expression::Expr, + common::{MeasurementName, QualifiedMeasurementName, ZeroOrMore}, + expression::{walk, Expr, WildcardType}, + identifier::Identifier, literal::Literal, - select::{Dimension, FromMeasurementClause, MeasurementSelection, SelectStatement}, + select::{ + Dimension, Field, FieldList, FromMeasurementClause, MeasurementSelection, SelectStatement, + }, }; -use snafu::ResultExt; +use itertools::{Either, Itertools}; +use snafu::{ensure, OptionExt, ResultExt}; use super::util; use crate::{influxql::error::*, provider::MetaProvider}; @@ -29,11 +35,11 @@ impl<'a, P: MetaProvider> StmtRewriter<'a, P> { name: MeasurementName::Name(name), .. } => { - let table = self - .sql_planner - .find_table(name) - .box_err() - .context(RewriteStmtWithCause)?; + let table = self.sql_planner.find_table(name).box_err().context( + RewriteFromWithCause { + msg: format!("measurement not found, measurement:{name}"), + }, + )?; if table.is_some() { new_from.push(ms.clone()) } @@ -44,392 +50,301 @@ impl<'a, P: MetaProvider> StmtRewriter<'a, P> { } => { // TODO: need to support get all tables first. return Unimplemented { - msg: "regex in from clause", + msg: "rewrite from regex", } .fail(); } }, MeasurementSelection::Subquery(_) => { return Unimplemented { - msg: "subquery in from clause", + msg: "rewrite from subquery", } .fail(); } } } + + // TODO: support from multiple tables. + ensure!( + new_from.len() == 1, + Unimplemented { + msg: "rewrite from multiple measurements" + } + ); + stmt.from = FromMeasurementClause::new(new_from); Ok(()) } - /// Rewrite the projection list and GROUP BY of the specified `SELECT` - /// statement. - /// - /// Wildcards and regular expressions in the `SELECT` projection list and - /// `GROUP BY` are expanded. Any fields with no type specifier are - /// rewritten with the appropriate type, if they exist in the underlying - /// schema. - /// - /// Derived from [Go implementation](https://github.com/influxdata/influxql/blob/1ba470371ec093d57a726b143fe6ccbacf1b452b/ast.go#L1185). + /// Rewrite the projection list and GROUP BY of the specified `SELECT`. + // TODO: should support from multiple measurements. + // TODO: support rewrite fields in subquery. fn rewrite_field_list(&self, stmt: &mut SelectStatement) -> Result<()> { - // TODO: support rewrite fields in subquery. - - // Attempt to rewrite all variable references in the fields with their types, if - // one hasn't been specified. - if let ControlFlow::Break(e) = stmt.fields.iter_mut().try_for_each(|f| { - walk_expr_mut::(&mut f.expr, &mut |e| { - if matches!(e, Expr::VarRef { .. }) { - let new_type = match evaluate_type(s, e.borrow(), &stmt.from) { - Err(e) => ControlFlow::Break(e)?, - Ok(v) => v, - }; - - if let Expr::VarRef { data_type, .. } = e { - *data_type = new_type; - } - } - ControlFlow::Continue(()) - }) - }) { - return Err(e); - } + ensure!( + stmt.from.len() == 1, + Unimplemented { + msg: "rewrite field list from multiple measurements" + } + ); - let (has_field_wildcard, has_group_by_wildcard) = has_wildcards(stmt); - if (has_field_wildcard, has_group_by_wildcard) == (false, false) { - return Ok(()); - } + match &stmt.from[0] { + MeasurementSelection::Name(qualified_name) => { + let QualifiedMeasurementName { name, .. } = qualified_name; - let (field_set, mut tag_set) = from_field_and_dimensions(s, &stmt.from)?; + match name { + MeasurementName::Name(name) => { + // Get schema, and split columns to tags and fields. + let (tags, fields) = self.tags_and_fields_in_measurement(name.as_str())?; + let mut group_by_tags = HashSet::new(); + maybe_rewrite_group_by(&tags, &mut group_by_tags, stmt)?; + maybe_rewrite_projection(&tags, &fields, &group_by_tags, stmt)?; - if !has_group_by_wildcard { - if let Some(group_by) = &stmt.group_by { - // Remove any explicitly listed tags in the GROUP BY clause, so they are not - // expanded in the wildcard specified in the SELECT projection - // list - group_by.iter().for_each(|dim| { - if let Dimension::Tag(ident) = dim { - tag_set.remove(ident.as_str()); + Ok(()) } - }); + + MeasurementName::Regex(_) => RewriteFieldsNoCause { + msg: "rewrite field list should not encounter regex in from clause", + } + .fail(), + } } - } - #[derive(PartialEq, PartialOrd, Eq, Ord)] - struct VarRef { - name: String, - data_type: VarRefDataType, + MeasurementSelection::Subquery(_) => Unimplemented { + msg: "rewrite field list from subquery", + } + .fail(), } + } - let fields = if !field_set.is_empty() { - let fields_iter = field_set.iter().map(|(k, v)| VarRef { - name: k.clone(), - data_type: *v, + // TODO: just support from one table now. + fn tags_and_fields_in_measurement( + &self, + measurement_name: &str, + ) -> Result<(Vec, Vec)> { + let measurement = self + .sql_planner + .find_table(measurement_name) + .box_err() + .context(RewriteFieldsWithCause { + msg: format!("failed to find measurement, measurement:{measurement_name}"), + })? + .context(RewriteFieldsNoCause { + msg: format!("measurement not found, measurement:{measurement_name}"), + })?; + + // Get schema and split to tags and fields. + let schema = measurement.schema(); + let tags_and_fields: (Vec, Vec) = + schema.columns().iter().partition_map(|column| { + if column.is_tag { + Either::Left(column.name.clone()) + } else { + Either::Right(column.name.clone()) + } }); - if !has_group_by_wildcard { - fields_iter - .chain(tag_set.iter().map(|tag| VarRef { - name: tag.clone(), - data_type: VarRefDataType::Tag, - })) - .sorted() - .collect::>() - } else { - fields_iter.sorted().collect::>() - } - } else { - vec![] - }; - - if has_field_wildcard { - let mut new_fields = Vec::new(); - - for f in stmt.fields.iter() { - let add_field = |f: &VarRef| { - new_fields.push(Field { - expr: Expr::VarRef { - name: f.name.clone().into(), - data_type: Some(f.data_type), - }, - alias: None, - }) - }; - - match &f.expr { - Expr::Wildcard(wct) => { - let filter: fn(&&VarRef) -> bool = match wct { - None => |_| true, - Some(WildcardType::Tag) => |v| v.data_type.is_tag_type(), - Some(WildcardType::Field) => |v| v.data_type.is_field_type(), - }; + Ok(tags_and_fields) + } +} - fields.iter().filter(filter).for_each(add_field); +fn maybe_rewrite_group_by( + tags: &[String], + group_by_tags: &mut HashSet, + stmt: &mut SelectStatement, +) -> Result<()> { + if let Some(group_by) = &stmt.group_by { + for dimension in group_by.iter() { + match dimension { + Dimension::Time { .. } => { + return Unimplemented { + msg: "group by time interval", } + .fail(); + } - Expr::Literal(Literal::Regex(re)) => { - let re = util::parse_regex(re)?; - fields - .iter() - .filter(|v| re.is_match(v.name.as_str())) - .for_each(add_field); + Dimension::Tag(tag) => { + if tags.contains(&tag.to_string()) { + return RewriteFieldsNoCause { + msg: format!("group by tag not exist, tag:{tag}, exist tags:{tags:?}"), + } + .fail(); } + let _ = group_by_tags.insert(tag.to_string()); + } - Expr::Call { name, args } => { - let mut name = name; - let mut args = args; - - // Search for the call with a wildcard by continuously descending until - // we no longer have a call. - while let Some(Expr::Call { - name: inner_name, - args: inner_args, - }) = args.first() - { - name = inner_name; - args = inner_args; + Dimension::Regex(re) => { + let re = util::parse_regex(re) + .box_err() + .context(RewriteFieldsWithCause { + msg: format!("group by invalid regex, regex:{re}"), + })?; + let match_tags = tags.iter().filter_map(|tag| { + if re.is_match(tag.as_str()) { + Some(tag.clone()) + } else { + None } + }); + group_by_tags.extend(match_tags); + } - // Add additional types for certain functions. - match name.to_lowercase().as_str() { - "count" | "first" | "last" | "distinct" | "elapsed" | "mode" - | "sample" => { - supported_types - .extend([VarRefDataType::String, VarRefDataType::Boolean]); - } - "min" | "max" => { - supported_types.insert(VarRefDataType::Boolean); - } - "holt_winters" | "holt_winters_with_fit" => { - supported_types.remove(&VarRefDataType::Unsigned); - } - _ => {} - } + Dimension::Wildcard => group_by_tags.extend(tags.iter().cloned()), + } + } - let add_field = |v: &VarRef| { - let mut args = args.clone(); - args[0] = Expr::VarRef { - name: v.name.clone().into(), - data_type: Some(v.data_type), - }; - new_fields.push(Field { - expr: Expr::Call { - name: name.clone(), - args, - }, - alias: Some(format!("{}_{}", field_name(f), v.name).into()), - }) - }; - - match args.first() { - Some(Expr::Wildcard(Some(WildcardType::Tag))) => { - return Err(DataFusionError::External( - format!("unable to use tag as wildcard in {name}()").into(), - )); - } - Some(Expr::Wildcard(_)) => { - fields - .iter() - .filter(|v| supported_types.contains(&v.data_type)) - .for_each(add_field); - } - Some(Expr::Literal(Literal::Regex(re))) => { - let re = util::parse_regex(re)?; - fields - .iter() - .filter(|v| { - supported_types.contains(&v.data_type) - && re.is_match(v.name.as_str()) - }) - .for_each(add_field); - } - _ => { - new_fields.push(f.clone()); - continue; - } - } - } + stmt.group_by = Some(ZeroOrMore::new( + group_by_tags + .iter() + .map(|tag| Dimension::Tag(Identifier::new(tag.clone()))) + .collect::>(), + )); + } - Expr::Binary { .. } => { - let has_wildcard = walk_expr(&f.expr, &mut |e| { - match e { - Expr::Wildcard(_) | Expr::Literal(Literal::Regex(_)) => { - return ControlFlow::Break(()) - } - _ => {} - } - ControlFlow::Continue(()) - }) - .is_break(); - - if has_wildcard { - return Err(DataFusionError::External( - "unsupported expression: contains a wildcard or regular expression" - .into(), - )); - } + Ok(()) +} - new_fields.push(f.clone()); - } +fn maybe_rewrite_projection( + tags: &[String], + fields: &[String], + groub_by_tags: &HashSet, + stmt: &mut SelectStatement, +) -> Result<()> { + let mut new_fields = Vec::new(); + + enum AddFieldType { + Tag, + Field, + Both, + } - _ => new_fields.push(f.clone()), + let add_fields = |filter: &dyn Fn(&String) -> bool, + add_field_type: AddFieldType, + new_fields: &mut Vec| { + if matches!(&add_field_type, AddFieldType::Tag | AddFieldType::Both) { + let tag_fields = tags.iter().filter_map(|tag| { + if !groub_by_tags.contains(tag.as_str()) && filter(tag) { + Some(Field { + expr: Expr::VarRef { + name: tag.clone().into(), + data_type: None, + }, + alias: None, + }) + } else { + None } - } + }); + new_fields.extend(tag_fields); + } - stmt.fields = FieldList::new(new_fields); + if matches!(&add_field_type, AddFieldType::Field | AddFieldType::Both) { + let normal_fields = fields.iter().filter_map(|field| { + if filter(field) { + Some(Field { + expr: Expr::VarRef { + name: field.clone().into(), + data_type: None, + }, + alias: None, + }) + } else { + None + } + }); + new_fields.extend(normal_fields); } + }; - // group by 展开 - if has_group_by_wildcard { - let group_by_tags = if has_group_by_wildcard { - tag_set.into_iter().sorted().collect::>() - } else { - vec![] - }; - - if let Some(group_by) = &stmt.group_by { - let mut new_dimensions = Vec::new(); - - for dim in group_by.iter() { - let add_dim = |dim: &String| { - new_dimensions.push(Dimension::Tag(Identifier::new(dim.clone()))) - }; - - match dim { - Dimension::Wildcard => { - group_by_tags.iter().for_each(add_dim); - } - Dimension::Regex(re) => { - let re = util::parse_regex(re)?; + for f in stmt.fields.iter() { + match &f.expr { + Expr::Wildcard(wct) => { + let filter = |_: &String| -> bool { true }; - group_by_tags - .iter() - .filter(|dim| re.is_match(dim.as_str())) - .for_each(add_dim); - } - _ => new_dimensions.push(dim.clone()), + match wct { + Some(WildcardType::Tag) => { + add_fields(&filter, AddFieldType::Tag, &mut new_fields); + } + Some(WildcardType::Field) => { + add_fields(&filter, AddFieldType::Field, &mut new_fields); + } + None => { + add_fields(&filter, AddFieldType::Both, &mut new_fields); } } - stmt.group_by = Some(GroupByClause::new(new_dimensions)); } - } - Ok(()) - } + Expr::Literal(Literal::Regex(re)) => { + let re = util::parse_regex(re) + .box_err() + .context(RewriteFieldsWithCause { + msg: format!("rewrite field list encounter invalid regex, regex:{re}"), + })?; - /// Determine the merged fields and tags of the `FROM` clause. - fn from_field_and_dimensions( - s: &dyn SchemaProvider, - from: &FromMeasurementClause, - ) -> Result<(FieldTypeMap, TagSet)> { - let mut fs = FieldTypeMap::new(); - let mut ts = TagSet::new(); + let filter = |v: &String| -> bool { re.is_match(v.as_str()) }; - for ms in from.deref() { - match ms { - MeasurementSelection::Name(QualifiedMeasurementName { - name: MeasurementName::Name(name), - .. - }) => { - let (field_set, tag_set) = match field_and_dimensions(s, name.as_str())? { - Some(res) => res, - None => continue, - }; - - // Merge field_set with existing - for (name, ft) in &field_set { - match fs.get(name) { - Some(existing_type) => { - if ft < existing_type { - fs.insert(name.to_string(), *ft); - } - } - None => { - fs.insert(name.to_string(), *ft); - } - }; - } + add_fields(&filter, AddFieldType::Both, &mut new_fields); + } + + Expr::Call { args, .. } => { + let mut args = args; - ts.extend(tag_set); + // Search for the call with a wildcard by continuously descending until + // we no longer have a call. + while let Some(Expr::Call { + args: inner_args, .. + }) = args.first() + { + args = inner_args; } - MeasurementSelection::Subquery(select) => { - for f in select.fields.iter() { - let dt = match evaluate_type(s, &f.expr, &select.from)? { - Some(dt) => dt, - None => continue, - }; - - let name = field_name(f); - - match fs.get(name.as_str()) { - Some(existing_type) => { - if dt < *existing_type { - fs.insert(name, dt); - } - } - None => { - fs.insert(name, dt); - } + + match args.first() { + Some(Expr::Wildcard(Some(WildcardType::Tag))) => { + return RewriteFieldsNoCause { + msg: "tags can't be placed in a call", } + .fail(); } - - if let Some(group_by) = &select.group_by { - // Merge the dimensions from the subquery - ts.extend(group_by.iter().filter_map(|d| match d { - Dimension::Tag(ident) => Some(ident.to_string()), - _ => None, - })); + Some(Expr::Wildcard(_)) | Some(Expr::Literal(Literal::Regex(_))) => { + return Unimplemented { + msg: "wildcard or regex in call", + } + .fail(); + } + _ => { + new_fields.push(f.clone()); + continue; } - } - _ => { - // Unreachable, as the from clause should be normalised at this point. - return Err(DataFusionError::Internal( - "Unexpected MeasurementSelection in from".to_string(), - )); } } - } - Ok((fs, ts)) - } - /// Returns a tuple indicating whether the specifies `SELECT` statement - /// has any wildcards or regular expressions in the projection list - /// and `GROUP BY` clause respectively. - fn has_wildcards(stmt: &SelectStatement) -> (bool, bool) { - use influxdb_influxql_parser::visit::{Recursion, Visitable, Visitor}; - - struct HasWildcardsVisitor(bool, bool); - - impl Visitor for HasWildcardsVisitor { - type Error = DataFusionError; - - fn pre_visit_expr(self, n: &Expr) -> Result> { - Ok( - if matches!(n, Expr::Wildcard(_) | Expr::Literal(Literal::Regex(_))) { - Recursion::Stop(Self(true, self.1)) - } else { - Recursion::Continue(self) - }, - ) - } + Expr::Binary { .. } => { + let has_wildcard = walk::walk_expr(&f.expr, &mut |e| { + match e { + Expr::Wildcard(_) | Expr::Literal(Literal::Regex(_)) => { + return ControlFlow::Break(()) + } + _ => {} + } + ControlFlow::Continue(()) + }) + .is_break(); - fn pre_visit_select_from_clause( - self, - _n: &FromMeasurementClause, - ) -> Result> { - // Don't traverse FROM and potential subqueries - Ok(Recursion::Stop(self)) - } + if has_wildcard { + return RewriteFieldsNoCause { + msg: "wildcard or regex should be encountered in binary expression", + } + .fail(); + } - fn pre_visit_select_dimension(self, n: &Dimension) -> Result> { - Ok(if matches!(n, Dimension::Wildcard | Dimension::Regex(_)) { - Recursion::Stop(Self(self.0, true)) - } else { - Recursion::Continue(self) - }) + new_fields.push(f.clone()); } - } - let res = Visitable::accept(stmt, HasWildcardsVisitor(false, false)).unwrap(); - (res.0, res.1) + _ => new_fields.push(f.clone()), + } } + + stmt.fields = FieldList::new(new_fields); + + Ok(()) }