diff --git a/analytic_engine/src/compaction/scheduler.rs b/analytic_engine/src/compaction/scheduler.rs index 8ca163703f..468c1dcc47 100644 --- a/analytic_engine/src/compaction/scheduler.rs +++ b/analytic_engine/src/compaction/scheduler.rs @@ -26,7 +26,7 @@ use snafu::{ResultExt, Snafu}; use table_engine::table::TableId; use tokio::{ sync::{ - mpsc::{self, error::SendError, Receiver, Sender}, + mpsc::{self, Receiver, Sender}, Mutex, }, time, @@ -38,9 +38,7 @@ use crate::{ PickerManager, TableCompactionRequest, WaitError, WaiterNotifier, }, instance::{ - flush_compaction::{self, TableFlushOptions}, - write_worker::CompactionNotifier, - Instance, SpaceStore, + flush_compaction::TableFlushOptions, write_worker::CompactionNotifier, Instance, SpaceStore, }, table::data::TableDataRef, TableOptions, @@ -574,12 +572,11 @@ impl ScheduleWorker { None => { // Memory usage exceeds the threshold, let's put pack the // request. - debug!( - "Compaction task is ignored, because of high memory usage:{}, task:{:?}", + warn!( + "Compaction task is ignored, because of high memory usage:{}, task:{:?}, table:{}", self.memory_limit.usage.load(Ordering::Relaxed), - compaction_task, + compaction_task, table_data.name ); - self.put_back_compaction_request(compact_req).await; return; } }; @@ -596,29 +593,6 @@ impl ScheduleWorker { ); } - async fn put_back_compaction_request(&self, req: TableCompactionRequest) { - if let Err(SendError(ScheduleTask::Request(TableCompactionRequest { - compaction_notifier, - waiter, - .. - }))) = self.sender.send(ScheduleTask::Request(req)).await - { - let e = Arc::new( - flush_compaction::Other { - msg: "Failed to put back the compaction request for memory usage exceeds", - } - .build(), - ); - if let Some(notifier) = compaction_notifier { - notifier.notify_err(e.clone()); - } - - let waiter_notifier = WaiterNotifier::new(waiter); - let wait_err = WaitError::Compaction { source: e }; - waiter_notifier.notify_wait_result(Err(wait_err)); - } - } - async fn schedule(&mut self) { self.compact_tables().await; self.flush_tables().await;