Skip to content

Commit

Permalink
dedup requests in proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
baojinri authored and ShiKaiWi committed Sep 22, 2023
1 parent 41f6313 commit 82292f1
Show file tree
Hide file tree
Showing 13 changed files with 258 additions and 12 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions components/future_ext/src/cancel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,29 @@ where
}
}

pub struct FutureCancelledGuard<F: FnMut()> {
f: F,
cancelled: bool,
}

impl<F: FnMut()> FutureCancelledGuard<F> {
pub fn new(f: F) -> Self {
Self { f, cancelled: true }
}

pub fn uncancelled(&mut self) {
self.cancelled = false;
}
}

impl<F: FnMut()> Drop for FutureCancelledGuard<F> {
fn drop(&mut self) {
if self.cancelled {
(self.f)();
}
}
}

#[cfg(test)]
mod tests {
use std::{
Expand Down
2 changes: 1 addition & 1 deletion components/future_ext/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@
mod cancel;
mod retry;

pub use cancel::CancellationSafeFuture;
pub use cancel::{CancellationSafeFuture, FutureCancelledGuard};
pub use retry::{retry_async, RetryConfig};
1 change: 1 addition & 0 deletions interpreters/src/interpreter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ define_result!(Error);

// TODO(yingwen): Maybe add a stream variant for streaming result
/// The interpreter output
#[derive(Clone)]
pub enum Output {
/// Affected rows number
AffectedRows(usize),
Expand Down
1 change: 1 addition & 0 deletions proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ common_types = { workspace = true }
datafusion = { workspace = true }
df_operator = { workspace = true }
futures = { workspace = true }
future_ext = { workspace = true }
generic_error = { workspace = true }
http = "0.2"
influxdb-line-protocol = "1.0"
Expand Down
123 changes: 123 additions & 0 deletions proxy/src/dedup_requests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright 2023 The CeresDB Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{collections::HashMap, hash::Hash, sync::RwLock};

#[derive(Debug)]
struct Notifiers<T> {
notifiers: RwLock<Vec<T>>,
}

impl<T> Notifiers<T> {
pub fn new(notifier: T) -> Self {
let notifiers = vec![notifier];
Self {
notifiers: RwLock::new(notifiers),
}
}

pub fn add_notifier(&self, notifier: T) {
self.notifiers.write().unwrap().push(notifier);
}
}

#[derive(Debug)]
pub struct RequestNotifiers<K, T>
where
K: PartialEq + Eq + Hash,
{
notifiers_by_key: RwLock<HashMap<K, Notifiers<T>>>,
}

impl<K, T> Default for RequestNotifiers<K, T>
where
K: PartialEq + Eq + Hash,
{
fn default() -> Self {
Self {
notifiers_by_key: RwLock::new(HashMap::new()),
}
}
}

impl<K, T> RequestNotifiers<K, T>
where
K: PartialEq + Eq + Hash,
{
/// Insert a notifier for the given key.
pub fn insert_notifier(&self, key: K, notifier: T) -> RequestResult {
// First try to read the notifiers, if the key exists, add the notifier to the
// notifiers.
let notifiers_by_key = self.notifiers_by_key.read().unwrap();
if let Some(notifiers) = notifiers_by_key.get(&key) {
notifiers.add_notifier(notifier);
return RequestResult::Wait;
}
drop(notifiers_by_key);

// If the key does not exist, try to write the notifiers.
let mut notifiers_by_key = self.notifiers_by_key.write().unwrap();
// double check, if the key exists, add the notifier to the notifiers.
if let Some(notifiers) = notifiers_by_key.get(&key) {
notifiers.add_notifier(notifier);
return RequestResult::Wait;
}

//the key is not existed, insert the key and the notifier.
notifiers_by_key.insert(key, Notifiers::new(notifier));
RequestResult::First
}

/// Take the notifiers for the given key, and remove the key from the map.
pub fn take_notifiers(&self, key: &K) -> Option<Vec<T>> {
self.notifiers_by_key
.write()
.unwrap()
.remove(key)
.map(|notifiers| notifiers.notifiers.into_inner().unwrap())
}
}

pub enum RequestResult {
// The first request for this key, need to handle this request.
First,
// There are other requests for this key, just wait for the result.
Wait,
}

pub struct ExecutionGuard<F: FnMut()> {
f: F,
cancelled: bool,
}

impl<F: FnMut()> ExecutionGuard<F> {
pub fn new(f: F) -> Self {
Self {
f,
cancelled: false,
}
}

pub fn cancel(&mut self) {
self.cancelled = true;
}
}

impl<F: FnMut()> Drop for ExecutionGuard<F> {
fn drop(&mut self) {
if !self.cancelled {
(self.f)()
}
}
}
33 changes: 24 additions & 9 deletions proxy/src/grpc/sql_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,30 @@ impl Proxy {

let req_context = req.context.as_ref().unwrap();
let schema = &req_context.database;
match self
.handle_sql(
ctx,
schema,
&req.sql,
self.sub_table_access_perm.enable_others,
)
.await?
{

let result = match self.request_notifiers.clone() {
Some(request_notifiers) => {
self.deduped_handle_sql(
ctx,
schema,
&req.sql,
request_notifiers,
self.sub_table_access_perm.enable_others,
)
.await?
}
None => {
self.handle_sql(
ctx,
schema,
&req.sql,
self.sub_table_access_perm.enable_others,
)
.await?
}
};

match result {
SqlResponse::Forwarded(resp) => Ok(resp),
SqlResponse::Local(output) => convert_output(&output, self.resp_compress_min_length),
}
Expand Down
6 changes: 6 additions & 0 deletions proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#![feature(trait_alias)]

pub mod context;
pub mod dedup_requests;
pub mod error;
mod error_util;
pub mod forward;
Expand Down Expand Up @@ -84,10 +85,12 @@ use time_ext::{current_time_millis, parse_duration};
use tonic::{transport::Channel, IntoRequest};

use crate::{
dedup_requests::RequestNotifiers,
error::{ErrNoCause, ErrWithCause, Error, Internal, Result},
forward::{ForwardRequest, ForwardResult, Forwarder, ForwarderRef},
hotspot::HotspotRecorder,
instance::InstanceRef,
read::SqlResponse,
schema_config_provider::SchemaConfigProviderRef,
};

Expand Down Expand Up @@ -121,6 +124,7 @@ pub struct Proxy {
engine_runtimes: Arc<EngineRuntimes>,
cluster_with_meta: bool,
sub_table_access_perm: SubTableAccessPerm,
request_notifiers: Option<Arc<RequestNotifiers<String, Result<SqlResponse>>>>,
}

impl Proxy {
Expand All @@ -137,6 +141,7 @@ impl Proxy {
engine_runtimes: Arc<EngineRuntimes>,
cluster_with_meta: bool,
sub_table_access_perm: SubTableAccessPerm,
request_notifiers: Option<Arc<RequestNotifiers<String, Result<SqlResponse>>>>,
) -> Self {
let forwarder = Arc::new(Forwarder::new(
forward_config,
Expand All @@ -155,6 +160,7 @@ impl Proxy {
engine_runtimes,
cluster_with_meta,
sub_table_access_perm,
request_notifiers,
}
}

Expand Down
1 change: 1 addition & 0 deletions proxy/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ make_auto_flush_static_metric! {
write_failed_row,
query_succeeded_row,
query_affected_row,
dedupped_stream_query,
}

pub struct GrpcHandlerCounterVec: LocalIntCounter {
Expand Down
67 changes: 66 additions & 1 deletion proxy/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@

//! Contains common methods used by the read process.

use std::time::Instant;
use std::{sync::Arc, time::Instant};

use ceresdbproto::storage::{
storage_service_client::StorageServiceClient, RequestContext, SqlQueryRequest, SqlQueryResponse,
};
use future_ext::FutureCancelledGuard;
use futures::FutureExt;
use generic_error::BoxError;
use http::StatusCode;
Expand All @@ -32,11 +33,14 @@ use query_frontend::{
use router::endpoint::Endpoint;
use snafu::{ensure, ResultExt};
use time_ext::InstantExt;
use tokio::sync::mpsc;
use tonic::{transport::Channel, IntoRequest};

use crate::{
dedup_requests::{RequestNotifiers, RequestResult},
error::{ErrNoCause, ErrWithCause, Error, Internal, Result},
forward::{ForwardRequest, ForwardResult},
metrics::GRPC_HANDLER_COUNTER_VEC,
Context, Proxy,
};

Expand Down Expand Up @@ -69,6 +73,67 @@ impl Proxy {
))
}

pub(crate) async fn deduped_handle_sql(
&self,
ctx: &Context,
schema: &str,
sql: &str,
request_notifiers: Arc<RequestNotifiers<String, Result<SqlResponse>>>,
enable_partition_table_access: bool,
) -> Result<SqlResponse> {
let (tx, mut rx) = mpsc::channel(1);
let mut guard = match request_notifiers.insert_notifier(sql.to_string(), tx) {
RequestResult::First => FutureCancelledGuard::new(|| {
request_notifiers.take_notifiers(&sql.to_string());
}),
RequestResult::Wait => {
return rx.recv().await.unwrap();
}
};

if let Some(resp) = self
.maybe_forward_sql_query(ctx.clone(), schema, sql)
.await?
{
match resp {
ForwardResult::Forwarded(resp) => {
let resp = resp?;
guard.uncancelled();
let notifiers = request_notifiers.take_notifiers(&sql.to_string()).unwrap();
for notifier in &notifiers {
if let Err(e) = notifier
.send(Ok(SqlResponse::Forwarded(resp.clone())))
.await
{
error!("Failed to send handler result, err:{}.", e);
}
}
GRPC_HANDLER_COUNTER_VEC
.dedupped_stream_query
.inc_by((notifiers.len() - 1) as u64);
return Ok(SqlResponse::Forwarded(resp));
}
ForwardResult::Local => (),
}
};

let result = self
.fetch_sql_query_output(ctx, schema, sql, enable_partition_table_access)
.await?;

guard.uncancelled();
let notifiers = request_notifiers.take_notifiers(&sql.to_string()).unwrap();
for notifier in &notifiers {
if let Err(e) = notifier.send(Ok(SqlResponse::Local(result.clone()))).await {
error!("Failed to send handler result, err:{}.", e);
}
}
GRPC_HANDLER_COUNTER_VEC
.dedupped_stream_query
.inc_by((notifiers.len() - 1) as u64);
Ok(SqlResponse::Local(result))
}

pub(crate) async fn fetch_sql_query_output(
&self,
ctx: &Context,
Expand Down
1 change: 1 addition & 0 deletions server/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use log::{info, warn};
use macros::define_result;
use notifier::notifier::RequestNotifiers;
use proxy::{
dedup_requests::RequestNotifiers,
forward,
hotspot::HotspotRecorder,
instance::InstanceRef,
Expand Down
3 changes: 2 additions & 1 deletion server/src/grpc/remote_engine_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use generic_error::BoxError;
use log::{error, info};
use notifier::notifier::{ExecutionGuard, RequestNotifiers, RequestResult};
use proxy::{
dedup_requests::{RequestNotifiers, RequestResult},
hotspot::{HotspotRecorder, Message},
instance::InstanceRef,
};
Expand Down Expand Up @@ -259,7 +260,7 @@ impl RemoteEngineServiceImpl {
}

// We should set cancel to guard, otherwise the key will be removed twice.
guard.cancel();
guard.uncancelled();
let notifiers = request_notifiers.take_notifiers(&request_key).unwrap();

// Do send in background to avoid blocking the rpc procedure.
Expand Down
Loading

0 comments on commit 82292f1

Please sign in to comment.