diff --git a/Cargo.lock b/Cargo.lock index bf372496b1..7fce58d64e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1176,6 +1176,28 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "chrono-tz" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa48fa079165080f11d7753fd0bc175b7d391f276b965fe4b55bfad67856e463" +dependencies = [ + "chrono", + "chrono-tz-build", + "phf", +] + +[[package]] +name = "chrono-tz-build" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9998fb9f7e9b2111641485bf8beb32f92945f97f92a3d061f744cfef335f751" +dependencies = [ + "parse-zoneinfo", + "phf", + "phf_codegen", +] + [[package]] name = "clang-sys" version = "1.3.3" @@ -2993,6 +3015,18 @@ dependencies = [ "str_stack", ] +[[package]] +name = "influxdb_influxql_parser" +version = "0.1.0" +source = "git+https://github.com/Rachelint/influxdb_iox.git?branch=influxql-parser#77e24e992a90dd08a6d71366de53fac721e491fb" +dependencies = [ + "chrono", + "chrono-tz", + "nom 7.1.1", + "num-traits", + "once_cell", +] + [[package]] name = "instant" version = "0.1.12" @@ -4383,6 +4417,15 @@ dependencies = [ "thrift 0.13.0", ] +[[package]] +name = "parse-zoneinfo" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c705f256449c60da65e11ff6626e0c16a0a0b96aaa348de61376b249bc340f41" +dependencies = [ + "regex", +] + [[package]] name = "paste" version = "0.1.18" @@ -4448,6 +4491,44 @@ dependencies = [ "indexmap", ] +[[package]] +name = "phf" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "928c6535de93548188ef63bb7c4036bd415cd8f36ad25af44b9789b2ee72a48c" +dependencies = [ + "phf_shared", +] + +[[package]] +name = "phf_codegen" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a56ac890c5e3ca598bbdeaa99964edb5b0258a583a9eb6ef4e89fc85d9224770" +dependencies = [ + "phf_generator", + "phf_shared", +] + +[[package]] +name = "phf_generator" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1181c94580fa345f50f19d738aaa39c0ed30a600d95cb2d3e23f94266f14fbf" +dependencies = [ + "phf_shared", + "rand 0.8.5", +] + +[[package]] +name = "phf_shared" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1fb5f6f826b772a8d4c0394209441e7d37cbbb967ae9c7e0e8134365c9ee676" +dependencies = [ + "siphasher", +] + [[package]] name = "pin-project" version = "1.0.11" @@ -6011,9 +6092,11 @@ dependencies = [ "datafusion-proto", "df_operator", "hashbrown 0.12.3", + "influxdb_influxql_parser", "log", "paste 1.0.8", "regex", + "regex-syntax", "snafu 0.6.10", "sqlparser", "table_engine", diff --git a/sql/Cargo.toml b/sql/Cargo.toml index 59ab5566aa..8f8d7e1dfb 100644 --- a/sql/Cargo.toml +++ b/sql/Cargo.toml @@ -32,6 +32,8 @@ regex = "1" snafu = { workspace = true } sqlparser = { workspace = true } table_engine = { workspace = true } +influxdb_influxql_parser = {git = "https://github.com/Rachelint/influxdb_iox.git", branch = "influxql-parser"} +regex-syntax = "0.6.28" [dev-dependencies] common_types = { workspace = true, features = ["test"] } diff --git a/sql/src/frontend.rs b/sql/src/frontend.rs index e46fc92da2..4bdc9d9356 100644 --- a/sql/src/frontend.rs +++ b/sql/src/frontend.rs @@ -6,6 +6,7 @@ use std::{sync::Arc, time::Instant}; use ceresdbproto::prometheus::Expr as PromExpr; use common_types::request_id::RequestId; +use influxdb_influxql_parser::statement::Statement as InfluxqlStatement; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use table_engine::table; @@ -28,14 +29,24 @@ pub enum Error { }, // TODO(yingwen): Should we store stmt here? - #[snafu(display("Failed to create plan, err:{}", source))] - CreatePlan { source: crate::planner::Error }, + #[snafu(display("Failed to create plan, plan_type:{}, err:{}", plan_type, source))] + CreatePlan { + plan_type: String, + source: crate::planner::Error, + }, #[snafu(display("Invalid prom request, err:{}", source))] InvalidPromRequest { source: crate::promql::Error }, #[snafu(display("Expr is not found in prom request.\nBacktrace:\n{}", backtrace))] ExprNotFoundInPromRequest { backtrace: Backtrace }, + + // invalid sql is quite common, so we don't provide a backtrace now. + #[snafu(display("invalid influxql, influxql:{}, err:{}", influxql, parse_err))] + InvalidInfluxql { + influxql: String, + parse_err: influxdb_influxql_parser::common::ParseError, + }, } define_result!(Error); @@ -89,6 +100,21 @@ impl

Frontend

{ let expr = expr.context(ExprNotFoundInPromRequest)?; Expr::try_from(expr).context(InvalidPromRequest) } + + /// Parse the sql and returns the statements + pub fn parse_influxql( + &self, + _ctx: &mut Context, + influxql: &str, + ) -> Result> { + match influxdb_influxql_parser::parse_statements(influxql) { + Ok(stmts) => Ok(stmts), + Err(e) => Err(Error::InvalidInfluxql { + influxql: influxql.to_string(), + parse_err: e, + }), + } + } } impl Frontend

{ @@ -96,7 +122,9 @@ impl Frontend

{ pub fn statement_to_plan(&self, ctx: &mut Context, stmt: Statement) -> Result { let planner = Planner::new(&self.provider, ctx.request_id, ctx.read_parallelism); - planner.statement_to_plan(stmt).context(CreatePlan) + planner + .statement_to_plan(stmt) + .context(CreatePlan { plan_type: "sql" }) } pub fn promql_expr_to_plan( @@ -106,6 +134,20 @@ impl Frontend

{ ) -> Result<(Plan, Arc)> { let planner = Planner::new(&self.provider, ctx.request_id, ctx.read_parallelism); - planner.promql_expr_to_plan(expr).context(CreatePlan) + planner.promql_expr_to_plan(expr).context(CreatePlan { + plan_type: "promql", + }) + } + + pub fn influxql_stmt_to_plan( + &self, + ctx: &mut Context, + stmt: InfluxqlStatement, + ) -> Result { + let planner = Planner::new(&self.provider, ctx.request_id, ctx.read_parallelism); + + planner.influxql_stmt_to_plan(stmt).context(CreatePlan { + plan_type: "influxql", + }) } } diff --git a/sql/src/influxql/mod.rs b/sql/src/influxql/mod.rs new file mode 100644 index 0000000000..b0c869330c --- /dev/null +++ b/sql/src/influxql/mod.rs @@ -0,0 +1,21 @@ +pub mod planner; +mod stmt_rewriter; +pub(crate) mod util; +pub mod error { + use snafu::Snafu; + use common_util::error::GenericError; + + #[derive(Debug, Snafu)] + #[snafu(visibility = "pub")] + pub enum Error { + #[snafu(display("Unimplemented influxql statement, msg: {}", msg))] + Unimplemented { msg: String }, + + #[snafu(display("Unimplemented influxql statement, source: {}", source))] + RewriteStmtWithCause { + source: GenericError, + } + } + + define_result!(Error); +} diff --git a/sql/src/influxql/planner.rs b/sql/src/influxql/planner.rs new file mode 100644 index 0000000000..339d7a84cc --- /dev/null +++ b/sql/src/influxql/planner.rs @@ -0,0 +1,33 @@ +use influxdb_influxql_parser::statement::Statement as InfluxqlStatement; + +use crate::{influxql::error::*, plan::Plan, provider::MetaProvider}; + +pub(crate) struct Planner<'a, P: MetaProvider> { + sql_planner: crate::planner::PlannerDelegate<'a, P>, +} + +impl<'a, P: MetaProvider> Planner<'a, P> { + pub fn new(sql_planner: crate::planner::PlannerDelegate<'a, P>) -> Self { + Self { sql_planner } + } + + pub fn statement_to_plan(&self, stmt: InfluxqlStatement) -> Result { + match stmt { + InfluxqlStatement::Select(_) => todo!(), + InfluxqlStatement::CreateDatabase(_) => todo!(), + InfluxqlStatement::ShowDatabases(_) => todo!(), + InfluxqlStatement::ShowRetentionPolicies(_) => todo!(), + InfluxqlStatement::ShowTagKeys(_) => todo!(), + InfluxqlStatement::ShowTagValues(_) => todo!(), + InfluxqlStatement::ShowFieldKeys(_) => todo!(), + InfluxqlStatement::ShowMeasurements(_) => todo!(), + InfluxqlStatement::Delete(_) => todo!(), + InfluxqlStatement::DropMeasurement(_) => todo!(), + InfluxqlStatement::Explain(_) => todo!(), + } + } + + fn rewrite_stmt(&self, stmt: InfluxqlStatement) -> Result { + todo!() + } +} diff --git a/sql/src/influxql/stmt_rewriter.rs b/sql/src/influxql/stmt_rewriter.rs new file mode 100644 index 0000000000..312801cfc9 --- /dev/null +++ b/sql/src/influxql/stmt_rewriter.rs @@ -0,0 +1,430 @@ +use influxdb_influxql_parser::{ + select::{SelectStatement, MeasurementSelection, FromMeasurementClause, Dimension}, common::{QualifiedMeasurementName, MeasurementName}, expression::Expr, literal::Literal, +}; +use crate::{influxql::error::*}; +use crate::provider::MetaProvider; +use common_util::error::BoxError; +use snafu::ResultExt; + +use super::util; + +pub(crate) struct StmtRewriter<'a, P: MetaProvider> { + sql_planner: &'a crate::planner::PlannerDelegate<'a, P>, +} + +// Partial copy from influxdb_iox. +impl<'a, P: MetaProvider> StmtRewriter<'a, P> { + pub fn new(sql_planner: &'a crate::planner::PlannerDelegate<'a, P>) -> Self { + Self { sql_planner } + } + + pub fn rewrite_from(&self, stmt: &mut SelectStatement) -> Result<()> { + let mut new_from = Vec::new(); + for ms in stmt.from.iter() { + match ms { + MeasurementSelection::Name(qmn) => match qmn { + QualifiedMeasurementName { + name: MeasurementName::Name(name), + .. + } => { + let table = + self.sql_planner.find_table(name).box_err().context(RewriteStmtWithCause)?; + if table.is_some() { + new_from.push(ms.clone()) + } + } + QualifiedMeasurementName { + name: MeasurementName::Regex(_), + .. + } => { + // TODO: need to support get all tables first. + return Unimplemented { + msg: "regex in from clause", + }.fail(); + } + }, + MeasurementSelection::Subquery(_) => { + return Unimplemented { + msg: "subquery in from clause", + }.fail(); + } + } + } + stmt.from = FromMeasurementClause::new(new_from); + + Ok(()) + } + + /// Determine the merged fields and tags of the `FROM` clause. + fn from_field_and_dimensions( + s: &dyn SchemaProvider, + from: &FromMeasurementClause, + ) -> Result<(FieldTypeMap, TagSet)> { + let mut fs = FieldTypeMap::new(); + let mut ts = TagSet::new(); + + for ms in from.deref() { + match ms { + MeasurementSelection::Name(QualifiedMeasurementName { + name: MeasurementName::Name(name), + .. + }) => { + let (field_set, tag_set) = match field_and_dimensions(s, name.as_str())? { + Some(res) => res, + None => continue, + }; + + // Merge field_set with existing + for (name, ft) in &field_set { + match fs.get(name) { + Some(existing_type) => { + if ft < existing_type { + fs.insert(name.to_string(), *ft); + } + } + None => { + fs.insert(name.to_string(), *ft); + } + }; + } + + ts.extend(tag_set); + } + MeasurementSelection::Subquery(select) => { + for f in select.fields.iter() { + let dt = match evaluate_type(s, &f.expr, &select.from)? { + Some(dt) => dt, + None => continue, + }; + + let name = field_name(f); + + match fs.get(name.as_str()) { + Some(existing_type) => { + if dt < *existing_type { + fs.insert(name, dt); + } + } + None => { + fs.insert(name, dt); + } + } + } + + if let Some(group_by) = &select.group_by { + // Merge the dimensions from the subquery + ts.extend(group_by.iter().filter_map(|d| match d { + Dimension::Tag(ident) => Some(ident.to_string()), + _ => None, + })); + } + } + _ => { + // Unreachable, as the from clause should be normalised at this point. + return Err(DataFusionError::Internal( + "Unexpected MeasurementSelection in from".to_string(), + )); + } + } + } + Ok((fs, ts)) + } + + /// Returns a tuple indicating whether the specifies `SELECT` statement + /// has any wildcards or regular expressions in the projection list + /// and `GROUP BY` clause respectively. + fn has_wildcards(stmt: &SelectStatement) -> (bool, bool) { + use influxdb_influxql_parser::visit::{Recursion, Visitable, Visitor}; + + struct HasWildcardsVisitor(bool, bool); + + impl Visitor for HasWildcardsVisitor { + type Error = DataFusionError; + + fn pre_visit_expr(self, n: &Expr) -> Result> { + Ok( + if matches!(n, Expr::Wildcard(_) | Expr::Literal(Literal::Regex(_))) { + Recursion::Stop(Self(true, self.1)) + } else { + Recursion::Continue(self) + }, + ) + } + + fn pre_visit_select_from_clause( + self, + _n: &FromMeasurementClause, + ) -> Result> { + // Don't traverse FROM and potential subqueries + Ok(Recursion::Stop(self)) + } + + fn pre_visit_select_dimension(self, n: &Dimension) -> Result> { + Ok(if matches!(n, Dimension::Wildcard | Dimension::Regex(_)) { + Recursion::Stop(Self(self.0, true)) + } else { + Recursion::Continue(self) + }) + } + } + + let res = Visitable::accept(stmt, HasWildcardsVisitor(false, false)).unwrap(); + (res.0, res.1) + } + + /// Rewrite the projection list and GROUP BY of the specified `SELECT` statement. + /// + /// Wildcards and regular expressions in the `SELECT` projection list and `GROUP BY` are expanded. + /// Any fields with no type specifier are rewritten with the appropriate type, if they exist in the + /// underlying schema. + /// + /// Derived from [Go implementation](https://github.com/influxdata/influxql/blob/1ba470371ec093d57a726b143fe6ccbacf1b452b/ast.go#L1185). + fn rewrite_field_list(s: &dyn SchemaProvider, stmt: &mut SelectStatement) -> Result<()> { + // Iterate through the `FROM` clause and rewrite any subqueries first. + for ms in stmt.from.iter_mut() { + if let MeasurementSelection::Subquery(subquery) = ms { + rewrite_field_list(s, subquery)?; + } + } + + // Attempt to rewrite all variable references in the fields with their types, if one + // hasn't been specified. + if let ControlFlow::Break(e) = stmt.fields.iter_mut().try_for_each(|f| { + walk_expr_mut::(&mut f.expr, &mut |e| { + if matches!(e, Expr::VarRef { .. }) { + let new_type = match evaluate_type(s, e.borrow(), &stmt.from) { + Err(e) => ControlFlow::Break(e)?, + Ok(v) => v, + }; + + if let Expr::VarRef { data_type, .. } = e { + *data_type = new_type; + } + } + ControlFlow::Continue(()) + }) + }) { + return Err(e); + } + + let (has_field_wildcard, has_group_by_wildcard) = has_wildcards(stmt); + if (has_field_wildcard, has_group_by_wildcard) == (false, false) { + return Ok(()); + } + + let (field_set, mut tag_set) = from_field_and_dimensions(s, &stmt.from)?; + + if !has_group_by_wildcard { + if let Some(group_by) = &stmt.group_by { + // Remove any explicitly listed tags in the GROUP BY clause, so they are not expanded + // in the wildcard specified in the SELECT projection list + group_by.iter().for_each(|dim| { + if let Dimension::Tag(ident) = dim { + tag_set.remove(ident.as_str()); + } + }); + } + } + + #[derive(PartialEq, PartialOrd, Eq, Ord)] + struct VarRef { + name: String, + data_type: VarRefDataType, + } + + let fields = if !field_set.is_empty() { + let fields_iter = field_set.iter().map(|(k, v)| VarRef { + name: k.clone(), + data_type: *v, + }); + + if !has_group_by_wildcard { + fields_iter + .chain(tag_set.iter().map(|tag| VarRef { + name: tag.clone(), + data_type: VarRefDataType::Tag, + })) + .sorted() + .collect::>() + } else { + fields_iter.sorted().collect::>() + } + } else { + vec![] + }; + + // + if has_field_wildcard { + let mut new_fields = Vec::new(); + + for f in stmt.fields.iter() { + let add_field = |f: &VarRef| { + new_fields.push(Field { + expr: Expr::VarRef { + name: f.name.clone().into(), + data_type: Some(f.data_type), + }, + alias: None, + }) + }; + + match &f.expr { + Expr::Wildcard(wct) => { + let filter: fn(&&VarRef) -> bool = match wct { + None => |_| true, + Some(WildcardType::Tag) => |v| v.data_type.is_tag_type(), + Some(WildcardType::Field) => |v| v.data_type.is_field_type(), + }; + + fields.iter().filter(filter).for_each(add_field); + } + + Expr::Literal(Literal::Regex(re)) => { + let re = util::parse_regex(re)?; + fields + .iter() + .filter(|v| re.is_match(v.name.as_str())) + .for_each(add_field); + } + + Expr::Call { name, args } => { + let mut name = name; + let mut args = args; + + // Search for the call with a wildcard by continuously descending until + // we no longer have a call. + while let Some(Expr::Call { + name: inner_name, + args: inner_args, + }) = args.first() + { + name = inner_name; + args = inner_args; + } + + // Add additional types for certain functions. + match name.to_lowercase().as_str() { + "count" | "first" | "last" | "distinct" | "elapsed" | "mode" | "sample" => { + supported_types + .extend([VarRefDataType::String, VarRefDataType::Boolean]); + } + "min" | "max" => { + supported_types.insert(VarRefDataType::Boolean); + } + "holt_winters" | "holt_winters_with_fit" => { + supported_types.remove(&VarRefDataType::Unsigned); + } + _ => {} + } + + let add_field = |v: &VarRef| { + let mut args = args.clone(); + args[0] = Expr::VarRef { + name: v.name.clone().into(), + data_type: Some(v.data_type), + }; + new_fields.push(Field { + expr: Expr::Call { + name: name.clone(), + args, + }, + alias: Some(format!("{}_{}", field_name(f), v.name).into()), + }) + }; + + match args.first() { + Some(Expr::Wildcard(Some(WildcardType::Tag))) => { + return Err(DataFusionError::External( + format!("unable to use tag as wildcard in {name}()").into(), + )); + } + Some(Expr::Wildcard(_)) => { + fields + .iter() + .filter(|v| supported_types.contains(&v.data_type)) + .for_each(add_field); + } + Some(Expr::Literal(Literal::Regex(re))) => { + let re = util::parse_regex(re)?; + fields + .iter() + .filter(|v| { + supported_types.contains(&v.data_type) + && re.is_match(v.name.as_str()) + }) + .for_each(add_field); + } + _ => { + new_fields.push(f.clone()); + continue; + } + } + } + + Expr::Binary { .. } => { + let has_wildcard = walk_expr(&f.expr, &mut |e| { + match e { + Expr::Wildcard(_) | Expr::Literal(Literal::Regex(_)) => { + return ControlFlow::Break(()) + } + _ => {} + } + ControlFlow::Continue(()) + }) + .is_break(); + + if has_wildcard { + return Err(DataFusionError::External( + "unsupported expression: contains a wildcard or regular expression" + .into(), + )); + } + + new_fields.push(f.clone()); + } + + _ => new_fields.push(f.clone()), + } + } + + stmt.fields = FieldList::new(new_fields); + } + + // group by 展开 + if has_group_by_wildcard { + let group_by_tags = if has_group_by_wildcard { + tag_set.into_iter().sorted().collect::>() + } else { + vec![] + }; + + if let Some(group_by) = &stmt.group_by { + let mut new_dimensions = Vec::new(); + + for dim in group_by.iter() { + let add_dim = |dim: &String| { + new_dimensions.push(Dimension::Tag(Identifier::new(dim.clone()))) + }; + + match dim { + Dimension::Wildcard => { + group_by_tags.iter().for_each(add_dim); + } + Dimension::Regex(re) => { + let re = util::parse_regex(re)?; + + group_by_tags + .iter() + .filter(|dim| re.is_match(dim.as_str())) + .for_each(add_dim); + } + _ => new_dimensions.push(dim.clone()), + } + } + stmt.group_by = Some(GroupByClause::new(new_dimensions)); + } + } + + Ok(()) + } +} diff --git a/sql/src/influxql/util.rs b/sql/src/influxql/util.rs new file mode 100644 index 0000000000..a232c96047 --- /dev/null +++ b/sql/src/influxql/util.rs @@ -0,0 +1,82 @@ +use influxdb_influxql_parser::string::Regex; + + +// copy from influxdb_iox. +/// Removes all `/` patterns that the rust regex library would reject +/// and rewrites them to their unescaped form. +/// +/// For example, `\:` is rewritten to `:` as `\:` is not a valid +/// escape sequence in the `regexp` crate but is valid in golang's +/// regexp implementation. +/// +/// This is done for compatibility purposes so that the regular +/// expression matching in Rust more closely follows the matching in +/// golang, used by the influx storage rpc. +/// +/// See for more details +pub fn clean_non_meta_escapes(pattern: &str) -> String { + if pattern.is_empty() { + return pattern.to_string(); + } + + #[derive(Debug, Copy, Clone)] + enum SlashState { + No, + Single, + Double, + } + + let mut next_state = SlashState::No; + + let next_chars = pattern + .chars() + .map(Some) + .skip(1) + .chain(std::iter::once(None)); + + // emit char based on previous + let new_pattern: String = pattern + .chars() + .zip(next_chars) + .filter_map(|(c, next_char)| { + let cur_state = next_state; + next_state = match (c, cur_state) { + ('\\', SlashState::No) => SlashState::Single, + ('\\', SlashState::Single) => SlashState::Double, + ('\\', SlashState::Double) => SlashState::Single, + _ => SlashState::No, + }; + + // Decide to emit `c` or not + match (cur_state, c, next_char) { + (SlashState::No, '\\', Some(next_char)) + | (SlashState::Double, '\\', Some(next_char)) + if !is_valid_character_after_escape(next_char) => + { + None + } + _ => Some(c), + } + }) + .collect(); + + new_pattern +} + +fn is_valid_character_after_escape(c: char) -> bool { + // same list as https://docs.rs/regex-syntax/0.6.25/src/regex_syntax/ast/parse.rs.html#1445-1538 + match c { + '0'..='7' => true, + '8'..='9' => true, + 'x' | 'u' | 'U' => true, + 'p' | 'P' => true, + 'd' | 's' | 'w' | 'D' | 'S' | 'W' => true, + _ => regex_syntax::is_meta_character(c), + } +} + +/// Sanitize an InfluxQL regular expression and create a compiled [`regex::Regex`]. +pub fn parse_regex(re: &Regex) -> std::result::Result { + let pattern = clean_non_meta_escapes(re.as_str()); + regex::Regex::new(&pattern) +} diff --git a/sql/src/lib.rs b/sql/src/lib.rs index 1b811fab51..e6a8707fcb 100644 --- a/sql/src/lib.rs +++ b/sql/src/lib.rs @@ -10,6 +10,7 @@ extern crate common_util; pub mod ast; pub mod container; pub mod frontend; +pub mod influxql; pub mod parser; pub(crate) mod partition; pub mod plan; diff --git a/sql/src/planner.rs b/sql/src/planner.rs index 6af112fb15..911546bf50 100644 --- a/sql/src/planner.rs +++ b/sql/src/planner.rs @@ -34,6 +34,7 @@ use datafusion::{ ResolvedTableReference, }, }; +use influxdb_influxql_parser::statement::Statement as InfluxqlStatement; use log::{debug, trace}; use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu}; use sqlparser::ast::{ @@ -57,7 +58,6 @@ use crate::{ promql::{ColumnNames, Expr as PromExpr}, provider::{ContextProviderAdapter, MetaProvider}, }; - // We do not carry backtrace in sql error because it is mainly used in server // handler and the error is usually caused by invalid/unsupported sql, which // should be easy to find out the reason. @@ -247,6 +247,11 @@ pub enum Error { #[snafu(display("Unsupported partition method, msg:{}", msg,))] UnsupportedPartition { msg: String }, + + #[snafu(display("Failed to build influxql plan, err:{}", source))] + BuildInfluxqlPlan { + source: crate::influxql::error::Error, + }, } define_result!(Error); @@ -318,20 +323,30 @@ impl<'a, P: MetaProvider> Planner<'a, P> { expr.to_plan(planner.meta_provider, self.read_parallelism) .context(BuildPromPlanError) } + + pub fn influxql_stmt_to_plan(&self, statement: InfluxqlStatement) -> Result { + let adapter = ContextProviderAdapter::new(self.provider, self.read_parallelism); + let planner = PlannerDelegate::new(adapter); + + let influxql_planner = crate::influxql::planner::Planner::new(planner); + influxql_planner + .statement_to_plan(statement) + .context(BuildInfluxqlPlan) + } } /// A planner wraps the datafusion's logical planner, and delegate sql like /// select/explain to datafusion's planner. -struct PlannerDelegate<'a, P: MetaProvider> { +pub(crate) struct PlannerDelegate<'a, P: MetaProvider> { meta_provider: ContextProviderAdapter<'a, P>, } impl<'a, P: MetaProvider> PlannerDelegate<'a, P> { - fn new(meta_provider: ContextProviderAdapter<'a, P>) -> Self { + pub(crate) fn new(meta_provider: ContextProviderAdapter<'a, P>) -> Self { Self { meta_provider } } - fn sql_statement_to_plan(self, sql_stmt: SqlStatement) -> Result { + pub(crate) fn sql_statement_to_plan(self, sql_stmt: SqlStatement) -> Result { match sql_stmt { // Query statement use datafusion planner SqlStatement::Explain { .. } | SqlStatement::Query(_) => { @@ -774,7 +789,7 @@ impl<'a, P: MetaProvider> PlannerDelegate<'a, P> { Ok(Plan::Show(ShowPlan::ShowDatabase)) } - fn find_table(&self, table_name: &str) -> Result> { + pub(crate) fn find_table(&self, table_name: &str) -> Result> { let table_ref = get_table_ref(table_name); self.meta_provider .table(table_ref)