From cd3f97665305b76ea75414d4a96d7a4f7e8d1dfe Mon Sep 17 00:00:00 2001 From: WEI Xikai Date: Tue, 27 Dec 2022 20:39:51 +0800 Subject: [PATCH] feat: auto forward query for grpc (#511) * add basic forwarder * support tenent settings when forward * avoid build forwarder when disabled * set the local addr as config.cluster.node.addr * make clippy happy * add the new unit tests * address CR issues * fix unit test for loopback ip checking * add some debug logs * update the comment --- server/src/config.rs | 32 +- server/src/grpc/forward.rs | 513 +++++++++++++++++++++++ server/src/grpc/mod.rs | 53 ++- server/src/grpc/storage_service/mod.rs | 30 +- server/src/grpc/storage_service/query.rs | 74 +++- server/src/server.rs | 4 + 6 files changed, 682 insertions(+), 24 deletions(-) create mode 100644 server/src/grpc/forward.rs diff --git a/server/src/config.rs b/server/src/config.rs index e1ac3a3c26..554559c680 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -13,6 +13,7 @@ use serde_derive::Deserialize; use table_engine::ANALYTIC_ENGINE_TYPE; use crate::{ + grpc::forward, http::DEFAULT_MAX_BODY_SIZE, limiter::LimiterConfig, route::rule_based::{ClusterView, RuleList}, @@ -51,7 +52,7 @@ pub struct StaticRouteConfig { pub topology: StaticTopologyConfig, } -#[derive(Debug, Clone, Deserialize, PartialEq, Eq)] +#[derive(Debug, Clone, Deserialize, Eq, Hash, PartialEq)] pub struct Endpoint { pub addr: String, pub port: u16, @@ -101,6 +102,15 @@ impl From for storage::Endpoint { } } +impl From for Endpoint { + fn from(endpoint: storage::Endpoint) -> Self { + Endpoint { + addr: endpoint.ip, + port: endpoint.port as u16, + } + } +} + #[derive(Debug, Clone, Deserialize)] pub struct ShardView { pub shard_id: ShardId, @@ -181,34 +191,37 @@ pub struct Config { pub grpc_port: u16, pub grpc_server_cq_count: usize, - // Engine related configs: + /// Engine related configs: pub runtime: RuntimeConfig, - // Log related configs: + /// Log related configs: pub log_level: String, pub enable_async_log: bool, pub async_log_channel_len: i32, - // Tracing related configs: + /// Tracing related configs: pub tracing_log_dir: String, pub tracing_log_name: String, pub tracing_level: String, - // Config of static router. + /// Config of static router. pub static_route: StaticRouteConfig, - // Analytic engine configs. + /// Analytic engine configs. pub analytic: analytic_engine::Config, - // Query engine config. + /// Query engine config. pub query: query_engine::Config, - // Deployment configs: + /// Deployment configs: pub deploy_mode: DeployMode, pub cluster: ClusterConfig, - // Config of limiter + /// Config of limiter pub limiter: LimiterConfig, + + /// Config for forwarding + pub forward: forward::Config, } impl Default for RuntimeConfig { @@ -245,6 +258,7 @@ impl Default for Config { deploy_mode: DeployMode::Standalone, cluster: ClusterConfig::default(), limiter: LimiterConfig::default(), + forward: forward::Config::default(), } } } diff --git a/server/src/grpc/forward.rs b/server/src/grpc/forward.rs new file mode 100644 index 0000000000..b78a5f1e94 --- /dev/null +++ b/server/src/grpc/forward.rs @@ -0,0 +1,513 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +//! Forward for grpc services +use std::{ + collections::HashMap, + net::Ipv4Addr, + sync::{Arc, RwLock}, + time::Duration, +}; + +use async_trait::async_trait; +use ceresdbproto::storage::{storage_service_client::StorageServiceClient, RouteRequest}; +use log::{debug, error, warn}; +use serde_derive::Deserialize; +use snafu::{ensure, Backtrace, ResultExt, Snafu}; +use tonic::{ + metadata::errors::InvalidMetadataValue, + transport::{self, Channel}, +}; + +use crate::{config::Endpoint, consts::TENANT_HEADER, route::RouterRef}; + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display( + "Invalid endpoint, endpoint:{}, err:{}.\nBacktrace:\n{}", + endpoint, + source, + backtrace + ))] + InvalidEndpoint { + endpoint: String, + source: tonic::transport::Error, + backtrace: Backtrace, + }, + + #[snafu(display( + "Local ip addr should not be loopback, addr:{}.\nBacktrace:\n{}", + ip_addr, + backtrace + ))] + LoopbackLocalIpAddr { + ip_addr: String, + backtrace: Backtrace, + }, + + #[snafu(display( + "Invalid schema, schema:{}, err:{}.\nBacktrace:\n{}", + schema, + source, + backtrace + ))] + InvalidSchema { + schema: String, + source: InvalidMetadataValue, + backtrace: Backtrace, + }, + + #[snafu(display( + "Failed to connect endpoint, endpoint:{}, err:{}.\nBacktrace:\n{}", + endpoint, + source, + backtrace + ))] + Connect { + endpoint: String, + source: tonic::transport::Error, + backtrace: Backtrace, + }, +} + +define_result!(Error); + +pub type ForwarderRef = Arc>; + +#[derive(Debug, Clone, Deserialize)] +#[serde(default)] +pub struct Config { + pub enable: bool, + /// Thread num for grpc polling + pub thread_num: usize, + /// -1 means unlimited + pub max_send_msg_len: i32, + /// -1 means unlimited + pub max_recv_msg_len: i32, + /// Sets an interval for HTTP2 Ping frames should be sent to keep a + /// connection alive. + pub keep_alive_interval: Duration, + /// A timeout for receiving an acknowledgement of the keep-alive ping + /// If the ping is not acknowledged within the timeout, the connection will + /// be closed + pub keep_alive_timeout: Duration, + /// default keep http2 connections alive while idle + pub keep_alive_while_idle: bool, + pub connect_timeout: Duration, + pub forward_timeout: Duration, +} + +impl Default for Config { + fn default() -> Self { + Self { + enable: false, + thread_num: 4, + // 20MB + max_send_msg_len: 20 * (1 << 20), + // 1GB + max_recv_msg_len: 1 << 30, + keep_alive_interval: Duration::from_secs(60 * 10), + keep_alive_timeout: Duration::from_secs(3), + keep_alive_while_idle: true, + connect_timeout: Duration::from_secs(3), + forward_timeout: Duration::from_secs(60), + } + } +} + +#[async_trait] +pub trait ClientBuilder { + async fn connect(&self, endpoint: &Endpoint) -> Result>; +} + +pub struct DefaultClientBuilder { + config: Config, +} + +impl DefaultClientBuilder { + #[inline] + fn make_endpoint_with_scheme(endpoint: &Endpoint) -> String { + format!("http://{}:{}", endpoint.addr, endpoint.port) + } +} + +#[async_trait] +impl ClientBuilder for DefaultClientBuilder { + async fn connect(&self, endpoint: &Endpoint) -> Result> { + let endpoint_with_scheme = Self::make_endpoint_with_scheme(endpoint); + let configured_endpoint = transport::Endpoint::from_shared(endpoint_with_scheme.clone()) + .context(InvalidEndpoint { + endpoint: &endpoint_with_scheme, + })?; + + let configured_endpoint = match self.config.keep_alive_while_idle { + true => configured_endpoint + .connect_timeout(self.config.connect_timeout) + .keep_alive_timeout(self.config.keep_alive_timeout) + .keep_alive_while_idle(true) + .http2_keep_alive_interval(self.config.keep_alive_interval), + false => configured_endpoint + .connect_timeout(self.config.connect_timeout) + .keep_alive_while_idle(false), + }; + let channel = configured_endpoint.connect().await.context(Connect { + endpoint: &endpoint_with_scheme, + })?; + + Ok(StorageServiceClient::new(channel)) + } +} + +/// Forwarder does request forwarding. +/// +/// No forward happens if the router tells the target endpoint is the same as +/// the local endpoint. +/// +/// Assuming client wants to access some table which are located on server1 (the +/// router can tell the location information). Then here is the diagram +/// describing what the forwarder does: +/// peer-to-peer procedure: client --> server1 +/// forwarding procedure: client --> server0 (forwarding server) --> server1 +pub struct Forwarder { + config: Config, + router: RouterRef, + local_endpoint: Endpoint, + client_builder: B, + clients: RwLock>>, +} + +/// The result of forwarding. +/// +/// If no forwarding happens, [`Original`] can be used. +pub enum ForwardResult { + Original, + Forwarded(std::result::Result), +} + +#[derive(Debug)] +pub struct ForwardRequest { + pub schema: String, + pub metric: String, + pub req: tonic::Request, +} + +impl Forwarder { + pub fn try_new(config: Config, router: RouterRef, local_endpoint: Endpoint) -> Result { + let client_builder = DefaultClientBuilder { + config: config.clone(), + }; + + Self::try_new_with_client_builder(config, router, local_endpoint, client_builder) + } +} + +impl Forwarder { + #[inline] + fn is_loopback_ip(ip_addr: &str) -> bool { + ip_addr + .parse::() + .map(|ip| ip.is_loopback()) + .unwrap_or(false) + } + + /// Check whether the target endpoint is the same as the local endpoint. + fn is_local_endpoint(&self, target: &Endpoint) -> bool { + if &self.local_endpoint == target { + return true; + } + + if self.local_endpoint.port != target.port { + return false; + } + + // Only need to check the remote is loopback addr. + Self::is_loopback_ip(&target.addr) + } + + /// Release the client for the given endpoint. + fn release_client(&self, endpoint: &Endpoint) -> Option> { + let mut clients = self.clients.write().unwrap(); + clients.remove(endpoint) + } +} + +impl Forwarder { + pub fn try_new_with_client_builder( + config: Config, + router: RouterRef, + local_endpoint: Endpoint, + client_builder: B, + ) -> Result { + let loopback_local_endpoint = Self::is_loopback_ip(&local_endpoint.addr); + ensure!( + !loopback_local_endpoint, + LoopbackLocalIpAddr { + ip_addr: &local_endpoint.addr, + } + ); + + Ok(Self { + config, + local_endpoint, + router, + clients: RwLock::new(HashMap::new()), + client_builder, + }) + } + + /// Forward the request according to the configured router. + /// + /// Error will be thrown if it happens in the forwarding procedure, that is + /// to say, some errors like the output from the `do_rpc` will be + /// wrapped in the [`ForwardResult::Forwarded`]. + pub async fn forward( + &self, + forward_req: ForwardRequest, + do_rpc: F, + ) -> Result> + where + F: FnOnce( + StorageServiceClient, + tonic::Request, + &Endpoint, + ) -> Box< + dyn std::future::Future> + Send + Unpin, + >, + Req: std::fmt::Debug + Clone, + { + if !self.config.enable { + return Ok(ForwardResult::Original); + } + + let ForwardRequest { + schema, + metric, + mut req, + } = forward_req; + + let route_req = RouteRequest { + metrics: vec![metric], + }; + + let endpoint = match self.router.route(&schema, route_req).await { + Ok(mut routes) => { + if routes.len() != 1 || routes[0].endpoint.is_none() { + warn!( + "Fail to forward request for multiple route results, routes result:{:?}, req:{:?}", + routes, req + ); + return Ok(ForwardResult::Original); + } + + Endpoint::from(routes.remove(0).endpoint.unwrap()) + } + Err(e) => { + error!("Fail to route request, req:{:?}, err:{}", req, e); + return Ok(ForwardResult::Original); + } + }; + + if self.is_local_endpoint(&endpoint) { + return Ok(ForwardResult::Original); + } + + // Update the request. + { + // TODO: we should use the timeout from the original request. + req.set_timeout(self.config.forward_timeout); + let metadata = req.metadata_mut(); + metadata.insert( + TENANT_HEADER, + schema.parse().context(InvalidSchema { schema })?, + ); + } + + // TODO: add metrics to record the forwarding. + debug!( + "Try to forward request to {:?}, request:{:?}", + endpoint, req, + ); + let client = self.get_or_create_client(&endpoint).await?; + match do_rpc(client, req, &endpoint).await { + Err(e) => { + // Release the grpc client for the error doesn't belong to the normal error. + self.release_client(&endpoint); + Ok(ForwardResult::Forwarded(Err(e))) + } + Ok(resp) => Ok(ForwardResult::Forwarded(Ok(resp))), + } + } + + async fn get_or_create_client( + &self, + endpoint: &Endpoint, + ) -> Result> { + { + let clients = self.clients.read().unwrap(); + if let Some(v) = clients.get(endpoint) { + return Ok(v.clone()); + } + } + + let new_client = self.client_builder.connect(endpoint).await?; + { + let mut clients = self.clients.write().unwrap(); + if let Some(v) = clients.get(endpoint) { + return Ok(v.clone()); + } + clients.insert(endpoint.clone(), new_client.clone()); + } + + Ok(new_client) + } +} + +#[cfg(test)] +mod tests { + use ceresdbproto::storage::{QueryRequest, QueryResponse, Route}; + use futures::FutureExt; + use tonic::IntoRequest; + + use super::*; + use crate::route::Router; + + #[test] + fn test_check_loopback_endpoint() { + let loopback_ips = vec!["127.0.0.1", "127.0.0.2"]; + for loopback_ip in loopback_ips { + assert!(Forwarder::::is_loopback_ip( + loopback_ip + )); + } + + let normal_ips = vec!["10.100.10.14", "192.168.1.2", "0.0.0.0"]; + for ip in normal_ips { + assert!(!Forwarder::::is_loopback_ip(ip)); + } + + let invalid_addrs = vec!["hello.world.com", "test", "localhost", ""]; + for ip in invalid_addrs { + assert!(!Forwarder::::is_loopback_ip(ip)); + } + } + + struct MockRouter { + routing_tables: HashMap, + } + + #[async_trait] + impl Router for MockRouter { + async fn route( + &self, + _schema: &str, + req: RouteRequest, + ) -> crate::route::Result> { + let endpoint = self.routing_tables.get(&req.metrics[0]); + match endpoint { + None => Ok(vec![]), + Some(v) => Ok(vec![Route { + metric: req.metrics[0].clone(), + endpoint: Some(v.clone().into()), + ext: vec![], + }]), + } + } + } + + struct MockClientBuilder; + + #[async_trait] + impl ClientBuilder for MockClientBuilder { + async fn connect(&self, _: &Endpoint) -> Result> { + let (channel, _) = Channel::balance_channel::(10); + Ok(StorageServiceClient::::new(channel)) + } + } + + #[tokio::test] + async fn test_normal_forward() { + let config = Config { + enable: true, + ..Default::default() + }; + + let mut mock_router = MockRouter { + routing_tables: HashMap::new(), + }; + let test_metric0: &str = "test_metric0"; + let test_metric1: &str = "test_metric1"; + let test_metric2: &str = "test_metric2"; + let test_metric3: &str = "test_metric3"; + let test_endpoint0 = Endpoint::new("192.168.1.12".to_string(), 8831); + let test_endpoint1 = Endpoint::new("192.168.1.2".to_string(), 8831); + let test_endpoint2 = Endpoint::new("192.168.1.2".to_string(), 8832); + let test_endpoint3 = Endpoint::new("192.168.1.1".to_string(), 8831); + mock_router + .routing_tables + .insert(test_metric0.to_string(), test_endpoint0.clone()); + mock_router + .routing_tables + .insert(test_metric1.to_string(), test_endpoint1.clone()); + mock_router + .routing_tables + .insert(test_metric2.to_string(), test_endpoint2.clone()); + mock_router + .routing_tables + .insert(test_metric3.to_string(), test_endpoint3.clone()); + let mock_router = Arc::new(mock_router); + + let local_endpoint = test_endpoint3.clone(); + let forwarder = Forwarder::try_new_with_client_builder( + config, + mock_router.clone() as _, + local_endpoint.clone(), + MockClientBuilder, + ) + .unwrap(); + + let make_forward_req = |metric: &str| { + let query_request = QueryRequest { + metrics: vec![metric.to_string()], + ql: "".to_string(), + }; + ForwardRequest { + schema: "public".to_string(), + metric: metric.to_string(), + req: query_request.into_request(), + } + }; + + let do_rpc = |_client, req: tonic::Request, endpoint: &Endpoint| { + let tenant = req.metadata().get(TENANT_HEADER).unwrap().to_str().unwrap(); + assert_eq!(tenant, "public"); + let req = req.into_inner(); + let expect_endpoint = mock_router.routing_tables.get(&req.metrics[0]).unwrap(); + assert_eq!(expect_endpoint, endpoint); + + let resp = QueryResponse::default(); + Box::new(async move { Ok(resp) }.boxed()) as _ + }; + + for test_metric in [test_metric0, test_metric1, test_metric2, test_metric3] { + let endpoint = mock_router.routing_tables.get(test_metric).unwrap(); + let forward_req = make_forward_req(test_metric); + let res: Result> = + forwarder.forward(forward_req, do_rpc).await; + let forward_res = res.expect("should succeed in forwarding"); + if endpoint == &local_endpoint { + assert!(forwarder.is_local_endpoint(endpoint)); + assert!( + matches!(forward_res, ForwardResult::Original), + "endpoint is:{:?}", + endpoint + ); + } else { + assert!(!forwarder.is_local_endpoint(endpoint)); + assert!( + matches!(forward_res, ForwardResult::Forwarded(_)), + "endpoint is:{:?}", + endpoint + ); + } + } + } +} diff --git a/server/src/grpc/mod.rs b/server/src/grpc/mod.rs index ed839c4f74..2cf342263e 100644 --- a/server/src/grpc/mod.rs +++ b/server/src/grpc/mod.rs @@ -4,6 +4,7 @@ use std::{ net::{AddrParseError, SocketAddr}, + str::FromStr, stringify, sync::Arc, }; @@ -19,6 +20,7 @@ use common_types::{ }; use common_util::{ define_result, + error::GenericError, runtime::{JoinHandle, Runtime}, }; use futures::FutureExt; @@ -30,12 +32,17 @@ use tokio::sync::oneshot::{self, Sender}; use tonic::transport::Server; use crate::{ - grpc::{meta_event_service::MetaServiceImpl, storage_service::StorageServiceImpl}, + config::Endpoint, + grpc::{ + forward::Forwarder, meta_event_service::MetaServiceImpl, + storage_service::StorageServiceImpl, + }, instance::InstanceRef, route::RouterRef, schema_config_provider::{self, SchemaConfigProviderRef}, }; +pub mod forward; mod meta_event_service; mod metrics; mod storage_service; @@ -71,6 +78,15 @@ pub enum Error { #[snafu(display("Missing runtimes.\nBacktrace:\n{}", backtrace))] MissingRuntimes { backtrace: Backtrace }, + #[snafu(display( + "Missing local endpoint when forwarder enabled.\nBacktrace:\n{}", + backtrace + ))] + MissingLocalEndpoint { backtrace: Backtrace }, + + #[snafu(display("Invalid local endpoint when forwarder enabled, err:{}", source,))] + InvalidLocalEndpoint { source: GenericError }, + #[snafu(display("Missing instance.\nBacktrace:\n{}", backtrace))] MissingInstance { backtrace: Backtrace }, @@ -92,9 +108,12 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display("Fail to build table schema for metric: {}, err:{}", metric, source))] + #[snafu(display("Fail to build table schema for metric:{}, err:{}", metric, source))] BuildTableSchema { metric: String, source: SchemaError }, + #[snafu(display("Fail to build forwarder, err:{}", source))] + BuildForwarder { source: forward::Error }, + #[snafu(display( "Fail to build column schema from column: {}, err:{}", column_name, @@ -104,6 +123,7 @@ pub enum Error { column_name: String, source: column_schema::Error, }, + #[snafu(display("Invalid column: {} schema, err:{}", column_name, source))] InvalidColumnSchema { column_name: String, @@ -173,22 +193,26 @@ impl RpcServices { pub struct Builder { endpoint: String, + local_endpoint: Option, runtimes: Option>, instance: Option>, router: Option, cluster: Option, schema_config_provider: Option, + forward_config: Option, } impl Builder { pub fn new() -> Self { Self { endpoint: "0.0.0.0:8381".to_string(), + local_endpoint: None, runtimes: None, instance: None, router: None, cluster: None, schema_config_provider: None, + forward_config: None, } } @@ -197,6 +221,12 @@ impl Builder { self } + pub fn local_endpoint(mut self, endpoint: String) -> Self { + self.local_endpoint = Some(endpoint); + + self + } + pub fn runtimes(mut self, runtimes: Arc) -> Self { self.runtimes = Some(runtimes); self @@ -222,6 +252,11 @@ impl Builder { self.schema_config_provider = Some(provider); self } + + pub fn forward_config(mut self, config: forward::Config) -> Self { + self.forward_config = Some(config); + self + } } impl Builder { @@ -242,12 +277,26 @@ impl Builder { MetaEventServiceServer::new(meta_service) }); + let forward_config = self.forward_config.unwrap_or_default(); + let forwarder = if forward_config.enable { + let local_endpoint = + Endpoint::from_str(&self.local_endpoint.context(MissingLocalEndpoint)?) + .context(InvalidLocalEndpoint)?; + let forwarder = Arc::new( + Forwarder::try_new(forward_config, router.clone(), local_endpoint) + .context(BuildForwarder)?, + ); + Some(forwarder) + } else { + None + }; let bg_runtime = runtimes.bg_runtime.clone(); let storage_service = StorageServiceImpl { router, instance, runtimes, schema_config_provider, + forwarder, }; let rpc_server = StorageServiceServer::new(storage_service); diff --git a/server/src/grpc/storage_service/mod.rs b/server/src/grpc/storage_service/mod.rs index 4813d102b2..7ce10266b0 100644 --- a/server/src/grpc/storage_service/mod.rs +++ b/server/src/grpc/storage_service/mod.rs @@ -39,6 +39,7 @@ use tonic::metadata::{KeyAndValueRef, MetadataMap}; use crate::{ consts, grpc::{ + forward::ForwarderRef, metrics::GRPC_HANDLER_DURATION_HISTOGRAM_VEC, storage_service::error::{ErrNoCause, ErrWithCause, Result}, }, @@ -98,6 +99,7 @@ pub struct HandlerContext<'a, Q> { catalog: String, schema: String, schema_config: Option<&'a SchemaConfig>, + forwarder: Option, } impl<'a, Q> HandlerContext<'a, Q> { @@ -106,6 +108,7 @@ impl<'a, Q> HandlerContext<'a, Q> { router: Arc, instance: InstanceRef, schema_config_provider: &'a SchemaConfigProviderRef, + forwarder: Option, ) -> Result { let default_catalog = instance.catalog_manager.default_catalog_name(); let default_schema = instance.catalog_manager.default_schema_name(); @@ -147,6 +150,7 @@ impl<'a, Q> HandlerContext<'a, Q> { catalog, schema, schema_config, + forwarder, }) } @@ -166,6 +170,7 @@ pub struct StorageServiceImpl { pub instance: InstanceRef, pub runtimes: Arc, pub schema_config_provider: SchemaConfigProviderRef, + pub forwarder: Option, } impl Clone for StorageServiceImpl { @@ -175,6 +180,7 @@ impl Clone for StorageServiceImpl { instance: self.instance.clone(), runtimes: self.runtimes.clone(), schema_config_provider: self.schema_config_provider.clone(), + forwarder: self.forwarder.clone(), } } } @@ -191,6 +197,7 @@ macro_rules! handle_request { let router = self.router.clone(); let header = RequestHeader::from(request.metadata()); let instance = self.instance.clone(); + let forwarder = self.forwarder.clone(); // The future spawned by tokio cannot be executed by other executor/runtime, so @@ -204,7 +211,7 @@ macro_rules! handle_request { // we need to pass the result via channel let join_handle = runtime.spawn(async move { let handler_ctx = - HandlerContext::new(header, router, instance, &schema_config_provider) + HandlerContext::new(header, router, instance, &schema_config_provider, forwarder) .map_err(|e| Box::new(e) as _) .context(ErrWithCause { code: StatusCode::BAD_REQUEST, @@ -274,12 +281,18 @@ impl StorageServiceImpl { let instance = self.instance.clone(); let schema_config_provider = self.schema_config_provider.clone(); - let handler_ctx = HandlerContext::new(header, router, instance, &schema_config_provider) - .map_err(|e| Box::new(e) as _) - .context(ErrWithCause { - code: StatusCode::BAD_REQUEST, - msg: "invalid header", - })?; + let handler_ctx = HandlerContext::new( + header, + router, + instance, + &schema_config_provider, + self.forwarder.clone(), + ) + .map_err(|e| Box::new(e) as _) + .context(ErrWithCause { + code: StatusCode::BAD_REQUEST, + msg: "invalid header", + })?; let mut total_success = 0; let mut resp = WriteResponse::default(); @@ -332,10 +345,11 @@ impl StorageServiceImpl { let header = RequestHeader::from(request.metadata()); let instance = self.instance.clone(); let schema_config_provider = self.schema_config_provider.clone(); + let forwarder = self.forwarder.clone(); let (tx, rx) = mpsc::channel(STREAM_QUERY_CHANNEL_LEN); let _: JoinHandle> = self.runtimes.read_runtime.spawn(async move { - let handler_ctx = HandlerContext::new(header, router, instance, &schema_config_provider) + let handler_ctx = HandlerContext::new(header, router, instance, &schema_config_provider, forwarder) .map_err(|e| Box::new(e) as _) .context(ErrWithCause { code: StatusCode::BAD_REQUEST, diff --git a/server/src/grpc/storage_service/query.rs b/server/src/grpc/storage_service/query.rs index 687fbe07c8..aa165483cc 100644 --- a/server/src/grpc/storage_service/query.rs +++ b/server/src/grpc/storage_service/query.rs @@ -6,25 +6,33 @@ use std::time::Instant; use ceresdbproto::{ common::ResponseHeader, - storage::{query_response, QueryRequest, QueryResponse}, + storage::{ + query_response, storage_service_client::StorageServiceClient, QueryRequest, QueryResponse, + }, }; use common_types::{record_batch::RecordBatch, request_id::RequestId}; use common_util::time::InstantExt; +use futures::FutureExt; use http::StatusCode; use interpreters::{context::Context as InterpreterContext, factory::Factory, interpreter::Output}; -use log::info; +use log::{error, info, warn}; use query_engine::executor::{Executor as QueryExecutor, RecordBatchVec}; use snafu::{ensure, ResultExt}; use sql::{ frontend::{Context as SqlContext, Frontend}, provider::CatalogMetaProvider, }; +use tonic::{transport::Channel, IntoRequest}; use crate::{ avro_util, - grpc::storage_service::{ - error::{ErrNoCause, ErrWithCause, Result}, - HandlerContext, + config::Endpoint, + grpc::{ + forward::{ForwardRequest, ForwardResult}, + storage_service::{ + error::{ErrNoCause, ErrWithCause, Result}, + HandlerContext, + }, }, }; @@ -43,10 +51,66 @@ fn empty_ok_resp() -> QueryResponse { } } +async fn maybe_forward_query( + ctx: &HandlerContext<'_, Q>, + req: &QueryRequest, +) -> Option> { + let forwarder = ctx.forwarder.as_ref()?; + + if req.metrics.len() != 1 { + warn!( + "Unable to forward query without exactly one metric, req:{:?}", + req + ); + + return None; + } + + let forward_req = ForwardRequest { + schema: ctx.schema.clone(), + metric: req.metrics[0].clone(), + req: req.clone().into_request(), + }; + let do_query = |mut client: StorageServiceClient, + request: tonic::Request, + _: &Endpoint| { + let query = async move { + client + .query(request) + .await + .map(|resp| resp.into_inner()) + .map_err(|e| Box::new(e) as _) + .context(ErrWithCause { + code: StatusCode::INTERNAL_SERVER_ERROR, + msg: "Forwarded query failed".to_string(), + }) + } + .boxed(); + + Box::new(query) as _ + }; + + match forwarder.forward(forward_req, do_query).await { + Ok(forward_res) => match forward_res { + ForwardResult::Forwarded(v) => Some(v), + ForwardResult::Original => None, + }, + Err(e) => { + error!("Failed to forward req but the error is ignored, err:{}", e); + None + } + } +} + pub async fn handle_query( ctx: &HandlerContext<'_, Q>, req: QueryRequest, ) -> Result { + let req = match maybe_forward_query(ctx, &req).await { + Some(resp) => return resp, + None => req, + }; + let output_result = fetch_query_output(ctx, &req).await?; if let Some(output) = output_result { convert_output(&output) diff --git a/server/src/server.rs b/server/src/server.rs index 473ca53c43..c42339f0b5 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -319,11 +319,15 @@ impl Builder { .context(MissingSchemaConfigProvider)?; let rpc_services = grpc::Builder::new() .endpoint(Endpoint::new(self.config.bind_addr, self.config.grpc_port).to_string()) + .local_endpoint( + Endpoint::new(self.config.cluster.node.addr, self.config.grpc_port).to_string(), + ) .runtimes(engine_runtimes) .instance(instance.clone()) .router(router) .cluster(self.cluster.clone()) .schema_config_provider(provider) + .forward_config(self.config.forward) .build() .context(BuildGrpcService)?;