Skip to content

Commit

Permalink
refactor: add request id in context
Browse files Browse the repository at this point in the history
  • Loading branch information
jiacai2050 committed Aug 15, 2023
1 parent b59e07e commit d533d98
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 95 deletions.
10 changes: 5 additions & 5 deletions proxy/src/grpc/sql_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {
GRPC_HANDLER_COUNTER_VEC.incoming_query.inc();

self.hotspot_recorder.inc_sql_query_reqs(&req).await;
match self.handle_sql_query_internal(ctx, req).await {
match self.handle_sql_query_internal(&ctx, req).await {
Err(e) => {
GRPC_HANDLER_COUNTER_VEC.query_failed.inc();
error!("Failed to handle sql query, err:{e}");
error!("Failed to handle sql query, ctx:{ctx:?}, err:{e}");
SqlQueryResponse {
header: Some(error::build_err_header(e)),
..Default::default()
Expand All @@ -73,7 +73,7 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {

async fn handle_sql_query_internal(
&self,
ctx: Context,
ctx: &Context,
req: SqlQueryRequest,
) -> Result<SqlQueryResponse> {
if req.context.is_none() {
Expand All @@ -86,7 +86,7 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {

let req_context = req.context.as_ref().unwrap();
let schema = &req_context.database;
match self.handle_sql(ctx, schema, &req.sql).await? {
match self.handle_sql(ctx, schema, &req.sql, false).await? {
SqlResponse::Forwarded(resp) => Ok(resp),
SqlResponse::Local(output) => convert_output(&output, self.resp_compress_min_length),
}
Expand Down Expand Up @@ -148,7 +148,7 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {
let resp_compress_min_length = self.resp_compress_min_length;
let output = self
.as_ref()
.fetch_sql_query_output(ctx, &schema, &req.sql)
.fetch_sql_query_output(&ctx, &schema, &req.sql, false)
.await?;
runtime.spawn(async move {
match output {
Expand Down
11 changes: 5 additions & 6 deletions proxy/src/http/prom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,11 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {
}),
table_requests: write_table_requests,
};
let ctx = ProxyContext {
runtime: self.engine_runtimes.write_runtime.clone(),
timeout: ctx.timeout,
enable_partition_table_access: false,
forwarded_from: None,
};
let ctx = ProxyContext::new(
self.engine_runtimes.write_runtime.clone(),
ctx.timeout,
None,
);

match self.handle_write_internal(ctx, table_request).await {
Ok(result) => {
Expand Down
17 changes: 10 additions & 7 deletions proxy/src/http/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use common_types::{
use generic_error::BoxError;
use http::StatusCode;
use interpreters::interpreter::Output;
use log::error;
use query_engine::{
executor::{Executor as QueryExecutor, RecordBatchVec},
physical_planner::PhysicalPlanner,
Expand All @@ -49,14 +50,16 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {
ctx: &RequestContext,
req: Request,
) -> Result<Output> {
let context = Context {
timeout: ctx.timeout,
runtime: self.engine_runtimes.read_runtime.clone(),
enable_partition_table_access: true,
forwarded_from: None,
};
let schema = &ctx.schema;
let ctx = Context::new(self.engine_runtimes.read_runtime.clone(), ctx.timeout, None);

match self.handle_sql(context, &ctx.schema, &req.query).await? {
match self
.handle_sql(&ctx, schema, &req.query, true)
.await
.map_err(|e| {
error!("Handle sql query failed, ctx:{ctx:?}, req:{req:?}, err:{e}");
e
})? {
SqlResponse::Forwarded(resp) => convert_sql_response_to_output(resp),
SqlResponse::Local(output) => Ok(output),
}
Expand Down
11 changes: 5 additions & 6 deletions proxy/src/influxdb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,11 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {
}),
table_requests: write_table_requests,
};
let proxy_context = Context {
timeout: ctx.timeout,
runtime: self.engine_runtimes.write_runtime.clone(),
enable_partition_table_access: false,
forwarded_from: None,
};
let proxy_context = Context::new(
self.engine_runtimes.write_runtime.clone(),
ctx.timeout,
None,
);

match self
.handle_write_internal(proxy_context, table_request)
Expand Down
30 changes: 28 additions & 2 deletions proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ mod write;
pub const FORWARDED_FROM: &str = "forwarded-from";

use std::{
fmt,
ops::Bound,
sync::Arc,
time::{Duration, Instant},
Expand Down Expand Up @@ -556,8 +557,33 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {

#[derive(Clone)]
pub struct Context {
pub timeout: Option<Duration>,
pub request_id: RequestId,
pub runtime: Arc<Runtime>,
pub enable_partition_table_access: bool,
pub timeout: Option<Duration>,
pub forwarded_from: Option<String>,
}

impl Context {
pub fn new(
runtime: Arc<Runtime>,
timeout: Option<Duration>,
forwarded_from: Option<String>,
) -> Self {
Self {
runtime,
timeout,
forwarded_from,
request_id: RequestId::next_id(),
}
}
}

impl fmt::Debug for Context {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Context")
.field("id", &self.request_id)
.field("timeout", &self.timeout)
.field("forwarded_from", &self.forwarded_from)
.finish()
}
}
11 changes: 5 additions & 6 deletions proxy/src/opentsdb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,11 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {
}),
table_requests: write_table_requests,
};
let proxy_context = Context {
timeout: ctx.timeout,
runtime: self.engine_runtimes.write_runtime.clone(),
enable_partition_table_access: false,
forwarded_from: None,
};
let proxy_context = Context::new(
self.engine_runtimes.write_runtime.clone(),
ctx.timeout,
None,
);

match self
.handle_write_internal(proxy_context, table_request)
Expand Down
13 changes: 8 additions & 5 deletions proxy/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@ pub enum SqlResponse {
impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {
pub(crate) async fn handle_sql(
&self,
ctx: Context,
ctx: &Context,
schema: &str,
sql: &str,
enable_partition_table_access: bool,
) -> Result<SqlResponse> {
if let Some(resp) = self
.maybe_forward_sql_query(ctx.clone(), schema, sql)
Expand All @@ -65,22 +66,24 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {
};

Ok(SqlResponse::Local(
self.fetch_sql_query_output(ctx, schema, sql).await?,
self.fetch_sql_query_output(ctx, schema, sql, enable_partition_table_access)
.await?,
))
}

pub(crate) async fn fetch_sql_query_output(
&self,
ctx: Context,
ctx: &Context,
schema: &str,
sql: &str,
enable_partition_table_access: bool,
) -> Result<Output> {
let request_id = RequestId::next_id();
let begin_instant = Instant::now();
let deadline = ctx.timeout.map(|t| begin_instant + t);
let catalog = self.instance.catalog_manager.default_catalog_name();

info!("Handle sql query, request_id:{request_id}, deadline:{deadline:?}, schema:{schema}, sql:{sql}");
info!("Handle sql query, schema:{schema}, sql:{sql}, ctx;{ctx:?}");

let instance = &self.instance;
// TODO(yingwen): Privilege check, cannot access data of other tenant
Expand Down Expand Up @@ -155,7 +158,7 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {
}
}

let output = if ctx.enable_partition_table_access {
let output = if enable_partition_table_access {
self.execute_plan_involving_partition_table(request_id, catalog, schema, plan, deadline)
.await
} else {
Expand Down
99 changes: 41 additions & 58 deletions server/src/grpc/storage_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,15 +146,12 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> StorageService for StorageS
) -> Result<tonic::Response<Self::StreamSqlQueryStream>, tonic::Status> {
let begin_instant = Instant::now();
let proxy = self.proxy.clone();
let ctx = Context {
runtime: self.runtimes.read_runtime.clone(),
timeout: self.timeout,
enable_partition_table_access: false,
forwarded_from: req
.metadata()
.get(FORWARDED_FROM)
.map(|value| value.to_str().unwrap().to_string()),
};
let ctx = Context::new(
self.runtimes.read_runtime.clone(),
self.timeout,
get_forwarded_from(&req),
);

let stream = Self::stream_sql_query_internal(ctx, proxy, req).await;

GRPC_HANDLER_DURATION_HISTOGRAM_VEC
Expand All @@ -165,21 +162,23 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> StorageService for StorageS
}
}

fn get_forwarded_from<T>(req: &tonic::Request<T>) -> Option<String> {
req.metadata()
.get(FORWARDED_FROM)
.map(|value| value.to_str().unwrap().to_string())
}

// TODO: Use macros to simplify duplicate code
impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> StorageServiceImpl<Q, P> {
async fn route_internal(
&self,
req: tonic::Request<RouteRequest>,
) -> Result<tonic::Response<RouteResponse>, tonic::Status> {
let ctx = Context {
runtime: self.runtimes.read_runtime.clone(),
timeout: self.timeout,
enable_partition_table_access: false,
forwarded_from: req
.metadata()
.get(FORWARDED_FROM)
.map(|value| value.to_str().unwrap().to_string()),
};
let ctx = Context::new(
self.runtimes.read_runtime.clone(),
self.timeout,
get_forwarded_from(&req),
);
let req = req.into_inner();
let proxy = self.proxy.clone();

Expand All @@ -206,15 +205,12 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> StorageServiceImpl<Q, P> {
&self,
req: tonic::Request<WriteRequest>,
) -> Result<tonic::Response<WriteResponse>, tonic::Status> {
let ctx = Context {
runtime: self.runtimes.write_runtime.clone(),
timeout: self.timeout,
enable_partition_table_access: false,
forwarded_from: req
.metadata()
.get(FORWARDED_FROM)
.map(|value| value.to_str().unwrap().to_string()),
};
let ctx = Context::new(
self.runtimes.read_runtime.clone(),
self.timeout,
get_forwarded_from(&req),
);

let req = req.into_inner();
let proxy = self.proxy.clone();

Expand Down Expand Up @@ -250,22 +246,17 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> StorageServiceImpl<Q, P> {
&self,
req: tonic::Request<SqlQueryRequest>,
) -> Result<tonic::Response<SqlQueryResponse>, tonic::Status> {
let ctx = Context {
runtime: self.runtimes.read_runtime.clone(),
timeout: self.timeout,
enable_partition_table_access: false,
forwarded_from: req
.metadata()
.get(FORWARDED_FROM)
.map(|value| value.to_str().unwrap().to_string()),
};
let req = req.into_inner();
let ctx = Context::new(
self.runtimes.read_runtime.clone(),
self.timeout,
get_forwarded_from(&req),
);
let proxy = self.proxy.clone();

let join_handle = self
.runtimes
.read_runtime
.spawn(async move { proxy.handle_sql_query(ctx, req).await });
.spawn(async move { proxy.handle_sql_query(ctx, req.into_inner()).await });

let resp = match join_handle.await {
Ok(v) => v,
Expand Down Expand Up @@ -318,15 +309,12 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> StorageServiceImpl<Q, P> {
&self,
req: tonic::Request<PrometheusQueryRequest>,
) -> Result<tonic::Response<PrometheusQueryResponse>, tonic::Status> {
let ctx = Context {
runtime: self.runtimes.read_runtime.clone(),
timeout: self.timeout,
enable_partition_table_access: false,
forwarded_from: req
.metadata()
.get(FORWARDED_FROM)
.map(|value| value.to_str().unwrap().to_string()),
};
let ctx = Context::new(
self.runtimes.read_runtime.clone(),
self.timeout,
get_forwarded_from(&req),
);

let req = req.into_inner();
let proxy = self.proxy.clone();

Expand Down Expand Up @@ -361,20 +349,15 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> StorageServiceImpl<Q, P> {
&self,
req: tonic::Request<tonic::Streaming<WriteRequest>>,
) -> Result<tonic::Response<WriteResponse>, tonic::Status> {
let mut total_success = 0;

let ctx = Context {
runtime: self.runtimes.write_runtime.clone(),
timeout: self.timeout,
enable_partition_table_access: false,
forwarded_from: req
.metadata()
.get(FORWARDED_FROM)
.map(|value| value.to_str().unwrap().to_string()),
};
let ctx = Context::new(
self.runtimes.read_runtime.clone(),
self.timeout,
get_forwarded_from(&req),
);
let mut stream = req.into_inner();
let proxy = self.proxy.clone();

let mut total_success = 0;
let join_handle = self.runtimes.write_runtime.spawn(async move {
let mut resp = WriteResponse::default();
let mut has_err = false;
Expand Down

0 comments on commit d533d98

Please sign in to comment.