Skip to content

Commit

Permalink
[refactor]: use an event listener to replace the Shutdown struct.
Browse files Browse the repository at this point in the history
  • Loading branch information
Phoenix500526 committed Feb 10, 2023
1 parent 66039fb commit f672853
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 74 deletions.
3 changes: 0 additions & 3 deletions curp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,5 @@ pub(crate) mod conflict_checked_mpmc;
/// Protobuf generated types that are used in RPC
mod rpc;

/// Shutdown related
mod shutdown;

#[cfg(test)]
pub(crate) mod test_utils;
6 changes: 3 additions & 3 deletions curp/src/server/bg_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{
};

use clippy_utilities::NumericCast;
use event_listener::Event;
use futures::{future::Either, pin_mut, stream::FuturesUnordered, Stream, StreamExt};
use itertools::Itertools;
use madsim::rand::{thread_rng, Rng};
Expand Down Expand Up @@ -35,7 +36,6 @@ use crate::{
},
ServerRole, State,
},
shutdown::Shutdown,
TxFilter,
};

Expand All @@ -53,7 +53,7 @@ pub(super) async fn run_bg_tasks<
cmd_exe_tx: ExeTx,
cmd_exe_rx: CmdExeReceiver<C>,
cmd_as_rx: CmdAsReceiver<C>,
mut shutdown: Shutdown,
shutdown_trigger: Arc<Event>,
timeout: Arc<ServerTimeout>,
tx_filter: Option<Box<dyn TxFilter>>,
) {
Expand Down Expand Up @@ -104,7 +104,7 @@ pub(super) async fn run_bg_tasks<
})
.collect();

shutdown.recv().await;
shutdown_trigger.listen().await;
bg_tick_handle.abort();
bg_ae_handle.abort();
bg_apply_handle.abort();
Expand Down
11 changes: 5 additions & 6 deletions curp/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ use crate::{
cmd_worker::{cmd_exe_channel, CmdExeSender},
spec_pool::SpecPoolRef,
},
shutdown::Shutdown,
TxFilter,
};

Expand Down Expand Up @@ -253,7 +252,7 @@ pub struct Protocol<C: Command + 'static> {
/// Cmd watch board for tracking the cmd sync results
cmd_board: CmdBoardRef<C>,
/// Stop channel sender
stop_tx: broadcast::Sender<()>,
shutdown_trigger: Arc<Event>,
/// The channel to send cmds to background exe tasks
cmd_exe_tx: CmdExeSender<C>,
/// The curp server timeout
Expand Down Expand Up @@ -324,7 +323,7 @@ impl<C: 'static + Command> Protocol<C> {
tx_filter: Option<Box<dyn TxFilter>>,
) -> Self {
let (sync_tx, sync_rx) = flume::unbounded();
let (stop_tx, stop_rx) = broadcast::channel(1);
let shutdown_trigger = Arc::new(Event::new());
let (exe_tx, exe_rx, as_rx) = cmd_exe_channel();

let state = State::new(
Expand All @@ -350,7 +349,7 @@ impl<C: 'static + Command> Protocol<C> {
exe_tx.clone(),
exe_rx,
as_rx,
Shutdown::new(stop_rx.resubscribe()),
Arc::clone(&shutdown_trigger),
Arc::clone(&timeout),
tx_filter,
));
Expand All @@ -362,7 +361,7 @@ impl<C: 'static + Command> Protocol<C> {
spec,
sync_tx,
cmd_board,
stop_tx,
shutdown_trigger,
cmd_exe_tx: exe_tx,
timeout,
}
Expand Down Expand Up @@ -686,6 +685,6 @@ impl<C: 'static + Command> Drop for Protocol<C> {
#[inline]
fn drop(&mut self) {
// TODO: async drop is still not supported by Rust(should wait for bg tasks to be stopped?), or we should create an async `stop` function for Protocol
let _ = self.stop_tx.send(()).ok();
self.shutdown_trigger.notify(usize::MAX);
}
}
62 changes: 0 additions & 62 deletions curp/src/shutdown.rs

This file was deleted.

0 comments on commit f672853

Please sign in to comment.