Skip to content

Commit

Permalink
refactor: add request id in context
Browse files Browse the repository at this point in the history
  • Loading branch information
jiacai2050 committed Aug 15, 2023
1 parent b59e07e commit 492aa28
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 116 deletions.
31 changes: 14 additions & 17 deletions proxy/src/grpc/sql_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,11 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {
GRPC_HANDLER_COUNTER_VEC.incoming_query.inc();

self.hotspot_recorder.inc_sql_query_reqs(&req).await;
match self.handle_sql_query_internal(ctx, req).await {
match self.handle_sql_query_internal(&ctx, &req).await {
Err(e) => {
error!("Failed to handle sql query, ctx:{ctx:?}, req:{req:?}, err:{e}");

GRPC_HANDLER_COUNTER_VEC.query_failed.inc();
error!("Failed to handle sql query, err:{e}");
SqlQueryResponse {
header: Some(error::build_err_header(e)),
..Default::default()
Expand All @@ -73,8 +74,8 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {

async fn handle_sql_query_internal(
&self,
ctx: Context,
req: SqlQueryRequest,
ctx: &Context,
req: &SqlQueryRequest,
) -> Result<SqlQueryResponse> {
if req.context.is_none() {
return ErrNoCause {
Expand All @@ -86,7 +87,7 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {

let req_context = req.context.as_ref().unwrap();
let schema = &req_context.database;
match self.handle_sql(ctx, schema, &req.sql).await? {
match self.handle_sql(ctx, schema, &req.sql, false).await? {
SqlResponse::Forwarded(resp) => Ok(resp),
SqlResponse::Local(output) => convert_output(&output, self.resp_compress_min_length),
}
Expand All @@ -99,7 +100,7 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {
) -> BoxStream<'static, SqlQueryResponse> {
GRPC_HANDLER_COUNTER_VEC.stream_query.inc();
self.hotspot_recorder.inc_sql_query_reqs(&req).await;
match self.clone().handle_stream_query_internal(ctx, req).await {
match self.clone().handle_stream_query_internal(&ctx, &req).await {
Err(e) => stream::once(async {
error!("Failed to handle stream sql query, err:{e}");
GRPC_HANDLER_COUNTER_VEC.stream_query_failed.inc();
Expand All @@ -118,8 +119,8 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {

async fn handle_stream_query_internal(
self: Arc<Self>,
ctx: Context,
req: SqlQueryRequest,
ctx: &Context,
req: &SqlQueryRequest,
) -> Result<BoxStream<'static, SqlQueryResponse>> {
if req.context.is_none() {
return ErrNoCause {
Expand All @@ -130,12 +131,8 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {
}

let req_context = req.context.as_ref().unwrap();
let schema = req_context.database.clone();
let req = match self
.clone()
.maybe_forward_stream_sql_query(ctx.clone(), &req)
.await
{
let schema = &req_context.database;
let req = match self.clone().maybe_forward_stream_sql_query(ctx, req).await {
Some(resp) => match resp {
ForwardResult::Forwarded(resp) => return resp,
ForwardResult::Local => req,
Expand All @@ -148,7 +145,7 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {
let resp_compress_min_length = self.resp_compress_min_length;
let output = self
.as_ref()
.fetch_sql_query_output(ctx, &schema, &req.sql)
.fetch_sql_query_output(ctx, schema, &req.sql, false)
.await?;
runtime.spawn(async move {
match output {
Expand Down Expand Up @@ -189,7 +186,7 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {

async fn maybe_forward_stream_sql_query(
self: Arc<Self>,
ctx: Context,
ctx: &Context,
req: &SqlQueryRequest,
) -> Option<ForwardResult<BoxStream<'static, SqlQueryResponse>, Error>> {
if req.tables.len() != 1 {
Expand All @@ -203,7 +200,7 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {
schema: req_ctx.database.clone(),
table: req.tables[0].clone(),
req: req.clone().into_request(),
forwarded_from: ctx.forwarded_from,
forwarded_from: ctx.forwarded_from.clone(),
};
let do_query = |mut client: StorageServiceClient<Channel>,
request: tonic::Request<SqlQueryRequest>,
Expand Down
11 changes: 5 additions & 6 deletions proxy/src/http/prom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,11 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {
}),
table_requests: write_table_requests,
};
let ctx = ProxyContext {
runtime: self.engine_runtimes.write_runtime.clone(),
timeout: ctx.timeout,
enable_partition_table_access: false,
forwarded_from: None,
};
let ctx = ProxyContext::new(
self.engine_runtimes.write_runtime.clone(),
ctx.timeout,
None,
);

match self.handle_write_internal(ctx, table_request).await {
Ok(result) => {
Expand Down
17 changes: 10 additions & 7 deletions proxy/src/http/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use common_types::{
use generic_error::BoxError;
use http::StatusCode;
use interpreters::interpreter::Output;
use log::error;
use query_engine::{
executor::{Executor as QueryExecutor, RecordBatchVec},
physical_planner::PhysicalPlanner,
Expand All @@ -49,14 +50,16 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {
ctx: &RequestContext,
req: Request,
) -> Result<Output> {
let context = Context {
timeout: ctx.timeout,
runtime: self.engine_runtimes.read_runtime.clone(),
enable_partition_table_access: true,
forwarded_from: None,
};
let schema = &ctx.schema;
let ctx = Context::new(self.engine_runtimes.read_runtime.clone(), ctx.timeout, None);

match self.handle_sql(context, &ctx.schema, &req.query).await? {
match self
.handle_sql(&ctx, schema, &req.query, true)
.await
.map_err(|e| {
error!("Handle sql query failed, ctx:{ctx:?}, req:{req:?}, err:{e}");
e
})? {
SqlResponse::Forwarded(resp) => convert_sql_response_to_output(resp),
SqlResponse::Local(output) => Ok(output),
}
Expand Down
11 changes: 5 additions & 6 deletions proxy/src/influxdb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,11 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {
}),
table_requests: write_table_requests,
};
let proxy_context = Context {
timeout: ctx.timeout,
runtime: self.engine_runtimes.write_runtime.clone(),
enable_partition_table_access: false,
forwarded_from: None,
};
let proxy_context = Context::new(
self.engine_runtimes.write_runtime.clone(),
ctx.timeout,
None,
);

match self
.handle_write_internal(proxy_context, table_request)
Expand Down
30 changes: 28 additions & 2 deletions proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ mod write;
pub const FORWARDED_FROM: &str = "forwarded-from";

use std::{
fmt,
ops::Bound,
sync::Arc,
time::{Duration, Instant},
Expand Down Expand Up @@ -556,8 +557,33 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {

#[derive(Clone)]
pub struct Context {
pub timeout: Option<Duration>,
pub request_id: RequestId,
pub runtime: Arc<Runtime>,
pub enable_partition_table_access: bool,
pub timeout: Option<Duration>,
pub forwarded_from: Option<String>,
}

impl Context {
pub fn new(
runtime: Arc<Runtime>,
timeout: Option<Duration>,
forwarded_from: Option<String>,
) -> Self {
Self {
runtime,
timeout,
forwarded_from,
request_id: RequestId::next_id(),
}
}
}

impl fmt::Debug for Context {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Context")
.field("id", &self.request_id)
.field("timeout", &self.timeout)
.field("forwarded_from", &self.forwarded_from)
.finish()
}
}
11 changes: 5 additions & 6 deletions proxy/src/opentsdb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,11 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {
}),
table_requests: write_table_requests,
};
let proxy_context = Context {
timeout: ctx.timeout,
runtime: self.engine_runtimes.write_runtime.clone(),
enable_partition_table_access: false,
forwarded_from: None,
};
let proxy_context = Context::new(
self.engine_runtimes.write_runtime.clone(),
ctx.timeout,
None,
);

match self
.handle_write_internal(proxy_context, table_request)
Expand Down
28 changes: 14 additions & 14 deletions proxy/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@ pub enum SqlResponse {
impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {
pub(crate) async fn handle_sql(
&self,
ctx: Context,
ctx: &Context,
schema: &str,
sql: &str,
enable_partition_table_access: bool,
) -> Result<SqlResponse> {
if let Some(resp) = self
.maybe_forward_sql_query(ctx.clone(), schema, sql)
Expand All @@ -65,22 +66,24 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {
};

Ok(SqlResponse::Local(
self.fetch_sql_query_output(ctx, schema, sql).await?,
self.fetch_sql_query_output(ctx, schema, sql, enable_partition_table_access)
.await?,
))
}

pub(crate) async fn fetch_sql_query_output(
&self,
ctx: Context,
ctx: &Context,
schema: &str,
sql: &str,
enable_partition_table_access: bool,
) -> Result<Output> {
let request_id = RequestId::next_id();
let begin_instant = Instant::now();
let deadline = ctx.timeout.map(|t| begin_instant + t);
let catalog = self.instance.catalog_manager.default_catalog_name();

info!("Handle sql query, request_id:{request_id}, deadline:{deadline:?}, schema:{schema}, sql:{sql}");
info!("Handle sql query begin, catalog:{catalog}, schema:{schema}, ctx;{ctx:?}, sql:{sql}");

let instance = &self.instance;
// TODO(yingwen): Privilege check, cannot access data of other tenant
Expand Down Expand Up @@ -108,7 +111,7 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {
!stmts.is_empty(),
ErrNoCause {
code: StatusCode::BAD_REQUEST,
msg: format!("No valid query statement provided, sql:{sql}",),
msg: "No valid query statement provided",
}
);

Expand All @@ -118,11 +121,7 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {
stmts.len() == 1,
ErrNoCause {
code: StatusCode::BAD_REQUEST,
msg: format!(
"Only support execute one statement now, current num:{}, sql:{}",
stmts.len(),
sql
),
msg: "Only support execute one statement",
}
);

Expand All @@ -142,7 +141,7 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {
.box_err()
.with_context(|| ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: format!("Failed to create plan, query:{sql}"),
msg: "Failed to create plan",
})?;

let mut plan_maybe_expired = false;
Expand All @@ -155,7 +154,7 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {
}
}

let output = if ctx.enable_partition_table_access {
let output = if enable_partition_table_access {
self.execute_plan_involving_partition_table(request_id, catalog, schema, plan, deadline)
.await
} else {
Expand All @@ -164,11 +163,12 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {
};
let output = output.box_err().with_context(|| ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: format!("Failed to execute plan, sql:{sql}"),
msg: "Failed to execute plan",
})?;

let cost = begin_instant.saturating_elapsed();
info!("Handle sql query success, catalog:{catalog}, schema:{schema}, request_id:{request_id}, cost:{cost:?}, sql:{sql:?}");

info!("Handle sql query success, catalog:{catalog}, schema:{schema}, cost:{cost:?}, ctx:{ctx:?}, sql:{sql:?}");

match &output {
Output::AffectedRows(_) => Ok(output),
Expand Down
Loading

0 comments on commit 492aa28

Please sign in to comment.