diff --git a/Cargo.lock b/Cargo.lock index 5b43da76ff..85fd1cc92d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6026,6 +6026,7 @@ dependencies = [ "hashbrown 0.12.3", "influxdb_influxql_parser", "itertools", + "lazy_static", "log", "paste 1.0.8", "regex", diff --git a/server/src/handlers/error.rs b/server/src/handlers/error.rs index 4a3d41a5ec..1b0362623e 100644 --- a/server/src/handlers/error.rs +++ b/server/src/handlers/error.rs @@ -14,6 +14,9 @@ pub enum Error { #[snafu(display("Failed to parse sql, err:{}", source))] ParseSql { source: sql::frontend::Error }, + #[snafu(display("Failed to parse influxql, err:{}", source))] + ParseInfluxql { source: sql::frontend::Error }, + #[snafu(display("Failed to create plan, query:{}, err:{}", query, source))] CreatePlan { query: String, diff --git a/server/src/handlers/mod.rs b/server/src/handlers/mod.rs index f447f92f2a..229f00c2c0 100644 --- a/server/src/handlers/mod.rs +++ b/server/src/handlers/mod.rs @@ -5,7 +5,7 @@ pub mod admin; pub mod error; pub mod prom; -pub mod sql; +pub mod query; mod prelude { pub use catalog::manager::Manager as CatalogManager; diff --git a/server/src/handlers/prom.rs b/server/src/handlers/prom.rs index b6e859abff..f3ac08c635 100644 --- a/server/src/handlers/prom.rs +++ b/server/src/handlers/prom.rs @@ -25,6 +25,7 @@ use query_engine::executor::{Executor as QueryExecutor, RecordBatchVec}; use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu}; use warp::reject; +use super::query::QueryRequest; use crate::{ context::RequestContext, handlers, instance::InstanceRef, schema_config_provider::SchemaConfigProviderRef, @@ -291,7 +292,8 @@ impl RemoteStorage for CeresDBStorage { TIMESTAMP_COLUMN ); - let result = handlers::sql::handle_sql(ctx, self.instance.clone(), sql.into()) + let request = QueryRequest::Sql(sql.into()); + let result = handlers::query::handle_query(ctx, self.instance.clone(), request) .await .map_err(Box::new) .context(SqlHandle)?; diff --git a/server/src/handlers/sql.rs b/server/src/handlers/query.rs similarity index 67% rename from server/src/handlers/sql.rs rename to server/src/handlers/query.rs index 57e740996e..cfdb7a644f 100644 --- a/server/src/handlers/sql.rs +++ b/server/src/handlers/query.rs @@ -24,7 +24,9 @@ use sql::{ }; use crate::handlers::{ - error::{CreatePlan, InterpreterExec, ParseSql, QueryBlock, QueryTimeout, TooMuchStmt}, + error::{ + CreatePlan, InterpreterExec, ParseInfluxql, ParseSql, QueryBlock, QueryTimeout, TooMuchStmt, + }, prelude::*, }; @@ -104,18 +106,33 @@ impl From for Request { } } -pub async fn handle_sql( +#[derive(Debug)] +pub enum QueryRequest { + Sql(Request), + // TODO: influxql include more parameters, we should add it in later. + Influxql(Request), +} +impl QueryRequest { + fn query(&self) -> &str { + match self { + QueryRequest::Sql(request) => request.query.as_str(), + QueryRequest::Influxql(request) => request.query.as_str(), + } + } +} + +pub async fn handle_query( ctx: &RequestContext, instance: InstanceRef, - request: Request, + query_request: QueryRequest, ) -> Result { let request_id = RequestId::next_id(); let begin_instant = Instant::now(); let deadline = ctx.timeout.map(|t| begin_instant + t); info!( - "sql handler try to process request, request_id:{}, request:{:?}", - request_id, request + "Query handler try to process request, request_id:{}, request:{:?}", + request_id, query_request ); // TODO(yingwen): Privilege check, cannot access data of other tenant @@ -127,38 +144,66 @@ pub async fn handle_sql( function_registry: &*instance.function_registry, }; let frontend = Frontend::new(provider); - let mut sql_ctx = SqlContext::new(request_id, deadline); - // Parse sql, frontend error of invalid sql already contains sql - // TODO(yingwen): Maybe move sql from frontend error to outer error - let mut stmts = frontend - .parse_sql(&mut sql_ctx, &request.query) - .context(ParseSql)?; - - if stmts.is_empty() { - return Ok(Output::AffectedRows(0)); - } - // TODO(yingwen): For simplicity, we only support executing one statement now - // TODO(yingwen): INSERT/UPDATE/DELETE can be batched - ensure!( - stmts.len() == 1, - TooMuchStmt { - len: stmts.len(), - query: request.query, + let plan = match &query_request { + QueryRequest::Sql(request) => { + // Parse sql, frontend error of invalid sql already contains sql + // TODO(yingwen): Maybe move sql from frontend error to outer error + let mut stmts = frontend + .parse_sql(&mut sql_ctx, &request.query) + .context(ParseSql)?; + + if stmts.is_empty() { + return Ok(Output::AffectedRows(0)); + } + + // TODO(yingwen): For simplicity, we only support executing one statement now + // TODO(yingwen): INSERT/UPDATE/DELETE can be batched + ensure!( + stmts.len() == 1, + TooMuchStmt { + len: stmts.len(), + query: &request.query, + } + ); + + // Create logical plan + // Note: Remember to store sql in error when creating logical plan + frontend + .statement_to_plan(&mut sql_ctx, stmts.remove(0)) + .context(CreatePlan { + query: &request.query, + })? } - ); - // Create logical plan - // Note: Remember to store sql in error when creating logical plan - let plan = frontend - .statement_to_plan(&mut sql_ctx, stmts.remove(0)) - .context(CreatePlan { - query: &request.query, - })?; + QueryRequest::Influxql(request) => { + let mut stmts = frontend + .parse_influxql(&mut sql_ctx, &request.query) + .context(ParseInfluxql)?; + + if stmts.is_empty() { + return Ok(Output::AffectedRows(0)); + } + + ensure!( + stmts.len() == 1, + TooMuchStmt { + len: stmts.len(), + query: &request.query, + } + ); + + frontend + .influxql_stmt_to_plan(&mut sql_ctx, stmts.remove(0)) + .context(CreatePlan { + query: &request.query, + })? + } + }; instance.limiter.try_limit(&plan).context(QueryBlock { - query: &request.query, + query: query_request.query(), })?; // Execute in interpreter @@ -177,7 +222,7 @@ pub async fn handle_sql( interpreter_factory .create(interpreter_ctx, plan) .context(InterpreterExec { - query: &request.query, + query: query_request.query(), })?; let output = if let Some(deadline) = deadline { @@ -187,24 +232,24 @@ pub async fn handle_sql( ) .await .context(QueryTimeout { - query: &request.query, + query: query_request.query(), }) .and_then(|v| { v.context(InterpreterExec { - query: &request.query, + query: query_request.query(), }) })? } else { interpreter.execute().await.context(InterpreterExec { - query: &request.query, + query: query_request.query(), })? }; info!( - "sql handler finished, request_id:{}, cost:{}ms, request:{:?}", + "Query handler finished, request_id:{}, cost:{}ms, request:{:?}", request_id, begin_instant.saturating_elapsed().as_millis(), - request + query_request ); Ok(output) diff --git a/server/src/http.rs b/server/src/http.rs index b11594303c..c5201fd25b 100644 --- a/server/src/http.rs +++ b/server/src/http.rs @@ -8,6 +8,7 @@ use std::{ }; use common_util::error::BoxError; +use handlers::query::QueryRequest; use log::{error, info}; use logger::RuntimeLevel; use profile::Profiler; @@ -30,7 +31,7 @@ use crate::{ consts, context::RequestContext, error_util, - handlers::{self, prom::CeresDBStorage, sql::Request}, + handlers::{self, prom::CeresDBStorage, query::Request}, instance::InstanceRef, metrics, schema_config_provider::SchemaConfigProviderRef, @@ -126,6 +127,7 @@ impl Service { self.home() .or(self.metrics()) .or(self.sql()) + .or(self.influxql()) .or(self.heap_profile()) .or(self.admin_block()) .or(self.flush_memtable()) @@ -181,9 +183,47 @@ impl Service { .and(self.with_context()) .and(self.with_instance()) .and_then(|req, ctx, instance| async move { - let result = handlers::sql::handle_sql(&ctx, instance, req) + let req = QueryRequest::Sql(req); + let result = handlers::query::handle_query(&ctx, instance, req) .await - .map(handlers::sql::convert_output) + .map(handlers::query::convert_output) + .map_err(|e| { + // TODO(yingwen): Maybe truncate and print the sql + error!("Http service Failed to handle sql, err:{}", e); + Box::new(e) + }) + .context(HandleRequest); + match result { + Ok(res) => Ok(reply::json(&res)), + Err(e) => Err(reject::custom(e)), + } + }) + } + + // POST /influxql + // this request type is not what influxdb API expected, the one in influxdb: + // https://docs.influxdata.com/influxdb/v1.8/tools/api/#query-http-endpoint + fn influxql( + &self, + ) -> impl Filter + Clone { + // accept json or plain text + let extract_request = warp::body::json() + .or(warp::body::bytes().map(Request::from)) + .unify(); + + warp::path!("influxql") + .and(warp::post()) + .and(warp::body::content_length_limit(self.config.max_body_size)) + .and(extract_request) + .and(self.with_context()) + .and(self.with_instance()) + .and_then(|req, ctx, instance| async move { + let req = QueryRequest::Influxql(req); + let result = handlers::query::handle_query(&ctx, instance, req) + .await + // TODO: the sql's `convert_output` function may be not suitable to influxql. + // We should implement influxql's related function in later. + .map(handlers::query::convert_output) .map_err(|e| { // TODO(yingwen): Maybe truncate and print the sql error!("Http service Failed to handle sql, err:{}", e); diff --git a/server/src/mysql/worker.rs b/server/src/mysql/worker.rs index f6fd2d720c..82f7c81eae 100644 --- a/server/src/mysql/worker.rs +++ b/server/src/mysql/worker.rs @@ -11,7 +11,10 @@ use table_engine::engine::EngineRuntimes; use crate::{ context::RequestContext, - handlers::{self, sql::Request}, + handlers::{ + self, + query::{QueryRequest, Request}, + }, instance::Instance, mysql::{ error::{CreateContext, HandleSql, Result}, @@ -109,9 +112,9 @@ where { async fn do_query<'a>(&'a mut self, sql: &'a str) -> Result { let ctx = self.create_ctx()?; - let req = Request::from(sql.to_string()); - handlers::sql::handle_sql(&ctx, self.instance.clone(), req) + let req = QueryRequest::Sql(req); + handlers::query::handle_query(&ctx, self.instance.clone(), req) .await .map_err(|e| { error!("Mysql service Failed to handle sql, err: {}", e); diff --git a/sql/Cargo.toml b/sql/Cargo.toml index 993e74381b..126309dfa5 100644 --- a/sql/Cargo.toml +++ b/sql/Cargo.toml @@ -29,6 +29,7 @@ df_operator = { workspace = true } hashbrown = { version = "0.12", features = ["raw"] } influxdb_influxql_parser = { git = "https://github.com/Rachelint/influxdb_iox.git", branch = "influxql-parser" } itertools = { workspace = true } +lazy_static = { workspace = true } log = { workspace = true } paste = { workspace = true } regex = "1" diff --git a/sql/src/influxql/mod.rs b/sql/src/influxql/mod.rs index fbe9dc51cf..033a5dd5e5 100644 --- a/sql/src/influxql/mod.rs +++ b/sql/src/influxql/mod.rs @@ -3,8 +3,9 @@ //! Influxql processing pub mod planner; -pub(crate) mod stmt_rewriter; +pub(crate) mod select; pub(crate) mod util; + pub mod error { use common_util::error::GenericError; use snafu::{Backtrace, Snafu}; @@ -32,7 +33,13 @@ pub mod error { backtrace ))] RewriteNoCause { msg: String, backtrace: Backtrace }, - } + #[snafu(display( + "Failed to convert to sql statement, msg:{}.\nBacktrace:{}", + msg, + backtrace + ))] + Convert { msg: String, backtrace: Backtrace }, + } define_result!(Error); } diff --git a/sql/src/influxql/planner.rs b/sql/src/influxql/planner.rs index 9913ed78c6..00b09736fb 100644 --- a/sql/src/influxql/planner.rs +++ b/sql/src/influxql/planner.rs @@ -1,13 +1,21 @@ // Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. -//! Influxql planner. +//! Influxql planner -use common_util::error::BoxError; -use influxdb_influxql_parser::statement::Statement as InfluxqlStatement; +use common_util::error::{BoxError, GenericResult}; +use influxdb_influxql_parser::{ + select::SelectStatement, statement::Statement as InfluxqlStatement, +}; use snafu::ResultExt; +use sqlparser::ast::Statement as SqlStatement; use table_engine::table::TableRef; -use crate::{influxql::error::*, plan::Plan, provider::MetaProvider}; +use crate::{ + influxql::select::{converter::Converter, rewriter::Rewriter}, + plan::Plan, + planner::{BuildInfluxqlPlan, Result}, + provider::MetaProvider, +}; #[allow(dead_code)] pub(crate) struct Planner<'a, P: MetaProvider> { @@ -19,9 +27,14 @@ impl<'a, P: MetaProvider> Planner<'a, P> { Self { sql_planner } } - pub fn statement_to_plan(&self, stmt: InfluxqlStatement) -> Result { + /// Build sql logical plan from [InfluxqlStatement]. + /// + /// NOTICE: when building plan from influxql select statement, + /// the [InfluxqlStatement] will be converted to [SqlStatement] first, + /// and build plan then. + pub fn statement_to_plan(self, stmt: InfluxqlStatement) -> Result { match stmt { - InfluxqlStatement::Select(_) => todo!(), + InfluxqlStatement::Select(stmt) => self.select_to_plan(*stmt), InfluxqlStatement::CreateDatabase(_) => todo!(), InfluxqlStatement::ShowDatabases(_) => todo!(), InfluxqlStatement::ShowRetentionPolicies(_) => todo!(), @@ -34,23 +47,37 @@ impl<'a, P: MetaProvider> Planner<'a, P> { InfluxqlStatement::Explain(_) => todo!(), } } + + pub fn select_to_plan(self, stmt: SelectStatement) -> Result { + let mut stmt = stmt; + let provider_impl = MeasurementProviderImpl(&self.sql_planner); + let rewriter = Rewriter::new(&provider_impl); + rewriter + .rewrite(&mut stmt) + .box_err() + .context(BuildInfluxqlPlan)?; + + let sql_stmt = SqlStatement::Query(Box::new( + Converter::convert(stmt) + .box_err() + .context(BuildInfluxqlPlan)?, + )); + + self.sql_planner + .sql_statement_to_plan(sql_stmt) + .box_err() + .context(BuildInfluxqlPlan) + } } pub trait MeasurementProvider { - fn measurement(&self, measurement_name: &str) -> Result>; + fn measurement(&self, measurement_name: &str) -> GenericResult>; } -pub(crate) struct MeasurementProviderImpl<'a, P: MetaProvider>( - crate::planner::PlannerDelegate<'a, P>, -); +struct MeasurementProviderImpl<'a, P: MetaProvider>(&'a crate::planner::PlannerDelegate<'a, P>); impl<'a, P: MetaProvider> MeasurementProvider for MeasurementProviderImpl<'a, P> { - fn measurement(&self, measurement_name: &str) -> Result> { - self.0 - .find_table(measurement_name) - .box_err() - .context(RewriteWithCause { - msg: format!("failed to find measurement, measurement:{measurement_name}"), - }) + fn measurement(&self, measurement_name: &str) -> GenericResult> { + self.0.find_table(measurement_name).box_err() } } diff --git a/sql/src/influxql/select/converter.rs b/sql/src/influxql/select/converter.rs new file mode 100644 index 0000000000..c8ee6a7920 --- /dev/null +++ b/sql/src/influxql/select/converter.rs @@ -0,0 +1,386 @@ +// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. + +//! Influxql select statement converter(to sql statement) + +use influxdb_influxql_parser::{ + common::{MeasurementName, QualifiedMeasurementName}, + expression::{ + BinaryOperator as InfluxqlBinaryOperator, ConditionalExpression, ConditionalOperator, + Expr as InfluxqlExpr, + }, + literal::Literal, + select::{Dimension, MeasurementSelection, SelectStatement}, +}; +use snafu::ensure; +use sqlparser::ast::{ + BinaryOperator, Expr, Function, FunctionArg, FunctionArgExpr, Ident, ObjectName, Offset, + OffsetRows, Query, Select, SelectItem, SetExpr, TableFactor, TableWithJoins, Value, +}; + +use crate::influxql::{error::*, util}; + +/// Influxql select statement converter +// Derived from influxdb_iox: +// https://github.com/influxdata/influxdb_iox/blob/ff11fe465d02faf6c4dd3017df8750b38d4afd2b/iox_query/src/plan/influxql/planner.rs +#[allow(dead_code)] +pub struct Converter; + +impl Converter { + #[allow(dead_code)] + pub fn convert(stmt: SelectStatement) -> Result { + // Fields in `Query` needed to be converted. + // - limit + // - order by + // - limit + // - offset + // - select body + let limit = stmt.limit.map(|limit| { + let limit_n = *limit; + Expr::Value(Value::Number(limit_n.to_string(), false)) + }); + + let offset = stmt.offset.map(|offset| { + let offset_n = *offset; + let offset_val = Expr::Value(Value::Number(offset_n.to_string(), false)); + + Offset { + value: offset_val, + rows: OffsetRows::None, + } + }); + + // For select body: + // - projection + // - from + // - selection + // - group_by + let projection_exprs = stmt + .fields + .iter() + .map(|field| expr_to_sql_expr(ExprScope::Projection, &field.expr)) + .collect::>>()?; + let projection = stmt + .fields + .iter() + .zip(projection_exprs.into_iter()) + .map(|(field, expr)| match &field.alias { + Some(alias) => SelectItem::ExprWithAlias { + expr, + alias: Ident::new(alias.to_string()), + }, + None => SelectItem::UnnamedExpr(expr), + }) + .collect(); + + ensure!( + stmt.from.len() == 1, + Unimplemented { + msg: "select from multiple measurements", + } + ); + let measurement_name = match &stmt.from[0] { + MeasurementSelection::Name(QualifiedMeasurementName { name, .. }) => match name { + MeasurementName::Name(name) => name.to_string(), + MeasurementName::Regex(re) => { + return Convert { + msg: format!("convert from to sql statement encounter regex, regex:{re}"), + } + .fail() + } + }, + MeasurementSelection::Subquery(_) => { + return Unimplemented { + msg: "select from subquery", + } + .fail() + } + }; + let table_factor = TableFactor::Table { + name: ObjectName(vec![Ident::with_quote('`', measurement_name)]), + alias: None, + args: None, + with_hints: Vec::default(), + }; + let from = vec![TableWithJoins { + relation: table_factor, + joins: Vec::default(), + }]; + + let selection = match stmt.condition { + Some(condition) => Some(conditional_to_sql_expr(&condition)?), + None => None, + }; + + let group_by = match stmt.group_by { + Some(keys) => keys + .iter() + .map(|key| match key { + Dimension::Time { .. } => Unimplemented { + msg: "group by time interval", + } + .fail(), + Dimension::Tag(tag) => Ok(Expr::Identifier(Ident::new(tag.to_string()))), + Dimension::Regex(re) => Convert { + msg: format!( + "convert group by to sql statement encounter regex, regex:{re}" + ), + } + .fail(), + Dimension::Wildcard => Convert { + msg: "convert group by to sql statement encounter wildcard", + } + .fail(), + }) + .collect::>>()?, + None => Vec::default(), + }; + + let body = Select { + distinct: false, + top: None, + projection, + into: None, + from, + lateral_views: Vec::default(), + selection, + group_by, + cluster_by: Vec::default(), + sort_by: Vec::default(), + having: None, + qualify: None, + distribute_by: Vec::default(), + }; + + Ok(Query { + with: None, + body: Box::new(SetExpr::Select(Box::new(body))), + order_by: Vec::default(), + limit, + offset, + fetch: None, + locks: Vec::default(), + }) + } +} + +#[derive(Debug, Eq, PartialEq, Clone, Copy)] +enum ExprScope { + Projection, + Where, +} + +/// Map an InfluxQL [`InfluxqlExpr`] to a sql [`Expr`]. +fn expr_to_sql_expr(scope: ExprScope, iql: &InfluxqlExpr) -> Result { + match iql { + // rewriter is expected to expand wildcard expressions + InfluxqlExpr::Wildcard(_) => Convert { + msg: "unexpected wildcard in projection", + } + .fail(), + + InfluxqlExpr::VarRef { + name, + data_type: opt_dst_type, + } => { + if let Some(dst_type) = opt_dst_type { + return Unimplemented { + msg: format!("cast to dst type, column:{name}, dst type:{dst_type}"), + } + .fail(); + }; + + Ok(Expr::Identifier(Ident::new(name.to_string()))) + } + + InfluxqlExpr::BindParameter(_) => Unimplemented { + msg: "bind parameter", + } + .fail(), + + InfluxqlExpr::Literal(val) => Ok(match val { + Literal::Integer(v) => Expr::Value(Value::Number(v.to_string(), false)), + Literal::Unsigned(v) => Expr::Value(Value::Number(v.to_string(), false)), + Literal::Float(v) => Expr::Value(Value::Number(v.to_string(), false)), + Literal::String(v) => Expr::Value(Value::SingleQuotedString(v.clone())), + Literal::Timestamp(v) => Expr::Value(Value::SingleQuotedString(v.to_rfc3339())), + Literal::Duration(_) => { + return Unimplemented { + msg: "duration literal", + } + .fail() + } + Literal::Regex(re) => match scope { + // a regular expression in a projection list is unexpected, + // as it should have been expanded by the rewriter. + ExprScope::Projection => { + return Convert { + msg: format!( + "convert projection to sql statement encounter regex, regex:{re}" + ), + } + .fail() + } + ExprScope::Where => { + return Unimplemented { + msg: "regex in where clause", + } + .fail() + } + }, + Literal::Boolean(v) => Expr::Value(Value::Boolean(*v)), + }), + + InfluxqlExpr::Distinct(_) => Unimplemented { msg: "DISTINCT" }.fail(), + + InfluxqlExpr::Call { name, args } => call_to_sql_expr(scope, name, args), + + InfluxqlExpr::Binary { lhs, op, rhs } => binary_expr_to_sql_expr(scope, lhs, op, rhs), + + InfluxqlExpr::Nested(e) => expr_to_sql_expr(scope, e), + } +} + +fn call_to_sql_expr(scope: ExprScope, name: &str, args: &[InfluxqlExpr]) -> Result { + if util::is_scalar_math_function(name) { + let name = ObjectName(vec![Ident::new(name.to_string())]); + // TODO: Support `FunctionArg::Named`. + let args = args + .iter() + .map(|arg| { + let sql_expr_res = expr_to_sql_expr(scope, arg); + sql_expr_res.map(|sql_expr| FunctionArg::Unnamed(FunctionArgExpr::Expr(sql_expr))) + }) + .collect::>>()?; + + return Ok(Expr::Function(Function { + name, + args, + over: None, + distinct: false, + special: false, + })); + } + + match scope { + ExprScope::Projection => Unimplemented { + msg: "aggregate and selector functions in projection list", + } + .fail(), + + ExprScope::Where => { + if name.eq_ignore_ascii_case("now") { + Unimplemented { + msg: "now() in where clause", + } + .fail() + } else { + Convert { + msg: format!("invalid function call in condition: {name}"), + } + .fail() + } + } + } +} + +fn binary_expr_to_sql_expr( + scope: ExprScope, + lhs: &InfluxqlExpr, + op: &InfluxqlBinaryOperator, + rhs: &InfluxqlExpr, +) -> Result { + let left = Box::new(expr_to_sql_expr(scope, lhs)?); + let right = Box::new(expr_to_sql_expr(scope, rhs)?); + + let op = match op { + InfluxqlBinaryOperator::Add => BinaryOperator::Plus, + InfluxqlBinaryOperator::Sub => BinaryOperator::Minus, + InfluxqlBinaryOperator::Mul => BinaryOperator::Multiply, + InfluxqlBinaryOperator::Div => BinaryOperator::Divide, + InfluxqlBinaryOperator::Mod => BinaryOperator::Modulo, + InfluxqlBinaryOperator::BitwiseAnd => BinaryOperator::BitwiseAnd, + InfluxqlBinaryOperator::BitwiseOr => BinaryOperator::BitwiseOr, + InfluxqlBinaryOperator::BitwiseXor => BinaryOperator::BitwiseXor, + }; + + Ok(Expr::BinaryOp { left, op, right }) +} + +/// Map an InfluxQL [`ConditionalExpression`] to a sql [`Expr`]. +fn conditional_to_sql_expr(iql: &ConditionalExpression) -> Result { + match iql { + ConditionalExpression::Expr(expr) => expr_to_sql_expr(ExprScope::Where, expr), + ConditionalExpression::Binary { lhs, op, rhs } => { + let op = conditional_op_to_operator(*op)?; + let (lhs, rhs) = (conditional_to_sql_expr(lhs)?, conditional_to_sql_expr(rhs)?); + + Ok(Expr::BinaryOp { + left: Box::new(lhs), + op, + right: Box::new(rhs), + }) + } + ConditionalExpression::Grouped(e) => conditional_to_sql_expr(e), + } +} + +fn conditional_op_to_operator(op: ConditionalOperator) -> Result { + match op { + ConditionalOperator::Eq => Ok(BinaryOperator::Eq), + ConditionalOperator::NotEq => Ok(BinaryOperator::NotEq), + ConditionalOperator::EqRegex => Unimplemented { + msg: "eq regex in where clause", + } + .fail(), + ConditionalOperator::NotEqRegex => Unimplemented { + msg: "not eq regex in where clause", + } + .fail(), + ConditionalOperator::Lt => Ok(BinaryOperator::Lt), + ConditionalOperator::LtEq => Ok(BinaryOperator::LtEq), + ConditionalOperator::Gt => Ok(BinaryOperator::Gt), + ConditionalOperator::GtEq => Ok(BinaryOperator::GtEq), + ConditionalOperator::And => Ok(BinaryOperator::And), + ConditionalOperator::Or => Ok(BinaryOperator::Or), + // NOTE: This is not supported by InfluxQL SELECT expressions, so it is unexpected + ConditionalOperator::In => Convert { + msg: "unexpected binary operator: IN", + } + .fail(), + } +} + +#[cfg(test)] +mod test { + use sqlparser::ast::Statement as SqlStatement; + + use crate::{ + ast::Statement, influxql::select::converter::Converter, parser::Parser, tests::parse_select, + }; + + #[test] + fn test_basic_convert() { + // Common parts between influxql and sql, include: + // - limit + // - offset + // - projection + // - from(single table) + // - selection + // - group_by + let stmt = parse_select( + "SELECT a, sin(b), c + FROM influxql_test WHERE a < 4 and b > 4.5 GROUP BY c LIMIT 1 OFFSET 0", + ); + let converted_sql_stmt = Statement::Standard(Box::new(SqlStatement::Query(Box::new( + Converter::convert(stmt).unwrap(), + )))); + + let sql_stmts = Parser::parse_sql( + "SELECT a, sin(b), c + FROM influxql_test WHERE a < 4 and b > 4.5 GROUP BY c LIMIT 1 OFFSET 0", + ) + .unwrap(); + let expected_sql_stmt = sql_stmts.first().unwrap(); + assert_eq!(expected_sql_stmt, &converted_sql_stmt); + } +} diff --git a/sql/src/influxql/select/mod.rs b/sql/src/influxql/select/mod.rs new file mode 100644 index 0000000000..0cf2aefd0e --- /dev/null +++ b/sql/src/influxql/select/mod.rs @@ -0,0 +1,6 @@ +// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. + +//! Convert influxql to sql at statement level + +pub(crate) mod converter; +pub(crate) mod rewriter; diff --git a/sql/src/influxql/stmt_rewriter.rs b/sql/src/influxql/select/rewriter.rs similarity index 84% rename from sql/src/influxql/stmt_rewriter.rs rename to sql/src/influxql/select/rewriter.rs index 9c8251f278..e44d73b6b9 100644 --- a/sql/src/influxql/stmt_rewriter.rs +++ b/sql/src/influxql/select/rewriter.rs @@ -1,6 +1,6 @@ // Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. -//! Influxql statement rewriter +//! Influxql select statement rewriter use std::{collections::BTreeSet, ops::ControlFlow}; @@ -17,26 +17,24 @@ use influxdb_influxql_parser::{ use itertools::{Either, Itertools}; use snafu::{ensure, OptionExt, ResultExt}; -use super::{planner::MeasurementProvider, util}; -use crate::influxql::error::*; +use crate::influxql::{error::*, planner::MeasurementProvider, util}; /// Rewriter for the influxql statement /// /// It will rewrite statement before converting it to sql statement. -// Partial copy from influxdb_iox. -pub(crate) struct StmtRewriter<'a> { +// Derived from influxdb_iox: +// https://github.com/influxdata/influxdb_iox/blob/ff11fe465d02faf6c4dd3017df8750b38d4afd2b/iox_query/src/plan/influxql/rewriter.rs +pub(crate) struct Rewriter<'a> { measurement_provider: &'a dyn MeasurementProvider, } -impl<'a> StmtRewriter<'a> { - #[allow(dead_code)] +impl<'a> Rewriter<'a> { pub fn new(measurement_provider: &'a dyn MeasurementProvider) -> Self { Self { measurement_provider, } } - #[allow(dead_code)] pub fn rewrite(&self, stmt: &mut SelectStatement) -> Result<()> { self.rewrite_from(stmt)?; self.rewrite_field_list(stmt) @@ -46,29 +44,39 @@ impl<'a> StmtRewriter<'a> { let mut new_from = Vec::new(); for ms in stmt.from.iter() { match ms { - MeasurementSelection::Name(qmn) => match qmn { - QualifiedMeasurementName { - name: MeasurementName::Name(name), - .. - } => { - let _ = self.measurement_provider.measurement(name)?.context( - RewriteNoCause { - msg: format!("measurement not found, measurement:{name}"), - }, - )?; - new_from.push(ms.clone()); - } - QualifiedMeasurementName { - name: MeasurementName::Regex(_), - .. - } => { - // TODO: need to support get all tables first. - return Unimplemented { - msg: "rewrite from regex", + MeasurementSelection::Name(qmn) => { + match qmn { + QualifiedMeasurementName { + name: MeasurementName::Name(name), + .. + } => { + let _ = self + .measurement_provider + .measurement(name) + .context(RewriteWithCause { + msg: format!( + "rewrite from failed to find measurement, measurement:{name}", + ), + })? + .context(RewriteNoCause { + msg: format!( + "rewrite from found measurement not found, measurement:{name}" + ), + })?; + new_from.push(ms.clone()); + } + QualifiedMeasurementName { + name: MeasurementName::Regex(_), + .. + } => { + // TODO: need to support get all tables first. + return Unimplemented { + msg: "rewrite from regex", + } + .fail(); } - .fail(); } - }, + } MeasurementSelection::Subquery(_) => { return Unimplemented { msg: "rewrite from subquery", @@ -143,7 +151,6 @@ impl<'a> StmtRewriter<'a> { let measurement = self .measurement_provider .measurement(measurement_name) - .box_err() .context(RewriteWithCause { msg: format!("failed to find measurement, measurement:{measurement_name}"), })? @@ -382,27 +389,7 @@ fn maybe_rewrite_projection( #[cfg(test)] mod test { - use datafusion::sql::TableReference; - use influxdb_influxql_parser::{ - parse_statements, select::SelectStatement, statement::Statement, - }; - - use super::StmtRewriter; - use crate::{ - influxql::planner::MeasurementProvider, provider::MetaProvider, tests::MockMetaProvider, - }; - - impl MeasurementProvider for MockMetaProvider { - fn measurement( - &self, - measurement_name: &str, - ) -> crate::influxql::error::Result> { - let table_ref = TableReference::Bare { - table: std::borrow::Cow::Borrowed(measurement_name), - }; - Ok(self.table(table_ref).unwrap()) - } - } + use crate::tests::{parse_select, rewrite_statement, MockMetaProvider}; #[test] fn test_wildcard_and_regex_in_projection() { @@ -442,18 +429,4 @@ mod test { stmt.to_string() ); } - - pub fn rewrite_statement(provider: &dyn MeasurementProvider, stmt: &mut SelectStatement) { - let rewriter = StmtRewriter::new(provider); - rewriter.rewrite(stmt).unwrap(); - } - - /// Returns the InfluxQL [`SelectStatement`] for the specified SQL, `s`. - pub fn parse_select(s: &str) -> SelectStatement { - let statements = parse_statements(s).unwrap(); - match statements.first() { - Some(Statement::Select(sel)) => *sel.clone(), - _ => panic!("expected SELECT statement"), - } - } } diff --git a/sql/src/influxql/util.rs b/sql/src/influxql/util.rs index b143691332..7cc7b855f7 100644 --- a/sql/src/influxql/util.rs +++ b/sql/src/influxql/util.rs @@ -2,7 +2,10 @@ //! Some utils used process influxql +use std::collections::HashSet; + use influxdb_influxql_parser::string::Regex; +use lazy_static::lazy_static; // Copy from influxdb_iox: // https://github.com/influxdata/influxdb_iox/blob/e7369449f8975f6f86bc665ea3e1f556c2777145/query_functions/src/regex.rs#L147 @@ -76,6 +79,20 @@ pub fn parse_regex(re: &Regex) -> std::result::Result = HashSet::from([ + "abs", "sin", "cos", "tan", "asin", "acos", "atan", "atan2", "exp", "log", "ln", "log2", + "log10", "sqrt", "pow", "floor", "ceil", "round", + ]); +} + +/// Returns `true` if `name` is a mathematical scalar function +/// supported by InfluxQL. +pub(crate) fn is_scalar_math_function(name: &str) -> bool { + SCALAR_MATH_FUNCTIONS.contains(name) +} + mod test { // Copy from influxdb_iox: // https://github.com/influxdata/influxdb_iox/blob/e7369449f8975f6f86bc665ea3e1f556c2777145/query_functions/src/regex.rs#L357 diff --git a/sql/src/planner.rs b/sql/src/planner.rs index 298c3ed617..822f71516f 100644 --- a/sql/src/planner.rs +++ b/sql/src/planner.rs @@ -252,10 +252,9 @@ pub enum Error { #[snafu(display("Failed to build plan, msg:{}", msg))] InvalidWriteEntry { msg: String }, + #[snafu(display("Failed to build influxql plan, err:{}", source))] - BuildInfluxqlPlan { - source: crate::influxql::error::Error, - }, + BuildInfluxqlPlan { source: GenericError }, } define_result!(Error); @@ -333,9 +332,7 @@ impl<'a, P: MetaProvider> Planner<'a, P> { let planner = PlannerDelegate::new(adapter); let influxql_planner = crate::influxql::planner::Planner::new(planner); - influxql_planner - .statement_to_plan(statement) - .context(BuildInfluxqlPlan) + influxql_planner.statement_to_plan(statement) } pub fn write_req_to_plan( diff --git a/sql/src/tests.rs b/sql/src/tests.rs index 9304807492..cc593e5802 100644 --- a/sql/src/tests.rs +++ b/sql/src/tests.rs @@ -9,15 +9,20 @@ use common_types::{ schema::{Builder, Schema, TSID_COLUMN}, tests::{build_default_value_schema, build_schema}, }; +use common_util::error::GenericResult; use datafusion::catalog::TableReference; use df_operator::{scalar::ScalarUdf, udaf::AggregateUdf}; +use influxdb_influxql_parser::{parse_statements, select::SelectStatement, statement::Statement}; use table_engine::{ memory::MemoryTable, table::{Table, TableId, TableRef}, ANALYTIC_ENGINE_TYPE, }; -use crate::provider::MetaProvider; +use crate::{ + influxql::{planner::MeasurementProvider, select::rewriter::Rewriter}, + provider::MetaProvider, +}; pub struct MockMetaProvider { tables: Vec>, @@ -91,6 +96,32 @@ impl MetaProvider for MockMetaProvider { } } +impl MeasurementProvider for MockMetaProvider { + fn measurement( + &self, + measurement_name: &str, + ) -> GenericResult> { + let table_ref = TableReference::Bare { + table: std::borrow::Cow::Borrowed(measurement_name), + }; + Ok(self.table(table_ref).unwrap()) + } +} + +pub fn rewrite_statement(provider: &dyn MeasurementProvider, stmt: &mut SelectStatement) { + let rewriter = Rewriter::new(provider); + rewriter.rewrite(stmt).unwrap(); +} + +/// Returns the InfluxQL [`SelectStatement`] for the specified SQL, `s`. +pub fn parse_select(s: &str) -> SelectStatement { + let statements = parse_statements(s).unwrap(); + match statements.first() { + Some(Statement::Select(sel)) => *sel.clone(), + _ => panic!("expected SELECT statement"), + } +} + fn build_influxql_test_schema() -> Schema { Builder::new() .auto_increment_column_id(true)