Skip to content

Commit

Permalink
raft: fix more clippy warnings
Browse files Browse the repository at this point in the history
- skip proto
- add clippy check to tests directory
  • Loading branch information
BusyJay committed Feb 17, 2016
1 parent 57e70f2 commit a15a861
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 62 deletions.
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ extern crate env_logger;

pub mod util;
pub mod raft;
#[allow(clippy)]
pub mod proto;
pub mod storage;
pub mod raftserver;
12 changes: 4 additions & 8 deletions src/raft/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,7 @@ impl<T: Storage + Default> Raft<T> {
}

// send persists state to stable storage and then sends to its mailbox.
fn send(&mut self, m: Message) {
let mut m = m;
fn send(&mut self, mut m: Message) {
m.set_from(self.id);
// do not attach term to MsgPropose
// proposals are a way to forward to the leader and
Expand Down Expand Up @@ -788,7 +787,7 @@ impl<T: Storage + Default> Raft<T> {
self.term);
}

fn step_leader(&mut self, m: Message) {
fn step_leader(&mut self, mut m: Message) {
// These message types do not require any progress for m.From.
match m.get_msg_type() {
MessageType::MsgBeat => {
Expand All @@ -814,7 +813,6 @@ impl<T: Storage + Default> Raft<T> {
// drop any new proposals.
return;
}
let mut m = m;
for e in m.mut_entries().iter_mut() {
if e.get_entry_type() == EntryType::EntryConfChange {
if self.pending_conf {
Expand Down Expand Up @@ -905,7 +903,7 @@ impl<T: Storage + Default> Raft<T> {
}
}

fn step_follower(&mut self, m: Message) {
fn step_follower(&mut self, mut m: Message) {
let term = self.term;
match m.get_msg_type() {
MessageType::MsgPropose => {
Expand All @@ -915,7 +913,6 @@ impl<T: Storage + Default> Raft<T> {
term);
return;
}
let mut m = m;
m.set_to(self.lead);
self.send(m);
}
Expand Down Expand Up @@ -1001,8 +998,7 @@ impl<T: Storage + Default> Raft<T> {
self.send(to_send);
}

fn handle_snapshot(&mut self, m: Message) {
let mut m = m;
fn handle_snapshot(&mut self, mut m: Message) {
let (sindex, sterm) = (m.get_snapshot().get_metadata().get_index(),
m.get_snapshot().get_metadata().get_term());
if self.restore(m.take_snapshot()) {
Expand Down
86 changes: 40 additions & 46 deletions tests/test_raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,13 @@ fn ents(terms: Vec<u64>) -> Interface {
}

fn next_ents(r: &mut Raft<MemStorage>, s: &MemStorage) -> Vec<Entry> {
s.wl().append(&r.raft_log.unstable_entries().unwrap_or(&vec![])).expect("");
s.wl().append(&r.raft_log.unstable_entries().unwrap_or(&[])).expect("");
let (last_idx, last_term) = (r.raft_log.last_index(), r.raft_log.last_term());
r.raft_log.stable_to(last_idx, last_term);
let ents = r.raft_log.next_entries();
let committed = r.raft_log.committed;
r.raft_log.applied_to(committed);
ents.unwrap_or(vec![])
ents.unwrap_or_else(Vec::new)
}

#[derive(Default, Debug, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -127,7 +127,7 @@ impl Interface {
}
}

fn initial(&mut self, id: u64, ids: &Vec<u64>) {
fn initial(&mut self, id: u64, ids: &[u64]) {
if self.raft.is_some() {
self.id = id;
self.prs = HashMap::with_capacity(ids.len());
Expand Down Expand Up @@ -217,21 +217,19 @@ impl Network {
// A nil node will be replaced with a new *stateMachine.
// A *stateMachine will get its k, id.
// When using stateMachine, the address list is always [1, n].
pub fn new(peers: Vec<Option<Interface>>) -> Network {
pub fn new(mut peers: Vec<Option<Interface>>) -> Network {
let size = peers.len();
let peer_addrs: Vec<u64> = (1..size as u64 + 1).collect();
let mut nstorage: HashMap<u64, Arc<MemStorage>> = HashMap::new();
let mut npeers: HashMap<u64, Interface> = HashMap::new();
let mut peers = peers;
for (p, id) in peers.drain(..).zip(peer_addrs.clone()) {
match p {
None => {
nstorage.insert(id, new_storage());
let r = new_test_raft(id, peer_addrs.clone(), 10, 1, nstorage[&id].clone());
npeers.insert(id, r);
}
Some(p) => {
let mut p = p;
Some(mut p) => {
p.initial(id, &peer_addrs);
npeers.insert(id, p);
}
Expand All @@ -248,32 +246,29 @@ impl Network {
self.ignorem.insert(t, true);
}

fn filter(&self, msgs: Vec<Message>) -> Vec<Message> {
let mut msgs = msgs;
let msgs: Vec<Message> =
msgs.drain(..)
.filter(|m| {
if self.ignorem.get(&m.get_msg_type()).map(|x| *x).unwrap_or(false) {
return false;
}
// hups never go over the network, so don't drop them but panic
assert!(m.get_msg_type() != MessageType::MsgHup, "unexpected msgHup");
let perc = self.dropm
.get(&Connem {
from: m.get_from(),
to: m.get_to(),
})
.map(|x| *x)
.unwrap_or(0f64);
rand::random::<f64>() >= perc
})
.collect();
msgs
fn filter(&self, mut msgs: Vec<Message>) -> Vec<Message> {
msgs.drain(..)
.filter(|m| {
if self.ignorem.get(&m.get_msg_type()).cloned().unwrap_or(false) {
return false;
}
// hups never go over the network, so don't drop them but panic
assert!(m.get_msg_type() != MessageType::MsgHup, "unexpected msgHup");
let perc = self.dropm
.get(&Connem {
from: m.get_from(),
to: m.get_to(),
})
.cloned()
.unwrap_or(0f64);
rand::random::<f64>() >= perc
})
.collect()
}

pub fn send(&mut self, msgs: Vec<Message>) {
let mut msgs = msgs;
while msgs.len() > 0 {
while !msgs.is_empty() {
let mut new_msgs = vec![];
for m in msgs.drain(..) {
let resp = {
Expand Down Expand Up @@ -493,7 +488,7 @@ fn test_progress_paused() {
m.set_to(1);
m.set_msg_type(MessageType::MsgPropose);
let mut e = Entry::new();
e.set_data("some_data".as_bytes().to_vec());
e.set_data(b"some_data".to_vec());
m.set_entries(RepeatedField::from_vec(vec![e]));
raft.step(m.clone()).expect("");
raft.step(m.clone()).expect("");
Expand Down Expand Up @@ -562,7 +557,7 @@ fn test_log_replicatioin() {
network.send(vec![m.clone()]);
}

for (j, x) in network.peers.iter_mut() {
for (j, x) in &mut network.peers {
if x.raft_log.committed != wcommitted {
panic!("#{}.{}: committed = {}, want {}",
i,
Expand Down Expand Up @@ -732,7 +727,7 @@ fn test_candidate_concede() {

let ents = vec![empty_entry(1, 1), new_entry(1, 2, Some(data))];
let want_log = ltoa(&new_raft_log(ents, 3, 2));
for (id, p) in tt.peers.iter() {
for (id, p) in &tt.peers {
let l = ltoa(&p.raft_log);
if l != want_log {
panic!("#{}: raft_log: {}, want: {}", id, l, want_log);
Expand Down Expand Up @@ -769,7 +764,7 @@ fn test_old_messages() {
new_entry(3, 4, SOME_DATA)];
let ilog = new_raft_log(ents, 5, 4);
let base = ltoa(&ilog);
for (id, p) in tt.peers.iter() {
for (id, p) in &tt.peers {
let l = ltoa(&p.raft_log);
if l != base {
panic!("#{}: raft_log: {}, want: {}", id, l, base);
Expand All @@ -789,8 +784,7 @@ fn test_proposal() {
(Network::new(vec![None, NOP_STEPPER, NOP_STEPPER, None, None]), true),
];

for (j, (network, success)) in tests.drain(..).enumerate() {
let mut nw = network;
for (j, (mut nw, success)) in tests.drain(..).enumerate() {
let send = |nw: &mut Network, m| {
let mut network_wrapper = panic::AssertRecoverSafe::new(nw);
let res = panic::recover(move || network_wrapper.send(vec![m]));
Expand All @@ -807,7 +801,7 @@ fn test_proposal() {
RaftLog::new(new_storage())
};
let base = ltoa(&want_log);
for (id, p) in nw.peers.iter() {
for (id, p) in &nw.peers {
if p.raft.is_some() {
let l = ltoa(&p.raft_log);
if l != base {
Expand Down Expand Up @@ -836,7 +830,7 @@ fn test_proposal_by_proxy() {

let want_log = new_raft_log(vec![empty_entry(1, 1), new_entry(1, 2, SOME_DATA)], 3, 2);
let base = ltoa(&want_log);
for (id, p) in tt.peers.iter() {
for (id, p) in &tt.peers {
if p.raft.is_none() {
continue;
}
Expand Down Expand Up @@ -916,7 +910,7 @@ fn test_is_election_timeout() {
if round {
got = (got * 10.0 + 0.5).floor() / 10.0;
}
if got != wprobability {
if (got - wprobability).abs() > 0.000001 {
panic!("#{}: possibility = {}, want {}", i, got, wprobability);
}
}
Expand Down Expand Up @@ -1400,7 +1394,7 @@ fn test_bcast_beat() {
sm.become_candidate();
sm.become_leader();
for i in 0..10 {
sm.append_entry(&mut vec![empty_entry(0, i as u64 + 1)]);
sm.append_entry(&mut [empty_entry(0, i as u64 + 1)]);
}
// slow follower
let mut_pr = |sm: &mut Interface, n, matched, next_idx| {
Expand Down Expand Up @@ -1518,15 +1512,15 @@ fn test_send_append_for_progress_probe() {
// each round is a heartbeat
for _ in 0..3 {
// we expect that raft will only send out one msgAPP per heartbeat timeout
r.append_entry(&mut vec![new_entry(0, 0, SOME_DATA)]);
r.append_entry(&mut [new_entry(0, 0, SOME_DATA)]);
r.send_append(2);
let mut msg = r.read_messages();
assert_eq!(msg.len(), 1);
assert_eq!(msg[0].get_index(), 0);

assert!(r.prs[&2].paused);
for _ in 0..10 {
r.append_entry(&mut vec![new_entry(0, 0, SOME_DATA)]);
r.append_entry(&mut [new_entry(0, 0, SOME_DATA)]);
r.send_append(2);
assert_eq!(r.read_messages().len(), 0);
}
Expand All @@ -1551,7 +1545,7 @@ fn test_send_append_for_progress_replicate() {
r.prs.get_mut(&2).unwrap().become_replicate();

for _ in 0..10 {
r.append_entry(&mut vec![new_entry(0, 0, SOME_DATA)]);
r.append_entry(&mut [new_entry(0, 0, SOME_DATA)]);
r.send_append(2);
assert_eq!(r.read_messages().len(), 1);
}
Expand All @@ -1566,7 +1560,7 @@ fn test_send_append_for_progress_snapshot() {
r.prs.get_mut(&2).unwrap().become_snapshot(10);

for _ in 0..10 {
r.append_entry(&mut vec![new_entry(0, 0, SOME_DATA)]);
r.append_entry(&mut [new_entry(0, 0, SOME_DATA)]);
r.send_append(2);
assert_eq!(r.read_messages().len(), 0);
}
Expand Down Expand Up @@ -1774,7 +1768,7 @@ fn test_recover_pending_config() {
let mut r = new_test_raft(1, vec![1, 2], 10, 1, new_storage());
let mut e = Entry::new();
e.set_entry_type(ent_type);
r.append_entry(&mut vec![e]);
r.append_entry(&mut [e]);
r.become_candidate();
r.become_leader();
if r.pending_conf != wpending {
Expand All @@ -1794,8 +1788,8 @@ fn test_recover_double_pending_config() {
let mut r = new_test_raft(1, vec![1, 2], 10, 1, new_storage());
let mut e = Entry::new();
e.set_entry_type(EntryType::EntryConfChange);
r.append_entry(&mut vec![e.clone()]);
r.append_entry(&mut vec![e]);
r.append_entry(&mut [e.clone()]);
r.append_entry(&mut [e]);
r.become_candidate();
r.become_leader();
}
Expand Down
2 changes: 1 addition & 1 deletion tests/test_raft_flow_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ fn test_msg_app_flow_control_full() {
for i in 0..10 {
r.step(new_message(1, 1, MessageType::MsgPropose, 1)).expect("");
let ms = r.read_messages();
if ms.len() != 0 {
if !ms.is_empty() {
panic!("#{}: ms count = {}, want 0", i, ms.len());
}
}
Expand Down
9 changes: 4 additions & 5 deletions tests/test_raft_paper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ fn test_leader_bcast_beat() {
r.become_candidate();
r.become_leader();
for i in 0..10 {
r.append_entry(&mut vec![empty_entry(0, i as u64 + 1)]);
r.append_entry(&mut [empty_entry(0, i as u64 + 1)]);
}

for _ in 0..hi {
Expand Down Expand Up @@ -367,8 +367,8 @@ fn test_nonleaders_election_timeout_nonconfict(state: StateRole) {
let size = 5;
let mut rs = Vec::with_capacity(size);
let ids: Vec<u64> = (1..size as u64 + 1).collect();
for k in 0..size {
rs.push(new_test_raft(ids[k], ids.clone(), et, 1, new_storage()));
for id in ids.iter().take(size) {
rs.push(new_test_raft(*id, ids.clone(), et, 1, new_storage()));
}
let mut conflicts = 0;
for _ in 0..1000 {
Expand Down Expand Up @@ -525,7 +525,7 @@ fn test_leader_commit_preceding_entries() {
vec![empty_entry(1, 1)],
];

for (i, tt) in tests.drain(..).enumerate() {
for (i, mut tt) in tests.drain(..).enumerate() {
let s = new_storage();
s.wl().append(&tt).expect("");
let mut r = new_test_raft(1, vec![1, 2, 3], 10, 1, s);
Expand All @@ -538,7 +538,6 @@ fn test_leader_commit_preceding_entries() {
r.step(accept_and_reply(m)).expect("");
}

let mut tt = tt;
let li = tt.len() as u64;
tt.append(&mut vec![empty_entry(3, li + 1), new_entry(3, li + 2, SOME_DATA)]);
let g = r.raft_log.next_entries();
Expand Down
4 changes: 2 additions & 2 deletions tests/test_raw_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ fn test_raw_node_propose_and_conf_change() {
s.wl().append(&rd.entries).expect("");
// Once we are the leader, propose a command and a ConfChange.
if !proposed && rd.ss.is_some() && rd.ss.as_ref().unwrap().lead == raw_node.raft.id {
raw_node.propose("somedata".as_bytes().to_vec()).expect("");
raw_node.propose(b"somedata".to_vec()).expect("");

let cc = conf_change(ConfChangeType::ConfChangeAddNode, 1);
ccdata = protobuf::Message::write_to_bytes(&cc).unwrap();
Expand Down Expand Up @@ -149,7 +149,7 @@ fn test_raw_node_start() {
store.wl().append(&rd.entries).expect("");
raw_node.advance(rd);

raw_node.propose("foo".as_bytes().to_vec()).expect("");
raw_node.propose(b"foo".to_vec()).expect("");
let rd = raw_node.ready();
assert_eq!(rd, wants[1]);
store.wl().append(&rd.entries).expect("");
Expand Down
2 changes: 2 additions & 0 deletions tests/tests.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#![feature(std_panic, recover)]
#![feature(plugin)]
#![plugin(clippy)]

#[macro_use]
extern crate log;
Expand Down

0 comments on commit a15a861

Please sign in to comment.