Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support query limit by rule #494

Merged
merged 3 commits into from
Dec 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions server/src/grpc/storage_service/prom_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,19 @@ where
};
Error::ErrWithCause {
code,
msg: "failed to create plan".to_string(),
msg: "Failed to create plan".to_string(),
source: Box::new(e),
}
})?;

if ctx.instance.limiter.should_limit(&plan) {
ErrNoCause {
code: StatusCode::TOO_MANY_REQUESTS,
msg: "query limited by reject list",
}
.fail()?;
}
ctx.instance
.limiter
.try_limit(&plan)
.map_err(|e| Box::new(e) as _)
.context(ErrWithCause {
code: StatusCode::FORBIDDEN,
msg: "Query is blocked",
})?;

// Execute in interpreter
let interpreter_ctx = InterpreterContext::builder(request_id)
Expand All @@ -117,7 +118,7 @@ where
.map_err(|e| Box::new(e) as _)
.with_context(|| ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: "failed to execute interpreter",
msg: "Failed to execute interpreter",
})?;

let resp = convert_output(output, column_name)
Expand Down
15 changes: 8 additions & 7 deletions server/src/grpc/storage_service/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,14 @@ pub async fn fetch_query_output<Q: QueryExecutor + 'static>(
msg: format!("Failed to create plan, query:{}", req.ql),
})?;

if ctx.instance.limiter.should_limit(&plan) {
ErrNoCause {
code: StatusCode::TOO_MANY_REQUESTS,
msg: "query limited by block list",
}
.fail()?;
}
ctx.instance
.limiter
.try_limit(&plan)
.map_err(|e| Box::new(e) as _)
.context(ErrWithCause {
code: StatusCode::FORBIDDEN,
msg: "Query is blocked",
})?;

// Execute in interpreter
let interpreter_ctx = InterpreterContext::builder(request_id)
Expand Down
30 changes: 16 additions & 14 deletions server/src/grpc/storage_service/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,14 @@ pub(crate) async fn handle_write<Q: QueryExecutor + 'static>(
);
let plan = Plan::Insert(insert_plan);

if ctx.instance.limiter.should_limit(&plan) {
ErrNoCause {
code: StatusCode::TOO_MANY_REQUESTS,
msg: "Insert limited by reject list",
}
.fail()?;
}
ctx.instance
.limiter
.try_limit(&plan)
.map_err(|e| Box::new(e) as _)
.context(ErrWithCause {
code: StatusCode::FORBIDDEN,
msg: "Insert is blocked",
})?;

let interpreter_ctx = InterpreterContext::builder(request_id)
// Use current ctx's catalog and tenant as default catalog and tenant
Expand Down Expand Up @@ -207,13 +208,14 @@ async fn create_table<Q: QueryExecutor + 'static>(

let instance = &ctx.instance;

if instance.limiter.should_limit(&plan) {
ErrNoCause {
code: StatusCode::TOO_MANY_REQUESTS,
msg: "Create table limited by reject list",
}
.fail()?;
}
instance
.limiter
.try_limit(&plan)
chunshao90 marked this conversation as resolved.
Show resolved Hide resolved
.map_err(|e| Box::new(e) as _)
.context(ErrWithCause {
code: StatusCode::FORBIDDEN,
msg: "Create table is blocked",
})?;

let interpreter_ctx = InterpreterContext::builder(request_id)
// Use current ctx's catalog and tenant as default catalog and tenant
Expand Down
49 changes: 21 additions & 28 deletions server/src/handlers/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use std::collections::BTreeSet;

use crate::handlers::prelude::*;
use crate::{handlers::prelude::*, limiter::BlockRule};

#[derive(Debug, Deserialize)]
pub enum Operation {
Expand All @@ -12,60 +12,53 @@ pub enum Operation {
}

#[derive(Debug, Deserialize)]
pub struct RejectRequest {
pub struct BlockRequest {
operation: Operation,
write_block_list: Vec<String>,
read_block_list: Vec<String>,
block_rules: Vec<BlockRule>,
}

#[derive(Serialize)]
pub struct RejectResponse {
pub struct BlockResponse {
write_block_list: BTreeSet<String>,
read_block_list: BTreeSet<String>,
block_rules: BTreeSet<BlockRule>,
}

pub async fn handle_block<Q: QueryExecutor + 'static>(
_ctx: RequestContext,
instance: InstanceRef<Q>,
request: RejectRequest,
) -> Result<RejectResponse> {
request: BlockRequest,
) -> Result<BlockResponse> {
let limiter = &instance.limiter;
match request.operation {
Operation::Add => {
instance
.limiter
.add_write_block_list(request.write_block_list);
instance
.limiter
.add_read_block_list(request.read_block_list);
limiter.add_write_block_list(request.write_block_list);
limiter.add_read_block_list(request.read_block_list);
limiter.add_block_rules(request.block_rules);
}
Operation::Set => {
instance
.limiter
.set_write_block_list(request.write_block_list);
instance
.limiter
.set_read_block_list(request.read_block_list);
limiter.set_write_block_list(request.write_block_list);
limiter.set_read_block_list(request.read_block_list);
limiter.set_block_rules(request.block_rules);
}
Operation::Remove => {
instance
.limiter
.remove_write_block_list(request.write_block_list);
instance
.limiter
.remove_read_block_list(request.read_block_list);
limiter.remove_write_block_list(request.write_block_list);
limiter.remove_read_block_list(request.read_block_list);
limiter.remove_block_rules(&request.block_rules);
}
}

Ok(RejectResponse {
write_block_list: instance
.limiter
Ok(BlockResponse {
write_block_list: limiter
.get_write_block_list()
.into_iter()
.collect::<BTreeSet<_>>(),
read_block_list: instance
.limiter
read_block_list: limiter
.get_read_block_list()
.into_iter()
.collect::<BTreeSet<_>>(),
block_rules: limiter.get_block_rules().into_iter().collect(),
})
}
9 changes: 7 additions & 2 deletions server/src/handlers/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
//! Error of handlers

use snafu::{Backtrace, Snafu};

use crate::limiter;
// TODO(yingwen): Avoid printing huge sql string
// TODO(yingwen): Maybe add an error type to sql sub mod

Expand Down Expand Up @@ -48,8 +50,11 @@ pub enum Error {
backtrace: Backtrace,
},

#[snafu(display("Query limited by block list, query:{}", query))]
QueryBlock { query: String },
#[snafu(display("Query limited by block list, query:{}, err:{}", query, source))]
QueryBlock {
query: String,
source: limiter::Error,
},
}

define_result!(Error);
12 changes: 5 additions & 7 deletions server/src/handlers/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ use serde::{
ser::{SerializeMap, SerializeSeq},
Serialize,
};
use snafu::ensure;
use snafu::{ensure, ResultExt};
use sql::{
frontend::{Context as SqlContext, Frontend},
provider::CatalogMetaProvider,
};

use crate::handlers::{
error::{ArrowToString, CreatePlan, Error::QueryBlock, InterpreterExec, ParseSql, TooMuchStmt},
error::{ArrowToString, CreatePlan, InterpreterExec, ParseSql, QueryBlock, TooMuchStmt},
prelude::*,
};

Expand Down Expand Up @@ -157,11 +157,9 @@ pub async fn handle_sql<Q: QueryExecutor + 'static>(
query: &request.query,
})?;

if instance.limiter.should_limit(&plan) {
return Err(QueryBlock {
query: request.query.to_owned(),
});
}
instance.limiter.try_limit(&plan).context(QueryBlock {
query: &request.query,
})?;

// Execute in interpreter
let interpreter_ctx = InterpreterContext::builder(request_id)
Expand Down
Loading