Skip to content

Commit

Permalink
raft: port transfer leader feature from etcd (tikv#534)
Browse files Browse the repository at this point in the history
*: suppress compile warning
  • Loading branch information
tiancaiamao committed May 10, 2016
1 parent 8dca943 commit 768d909
Show file tree
Hide file tree
Showing 13 changed files with 393 additions and 20 deletions.
1 change: 0 additions & 1 deletion src/pd/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
use util::codec::rpc;
use util::make_std_tcp_conn;
use protobuf::MessageStatic;

use kvproto::pdpb::{Request, Response};
use kvproto::msgpb::{Message, MessageType};
Expand Down
151 changes: 144 additions & 7 deletions src/raft/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ pub struct Raft<T: Storage> {
/// the leader id
pub leader_id: u64,

/// lead_transferee is id of the leader transfer target when its value is not None.
/// Follow the procedure defined in raft thesis 3.10.
pub lead_transferee: Option<u64>,

/// New configuration is ignored if there exists unapplied configuration.
pub pending_conf: bool,

Expand Down Expand Up @@ -236,6 +240,7 @@ impl<T: Storage> Raft<T> {
votes: Default::default(),
msgs: Default::default(),
leader_id: Default::default(),
lead_transferee: None,
term: Default::default(),
election_elapsed: Default::default(),
pending_conf: Default::default(),
Expand Down Expand Up @@ -387,7 +392,8 @@ impl<T: Storage> Raft<T> {
}
ProgressState::Probe => pr.pause(),
_ => {
panic!("{} is sending append in unhandled state {:?}",
panic!("{} {} is sending append in unhandled state {:?}",
self.tag,
self.id,
pr.state)
}
Expand Down Expand Up @@ -486,6 +492,8 @@ impl<T: Storage> Raft<T> {
self.heartbeat_elapsed = 0;
self.reset_randomized_election_timeout();

self.lead_transferee = None;

self.votes = HashMap::new();
let (last_index, max_inflight) = (self.raft_log.last_index(), self.max_inflight);
let self_id = self.id;
Expand Down Expand Up @@ -543,6 +551,9 @@ impl<T: Storage> Raft<T> {
let m = new_message(INVALID_ID, MessageType::MsgCheckQuorum, Some(self.id));
self.step(m).is_ok();
}
if self.state == StateRole::Leader && self.lead_transferee.is_some() {
self.abort_leader_transfer()
}
}

if self.state != StateRole::Leader {
Expand Down Expand Up @@ -668,6 +679,14 @@ impl<T: Storage> Raft<T> {
}
return Ok(());
}
if m.get_msg_type() == MessageType::MsgTransferLeader && self.state != StateRole::Leader {
debug!("{} {} [term {} state {:?}] ignoring MsgTransferLeader to {}",
self.tag,
self.id,
self.term,
self.state,
m.get_from());
}

if m.get_term() == 0 {
// local message
Expand Down Expand Up @@ -710,9 +729,9 @@ impl<T: Storage> Raft<T> {
old_paused: &mut bool,
send_append: &mut bool,
maybe_commit: &mut bool) {
let pr = self.prs.get_mut(&m.get_from()).unwrap();
pr.recent_active = true;
self.prs.get_mut(&m.get_from()).unwrap().recent_active = true;
if m.get_reject() {
let pr = self.prs.get_mut(&m.get_from()).unwrap();
debug!("{} {} received msgAppend rejection(lastindex: {}) from {} for index {}",
self.tag,
self.id,
Expand All @@ -733,11 +752,27 @@ impl<T: Storage> Raft<T> {
return;
}

*old_paused = pr.is_paused();
if !pr.maybe_update(m.get_index()) {
return;
{
let pr = self.prs.get_mut(&m.get_from()).unwrap();
*old_paused = pr.is_paused();
if !pr.maybe_update(m.get_index()) {
return;
}
}

// Transfer leadership is in progress.
if let Some(lead_transferee) = self.lead_transferee {
if m.get_from() == lead_transferee &&
self.prs.get_mut(&m.get_from()).unwrap().matched == self.raft_log.last_index() {
info!("{} {} sent MsgTimeoutNow to {} after received MsgAppResp",
self.tag,
self.id,
m.get_from());
self.send_timeout_now(m.get_from());
}
}

let pr = self.prs.get_mut(&m.get_from()).unwrap();
match pr.state {
ProgressState::Probe => pr.become_replicate(),
ProgressState::Snapshot => {
Expand All @@ -757,6 +792,64 @@ impl<T: Storage> Raft<T> {
*maybe_commit = true;
}

fn handle_transfer_leader(&mut self, m: &Message) {
let lead_transferee = m.get_from();
let last_lead_transferee = self.lead_transferee;
if last_lead_transferee.is_some() {
if last_lead_transferee.unwrap() == lead_transferee {
info!("{} {} [term {}] transfer leadership to {} is in progress, ignores request \
to same node {}",
self.tag,
self.id,
self.term,
lead_transferee,
lead_transferee);
return;
}
self.abort_leader_transfer();
info!("{} {} [term {}] abort transfer leadership to {}",
self.tag,
self.id,
self.term,
last_lead_transferee.unwrap());
}
if lead_transferee == self.id {
if last_lead_transferee.is_none() {
debug!("{} {} is already leader. Ignored transfer leadership to {}",
self.tag,
self.id,
self.id);
} else {
debug!("{} {} abort transfer leadership to {}, transfer to current leader {}.",
self.tag,
self.id,
self.lead_transferee.unwrap(),
self.id);
}
return;
}
// Transfer leadership to third party.
info!("{} {} [term {}] starts to transfer leadership to {}",
self.tag,
self.id,
self.term,
lead_transferee);
// Transfer leadership should be finished in one electionTimeout
// so reset r.electionElapsed.
self.election_elapsed = 0;
self.lead_transferee = Some(lead_transferee);
if self.prs.get(&m.get_from()).unwrap().matched == self.raft_log.last_index() {
self.send_timeout_now(lead_transferee);
info!("{} {} sends MsgTimeoutNow to {} immediately as {} already has up-to-date log",
self.tag,
self.id,
lead_transferee,
lead_transferee);
} else {
self.send_append(lead_transferee);
}
}

fn handle_snapshot_status(&mut self, m: &Message) {
let pr = self.prs.get_mut(&m.get_from()).unwrap();
if m.get_reject() {
Expand Down Expand Up @@ -793,7 +886,7 @@ impl<T: Storage> Raft<T> {
}
match m.get_msg_type() {
MessageType::MsgAppendResponse => {
self.handle_append_response(m, old_paused, send_append, maybe_commit)
self.handle_append_response(m, old_paused, send_append, maybe_commit);
}
MessageType::MsgHeartbeatResponse => {
let pr = self.prs.get_mut(&m.get_from()).unwrap();
Expand Down Expand Up @@ -826,6 +919,9 @@ impl<T: Storage> Raft<T> {
m.get_from(),
pr);
}
MessageType::MsgTransferLeader => {
self.handle_transfer_leader(m);
}
_ => {}
}
}
Expand Down Expand Up @@ -885,6 +981,16 @@ impl<T: Storage> Raft<T> {
// drop any new proposals.
return;
}
if self.lead_transferee.is_some() {
debug!("{} {} [term {}] transfer leadership to {} is in progress; dropping \
proposal",
self.tag,
self.id,
self.term,
self.lead_transferee.unwrap());
return;
}

for e in m.mut_entries().iter_mut() {
if e.get_entry_type() == EntryType::EntryConfChange {
if self.pending_conf {
Expand Down Expand Up @@ -923,6 +1029,7 @@ impl<T: Storage> Raft<T> {
send_append = true;
}
}

if send_append {
self.send_append(m.get_from());
}
Expand Down Expand Up @@ -973,6 +1080,14 @@ impl<T: Storage> Raft<T> {
self.become_follower(term, INVALID_ID);
}
}
MessageType::MsgTimeoutNow => {
debug!("{} {} [term {} state {:?}] ignored MsgTimeoutNow from {}",
self.tag,
self.id,
self.term,
self.state,
m.get_from())
}
_ => {}
}
}
Expand Down Expand Up @@ -1022,6 +1137,15 @@ impl<T: Storage> Raft<T> {
self.send(to_send);
}
}
MessageType::MsgTimeoutNow => {
info!("{} {} [term {}] received MsgTimeoutNow from {} and starts an election to \
get leadership.",
self.tag,
self.id,
self.term,
m.get_from());
self.campaign();
}
_ => {}
}
}
Expand Down Expand Up @@ -1185,6 +1309,10 @@ impl<T: Storage> Raft<T> {
if self.maybe_commit() {
self.bcast_append();
}
// If the removed node is the lead_transferee, then abort the leadership transferring.
if self.state == StateRole::Leader && self.lead_transferee == Some(id) {
self.abort_leader_transfer()
}
}

pub fn reset_pending_conf(&mut self) {
Expand Down Expand Up @@ -1251,4 +1379,13 @@ impl<T: Storage> Raft<T> {
}
act >= self.quorum()
}

pub fn send_timeout_now(&mut self, to: u64) {
let msg = new_message(to, MessageType::MsgTimeoutNow, None);
self.send(msg);
}

pub fn abort_leader_transfer(&mut self) {
self.lead_transferee = None;
}
}
2 changes: 1 addition & 1 deletion src/raft/raft_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ impl<T> RaftLog<T>
#[cfg(test)]
mod test {
use raft::raft_log::{self, RaftLog};
use raft::storage::{MemStorage, Storage};
use raft::storage::MemStorage;
use std::sync::Arc;
use kvproto::raftpb;
use raft::errors::{Error, StorageError};
Expand Down
2 changes: 1 addition & 1 deletion src/raftstore/coprocessor/split_observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ mod test {
use kvproto::raft_cmdpb::{SplitRequest, AdminRequest, AdminCmdType};
use util::codec::{datum, table, Datum};
use util::codec::number::NumberEncoder;
use byteorder::{ByteOrder, BigEndian, WriteBytesExt};
use byteorder::{BigEndian, WriteBytesExt};

fn new_peer_storage(path: &TempDir) -> PeerStorage {
let engine = new_engine(path.path().to_str().unwrap()).unwrap();
Expand Down
1 change: 0 additions & 1 deletion src/raftstore/store/peer_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,6 @@ mod test {
use raft::{StorageError, Error as RaftError};
use tempdir::*;
use protobuf;
use raft::Storage;
use raftstore::store::bootstrap;

fn new_storage(path: &TempDir) -> RaftStorage {
Expand Down
2 changes: 1 addition & 1 deletion src/raftstore/store/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use std::sync::{Arc, RwLock};
use std::option::Option;
use std::collections::{HashMap, HashSet, BTreeMap};
use std::boxed::{Box, FnBox};
use std::boxed::Box;
use std::collections::Bound::{Excluded, Unbounded};
use std::time::Duration;

Expand Down
1 change: 0 additions & 1 deletion src/raftstore/store/worker/snap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// limitations under the License.

use raftstore::store::{self, RaftStorage, SnapState};
use util::HandyRwLock;

use std::sync::Arc;
use std::fmt::{self, Formatter, Display};
Expand Down
2 changes: 1 addition & 1 deletion src/server/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use raftstore::store::{self, Msg, Store, Config as StoreConfig, keys, Peekable,
use super::Result;
use util::HandyRwLock;
use super::config::Config;
use storage::{Storage, Engine, RaftKv};
use storage::{Storage, RaftKv};
use super::transport::ServerRaftStoreRouter;

pub fn create_raft_storage<T, Trans>(node: Node<T, Trans>, db: Arc<DB>) -> Result<Storage>
Expand Down
1 change: 0 additions & 1 deletion src/server/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use std::net::SocketAddr;
use mio::{Token, Handler, EventLoop, EventSet, PollOpt};
use mio::tcp::{TcpListener, TcpStream, Shutdown};

use raftstore::store::Transport;
use kvproto::raft_cmdpb::RaftCmdRequest;
use kvproto::msgpb::{MessageType, Message};
use kvproto::raftpb::MessageType as RaftMessageType;
Expand Down
2 changes: 1 addition & 1 deletion src/util/codec/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// limitations under the License.

use std::vec::Vec;
use std::io::{Read, Write};
use std::io::Write;

use super::{Result, Error};
use util::codec::number::{NumberEncoder, NumberDecoder};
Expand Down
4 changes: 2 additions & 2 deletions src/util/codec/number.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use byteorder::{ByteOrder, BigEndian, ReadBytesExt, WriteBytesExt};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use std::io::{self, ErrorKind, Write, Read};
use std::mem;

Expand Down Expand Up @@ -182,7 +182,7 @@ mod test {

use std::{i64, u64, f64, f32};
use protobuf::CodedOutputStream;
use std::io::{Write, ErrorKind};
use std::io::ErrorKind;

const U64_TESTS: &'static [u64] = &[i64::MIN as u64,
i64::MAX as u64,
Expand Down
Loading

0 comments on commit 768d909

Please sign in to comment.