Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: dedup requests in proxy #1125

Merged
merged 3 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions interpreters/src/interpreter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ define_result!(Error);

// TODO(yingwen): Maybe add a stream variant for streaming result
/// The interpreter output
#[derive(Clone)]
pub enum Output {
/// Affected rows number
AffectedRows(usize),
Expand Down
123 changes: 123 additions & 0 deletions proxy/src/dedup_requests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright 2023 The CeresDB Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{collections::HashMap, hash::Hash, sync::RwLock};

#[derive(Debug)]
struct Notifiers<T> {
notifiers: RwLock<Vec<T>>,
}

impl<T> Notifiers<T> {
pub fn new(notifier: T) -> Self {
let notifiers = vec![notifier];
Self {
notifiers: RwLock::new(notifiers),
}
}

pub fn add_notifier(&self, notifier: T) {
self.notifiers.write().unwrap().push(notifier);
}
}

#[derive(Debug)]
pub struct RequestNotifiers<K, T>
where
K: PartialEq + Eq + Hash,
{
notifiers_by_key: RwLock<HashMap<K, Notifiers<T>>>,
}

impl<K, T> Default for RequestNotifiers<K, T>
where
K: PartialEq + Eq + Hash,
{
fn default() -> Self {
Self {
notifiers_by_key: RwLock::new(HashMap::new()),
}
}
}

impl<K, T> RequestNotifiers<K, T>
where
K: PartialEq + Eq + Hash,
{
/// Insert a notifier for the given key.
pub fn insert_notifier(&self, key: K, notifier: T) -> RequestResult {
// First try to read the notifiers, if the key exists, add the notifier to the
// notifiers.
let notifiers_by_key = self.notifiers_by_key.read().unwrap();
if let Some(notifiers) = notifiers_by_key.get(&key) {
notifiers.add_notifier(notifier);
return RequestResult::Wait;
}
drop(notifiers_by_key);

// If the key does not exist, try to write the notifiers.
let mut notifiers_by_key = self.notifiers_by_key.write().unwrap();
// double check, if the key exists, add the notifier to the notifiers.
if let Some(notifiers) = notifiers_by_key.get(&key) {
notifiers.add_notifier(notifier);
return RequestResult::Wait;
}

//the key is not existed, insert the key and the notifier.
notifiers_by_key.insert(key, Notifiers::new(notifier));
RequestResult::First
}

/// Take the notifiers for the given key, and remove the key from the map.
pub fn take_notifiers(&self, key: &K) -> Option<Vec<T>> {
self.notifiers_by_key
.write()
.unwrap()
.remove(key)
.map(|notifiers| notifiers.notifiers.into_inner().unwrap())
}
}

pub enum RequestResult {
// The first request for this key, need to handle this request.
First,
// There are other requests for this key, just wait for the result.
Wait,
}

pub struct ExecutionGuard<F: FnMut()> {
f: F,
cancelled: bool,
}

impl<F: FnMut()> ExecutionGuard<F> {
pub fn new(f: F) -> Self {
Self {
f,
cancelled: false,
}
}

pub fn cancel(&mut self) {
self.cancelled = true;
}
}

impl<F: FnMut()> Drop for ExecutionGuard<F> {
fn drop(&mut self) {
if !self.cancelled {
(self.f)()
}
}
}
33 changes: 24 additions & 9 deletions proxy/src/grpc/sql_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,30 @@ impl Proxy {

let req_context = req.context.as_ref().unwrap();
let schema = &req_context.database;
match self
.handle_sql(
ctx,
schema,
&req.sql,
self.sub_table_access_perm.enable_others,
)
.await?
{

let result = match self.request_notifiers.clone() {
Some(request_notifiers) => {
self.dedup_handle_sql(
ctx,
schema,
&req.sql,
request_notifiers,
self.sub_table_access_perm.enable_others,
)
.await?
}
None => {
self.handle_sql(
ctx,
schema,
&req.sql,
self.sub_table_access_perm.enable_others,
)
.await?
}
};

match result {
SqlResponse::Forwarded(resp) => Ok(resp),
SqlResponse::Local(output) => convert_output(&output, self.resp_compress_min_length),
}
Expand Down
7 changes: 7 additions & 0 deletions proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#![feature(trait_alias)]

pub mod context;
pub mod dedup_requests;
pub mod error;
mod error_util;
pub mod forward;
Expand Down Expand Up @@ -81,13 +82,16 @@ use table_engine::{
PARTITION_TABLE_ENGINE_TYPE,
};
use time_ext::{current_time_millis, parse_duration};
use tokio::sync::mpsc::Sender;
use tonic::{transport::Channel, IntoRequest};

use crate::{
dedup_requests::RequestNotifiers,
error::{ErrNoCause, ErrWithCause, Error, Internal, Result},
forward::{ForwardRequest, ForwardResult, Forwarder, ForwarderRef},
hotspot::HotspotRecorder,
instance::InstanceRef,
read::SqlResponse,
schema_config_provider::SchemaConfigProviderRef,
};

Expand Down Expand Up @@ -121,6 +125,7 @@ pub struct Proxy {
engine_runtimes: Arc<EngineRuntimes>,
cluster_with_meta: bool,
sub_table_access_perm: SubTableAccessPerm,
request_notifiers: Option<Arc<RequestNotifiers<String, Sender<Result<SqlResponse>>>>>,
}

impl Proxy {
Expand All @@ -137,6 +142,7 @@ impl Proxy {
engine_runtimes: Arc<EngineRuntimes>,
cluster_with_meta: bool,
sub_table_access_perm: SubTableAccessPerm,
request_notifiers: Option<Arc<RequestNotifiers<String, Sender<Result<SqlResponse>>>>>,
) -> Self {
let forwarder = Arc::new(Forwarder::new(
forward_config,
Expand All @@ -155,6 +161,7 @@ impl Proxy {
engine_runtimes,
cluster_with_meta,
sub_table_access_perm,
request_notifiers,
}
}

Expand Down
1 change: 1 addition & 0 deletions proxy/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ make_auto_flush_static_metric! {
write_failed_row,
query_succeeded_row,
query_affected_row,
dedupped_stream_query,
}

pub struct GrpcHandlerCounterVec: LocalIntCounter {
Expand Down
70 changes: 67 additions & 3 deletions proxy/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

//! Contains common methods used by the read process.

use std::time::Instant;
use std::{sync::Arc, time::Instant};

use ceresdbproto::storage::{
storage_service_client::StorageServiceClient, RequestContext, SqlQueryRequest, SqlQueryResponse,
Expand All @@ -32,11 +32,14 @@ use query_frontend::{
use router::endpoint::Endpoint;
use snafu::{ensure, ResultExt};
use time_ext::InstantExt;
use tokio::sync::mpsc::{self, Sender};
use tonic::{transport::Channel, IntoRequest};

use crate::{
dedup_requests::{ExecutionGuard, RequestNotifiers, RequestResult},
error::{ErrNoCause, ErrWithCause, Error, Internal, Result},
forward::{ForwardRequest, ForwardResult},
metrics::GRPC_HANDLER_COUNTER_VEC,
Context, Proxy,
};

Expand Down Expand Up @@ -69,6 +72,67 @@ impl Proxy {
))
}

pub(crate) async fn dedup_handle_sql(
&self,
ctx: &Context,
schema: &str,
sql: &str,
request_notifiers: Arc<RequestNotifiers<String, Sender<Result<SqlResponse>>>>,
enable_partition_table_access: bool,
) -> Result<SqlResponse> {
let (tx, mut rx) = mpsc::channel(1);
let mut guard = match request_notifiers.insert_notifier(sql.to_string(), tx) {
RequestResult::First => ExecutionGuard::new(|| {
request_notifiers.take_notifiers(&sql.to_string());
}),
RequestResult::Wait => {
return rx.recv().await.unwrap();
}
};

if let Some(resp) = self
.maybe_forward_sql_query(ctx.clone(), schema, sql)
.await?
{
match resp {
ForwardResult::Forwarded(resp) => {
let resp = resp?;
guard.cancel();
let notifiers = request_notifiers.take_notifiers(&sql.to_string()).unwrap();
for notifier in &notifiers {
if let Err(e) = notifier
.send(Ok(SqlResponse::Forwarded(resp.clone())))
.await
{
error!("Failed to send handler result, err:{}.", e);
}
}
GRPC_HANDLER_COUNTER_VEC
.dedupped_stream_query
.inc_by((notifiers.len() - 1) as u64);
return Ok(SqlResponse::Forwarded(resp));
}
ForwardResult::Local => (),
}
};

let result = self
.fetch_sql_query_output(ctx, schema, sql, enable_partition_table_access)
.await?;

guard.cancel();
let notifiers = request_notifiers.take_notifiers(&sql.to_string()).unwrap();
for notifier in &notifiers {
if let Err(e) = notifier.send(Ok(SqlResponse::Local(result.clone()))).await {
error!("Failed to send handler result, err:{}.", e);
}
}
GRPC_HANDLER_COUNTER_VEC
.dedupped_stream_query
.inc_by((notifiers.len() - 1) as u64);
Ok(SqlResponse::Local(result))
}

pub(crate) async fn fetch_sql_query_output(
&self,
ctx: &Context,
Expand Down Expand Up @@ -166,10 +230,10 @@ impl Proxy {
Output::AffectedRows(_) => Ok(output),
Output::Records(v) => {
if plan_maybe_expired {
let row_nums = v
let num_rows = v
.iter()
.fold(0_usize, |acc, record_batch| acc + record_batch.num_rows());
if row_nums == 0 {
if num_rows == 0 {
warn!("Query time range maybe exceed TTL, sql:{sql}");

// TODO: Cannot return this error directly, empty query
Expand Down
24 changes: 21 additions & 3 deletions server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,24 @@ impl From<&StaticTopologyConfig> for ClusterView {
}
}

#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(default)]
pub struct QueryDedupConfig {
pub enable: bool,
pub notify_timeout: ReadableDuration,
pub notify_queue_cap: usize,
}

impl Default for QueryDedupConfig {
fn default() -> Self {
Self {
enable: false,
notify_timeout: ReadableDuration::secs(1),
notify_queue_cap: 1000,
}
}
}

#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(default)]
pub struct ServerConfig {
Expand Down Expand Up @@ -135,8 +153,8 @@ pub struct ServerConfig {
/// Config of remote engine client
pub remote_client: remote_engine_client::Config,

/// Whether to deduplicate requests
pub enable_query_dedup: bool,
/// Config of dedup query
pub query_dedup: QueryDedupConfig,

/// Whether enable to access partition table
pub sub_table_access_perm: SubTableAccessPerm,
Expand All @@ -160,7 +178,7 @@ impl Default for ServerConfig {
route_cache: router::RouteCacheConfig::default(),
hotspot: hotspot::Config::default(),
remote_client: remote_engine_client::Config::default(),
enable_query_dedup: false,
query_dedup: QueryDedupConfig::default(),
sub_table_access_perm: SubTableAccessPerm::default(),
}
}
Expand Down
Loading
Loading