Skip to content

Commit

Permalink
chore: address conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
ShiKaiWi committed Sep 22, 2023
1 parent 82292f1 commit fe4ad71
Show file tree
Hide file tree
Showing 8 changed files with 16 additions and 41 deletions.
23 changes: 0 additions & 23 deletions components/future_ext/src/cancel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,29 +107,6 @@ where
}
}

pub struct FutureCancelledGuard<F: FnMut()> {
f: F,
cancelled: bool,
}

impl<F: FnMut()> FutureCancelledGuard<F> {
pub fn new(f: F) -> Self {
Self { f, cancelled: true }
}

pub fn uncancelled(&mut self) {
self.cancelled = false;
}
}

impl<F: FnMut()> Drop for FutureCancelledGuard<F> {
fn drop(&mut self) {
if self.cancelled {
(self.f)();
}
}
}

#[cfg(test)]
mod tests {
use std::{
Expand Down
2 changes: 1 addition & 1 deletion components/future_ext/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@
mod cancel;
mod retry;

pub use cancel::{CancellationSafeFuture, FutureCancelledGuard};
pub use cancel::CancellationSafeFuture;
pub use retry::{retry_async, RetryConfig};
2 changes: 1 addition & 1 deletion proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ cluster = { workspace = true }
common_types = { workspace = true }
datafusion = { workspace = true }
df_operator = { workspace = true }
futures = { workspace = true }
future_ext = { workspace = true }
futures = { workspace = true }
generic_error = { workspace = true }
http = "0.2"
influxdb-line-protocol = "1.0"
Expand Down
2 changes: 1 addition & 1 deletion proxy/src/grpc/sql_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl Proxy {

let result = match self.request_notifiers.clone() {
Some(request_notifiers) => {
self.deduped_handle_sql(
self.dedup_handle_sql(
ctx,
schema,
&req.sql,
Expand Down
5 changes: 3 additions & 2 deletions proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ use table_engine::{
PARTITION_TABLE_ENGINE_TYPE,
};
use time_ext::{current_time_millis, parse_duration};
use tokio::sync::mpsc::Sender;
use tonic::{transport::Channel, IntoRequest};

use crate::{
Expand Down Expand Up @@ -124,7 +125,7 @@ pub struct Proxy {
engine_runtimes: Arc<EngineRuntimes>,
cluster_with_meta: bool,
sub_table_access_perm: SubTableAccessPerm,
request_notifiers: Option<Arc<RequestNotifiers<String, Result<SqlResponse>>>>,
request_notifiers: Option<Arc<RequestNotifiers<String, Sender<Result<SqlResponse>>>>>,
}

impl Proxy {
Expand All @@ -141,7 +142,7 @@ impl Proxy {
engine_runtimes: Arc<EngineRuntimes>,
cluster_with_meta: bool,
sub_table_access_perm: SubTableAccessPerm,
request_notifiers: Option<Arc<RequestNotifiers<String, Result<SqlResponse>>>>,
request_notifiers: Option<Arc<RequestNotifiers<String, Sender<Result<SqlResponse>>>>>,
) -> Self {
let forwarder = Arc::new(Forwarder::new(
forward_config,
Expand Down
19 changes: 9 additions & 10 deletions proxy/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use std::{sync::Arc, time::Instant};
use ceresdbproto::storage::{
storage_service_client::StorageServiceClient, RequestContext, SqlQueryRequest, SqlQueryResponse,
};
use future_ext::FutureCancelledGuard;
use futures::FutureExt;
use generic_error::BoxError;
use http::StatusCode;
Expand All @@ -33,11 +32,11 @@ use query_frontend::{
use router::endpoint::Endpoint;
use snafu::{ensure, ResultExt};
use time_ext::InstantExt;
use tokio::sync::mpsc;
use tokio::sync::mpsc::{self, Sender};
use tonic::{transport::Channel, IntoRequest};

use crate::{
dedup_requests::{RequestNotifiers, RequestResult},
dedup_requests::{ExecutionGuard, RequestNotifiers, RequestResult},
error::{ErrNoCause, ErrWithCause, Error, Internal, Result},
forward::{ForwardRequest, ForwardResult},
metrics::GRPC_HANDLER_COUNTER_VEC,
Expand Down Expand Up @@ -73,17 +72,17 @@ impl Proxy {
))
}

pub(crate) async fn deduped_handle_sql(
pub(crate) async fn dedup_handle_sql(
&self,
ctx: &Context,
schema: &str,
sql: &str,
request_notifiers: Arc<RequestNotifiers<String, Result<SqlResponse>>>,
request_notifiers: Arc<RequestNotifiers<String, Sender<Result<SqlResponse>>>>,
enable_partition_table_access: bool,
) -> Result<SqlResponse> {
let (tx, mut rx) = mpsc::channel(1);
let mut guard = match request_notifiers.insert_notifier(sql.to_string(), tx) {
RequestResult::First => FutureCancelledGuard::new(|| {
RequestResult::First => ExecutionGuard::new(|| {
request_notifiers.take_notifiers(&sql.to_string());
}),
RequestResult::Wait => {
Expand All @@ -98,7 +97,7 @@ impl Proxy {
match resp {
ForwardResult::Forwarded(resp) => {
let resp = resp?;
guard.uncancelled();
guard.cancel();
let notifiers = request_notifiers.take_notifiers(&sql.to_string()).unwrap();
for notifier in &notifiers {
if let Err(e) = notifier
Expand All @@ -121,7 +120,7 @@ impl Proxy {
.fetch_sql_query_output(ctx, schema, sql, enable_partition_table_access)
.await?;

guard.uncancelled();
guard.cancel();
let notifiers = request_notifiers.take_notifiers(&sql.to_string()).unwrap();
for notifier in &notifiers {
if let Err(e) = notifier.send(Ok(SqlResponse::Local(result.clone()))).await {
Expand Down Expand Up @@ -231,10 +230,10 @@ impl Proxy {
Output::AffectedRows(_) => Ok(output),
Output::Records(v) => {
if plan_maybe_expired {
let row_nums = v
let num_rows = v
.iter()
.fold(0_usize, |acc, record_batch| acc + record_batch.num_rows());
if row_nums == 0 {
if num_rows == 0 {
warn!("Query time range maybe exceed TTL, sql:{sql}");

// TODO: Cannot return this error directly, empty query
Expand Down
1 change: 0 additions & 1 deletion server/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ use log::{info, warn};
use macros::define_result;
use notifier::notifier::RequestNotifiers;
use proxy::{
dedup_requests::RequestNotifiers,
forward,
hotspot::HotspotRecorder,
instance::InstanceRef,
Expand Down
3 changes: 1 addition & 2 deletions server/src/grpc/remote_engine_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ use generic_error::BoxError;
use log::{error, info};
use notifier::notifier::{ExecutionGuard, RequestNotifiers, RequestResult};
use proxy::{
dedup_requests::{RequestNotifiers, RequestResult},
hotspot::{HotspotRecorder, Message},
instance::InstanceRef,
};
Expand Down Expand Up @@ -260,7 +259,7 @@ impl RemoteEngineServiceImpl {
}

// We should set cancel to guard, otherwise the key will be removed twice.
guard.uncancelled();
guard.cancel();
let notifiers = request_notifiers.take_notifiers(&request_key).unwrap();

// Do send in background to avoid blocking the rpc procedure.
Expand Down

0 comments on commit fe4ad71

Please sign in to comment.