Skip to content

Commit

Permalink
draft.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachelint committed Mar 2, 2023
1 parent ce9fa87 commit f8fa257
Show file tree
Hide file tree
Showing 9 changed files with 718 additions and 9 deletions.
83 changes: 83 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ regex = "1"
snafu = { workspace = true }
sqlparser = { workspace = true }
table_engine = { workspace = true }
influxdb_influxql_parser = {git = "https://github.com/Rachelint/influxdb_iox.git", branch = "influxql-parser"}
regex-syntax = "0.6.28"

[dev-dependencies]
common_types = { workspace = true, features = ["test"] }
Expand Down
50 changes: 46 additions & 4 deletions sql/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{sync::Arc, time::Instant};

use ceresdbproto::prometheus::Expr as PromExpr;
use common_types::request_id::RequestId;
use influxdb_influxql_parser::statement::Statement as InfluxqlStatement;
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
use table_engine::table;

Expand All @@ -28,14 +29,24 @@ pub enum Error {
},

// TODO(yingwen): Should we store stmt here?
#[snafu(display("Failed to create plan, err:{}", source))]
CreatePlan { source: crate::planner::Error },
#[snafu(display("Failed to create plan, plan_type:{}, err:{}", plan_type, source))]
CreatePlan {
plan_type: String,
source: crate::planner::Error,
},

#[snafu(display("Invalid prom request, err:{}", source))]
InvalidPromRequest { source: crate::promql::Error },

#[snafu(display("Expr is not found in prom request.\nBacktrace:\n{}", backtrace))]
ExprNotFoundInPromRequest { backtrace: Backtrace },

// invalid sql is quite common, so we don't provide a backtrace now.
#[snafu(display("invalid influxql, influxql:{}, err:{}", influxql, parse_err))]
InvalidInfluxql {
influxql: String,
parse_err: influxdb_influxql_parser::common::ParseError,
},
}

define_result!(Error);
Expand Down Expand Up @@ -89,14 +100,31 @@ impl<P> Frontend<P> {
let expr = expr.context(ExprNotFoundInPromRequest)?;
Expr::try_from(expr).context(InvalidPromRequest)
}

/// Parse the sql and returns the statements
pub fn parse_influxql(
&self,
_ctx: &mut Context,
influxql: &str,
) -> Result<Vec<InfluxqlStatement>> {
match influxdb_influxql_parser::parse_statements(influxql) {
Ok(stmts) => Ok(stmts),
Err(e) => Err(Error::InvalidInfluxql {
influxql: influxql.to_string(),
parse_err: e,
}),
}
}
}

impl<P: MetaProvider> Frontend<P> {
/// Create logical plan for the statement
pub fn statement_to_plan(&self, ctx: &mut Context, stmt: Statement) -> Result<Plan> {
let planner = Planner::new(&self.provider, ctx.request_id, ctx.read_parallelism);

planner.statement_to_plan(stmt).context(CreatePlan)
planner
.statement_to_plan(stmt)
.context(CreatePlan { plan_type: "sql" })
}

pub fn promql_expr_to_plan(
Expand All @@ -106,6 +134,20 @@ impl<P: MetaProvider> Frontend<P> {
) -> Result<(Plan, Arc<ColumnNames>)> {
let planner = Planner::new(&self.provider, ctx.request_id, ctx.read_parallelism);

planner.promql_expr_to_plan(expr).context(CreatePlan)
planner.promql_expr_to_plan(expr).context(CreatePlan {
plan_type: "promql",
})
}

pub fn influxql_stmt_to_plan(
&self,
ctx: &mut Context,
stmt: InfluxqlStatement,
) -> Result<Plan> {
let planner = Planner::new(&self.provider, ctx.request_id, ctx.read_parallelism);

planner.influxql_stmt_to_plan(stmt).context(CreatePlan {
plan_type: "influxql",
})
}
}
21 changes: 21 additions & 0 deletions sql/src/influxql/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
pub mod planner;
mod stmt_rewriter;
pub(crate) mod util;
pub mod error {
use snafu::Snafu;
use common_util::error::GenericError;

#[derive(Debug, Snafu)]
#[snafu(visibility = "pub")]
pub enum Error {
#[snafu(display("Unimplemented influxql statement, msg: {}", msg))]
Unimplemented { msg: String },

#[snafu(display("Unimplemented influxql statement, source: {}", source))]
RewriteStmtWithCause {
source: GenericError,
}
}

define_result!(Error);
}
33 changes: 33 additions & 0 deletions sql/src/influxql/planner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use influxdb_influxql_parser::statement::Statement as InfluxqlStatement;

use crate::{influxql::error::*, plan::Plan, provider::MetaProvider};

pub(crate) struct Planner<'a, P: MetaProvider> {
sql_planner: crate::planner::PlannerDelegate<'a, P>,
}

impl<'a, P: MetaProvider> Planner<'a, P> {
pub fn new(sql_planner: crate::planner::PlannerDelegate<'a, P>) -> Self {
Self { sql_planner }
}

pub fn statement_to_plan(&self, stmt: InfluxqlStatement) -> Result<Plan> {
match stmt {
InfluxqlStatement::Select(_) => todo!(),
InfluxqlStatement::CreateDatabase(_) => todo!(),
InfluxqlStatement::ShowDatabases(_) => todo!(),
InfluxqlStatement::ShowRetentionPolicies(_) => todo!(),
InfluxqlStatement::ShowTagKeys(_) => todo!(),
InfluxqlStatement::ShowTagValues(_) => todo!(),
InfluxqlStatement::ShowFieldKeys(_) => todo!(),
InfluxqlStatement::ShowMeasurements(_) => todo!(),
InfluxqlStatement::Delete(_) => todo!(),
InfluxqlStatement::DropMeasurement(_) => todo!(),
InfluxqlStatement::Explain(_) => todo!(),
}
}

fn rewrite_stmt(&self, stmt: InfluxqlStatement) -> Result<InfluxqlStatement> {
todo!()
}
}
Loading

0 comments on commit f8fa257

Please sign in to comment.