Skip to content

Commit

Permalink
feat: support the simplest influxql raw query (#710)
Browse files Browse the repository at this point in the history
* add influxql statement converter(to sql statement).

* complete the influxql planner.

* add influxql http interface.

* address CR.
  • Loading branch information
Rachelint authored Mar 9, 2023
1 parent 11f10c2 commit 15f066f
Show file tree
Hide file tree
Showing 16 changed files with 675 additions and 136 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions server/src/handlers/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ pub enum Error {
#[snafu(display("Failed to parse sql, err:{}", source))]
ParseSql { source: sql::frontend::Error },

#[snafu(display("Failed to parse influxql, err:{}", source))]
ParseInfluxql { source: sql::frontend::Error },

#[snafu(display("Failed to create plan, query:{}, err:{}", query, source))]
CreatePlan {
query: String,
Expand Down
2 changes: 1 addition & 1 deletion server/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
pub mod admin;
pub mod error;
pub mod prom;
pub mod sql;
pub mod query;

mod prelude {
pub use catalog::manager::Manager as CatalogManager;
Expand Down
4 changes: 3 additions & 1 deletion server/src/handlers/prom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use query_engine::executor::{Executor as QueryExecutor, RecordBatchVec};
use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu};
use warp::reject;

use super::query::QueryRequest;
use crate::{
context::RequestContext, handlers, instance::InstanceRef,
schema_config_provider::SchemaConfigProviderRef,
Expand Down Expand Up @@ -291,7 +292,8 @@ impl<Q: QueryExecutor + 'static> RemoteStorage for CeresDBStorage<Q> {
TIMESTAMP_COLUMN
);

let result = handlers::sql::handle_sql(ctx, self.instance.clone(), sql.into())
let request = QueryRequest::Sql(sql.into());
let result = handlers::query::handle_query(ctx, self.instance.clone(), request)
.await
.map_err(Box::new)
.context(SqlHandle)?;
Expand Down
119 changes: 82 additions & 37 deletions server/src/handlers/sql.rs → server/src/handlers/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ use sql::{
};

use crate::handlers::{
error::{CreatePlan, InterpreterExec, ParseSql, QueryBlock, QueryTimeout, TooMuchStmt},
error::{
CreatePlan, InterpreterExec, ParseInfluxql, ParseSql, QueryBlock, QueryTimeout, TooMuchStmt,
},
prelude::*,
};

Expand Down Expand Up @@ -104,18 +106,33 @@ impl From<Bytes> for Request {
}
}

pub async fn handle_sql<Q: QueryExecutor + 'static>(
#[derive(Debug)]
pub enum QueryRequest {
Sql(Request),
// TODO: influxql include more parameters, we should add it in later.
Influxql(Request),
}
impl QueryRequest {
fn query(&self) -> &str {
match self {
QueryRequest::Sql(request) => request.query.as_str(),
QueryRequest::Influxql(request) => request.query.as_str(),
}
}
}

pub async fn handle_query<Q: QueryExecutor + 'static>(
ctx: &RequestContext,
instance: InstanceRef<Q>,
request: Request,
query_request: QueryRequest,
) -> Result<Output> {
let request_id = RequestId::next_id();
let begin_instant = Instant::now();
let deadline = ctx.timeout.map(|t| begin_instant + t);

info!(
"sql handler try to process request, request_id:{}, request:{:?}",
request_id, request
"Query handler try to process request, request_id:{}, request:{:?}",
request_id, query_request
);

// TODO(yingwen): Privilege check, cannot access data of other tenant
Expand All @@ -127,38 +144,66 @@ pub async fn handle_sql<Q: QueryExecutor + 'static>(
function_registry: &*instance.function_registry,
};
let frontend = Frontend::new(provider);

let mut sql_ctx = SqlContext::new(request_id, deadline);
// Parse sql, frontend error of invalid sql already contains sql
// TODO(yingwen): Maybe move sql from frontend error to outer error
let mut stmts = frontend
.parse_sql(&mut sql_ctx, &request.query)
.context(ParseSql)?;

if stmts.is_empty() {
return Ok(Output::AffectedRows(0));
}

// TODO(yingwen): For simplicity, we only support executing one statement now
// TODO(yingwen): INSERT/UPDATE/DELETE can be batched
ensure!(
stmts.len() == 1,
TooMuchStmt {
len: stmts.len(),
query: request.query,
let plan = match &query_request {
QueryRequest::Sql(request) => {
// Parse sql, frontend error of invalid sql already contains sql
// TODO(yingwen): Maybe move sql from frontend error to outer error
let mut stmts = frontend
.parse_sql(&mut sql_ctx, &request.query)
.context(ParseSql)?;

if stmts.is_empty() {
return Ok(Output::AffectedRows(0));
}

// TODO(yingwen): For simplicity, we only support executing one statement now
// TODO(yingwen): INSERT/UPDATE/DELETE can be batched
ensure!(
stmts.len() == 1,
TooMuchStmt {
len: stmts.len(),
query: &request.query,
}
);

// Create logical plan
// Note: Remember to store sql in error when creating logical plan
frontend
.statement_to_plan(&mut sql_ctx, stmts.remove(0))
.context(CreatePlan {
query: &request.query,
})?
}
);

// Create logical plan
// Note: Remember to store sql in error when creating logical plan
let plan = frontend
.statement_to_plan(&mut sql_ctx, stmts.remove(0))
.context(CreatePlan {
query: &request.query,
})?;
QueryRequest::Influxql(request) => {
let mut stmts = frontend
.parse_influxql(&mut sql_ctx, &request.query)
.context(ParseInfluxql)?;

if stmts.is_empty() {
return Ok(Output::AffectedRows(0));
}

ensure!(
stmts.len() == 1,
TooMuchStmt {
len: stmts.len(),
query: &request.query,
}
);

frontend
.influxql_stmt_to_plan(&mut sql_ctx, stmts.remove(0))
.context(CreatePlan {
query: &request.query,
})?
}
};

instance.limiter.try_limit(&plan).context(QueryBlock {
query: &request.query,
query: query_request.query(),
})?;

// Execute in interpreter
Expand All @@ -177,7 +222,7 @@ pub async fn handle_sql<Q: QueryExecutor + 'static>(
interpreter_factory
.create(interpreter_ctx, plan)
.context(InterpreterExec {
query: &request.query,
query: query_request.query(),
})?;

let output = if let Some(deadline) = deadline {
Expand All @@ -187,24 +232,24 @@ pub async fn handle_sql<Q: QueryExecutor + 'static>(
)
.await
.context(QueryTimeout {
query: &request.query,
query: query_request.query(),
})
.and_then(|v| {
v.context(InterpreterExec {
query: &request.query,
query: query_request.query(),
})
})?
} else {
interpreter.execute().await.context(InterpreterExec {
query: &request.query,
query: query_request.query(),
})?
};

info!(
"sql handler finished, request_id:{}, cost:{}ms, request:{:?}",
"Query handler finished, request_id:{}, cost:{}ms, request:{:?}",
request_id,
begin_instant.saturating_elapsed().as_millis(),
request
query_request
);

Ok(output)
Expand Down
46 changes: 43 additions & 3 deletions server/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{
};

use common_util::error::BoxError;
use handlers::query::QueryRequest;
use log::{error, info};
use logger::RuntimeLevel;
use profile::Profiler;
Expand All @@ -30,7 +31,7 @@ use crate::{
consts,
context::RequestContext,
error_util,
handlers::{self, prom::CeresDBStorage, sql::Request},
handlers::{self, prom::CeresDBStorage, query::Request},
instance::InstanceRef,
metrics,
schema_config_provider::SchemaConfigProviderRef,
Expand Down Expand Up @@ -126,6 +127,7 @@ impl<Q: QueryExecutor + 'static> Service<Q> {
self.home()
.or(self.metrics())
.or(self.sql())
.or(self.influxql())
.or(self.heap_profile())
.or(self.admin_block())
.or(self.flush_memtable())
Expand Down Expand Up @@ -181,9 +183,47 @@ impl<Q: QueryExecutor + 'static> Service<Q> {
.and(self.with_context())
.and(self.with_instance())
.and_then(|req, ctx, instance| async move {
let result = handlers::sql::handle_sql(&ctx, instance, req)
let req = QueryRequest::Sql(req);
let result = handlers::query::handle_query(&ctx, instance, req)
.await
.map(handlers::sql::convert_output)
.map(handlers::query::convert_output)
.map_err(|e| {
// TODO(yingwen): Maybe truncate and print the sql
error!("Http service Failed to handle sql, err:{}", e);
Box::new(e)
})
.context(HandleRequest);
match result {
Ok(res) => Ok(reply::json(&res)),
Err(e) => Err(reject::custom(e)),
}
})
}

// POST /influxql
// this request type is not what influxdb API expected, the one in influxdb:
// https://docs.influxdata.com/influxdb/v1.8/tools/api/#query-http-endpoint
fn influxql(
&self,
) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
// accept json or plain text
let extract_request = warp::body::json()
.or(warp::body::bytes().map(Request::from))
.unify();

warp::path!("influxql")
.and(warp::post())
.and(warp::body::content_length_limit(self.config.max_body_size))
.and(extract_request)
.and(self.with_context())
.and(self.with_instance())
.and_then(|req, ctx, instance| async move {
let req = QueryRequest::Influxql(req);
let result = handlers::query::handle_query(&ctx, instance, req)
.await
// TODO: the sql's `convert_output` function may be not suitable to influxql.
// We should implement influxql's related function in later.
.map(handlers::query::convert_output)
.map_err(|e| {
// TODO(yingwen): Maybe truncate and print the sql
error!("Http service Failed to handle sql, err:{}", e);
Expand Down
9 changes: 6 additions & 3 deletions server/src/mysql/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ use table_engine::engine::EngineRuntimes;

use crate::{
context::RequestContext,
handlers::{self, sql::Request},
handlers::{
self,
query::{QueryRequest, Request},
},
instance::Instance,
mysql::{
error::{CreateContext, HandleSql, Result},
Expand Down Expand Up @@ -109,9 +112,9 @@ where
{
async fn do_query<'a>(&'a mut self, sql: &'a str) -> Result<Output> {
let ctx = self.create_ctx()?;

let req = Request::from(sql.to_string());
handlers::sql::handle_sql(&ctx, self.instance.clone(), req)
let req = QueryRequest::Sql(req);
handlers::query::handle_query(&ctx, self.instance.clone(), req)
.await
.map_err(|e| {
error!("Mysql service Failed to handle sql, err: {}", e);
Expand Down
1 change: 1 addition & 0 deletions sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ df_operator = { workspace = true }
hashbrown = { version = "0.12", features = ["raw"] }
influxdb_influxql_parser = { git = "https://github.com/Rachelint/influxdb_iox.git", branch = "influxql-parser" }
itertools = { workspace = true }
lazy_static = { workspace = true }
log = { workspace = true }
paste = { workspace = true }
regex = "1"
Expand Down
11 changes: 9 additions & 2 deletions sql/src/influxql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
//! Influxql processing

pub mod planner;
pub(crate) mod stmt_rewriter;
pub(crate) mod select;
pub(crate) mod util;

pub mod error {
use common_util::error::GenericError;
use snafu::{Backtrace, Snafu};
Expand Down Expand Up @@ -32,7 +33,13 @@ pub mod error {
backtrace
))]
RewriteNoCause { msg: String, backtrace: Backtrace },
}

#[snafu(display(
"Failed to convert to sql statement, msg:{}.\nBacktrace:{}",
msg,
backtrace
))]
Convert { msg: String, backtrace: Backtrace },
}
define_result!(Error);
}
Loading

0 comments on commit 15f066f

Please sign in to comment.