From c59d9c3639cdfda96b2e7ac7af5c483e0b2eebee Mon Sep 17 00:00:00 2001 From: lee <690585471@qq.com> Date: Wed, 15 Mar 2023 19:40:40 +0800 Subject: [PATCH] fix: remove compaction retry when memory limit (#739) * support auto create config * remove compaction retry * add table name in log * modify log * continue to open shard when open table failed * fmt * romove unused lines * Update server/src/grpc/meta_event_service/mod.rs Co-authored-by: kamille <34352236+Rachelint@users.noreply.github.com> * add finsih log after open shard * clippy * return error when some tables failed to open * add cost time * log detail table error * blank * chore * format log * remove compaction retry when memory limit --------- Co-authored-by: kamille <34352236+Rachelint@users.noreply.github.com> --- analytic_engine/src/compaction/scheduler.rs | 36 +++------------------ 1 file changed, 5 insertions(+), 31 deletions(-) diff --git a/analytic_engine/src/compaction/scheduler.rs b/analytic_engine/src/compaction/scheduler.rs index 8ca163703f..468c1dcc47 100644 --- a/analytic_engine/src/compaction/scheduler.rs +++ b/analytic_engine/src/compaction/scheduler.rs @@ -26,7 +26,7 @@ use snafu::{ResultExt, Snafu}; use table_engine::table::TableId; use tokio::{ sync::{ - mpsc::{self, error::SendError, Receiver, Sender}, + mpsc::{self, Receiver, Sender}, Mutex, }, time, @@ -38,9 +38,7 @@ use crate::{ PickerManager, TableCompactionRequest, WaitError, WaiterNotifier, }, instance::{ - flush_compaction::{self, TableFlushOptions}, - write_worker::CompactionNotifier, - Instance, SpaceStore, + flush_compaction::TableFlushOptions, write_worker::CompactionNotifier, Instance, SpaceStore, }, table::data::TableDataRef, TableOptions, @@ -574,12 +572,11 @@ impl ScheduleWorker { None => { // Memory usage exceeds the threshold, let's put pack the // request. - debug!( - "Compaction task is ignored, because of high memory usage:{}, task:{:?}", + warn!( + "Compaction task is ignored, because of high memory usage:{}, task:{:?}, table:{}", self.memory_limit.usage.load(Ordering::Relaxed), - compaction_task, + compaction_task, table_data.name ); - self.put_back_compaction_request(compact_req).await; return; } }; @@ -596,29 +593,6 @@ impl ScheduleWorker { ); } - async fn put_back_compaction_request(&self, req: TableCompactionRequest) { - if let Err(SendError(ScheduleTask::Request(TableCompactionRequest { - compaction_notifier, - waiter, - .. - }))) = self.sender.send(ScheduleTask::Request(req)).await - { - let e = Arc::new( - flush_compaction::Other { - msg: "Failed to put back the compaction request for memory usage exceeds", - } - .build(), - ); - if let Some(notifier) = compaction_notifier { - notifier.notify_err(e.clone()); - } - - let waiter_notifier = WaiterNotifier::new(waiter); - let wait_err = WaitError::Compaction { source: e }; - waiter_notifier.notify_wait_result(Err(wait_err)); - } - } - async fn schedule(&mut self) { self.compact_tables().await; self.flush_tables().await;