diff --git a/components/future_ext/src/cancel.rs b/components/future_ext/src/cancel.rs index 7af978cd05..1eef43c6cf 100644 --- a/components/future_ext/src/cancel.rs +++ b/components/future_ext/src/cancel.rs @@ -107,29 +107,6 @@ where } } -pub struct FutureCancelledGuard { - f: F, - cancelled: bool, -} - -impl FutureCancelledGuard { - pub fn new(f: F) -> Self { - Self { f, cancelled: true } - } - - pub fn uncancelled(&mut self) { - self.cancelled = false; - } -} - -impl Drop for FutureCancelledGuard { - fn drop(&mut self) { - if self.cancelled { - (self.f)(); - } - } -} - #[cfg(test)] mod tests { use std::{ diff --git a/components/future_ext/src/lib.rs b/components/future_ext/src/lib.rs index b66acbe261..dd8c63daf8 100644 --- a/components/future_ext/src/lib.rs +++ b/components/future_ext/src/lib.rs @@ -17,5 +17,5 @@ mod cancel; mod retry; -pub use cancel::{CancellationSafeFuture, FutureCancelledGuard}; +pub use cancel::CancellationSafeFuture; pub use retry::{retry_async, RetryConfig}; diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index 1fcae1b4c1..f8481f5465 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -36,8 +36,8 @@ cluster = { workspace = true } common_types = { workspace = true } datafusion = { workspace = true } df_operator = { workspace = true } -futures = { workspace = true } future_ext = { workspace = true } +futures = { workspace = true } generic_error = { workspace = true } http = "0.2" influxdb-line-protocol = "1.0" diff --git a/proxy/src/grpc/sql_query.rs b/proxy/src/grpc/sql_query.rs index b3deb82214..b95e4e47ae 100644 --- a/proxy/src/grpc/sql_query.rs +++ b/proxy/src/grpc/sql_query.rs @@ -85,7 +85,7 @@ impl Proxy { let result = match self.request_notifiers.clone() { Some(request_notifiers) => { - self.deduped_handle_sql( + self.dedup_handle_sql( ctx, schema, &req.sql, diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index 2af00a4504..ae227851e9 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -82,6 +82,7 @@ use table_engine::{ PARTITION_TABLE_ENGINE_TYPE, }; use time_ext::{current_time_millis, parse_duration}; +use tokio::sync::mpsc::Sender; use tonic::{transport::Channel, IntoRequest}; use crate::{ @@ -124,7 +125,7 @@ pub struct Proxy { engine_runtimes: Arc, cluster_with_meta: bool, sub_table_access_perm: SubTableAccessPerm, - request_notifiers: Option>>>, + request_notifiers: Option>>>>, } impl Proxy { @@ -141,7 +142,7 @@ impl Proxy { engine_runtimes: Arc, cluster_with_meta: bool, sub_table_access_perm: SubTableAccessPerm, - request_notifiers: Option>>>, + request_notifiers: Option>>>>, ) -> Self { let forwarder = Arc::new(Forwarder::new( forward_config, diff --git a/proxy/src/read.rs b/proxy/src/read.rs index 08a4ff8431..75f03a74a2 100644 --- a/proxy/src/read.rs +++ b/proxy/src/read.rs @@ -19,7 +19,6 @@ use std::{sync::Arc, time::Instant}; use ceresdbproto::storage::{ storage_service_client::StorageServiceClient, RequestContext, SqlQueryRequest, SqlQueryResponse, }; -use future_ext::FutureCancelledGuard; use futures::FutureExt; use generic_error::BoxError; use http::StatusCode; @@ -33,11 +32,11 @@ use query_frontend::{ use router::endpoint::Endpoint; use snafu::{ensure, ResultExt}; use time_ext::InstantExt; -use tokio::sync::mpsc; +use tokio::sync::mpsc::{self, Sender}; use tonic::{transport::Channel, IntoRequest}; use crate::{ - dedup_requests::{RequestNotifiers, RequestResult}, + dedup_requests::{ExecutionGuard, RequestNotifiers, RequestResult}, error::{ErrNoCause, ErrWithCause, Error, Internal, Result}, forward::{ForwardRequest, ForwardResult}, metrics::GRPC_HANDLER_COUNTER_VEC, @@ -73,17 +72,17 @@ impl Proxy { )) } - pub(crate) async fn deduped_handle_sql( + pub(crate) async fn dedup_handle_sql( &self, ctx: &Context, schema: &str, sql: &str, - request_notifiers: Arc>>, + request_notifiers: Arc>>>, enable_partition_table_access: bool, ) -> Result { let (tx, mut rx) = mpsc::channel(1); let mut guard = match request_notifiers.insert_notifier(sql.to_string(), tx) { - RequestResult::First => FutureCancelledGuard::new(|| { + RequestResult::First => ExecutionGuard::new(|| { request_notifiers.take_notifiers(&sql.to_string()); }), RequestResult::Wait => { @@ -98,7 +97,7 @@ impl Proxy { match resp { ForwardResult::Forwarded(resp) => { let resp = resp?; - guard.uncancelled(); + guard.cancel(); let notifiers = request_notifiers.take_notifiers(&sql.to_string()).unwrap(); for notifier in ¬ifiers { if let Err(e) = notifier @@ -121,7 +120,7 @@ impl Proxy { .fetch_sql_query_output(ctx, schema, sql, enable_partition_table_access) .await?; - guard.uncancelled(); + guard.cancel(); let notifiers = request_notifiers.take_notifiers(&sql.to_string()).unwrap(); for notifier in ¬ifiers { if let Err(e) = notifier.send(Ok(SqlResponse::Local(result.clone()))).await { @@ -231,10 +230,10 @@ impl Proxy { Output::AffectedRows(_) => Ok(output), Output::Records(v) => { if plan_maybe_expired { - let row_nums = v + let num_rows = v .iter() .fold(0_usize, |acc, record_batch| acc + record_batch.num_rows()); - if row_nums == 0 { + if num_rows == 0 { warn!("Query time range maybe exceed TTL, sql:{sql}"); // TODO: Cannot return this error directly, empty query diff --git a/server/src/grpc/mod.rs b/server/src/grpc/mod.rs index b0e097d00a..1dfabaebe2 100644 --- a/server/src/grpc/mod.rs +++ b/server/src/grpc/mod.rs @@ -35,7 +35,6 @@ use log::{info, warn}; use macros::define_result; use notifier::notifier::RequestNotifiers; use proxy::{ - dedup_requests::RequestNotifiers, forward, hotspot::HotspotRecorder, instance::InstanceRef, diff --git a/server/src/grpc/remote_engine_service/mod.rs b/server/src/grpc/remote_engine_service/mod.rs index 80cb120f54..0811f07558 100644 --- a/server/src/grpc/remote_engine_service/mod.rs +++ b/server/src/grpc/remote_engine_service/mod.rs @@ -39,7 +39,6 @@ use generic_error::BoxError; use log::{error, info}; use notifier::notifier::{ExecutionGuard, RequestNotifiers, RequestResult}; use proxy::{ - dedup_requests::{RequestNotifiers, RequestResult}, hotspot::{HotspotRecorder, Message}, instance::InstanceRef, }; @@ -260,7 +259,7 @@ impl RemoteEngineServiceImpl { } // We should set cancel to guard, otherwise the key will be removed twice. - guard.uncancelled(); + guard.cancel(); let notifiers = request_notifiers.take_notifiers(&request_key).unwrap(); // Do send in background to avoid blocking the rpc procedure.