diff --git a/proxy/src/grpc/sql_query.rs b/proxy/src/grpc/sql_query.rs index 9f22572838..12de38fa6d 100644 --- a/proxy/src/grpc/sql_query.rs +++ b/proxy/src/grpc/sql_query.rs @@ -55,10 +55,10 @@ impl Proxy { GRPC_HANDLER_COUNTER_VEC.incoming_query.inc(); self.hotspot_recorder.inc_sql_query_reqs(&req).await; - match self.handle_sql_query_internal(ctx, req).await { + match self.handle_sql_query_internal(&ctx, req).await { Err(e) => { GRPC_HANDLER_COUNTER_VEC.query_failed.inc(); - error!("Failed to handle sql query, err:{e}"); + error!("Failed to handle sql query, ctx:{ctx:?}, err:{e}"); SqlQueryResponse { header: Some(error::build_err_header(e)), ..Default::default() @@ -73,7 +73,7 @@ impl Proxy { async fn handle_sql_query_internal( &self, - ctx: Context, + ctx: &Context, req: SqlQueryRequest, ) -> Result { if req.context.is_none() { @@ -86,7 +86,7 @@ impl Proxy { let req_context = req.context.as_ref().unwrap(); let schema = &req_context.database; - match self.handle_sql(ctx, schema, &req.sql).await? { + match self.handle_sql(ctx, schema, &req.sql, false).await? { SqlResponse::Forwarded(resp) => Ok(resp), SqlResponse::Local(output) => convert_output(&output, self.resp_compress_min_length), } @@ -148,7 +148,7 @@ impl Proxy { let resp_compress_min_length = self.resp_compress_min_length; let output = self .as_ref() - .fetch_sql_query_output(ctx, &schema, &req.sql) + .fetch_sql_query_output(&ctx, &schema, &req.sql, false) .await?; runtime.spawn(async move { match output { diff --git a/proxy/src/http/prom.rs b/proxy/src/http/prom.rs index 6864db4dd5..92fd3ca9ab 100644 --- a/proxy/src/http/prom.rs +++ b/proxy/src/http/prom.rs @@ -85,12 +85,11 @@ impl Proxy { }), table_requests: write_table_requests, }; - let ctx = ProxyContext { - runtime: self.engine_runtimes.write_runtime.clone(), - timeout: ctx.timeout, - enable_partition_table_access: false, - forwarded_from: None, - }; + let ctx = ProxyContext::new( + self.engine_runtimes.write_runtime.clone(), + ctx.timeout, + None, + ); match self.handle_write_internal(ctx, table_request).await { Ok(result) => { diff --git a/proxy/src/http/sql.rs b/proxy/src/http/sql.rs index 9ddfabcdf9..2ce31a9895 100644 --- a/proxy/src/http/sql.rs +++ b/proxy/src/http/sql.rs @@ -26,6 +26,7 @@ use common_types::{ use generic_error::BoxError; use http::StatusCode; use interpreters::interpreter::Output; +use log::error; use query_engine::{ executor::{Executor as QueryExecutor, RecordBatchVec}, physical_planner::PhysicalPlanner, @@ -49,14 +50,16 @@ impl Proxy { ctx: &RequestContext, req: Request, ) -> Result { - let context = Context { - timeout: ctx.timeout, - runtime: self.engine_runtimes.read_runtime.clone(), - enable_partition_table_access: true, - forwarded_from: None, - }; + let schema = &ctx.schema; + let ctx = Context::new(self.engine_runtimes.read_runtime.clone(), ctx.timeout, None); - match self.handle_sql(context, &ctx.schema, &req.query).await? { + match self + .handle_sql(&ctx, schema, &req.query, true) + .await + .map_err(|e| { + error!("Handle sql query failed, ctx:{ctx:?}, req:{req:?}, err:{e}"); + e + })? { SqlResponse::Forwarded(resp) => convert_sql_response_to_output(resp), SqlResponse::Local(output) => Ok(output), } diff --git a/proxy/src/influxdb/mod.rs b/proxy/src/influxdb/mod.rs index 4bcdd8d788..988f8e8e3b 100644 --- a/proxy/src/influxdb/mod.rs +++ b/proxy/src/influxdb/mod.rs @@ -80,12 +80,11 @@ impl Proxy { }), table_requests: write_table_requests, }; - let proxy_context = Context { - timeout: ctx.timeout, - runtime: self.engine_runtimes.write_runtime.clone(), - enable_partition_table_access: false, - forwarded_from: None, - }; + let proxy_context = Context::new( + self.engine_runtimes.write_runtime.clone(), + ctx.timeout, + None, + ); match self .handle_write_internal(proxy_context, table_request) diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index 8a44d097a5..d65a7681aa 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -39,6 +39,7 @@ mod write; pub const FORWARDED_FROM: &str = "forwarded-from"; use std::{ + fmt, ops::Bound, sync::Arc, time::{Duration, Instant}, @@ -556,8 +557,33 @@ impl Proxy { #[derive(Clone)] pub struct Context { - pub timeout: Option, + pub request_id: RequestId, pub runtime: Arc, - pub enable_partition_table_access: bool, + pub timeout: Option, pub forwarded_from: Option, } + +impl Context { + pub fn new( + runtime: Arc, + timeout: Option, + forwarded_from: Option, + ) -> Self { + Self { + runtime, + timeout, + forwarded_from, + request_id: RequestId::next_id(), + } + } +} + +impl fmt::Debug for Context { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Context") + .field("id", &self.request_id) + .field("timeout", &self.timeout) + .field("forwarded_from", &self.forwarded_from) + .finish() + } +} diff --git a/proxy/src/opentsdb/mod.rs b/proxy/src/opentsdb/mod.rs index 14849f7758..e865e012a0 100644 --- a/proxy/src/opentsdb/mod.rs +++ b/proxy/src/opentsdb/mod.rs @@ -56,12 +56,11 @@ impl Proxy { }), table_requests: write_table_requests, }; - let proxy_context = Context { - timeout: ctx.timeout, - runtime: self.engine_runtimes.write_runtime.clone(), - enable_partition_table_access: false, - forwarded_from: None, - }; + let proxy_context = Context::new( + self.engine_runtimes.write_runtime.clone(), + ctx.timeout, + None, + ); match self .handle_write_internal(proxy_context, table_request) diff --git a/proxy/src/read.rs b/proxy/src/read.rs index c848609749..fccbc1e69b 100644 --- a/proxy/src/read.rs +++ b/proxy/src/read.rs @@ -50,9 +50,10 @@ pub enum SqlResponse { impl Proxy { pub(crate) async fn handle_sql( &self, - ctx: Context, + ctx: &Context, schema: &str, sql: &str, + enable_partition_table_access: bool, ) -> Result { if let Some(resp) = self .maybe_forward_sql_query(ctx.clone(), schema, sql) @@ -65,22 +66,24 @@ impl Proxy { }; Ok(SqlResponse::Local( - self.fetch_sql_query_output(ctx, schema, sql).await?, + self.fetch_sql_query_output(ctx, schema, sql, enable_partition_table_access) + .await?, )) } pub(crate) async fn fetch_sql_query_output( &self, - ctx: Context, + ctx: &Context, schema: &str, sql: &str, + enable_partition_table_access: bool, ) -> Result { let request_id = RequestId::next_id(); let begin_instant = Instant::now(); let deadline = ctx.timeout.map(|t| begin_instant + t); let catalog = self.instance.catalog_manager.default_catalog_name(); - info!("Handle sql query, request_id:{request_id}, deadline:{deadline:?}, schema:{schema}, sql:{sql}"); + info!("Handle sql query, schema:{schema}, sql:{sql}, ctx;{ctx:?}"); let instance = &self.instance; // TODO(yingwen): Privilege check, cannot access data of other tenant @@ -155,7 +158,7 @@ impl Proxy { } } - let output = if ctx.enable_partition_table_access { + let output = if enable_partition_table_access { self.execute_plan_involving_partition_table(request_id, catalog, schema, plan, deadline) .await } else { diff --git a/server/src/grpc/storage_service/mod.rs b/server/src/grpc/storage_service/mod.rs index d50a80a477..b59b69e8cd 100644 --- a/server/src/grpc/storage_service/mod.rs +++ b/server/src/grpc/storage_service/mod.rs @@ -146,15 +146,12 @@ impl StorageService for StorageS ) -> Result, tonic::Status> { let begin_instant = Instant::now(); let proxy = self.proxy.clone(); - let ctx = Context { - runtime: self.runtimes.read_runtime.clone(), - timeout: self.timeout, - enable_partition_table_access: false, - forwarded_from: req - .metadata() - .get(FORWARDED_FROM) - .map(|value| value.to_str().unwrap().to_string()), - }; + let ctx = Context::new( + self.runtimes.read_runtime.clone(), + self.timeout, + get_forwarded_from(&req), + ); + let stream = Self::stream_sql_query_internal(ctx, proxy, req).await; GRPC_HANDLER_DURATION_HISTOGRAM_VEC @@ -165,21 +162,23 @@ impl StorageService for StorageS } } +fn get_forwarded_from(req: &tonic::Request) -> Option { + req.metadata() + .get(FORWARDED_FROM) + .map(|value| value.to_str().unwrap().to_string()) +} + // TODO: Use macros to simplify duplicate code impl StorageServiceImpl { async fn route_internal( &self, req: tonic::Request, ) -> Result, tonic::Status> { - let ctx = Context { - runtime: self.runtimes.read_runtime.clone(), - timeout: self.timeout, - enable_partition_table_access: false, - forwarded_from: req - .metadata() - .get(FORWARDED_FROM) - .map(|value| value.to_str().unwrap().to_string()), - }; + let ctx = Context::new( + self.runtimes.read_runtime.clone(), + self.timeout, + get_forwarded_from(&req), + ); let req = req.into_inner(); let proxy = self.proxy.clone(); @@ -206,15 +205,12 @@ impl StorageServiceImpl { &self, req: tonic::Request, ) -> Result, tonic::Status> { - let ctx = Context { - runtime: self.runtimes.write_runtime.clone(), - timeout: self.timeout, - enable_partition_table_access: false, - forwarded_from: req - .metadata() - .get(FORWARDED_FROM) - .map(|value| value.to_str().unwrap().to_string()), - }; + let ctx = Context::new( + self.runtimes.read_runtime.clone(), + self.timeout, + get_forwarded_from(&req), + ); + let req = req.into_inner(); let proxy = self.proxy.clone(); @@ -250,22 +246,17 @@ impl StorageServiceImpl { &self, req: tonic::Request, ) -> Result, tonic::Status> { - let ctx = Context { - runtime: self.runtimes.read_runtime.clone(), - timeout: self.timeout, - enable_partition_table_access: false, - forwarded_from: req - .metadata() - .get(FORWARDED_FROM) - .map(|value| value.to_str().unwrap().to_string()), - }; - let req = req.into_inner(); + let ctx = Context::new( + self.runtimes.read_runtime.clone(), + self.timeout, + get_forwarded_from(&req), + ); let proxy = self.proxy.clone(); let join_handle = self .runtimes .read_runtime - .spawn(async move { proxy.handle_sql_query(ctx, req).await }); + .spawn(async move { proxy.handle_sql_query(ctx, req.into_inner()).await }); let resp = match join_handle.await { Ok(v) => v, @@ -318,15 +309,12 @@ impl StorageServiceImpl { &self, req: tonic::Request, ) -> Result, tonic::Status> { - let ctx = Context { - runtime: self.runtimes.read_runtime.clone(), - timeout: self.timeout, - enable_partition_table_access: false, - forwarded_from: req - .metadata() - .get(FORWARDED_FROM) - .map(|value| value.to_str().unwrap().to_string()), - }; + let ctx = Context::new( + self.runtimes.read_runtime.clone(), + self.timeout, + get_forwarded_from(&req), + ); + let req = req.into_inner(); let proxy = self.proxy.clone(); @@ -361,20 +349,15 @@ impl StorageServiceImpl { &self, req: tonic::Request>, ) -> Result, tonic::Status> { - let mut total_success = 0; - - let ctx = Context { - runtime: self.runtimes.write_runtime.clone(), - timeout: self.timeout, - enable_partition_table_access: false, - forwarded_from: req - .metadata() - .get(FORWARDED_FROM) - .map(|value| value.to_str().unwrap().to_string()), - }; + let ctx = Context::new( + self.runtimes.read_runtime.clone(), + self.timeout, + get_forwarded_from(&req), + ); let mut stream = req.into_inner(); let proxy = self.proxy.clone(); + let mut total_success = 0; let join_handle = self.runtimes.write_runtime.spawn(async move { let mut resp = WriteResponse::default(); let mut has_err = false;