Skip to content

Commit

Permalink
[refactor]: use an event listener to replace the Shutdown struct.
Browse files Browse the repository at this point in the history
  • Loading branch information
Phoenix500526 committed Feb 9, 2023
1 parent a2dd732 commit 8195de6
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 77 deletions.
3 changes: 0 additions & 3 deletions curp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,5 @@ pub(crate) mod conflict_checked_mpmc;
/// Protobuf generated types that are used in RPC
mod rpc;

/// Shutdown related
mod shutdown;

#[cfg(test)]
pub(crate) mod test_utils;
6 changes: 3 additions & 3 deletions curp/src/server/bg_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{
};

use clippy_utilities::NumericCast;
use event_listener::Event;
use futures::{pin_mut, stream::FuturesUnordered, Stream, StreamExt};
use itertools::Itertools;
use madsim::rand::{thread_rng, Rng};
Expand Down Expand Up @@ -38,7 +39,6 @@ use crate::{
},
ServerRole, State,
},
shutdown::Shutdown,
};

/// Run background tasks
Expand All @@ -55,7 +55,7 @@ pub(super) async fn run_bg_tasks<
cmd_exe_tx: ExeTx,
cmd_exe_rx: CmdExeReceiver<C>,
cmd_as_rx: CmdAsReceiver<C>,
mut shutdown: Shutdown,
shutdown_trigger: Arc<Event>,
timeout: Arc<ServerTimeout>,
#[cfg(test)] reachable: Arc<AtomicBool>,
) {
Expand Down Expand Up @@ -114,7 +114,7 @@ pub(super) async fn run_bg_tasks<
})
.collect();

shutdown.recv().await;
shutdown_trigger.listen().await;
bg_ae_handle.abort();
bg_election_handle.abort();
bg_apply_handle.abort();
Expand Down
17 changes: 8 additions & 9 deletions curp/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ use crate::{
cmd_worker::{cmd_exe_channel, CmdExeSender},
spec_pool::SpecPoolRef,
},
shutdown::Shutdown,
};

/// Background tasks of Curp protocol
Expand Down Expand Up @@ -224,7 +223,7 @@ pub struct Protocol<C: Command + 'static> {
/// Cmd watch board for tracking the cmd sync results
cmd_board: CmdBoardRef<C>,
/// Stop channel sender
stop_tx: broadcast::Sender<()>,
shutdown_trigger: Arc<Event>,
/// The channel to send cmds to background exe tasks
cmd_exe_tx: CmdExeSender<C>,
/// The curp server timeout
Expand Down Expand Up @@ -294,7 +293,7 @@ impl<C: 'static + Command> Protocol<C> {
timeout: Arc<ServerTimeout>,
) -> Self {
let (sync_tx, sync_rx) = flume::unbounded();
let (stop_tx, stop_rx) = broadcast::channel(1);
let shutdown_trigger = Arc::new(Event::new());
let (exe_tx, exe_rx, as_rx) = cmd_exe_channel();

let state = State::new(
Expand All @@ -320,7 +319,7 @@ impl<C: 'static + Command> Protocol<C> {
exe_tx.clone(),
exe_rx,
as_rx,
Shutdown::new(stop_rx.resubscribe()),
Arc::clone(&shutdown_trigger),
Arc::clone(&timeout),
#[cfg(test)]
Arc::new(AtomicBool::new(true)),
Expand All @@ -334,7 +333,7 @@ impl<C: 'static + Command> Protocol<C> {
spec,
sync_tx,
cmd_board,
stop_tx,
shutdown_trigger,
cmd_exe_tx: exe_tx,
timeout,
}
Expand All @@ -350,7 +349,7 @@ impl<C: 'static + Command> Protocol<C> {
reachable: Arc<AtomicBool>,
) -> Self {
let (sync_tx, sync_rx) = flume::unbounded();
let (stop_tx, stop_rx) = broadcast::channel(1);
let shutdown_trigger = Arc::new(Event::new());
let (exe_tx, exe_rx, as_rx) = cmd_exe_channel();

let state = State::new(
Expand All @@ -376,7 +375,7 @@ impl<C: 'static + Command> Protocol<C> {
exe_tx.clone(),
exe_rx,
as_rx,
Shutdown::new(stop_rx.resubscribe()),
Arc::clone(&shutdown_trigger),
Arc::clone(&timeout),
reachable,
));
Expand All @@ -389,7 +388,7 @@ impl<C: 'static + Command> Protocol<C> {
spec,
sync_tx,
cmd_board,
stop_tx,
shutdown_trigger,
cmd_exe_tx: exe_tx,
timeout,
}
Expand Down Expand Up @@ -710,7 +709,7 @@ impl<C: 'static + Command> Drop for Protocol<C> {
#[inline]
fn drop(&mut self) {
// TODO: async drop is still not supported by Rust(should wait for bg tasks to be stopped?), or we should create an async `stop` function for Protocol
let _ = self.stop_tx.send(()).ok();
self.shutdown_trigger.notify(usize::MAX);
}
}

Expand Down
62 changes: 0 additions & 62 deletions curp/src/shutdown.rs

This file was deleted.

0 comments on commit 8195de6

Please sign in to comment.