Skip to content

Commit

Permalink
[refactor]: typos, function or variable name, and comments
Browse files Browse the repository at this point in the history
modify some comment messages to keep the code style consistent
rename some function and variable names.
fix some typos
format code style
  • Loading branch information
Phoenix500526 committed Feb 10, 2023
1 parent c1b4311 commit 66039fb
Show file tree
Hide file tree
Showing 11 changed files with 52 additions and 53 deletions.
4 changes: 2 additions & 2 deletions curp/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub struct Client<C: Command> {
/// Curp client timeout settings
timeout: ClientTimeout,
/// To keep Command type
phatom: PhantomData<C>,
phantom: PhantomData<C>,
}

/// State of a client
Expand Down Expand Up @@ -88,7 +88,7 @@ where
state: RwLock::new(State::new()),
connects: rpc::connect(addrs, None).await,
timeout,
phatom: PhantomData,
phantom: PhantomData,
}
}

Expand Down
2 changes: 1 addition & 1 deletion curp/src/conflict_checked_mpmc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use tracing::{error, info};

use crate::cmd::ConflictCheck;

/// Call `DoneNotifier::done` when the process of the msg has finished
/// Call `DoneNotifier::notify` when the process of the msg has finished
pub(crate) struct DoneNotifier {
/// Notifier
notifier: flume::Sender<u64>,
Expand Down
16 changes: 8 additions & 8 deletions curp/src/rpc/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,28 +69,28 @@ pub(crate) trait ConnectInterface: Send + Sync + 'static {
&self,
) -> Result<ProtocolClient<tonic::transport::Channel>, tonic::transport::Error>;

/// send "propose" request
/// Send `ProposeRequest`
async fn propose(
&self,
request: ProposeRequest,
timeout: Duration,
) -> Result<tonic::Response<ProposeResponse>, ProposeError>;

/// send "wait synced" request
/// Send `WaitSyncedRequest`
async fn wait_synced(
&self,
request: WaitSyncedRequest,
timeout: Duration,
) -> Result<tonic::Response<WaitSyncedResponse>, ProposeError>;

/// Send `AppendEntries` request
/// Send `AppendEntriesRequest`
async fn append_entries(
&self,
request: AppendEntriesRequest,
timeout: Duration,
) -> Result<tonic::Response<AppendEntriesResponse>, ProposeError>;

/// Send `Vote` request
/// Send `VoteRequest`
async fn vote(
&self,
request: VoteRequest,
Expand Down Expand Up @@ -147,7 +147,7 @@ impl ConnectInterface for Connect {
Ok(client)
}

/// send "propose" request
/// Send `ProposeRequest`
#[instrument(skip(self), name = "client propose")]
async fn propose(
&self,
Expand All @@ -163,7 +163,7 @@ impl ConnectInterface for Connect {
client.propose(req).await.map_err(Into::into)
}

/// send "wait synced" request
/// Send `WaitSyncedRequest`
#[instrument(skip(self), name = "client propose")]
async fn wait_synced(
&self,
Expand All @@ -179,7 +179,7 @@ impl ConnectInterface for Connect {
client.wait_synced(req).await.map_err(Into::into)
}

/// Send `AppendEntries` request
/// Send `AppendEntriesRequest`
async fn append_entries(
&self,
request: AppendEntriesRequest,
Expand All @@ -193,7 +193,7 @@ impl ConnectInterface for Connect {
client.append_entries(req).await.map_err(Into::into)
}

/// Send `Vote` request
/// Send `VoteRequest`
async fn vote(
&self,
request: VoteRequest,
Expand Down
22 changes: 11 additions & 11 deletions curp/src/server/bg_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub(super) async fn run_bg_tasks<
ExeTx: CmdExeSenderInterface<C>,
>(
state: StateRef<C, ExeTx>,
sync_chan: flume::Receiver<SyncMessage<C>>,
sync_rx: flume::Receiver<SyncMessage<C>>,
cmd_executor: CE,
cmd_exe_tx: ExeTx,
cmd_exe_rx: CmdExeReceiver<C>,
Expand All @@ -64,7 +64,7 @@ pub(super) async fn run_bg_tasks<
let connects = rpc::connect(others, tx_filter).await;

// notify when a broadcast of append_entries is needed immediately
let (ae_trigger, ae_trigger_rx) = mpsc::unbounded_channel::<usize>();
let (ae_tx, ae_rx) = mpsc::unbounded_channel::<usize>();

let bg_tick_handle = tokio::spawn(bg_tick(
connects.clone(),
Expand All @@ -74,12 +74,12 @@ pub(super) async fn run_bg_tasks<
let bg_ae_handle = tokio::spawn(bg_append_entries(
connects.clone(),
Arc::clone(&state),
ae_trigger_rx,
ae_rx,
Arc::clone(&timeout),
));
let bg_apply_handle = tokio::spawn(bg_apply(Arc::clone(&state), cmd_exe_tx));
let bg_get_sync_cmds_handle =
tokio::spawn(bg_get_sync_cmds(Arc::clone(&state), sync_chan, ae_trigger));
tokio::spawn(bg_get_sync_cmds(Arc::clone(&state), sync_rx, ae_tx));

// spawn cmd execute worker
let cmd_executor = Arc::new(cmd_executor);
Expand Down Expand Up @@ -199,11 +199,11 @@ async fn bg_tick<C: Command + 'static, Conn: ConnectInterface, ExeTx: CmdExeSend
/// Fetch commands need to be synced and add them to the log
async fn bg_get_sync_cmds<C: Command + 'static, ExeTx: CmdExeSenderInterface<C>>(
state: StateRef<C, ExeTx>,
sync_chan: flume::Receiver<SyncMessage<C>>,
ae_trigger: mpsc::UnboundedSender<usize>,
sync_rx: flume::Receiver<SyncMessage<C>>,
ae_tx: mpsc::UnboundedSender<usize>,
) {
loop {
let (term, cmd) = match sync_chan.recv_async().await {
let (term, cmd) = match sync_rx.recv_async().await {
Ok(msg) => msg.inner(),
Err(_) => {
return;
Expand All @@ -212,8 +212,8 @@ async fn bg_get_sync_cmds<C: Command + 'static, ExeTx: CmdExeSenderInterface<C>>

state.map_write(|mut state_w| {
state_w.log.push(LogEntry::new(term, &[cmd]));
if let Err(e) = ae_trigger.send(state_w.last_log_index()) {
error!("ae_trigger failed: {}", e);
if let Err(e) = ae_tx.send(state_w.last_log_index()) {
error!("ae_tx failed: {}", e);
}

debug!(
Expand All @@ -232,10 +232,10 @@ async fn bg_append_entries<
>(
connects: Vec<Arc<Conn>>,
state: StateRef<C, ExeTx>,
mut ae_trigger_rx: mpsc::UnboundedReceiver<usize>,
mut ae_rx: mpsc::UnboundedReceiver<usize>,
timeout: Arc<ServerTimeout>,
) {
while let Some(i) = ae_trigger_rx.recv().await {
while let Some(i) = ae_rx.recv().await {
let req = {
let state_r = state.read();
if !state_r.is_leader() {
Expand Down
18 changes: 9 additions & 9 deletions curp/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,12 +248,12 @@ pub struct Protocol<C: Command + 'static> {
/// The speculative cmd pool, shared with executor
spec: SpecPoolRef<C>,
/// The channel to send synced command to background sync task
sync_chan: flume::Sender<SyncMessage<C>>,
sync_tx: flume::Sender<SyncMessage<C>>,
// TODO: clean up the board when the size is too large
/// Cmd watch board for tracking the cmd sync results
cmd_board: CmdBoardRef<C>,
/// Stop channel sender
stop_ch_tx: broadcast::Sender<()>,
stop_tx: broadcast::Sender<()>,
/// The channel to send cmds to background exe tasks
cmd_exe_tx: CmdExeSender<C>,
/// The curp server timeout
Expand Down Expand Up @@ -324,7 +324,7 @@ impl<C: 'static + Command> Protocol<C> {
tx_filter: Option<Box<dyn TxFilter>>,
) -> Self {
let (sync_tx, sync_rx) = flume::unbounded();
let (stop_ch_tx, stop_ch_rx) = broadcast::channel(1);
let (stop_tx, stop_rx) = broadcast::channel(1);
let (exe_tx, exe_rx, as_rx) = cmd_exe_channel();

let state = State::new(
Expand All @@ -350,7 +350,7 @@ impl<C: 'static + Command> Protocol<C> {
exe_tx.clone(),
exe_rx,
as_rx,
Shutdown::new(stop_ch_rx.resubscribe()),
Shutdown::new(stop_rx.resubscribe()),
Arc::clone(&timeout),
tx_filter,
));
Expand All @@ -360,9 +360,9 @@ impl<C: 'static + Command> Protocol<C> {
Self {
state,
spec,
sync_chan: sync_tx,
sync_tx,
cmd_board,
stop_ch_tx,
stop_tx,
cmd_exe_tx: exe_tx,
timeout,
}
Expand All @@ -377,7 +377,7 @@ impl<C: 'static + Command> Protocol<C> {
"shouldn't insert needs_exe twice"
);
}
if let Err(e) = self.sync_chan.send(SyncMessage::new(term, cmd)) {
if let Err(e) = self.sync_tx.send(SyncMessage::new(term, cmd)) {
error!("send channel error, {e}");
}
}
Expand Down Expand Up @@ -537,7 +537,7 @@ impl<C: 'static + Command> Protocol<C> {
let req = request.into_inner();
let state = self.state.upgradable_read();

debug!("{} receives append_entries from {}: term({}), commit({}), prev_log_index({}), prev_log_term({}), {} entries",
debug!("{} receives append_entries from {}: term({}), commit({}), prev_log_index({}), prev_log_term({}), {} entries",
state.id(), req.leader_id, req.term, req.leader_commit, req.prev_log_index, req.prev_log_term, req.entries.len());

// calibrate term
Expand Down Expand Up @@ -686,6 +686,6 @@ impl<C: 'static + Command> Drop for Protocol<C> {
#[inline]
fn drop(&mut self) {
// TODO: async drop is still not supported by Rust(should wait for bg tasks to be stopped?), or we should create an async `stop` function for Protocol
let _ = self.stop_ch_tx.send(()).ok();
let _ = self.stop_tx.send(()).ok();
}
}
8 changes: 4 additions & 4 deletions utils/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub mod duration_format {

use crate::parse_duration;

/// deseralizes a cluster duration
/// deserializes a cluster duration
#[allow(single_use_lifetimes)] // the false positive case blocks us
pub(crate) fn deserialize<'de, D>(deserializer: D) -> Result<Duration, D::Error>
where
Expand Down Expand Up @@ -299,14 +299,14 @@ pub struct LogConfig {
level: LevelConfig,
}

/// `ClusterRange` deserialization formatter
/// `LevelConfig` deserialization formatter
pub mod level_format {
use serde::{self, Deserialize, Deserializer};

use super::LevelConfig;
use crate::parse_log_level;

/// deseralizes a cluster duration
/// deserializes a cluster duration
#[allow(single_use_lifetimes)] // TODO: Think is it necessary to allow this clippy??
pub(crate) fn deserialize<'de, D>(deserializer: D) -> Result<LevelConfig, D::Error>
where
Expand Down Expand Up @@ -463,7 +463,7 @@ mod tests {

#[allow(clippy::unwrap_used)]
#[test]
fn test_xline_server_config_shoule_be_loaded() {
fn test_xline_server_config_should_be_loaded() {
let config: XlineServerConfig = toml::from_str(
r#"[cluster]
name = 'node1'
Expand Down
1 change: 0 additions & 1 deletion xline/src/server/auth_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use crate::{

/// Auth Server
#[derive(Debug)]
#[allow(unused)]
pub(crate) struct AuthServer<S>
where
S: StorageApi,
Expand Down
2 changes: 1 addition & 1 deletion xline/src/server/kv_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ where
{
/// KV storage
kv_storage: Arc<KvStore<S>>,
/// KV storage
/// Auth storage
auth_storage: Arc<AuthStore<S>>,
/// Consensus client
client: Arc<Client<Command>>,
Expand Down
18 changes: 9 additions & 9 deletions xline/src/server/lease_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ where
S: StorageApi,
{
/// Lease storage
storage: Arc<LeaseStore>,
lease_storage: Arc<LeaseStore>,
/// Auth storage
auth_storage: Arc<AuthStore<S>>,
/// Consensus client
Expand All @@ -62,7 +62,7 @@ where
id_gen: Arc<IdGenerator>,
) -> Arc<Self> {
let lease_server = Arc::new(Self {
storage: lease_storage,
lease_storage,
auth_storage,
client,
name,
Expand All @@ -78,7 +78,7 @@ where
loop {
// only leader will check expired lease
if lease_server.is_leader() {
for id in lease_server.storage.find_expired_leases() {
for id in lease_server.lease_storage.find_expired_leases() {
let _handle = tokio::spawn({
let s = Arc::clone(&lease_server);
let token_option = lease_server.auth_storage.root_token();
Expand Down Expand Up @@ -119,7 +119,7 @@ where
wrapper: RequestWithToken,
) -> Command {
let keys = if let RequestWrapper::LeaseRevokeRequest(ref req) = wrapper.request {
self.storage
self.lease_storage
.get_keys(req.id)
.into_iter()
.map(|k| KeyRange::new(k, ""))
Expand Down Expand Up @@ -178,7 +178,7 @@ where
) -> ReceiverStream<Result<LeaseKeepAliveResponse, tonic::Status>> {
let (response_tx, response_rx) = mpsc::channel(CHANNEL_SIZE);
let _hd = tokio::spawn({
let lease_storage = Arc::clone(&self.storage);
let lease_storage = Arc::clone(&self.lease_storage);
async move {
while let Some(req_result) = request_stream.next().await {
match req_result {
Expand Down Expand Up @@ -331,7 +331,7 @@ where
if self.is_leader() {
// TODO wait applied index
let time_to_live_req = request.into_inner();
let lease = match self.storage.look_up(time_to_live_req.id) {
let lease = match self.lease_storage.look_up(time_to_live_req.id) {
Some(lease) => lease,
None => return Err(tonic::Status::not_found("Lease not found")),
};
Expand All @@ -341,7 +341,7 @@ where
.then(|| lease.keys())
.unwrap_or_default();
let res = LeaseTimeToLiveResponse {
header: Some(self.storage.gen_header()),
header: Some(self.lease_storage.gen_header()),
id: time_to_live_req.id,
ttl: lease.remaining().as_secs().cast(),
granted_ttl: lease.ttl().as_secs().cast(),
Expand All @@ -364,13 +364,13 @@ where
) -> Result<tonic::Response<LeaseLeasesResponse>, tonic::Status> {
debug!("Receive LeaseLeasesRequest {:?}", request);
let leases = self
.storage
.lease_storage
.leases()
.into_iter()
.map(|lease| LeaseStatus { id: lease.id() })
.collect();
let res = LeaseLeasesResponse {
header: Some(self.storage.gen_header()),
header: Some(self.lease_storage.gen_header()),
leases,
};
Ok(tonic::Response::new(res))
Expand Down
3 changes: 2 additions & 1 deletion xline/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,10 @@ impl State {
mod test {
use std::{sync::Arc, time::Duration};

use super::*;
use tokio::time::timeout;

use super::*;

#[tokio::test]
async fn test_state() -> Result<(), Box<dyn std::error::Error>> {
let state = Arc::new(State::new(
Expand Down
Loading

0 comments on commit 66039fb

Please sign in to comment.