Skip to content

Commit

Permalink
address CR.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachelint committed Mar 8, 2023
1 parent a528599 commit 2c415b5
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 84 deletions.
23 changes: 6 additions & 17 deletions sql/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,8 @@ pub enum Error {
},

// TODO(yingwen): Should we store stmt here?
#[snafu(display("Failed to create plan, plan_type:{}, err:{}", plan_type, source))]
CreatePlan {
plan_type: String,
source: crate::planner::Error,
},
#[snafu(display("Failed to create plan, err:{}", source))]
CreatePlan { source: crate::planner::Error },

#[snafu(display("Invalid prom request, err:{}", source))]
InvalidPromRequest { source: crate::promql::Error },
Expand Down Expand Up @@ -123,9 +120,7 @@ impl<P: MetaProvider> Frontend<P> {
pub fn statement_to_plan(&self, ctx: &mut Context, stmt: Statement) -> Result<Plan> {
let planner = Planner::new(&self.provider, ctx.request_id, ctx.read_parallelism);

planner
.statement_to_plan(stmt)
.context(CreatePlan { plan_type: "sql" })
planner.statement_to_plan(stmt).context(CreatePlan)
}

pub fn promql_expr_to_plan(
Expand All @@ -135,9 +130,7 @@ impl<P: MetaProvider> Frontend<P> {
) -> Result<(Plan, Arc<ColumnNames>)> {
let planner = Planner::new(&self.provider, ctx.request_id, ctx.read_parallelism);

planner.promql_expr_to_plan(expr).context(CreatePlan {
plan_type: "promql",
})
planner.promql_expr_to_plan(expr).context(CreatePlan)
}

pub fn influxql_stmt_to_plan(
Expand All @@ -147,9 +140,7 @@ impl<P: MetaProvider> Frontend<P> {
) -> Result<Plan> {
let planner = Planner::new(&self.provider, ctx.request_id, ctx.read_parallelism);

planner.influxql_stmt_to_plan(stmt).context(CreatePlan {
plan_type: "influxql",
})
planner.influxql_stmt_to_plan(stmt).context(CreatePlan)
}

pub fn write_req_to_plan(
Expand All @@ -162,8 +153,6 @@ impl<P: MetaProvider> Frontend<P> {

planner
.write_req_to_plan(schema_config, write_table)
.context(CreatePlan {
plan_type: "internal write",
})
.context(CreatePlan)
}
}
35 changes: 13 additions & 22 deletions sql/src/influxql/mod.rs
Original file line number Diff line number Diff line change
@@ -1,46 +1,37 @@
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! Influxql processing

pub mod planner;
mod stmt_rewriter;
pub(crate) mod stmt_rewriter;
pub(crate) mod util;
pub mod error {
use common_util::error::GenericError;
use snafu::Snafu;
use snafu::{Backtrace, Snafu};

#[derive(Debug, Snafu)]
#[snafu(visibility = "pub")]
pub enum Error {
#[snafu(display("Unimplemented influxql statement, msg: {}", msg))]
Unimplemented { msg: String },

#[snafu(display(
"Failed to rewrite influxql from statement with cause, msg:{}, source:{}",
"Unimplemented influxql statement, msg: {}.\nBacktrace:{}",
msg,
source
backtrace
))]
RewriteFromWithCause { msg: String, source: GenericError },

#[snafu(display("Failed to rewrite influxql from statement no cause, msg:{}", msg))]
RewriteFromNoCause { msg: String },
Unimplemented { msg: String, backtrace: Backtrace },

#[snafu(display(
"Failed to rewrite influxql projection statement with cause, msg:{}, source: {}",
"Failed to rewrite influxql from statement with cause, msg:{}, source:{}",
msg,
source
))]
RewriteFieldsWithCause { msg: String, source: GenericError },
RewriteWithCause { msg: String, source: GenericError },

#[snafu(display(
"Failed to rewrite influxql projection statement no cause, msg: {}",
msg
"Failed to rewrite influxql from statement no cause, msg:{}.\nBacktrace:{}",
msg,
backtrace
))]
RewriteFieldsNoCause { msg: String },

#[snafu(display("Failed to find table with case, source:{}", source))]
FindTableWithCause { source: GenericError },

#[snafu(display("Failed to find table no case, msg: {}", msg))]
FindTableNoCause { msg: String },
RewriteNoCause { msg: String, backtrace: Backtrace },
}

define_result!(Error);
Expand Down
6 changes: 5 additions & 1 deletion sql/src/influxql/planner.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! Influxql planner.

use common_util::error::BoxError;
use influxdb_influxql_parser::statement::Statement as InfluxqlStatement;
use snafu::ResultExt;
Expand Down Expand Up @@ -47,6 +49,8 @@ impl<'a, P: MetaProvider> MeasurementProvider for MeasurementProviderImpl<'a, P>
self.0
.find_table(measurement_name)
.box_err()
.context(FindTableWithCause)
.context(RewriteWithCause {
msg: format!("failed to find measurement, measurement:{measurement_name}"),
})
}
}
57 changes: 28 additions & 29 deletions sql/src/influxql/stmt_rewriter.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! Influxql statement rewriter

use std::{collections::BTreeSet, ops::ControlFlow};

use common_util::error::BoxError;
Expand Down Expand Up @@ -49,16 +51,12 @@ impl<'a> StmtRewriter<'a> {
name: MeasurementName::Name(name),
..
} => {
let table = self
.measurement_provider
.measurement(name)
.box_err()
.context(RewriteFromWithCause {
let _ = self.measurement_provider.measurement(name)?.context(
RewriteNoCause {
msg: format!("measurement not found, measurement:{name}"),
})?;
if table.is_some() {
new_from.push(ms.clone())
}
},
)?;
new_from.push(ms.clone());
}
QualifiedMeasurementName {
name: MeasurementName::Regex(_),
Expand Down Expand Up @@ -111,15 +109,20 @@ impl<'a> StmtRewriter<'a> {
match name {
MeasurementName::Name(name) => {
// Get schema, and split columns to tags and fields.
let (tags, fields) = self.tags_and_fields_in_measurement(name.as_str())?;
let (tags, fields) = self
.tags_and_fields_in_measurement(name.as_str())
.box_err()
.context(RewriteWithCause {
msg: "rewrite field list fail to find measurement",
})?;
let mut group_by_tags = BTreeSet::new();
maybe_rewrite_group_by(&tags, &mut group_by_tags, stmt)?;
maybe_rewrite_projection(&tags, &fields, &group_by_tags, stmt)?;

Ok(())
}

MeasurementName::Regex(_) => RewriteFieldsNoCause {
MeasurementName::Regex(_) => RewriteNoCause {
msg: "rewrite field list should not encounter regex in from clause",
}
.fail(),
Expand All @@ -141,10 +144,10 @@ impl<'a> StmtRewriter<'a> {
.measurement_provider
.measurement(measurement_name)
.box_err()
.context(RewriteFieldsWithCause {
.context(RewriteWithCause {
msg: format!("failed to find measurement, measurement:{measurement_name}"),
})?
.context(RewriteFieldsNoCause {
.context(RewriteNoCause {
msg: format!("measurement not found, measurement:{measurement_name}"),
})?;

Expand Down Expand Up @@ -197,20 +200,18 @@ fn maybe_rewrite_group_by(

Dimension::Tag(tag) => {
if !tags.contains(&tag.to_string()) {
return RewriteFieldsNoCause {
msg: format!("group by tag not exist, tag:{tag}, exist tags:{tags:?}"),
return RewriteNoCause {
msg: format!("rewrite group by encounter tag not exist, tag:{tag}, exist tags:{tags:?}"),
}
.fail();
}
let _ = group_by_tags.insert(tag.to_string());
}

Dimension::Regex(re) => {
let re = util::parse_regex(re)
.box_err()
.context(RewriteFieldsWithCause {
msg: format!("group by invalid regex, regex:{re}"),
})?;
let re = util::parse_regex(re).box_err().context(RewriteWithCause {
msg: format!("rewrite group by encounter invalid regex, regex:{re}"),
})?;
let match_tags = tags.iter().filter_map(|tag| {
if re.is_match(tag.as_str()) {
Some(tag.clone())
Expand Down Expand Up @@ -307,11 +308,9 @@ fn maybe_rewrite_projection(
}

Expr::Literal(Literal::Regex(re)) => {
let re = util::parse_regex(re)
.box_err()
.context(RewriteFieldsWithCause {
msg: format!("rewrite field list encounter invalid regex, regex:{re}"),
})?;
let re = util::parse_regex(re).box_err().context(RewriteWithCause {
msg: format!("rewrite projection encounter invalid regex, regex:{re}"),
})?;

let filter = |v: &String| -> bool { re.is_match(v.as_str()) };

Expand All @@ -332,8 +331,8 @@ fn maybe_rewrite_projection(

match args.first() {
Some(Expr::Wildcard(Some(WildcardType::Tag))) => {
return RewriteFieldsNoCause {
msg: "tags can't be placed in a call",
return RewriteNoCause {
msg: "rewrite projection found tags placed in a call",
}
.fail();
}
Expand Down Expand Up @@ -363,8 +362,8 @@ fn maybe_rewrite_projection(
.is_break();

if has_wildcard {
return RewriteFieldsNoCause {
msg: "wildcard or regex should be encountered in binary expression",
return RewriteNoCause {
msg: "rewrite projection encounter wildcard or regex in binary expression",
}
.fail();
}
Expand Down
59 changes: 44 additions & 15 deletions sql/src/influxql/util.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,11 @@
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! Some utils used process influxql

use influxdb_influxql_parser::string::Regex;

// copy from influxdb_iox.
/// Removes all `/` patterns that the rust regex library would reject
/// and rewrites them to their unescaped form.
///
/// For example, `\:` is rewritten to `:` as `\:` is not a valid
/// escape sequence in the `regexp` crate but is valid in golang's
/// regexp implementation.
///
/// This is done for compatibility purposes so that the regular
/// expression matching in Rust more closely follows the matching in
/// golang, used by the influx storage rpc.
///
/// See <https://github.com/rust-lang/regex/issues/501> for more details
// Copy from influxdb_iox:
// https://github.com/influxdata/influxdb_iox/blob/e7369449f8975f6f86bc665ea3e1f556c2777145/query_functions/src/regex.rs#L147
pub fn clean_non_meta_escapes(pattern: &str) -> String {
if pattern.is_empty() {
return pattern.to_string();
Expand Down Expand Up @@ -64,6 +55,8 @@ pub fn clean_non_meta_escapes(pattern: &str) -> String {
new_pattern
}

// Copy from influxdb_iox:
// https://github.com/influxdata/influxdb_iox/blob/e7369449f8975f6f86bc665ea3e1f556c2777145/query_functions/src/regex.rs#L123
fn is_valid_character_after_escape(c: char) -> bool {
// same list as https://docs.rs/regex-syntax/0.6.25/src/regex_syntax/ast/parse.rs.html#1445-1538
match c {
Expand All @@ -76,9 +69,45 @@ fn is_valid_character_after_escape(c: char) -> bool {
}
}

/// Sanitize an InfluxQL regular expression and create a compiled
/// [`regex::Regex`].
// Copy from influxdb_iox:
// https://github.com/influxdata/influxdb_iox/blob/e7369449f8975f6f86bc665ea3e1f556c2777145/iox_query/src/plan/influxql/util.rs#L48
pub fn parse_regex(re: &Regex) -> std::result::Result<regex::Regex, regex::Error> {
let pattern = clean_non_meta_escapes(re.as_str());
regex::Regex::new(&pattern)
}

mod test {
// Copy from influxdb_iox:
// https://github.com/influxdata/influxdb_iox/blob/e7369449f8975f6f86bc665ea3e1f556c2777145/query_functions/src/regex.rs#L357
#[test]
fn test_clean_non_meta_escapes() {
let cases = vec![
("", ""),
(r#"\"#, r#"\"#),
(r#"\\"#, r#"\\"#),
// : is not a special meta character
(r#"\:"#, r#":"#),
// . is a special meta character
(r#"\."#, r#"\."#),
(r#"foo\"#, r#"foo\"#),
(r#"foo\\"#, r#"foo\\"#),
(r#"foo\:"#, r#"foo:"#),
(r#"foo\xff"#, r#"foo\xff"#),
(r#"fo\\o"#, r#"fo\\o"#),
(r#"fo\:o"#, r#"fo:o"#),
(r#"fo\:o\x123"#, r#"fo:o\x123"#),
(r#"fo\:o\x123\:"#, r#"fo:o\x123:"#),
(r#"foo\\\:bar"#, r#"foo\\:bar"#),
(r#"foo\\\:bar\\\:"#, r#"foo\\:bar\\:"#),
("foo", "foo"),
];

for (pattern, expected) in cases {
let cleaned_pattern = crate::influxql::util::clean_non_meta_escapes(pattern);
assert_eq!(
cleaned_pattern, expected,
"Expected '{pattern}' to be cleaned to '{expected}', got '{cleaned_pattern}'"
);
}
}
}

0 comments on commit 2c415b5

Please sign in to comment.