Skip to content

Commit

Permalink
Overhaul coordinator recovery (again).
Browse files Browse the repository at this point in the history
  • Loading branch information
finnbear committed Jul 1, 2023
1 parent 3cb5031 commit 570f3f6
Show file tree
Hide file tree
Showing 13 changed files with 286 additions and 183 deletions.
55 changes: 24 additions & 31 deletions src/bin/maelstrom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,15 @@ impl Display for IdEnum {
impl FromStr for IdEnum {
type Err = ();
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(if let Some(n) = s.strip_prefix("n") {
Ok(if let Some(n) = s.strip_prefix('n') {
let n = usize::from_str(n).map_err(|_| ())? - 1;
if n < 3 {
Self::Replica(n)
} else {
Self::App(n - 3)
}
} else {
let n = s.strip_prefix("c").ok_or(())?;
let n = s.strip_prefix('c').ok_or(())?;
let n = usize::from_str(n).map_err(|_| ())? - 1;
Self::Client(n)
})
Expand Down Expand Up @@ -134,7 +134,7 @@ impl Transport for Maelstrom {
address: Self::Address,
message: impl Into<Self::Message> + std::fmt::Debug,
) -> impl futures::Future<Output = R> + Send + 'static {
let id = self.id.clone();
let id = self.id;
let (sender, mut receiver) = tokio::sync::oneshot::channel();
let reply = thread_rng().gen();
self.inner.requests.lock().unwrap().insert(reply, sender);
Expand Down Expand Up @@ -202,7 +202,7 @@ impl Process<LinKv, Wrapper> for KvNode {
_ids: Vec<Id>,
start_msg_id: MsgId,
) {
let ids = (0..3).map(|n| IdEnum::Replica(n)).collect::<Vec<_>>();
let ids = (0..3).map(IdEnum::Replica).collect::<Vec<_>>();
let membership = IrMembership::new(ids);
let id = IdEnum::from_str(&id).unwrap();
let transport = Maelstrom {
Expand Down Expand Up @@ -260,34 +260,27 @@ impl Process<LinKv, Wrapper> for KvNode {
} else {
eprintln!("duplicate reply");
}
} else {
if let KvNodeInner::Replica(replica) = &inner {
if let Some(response) = replica
.receive(src.parse::<IdEnum>().unwrap(), app.message)
{
let response = Msg {
src: transport.id.to_string(),
dest: src.clone(),
body: Body::Application(Wrapper {
message: response,
do_reply_to: None,
is_reply_to: app.do_reply_to,
}),
};
eprintln!("sending response {response:?}");
let _ = transport
.inner
.net
.txq
.send(response)
.await
.unwrap();
} else {
eprintln!("NO RESPONSE");
}
} else if let KvNodeInner::Replica(replica) = &inner {
if let Some(response) =
replica.receive(src.parse::<IdEnum>().unwrap(), app.message)
{
let response = Msg {
src: transport.id.to_string(),
dest: src.clone(),
body: Body::Application(Wrapper {
message: response,
do_reply_to: None,
is_reply_to: app.do_reply_to,
}),
};
eprintln!("sending response {response:?}");
let _ =
transport.inner.net.txq.send(response).await.unwrap();
} else {
eprintln!("(was unsolicited)");
};
eprintln!("NO RESPONSE");
}
} else {
eprintln!("(was unsolicited)");
}
}
Body::Workload(work) => {
Expand Down
28 changes: 26 additions & 2 deletions src/ir/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use super::{
};
use crate::{
ir::{membership, Confirm, FinalizeConsensus, Replica, ReplyConsensus, ReplyInconsistent},
util::join,
util::{join, Join, Until},
Transport,
};
use futures::pin_mut;
Expand Down Expand Up @@ -122,6 +122,7 @@ impl<U: ReplicaUpcalls, T: Transport<Message = Message<U>>> Client<U, T> {
}
}

/// Unlogged request against a single replica.
pub fn invoke_unlogged(&self, index: ReplicaIndex, op: U::UO) -> impl Future<Output = U::UR> {
let inner = Arc::clone(&self.inner);
let address = {
Expand All @@ -131,7 +132,7 @@ impl<U: ReplicaUpcalls, T: Transport<Message = Message<U>>> Client<U, T> {

let future = inner
.transport
.send::<ReplyUnlogged<U::UR>>(address, RequestUnlogged { op: op.clone() });
.send::<ReplyUnlogged<U::UR>>(address, RequestUnlogged { op });

async move {
let response = future.await;
Expand All @@ -141,6 +142,29 @@ impl<U: ReplicaUpcalls, T: Transport<Message = Message<U>>> Client<U, T> {
}
}

/// A consenSUS operation; can get a quorum but doesn't preserve decisions.
pub fn invoke_unlogged_joined(
&self,
op: U::UO,
) -> (
Join<ReplicaIndex, impl Future<Output = ReplyUnlogged<U::UR>>>,
MembershipSize,
) {
let sync = self.inner.sync.lock().unwrap();
let membership_size = sync.membership.size();

let future = join(sync.membership.iter().map(|(index, address)| {
(
index,
self.inner
.transport
.send::<ReplyUnlogged<U::UR>>(address, RequestUnlogged { op: op.clone() }),
)
}));

(future, membership_size)
}

pub fn invoke_inconsistent(&self, op: U::IO) -> impl Future<Output = ()> {
let client_id = self.id;
let inner = Arc::clone(&self.inner);
Expand Down
1 change: 1 addition & 0 deletions src/ir/membership.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ impl<T: Transport> Membership<T> {
Size((self.members.len() - 1) / 2)
}

#[allow(clippy::len_without_is_empty)]
pub fn len(&self) -> usize {
self.members.len()
}
Expand Down
21 changes: 17 additions & 4 deletions src/ir/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ impl<U: Upcalls, T: Transport<Message = Message<U>>> Replica<U, T> {
Message::<U>::ProposeInconsistent(ProposeInconsistent { op_id, op, recent }) => {
if sync.status.is_normal() {
if !recent.is_recent_relative_to(sync.view.number) {
// eprintln!("ancient relative to {:?}", sync.view.number);
eprintln!("ancient relative to {:?}", sync.view.number);
return Some(Message::<U>::ReplyInconsistent(ReplyInconsistent {
op_id,
view_number: sync.view.number,
Expand Down Expand Up @@ -319,7 +319,7 @@ impl<U: Upcalls, T: Transport<Message = Message<U>>> Replica<U, T> {
Message::<U>::ProposeConsensus(ProposeConsensus { op_id, op, recent }) => {
if sync.status.is_normal() {
if !recent.is_recent_relative_to(sync.view.number) {
// eprintln!("ancient relative to {:?}", sync.view.number);
eprintln!("ancient relative to {:?}", sync.view.number);
return Some(Message::<U>::ReplyConsensus(ReplyConsensus {
op_id,
view_number: sync.view.number,
Expand Down Expand Up @@ -348,7 +348,7 @@ impl<U: Upcalls, T: Transport<Message = Message<U>>> Replica<U, T> {
result_state: Some((result, state)),
}));
} else {
//println!("{:?} abnormal", self.index);
eprintln!("{:?} abnormal", self.index);
}
}
Message::<U>::FinalizeInconsistent(FinalizeInconsistent { op_id }) => {
Expand Down Expand Up @@ -448,7 +448,20 @@ impl<U: Upcalls, T: Transport<Message = Message<U>>> Replica<U, T> {
if sync.latest_normal_view == latest_normal_view {
latest_records.push(sync.record.clone());
}
eprintln!("have {} latest ({:?})", latest_records.len(), sync.outstanding_do_view_changes.iter().map(|(i, dvt)| (i, dvt.view_number, dvt.addendum.as_ref().unwrap().latest_normal_view)).collect::<Vec<_>>());
eprintln!(
"have {} latest ({:?})",
latest_records.len(),
sync
.outstanding_do_view_changes
.iter()
.map(|(i, dvt)| (*i, dvt.view_number, dvt.addendum.as_ref().unwrap().latest_normal_view))
.chain(
std::iter::once(
(self.index, sync.view.number, sync.latest_normal_view)
)
)
.collect::<Vec<_>>()
);

#[allow(non_snake_case)]
let mut R = Record::<U>::default();
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#![feature(let_chains)]
#![feature(btree_cursors)]
#![allow(unused)]
#![allow(clippy::type_complexity)]

mod ir;
mod mvcc;
Expand Down
6 changes: 4 additions & 2 deletions src/occ/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ impl<K: Key, V: Value, TS: Timestamp> Store<K, V, TS> {
commit: TS,
finalized: bool,
) {
eprintln!("preparing {id:?} at {commit:?} (fin = {finalized})");
match self.prepared.entry(id) {
Entry::Vacant(mut vacant) => {
vacant.insert((commit, transaction.clone(), finalized));
Expand All @@ -300,7 +301,7 @@ impl<K: Key, V: Value, TS: Timestamp> Store<K, V, TS> {
let old_commit = occupied.get().0;
occupied.insert((commit, transaction.clone(), finalized));
self.remove_prepared_inner(id, transaction.clone(), old_commit);
self.add_prepared_inner(id, transaction.clone(), commit);
self.add_prepared_inner(id, transaction, commit);
}
}
}
Expand All @@ -327,7 +328,8 @@ impl<K: Key, V: Value, TS: Timestamp> Store<K, V, TS> {
}

pub fn remove_prepared(&mut self, id: TransactionId) -> bool {
if let Some((commit, transaction, _)) = self.prepared.remove(&id) {
if let Some((commit, transaction, finalized)) = self.prepared.remove(&id) {
eprintln!("removing prepared {id:?} at {commit:?} (fin = {finalized})");
self.remove_prepared_inner(id, transaction, commit);
true
} else {
Expand Down
4 changes: 2 additions & 2 deletions src/tapir/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl<K: Key, V: Value, T: Transport<Message = IrMessage<Replica<K, V>>>> Transac
let new_time = inner.client.transport().time().max(proposed.saturating_add(1)).max(min_commit_timestamp);
if new_time != timestamp.time {
timestamp.time = new_time;
continue;
// TODO continue;
}
}
if matches!(result, OccPrepareResult::TooLate | OccPrepareResult::TooOld) {
Expand Down Expand Up @@ -122,7 +122,7 @@ impl<K: Key, V: Value, T: Transport<Message = IrMessage<Replica<K, V>>>> Transac
unreachable!();
}
result = inner => {
return result;
result
}
}
} else {
Expand Down
16 changes: 10 additions & 6 deletions src/tapir/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,21 @@ pub enum UO<K> {
/// Get a different version instead (not part of normal TAPIR).
timestamp: Option<Timestamp>,
},
/// For backup coordinators.
CheckPrepare {
/// Id of transaction to check the preparedness of.
transaction_id: OccTransactionId,
/// Same as (any) known prepared timestamp.
commit: Timestamp,
},
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum UR<V> {
/// To clients.
Get(Option<V>, Timestamp),
/// To backup coordinators.
CheckPrepare(OccPrepareResult<Timestamp>),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -96,9 +106,6 @@ pub enum CO<K, V> {
transaction: OccTransaction<K, V, Timestamp>,
/// Proposed commit timestamp.
commit: Timestamp,
/// `true` when sent by backup coordinator(s), in which case the prepare
/// action is skipped.
backup: bool,
},
RaiseMinPrepareTime {
time: u64,
Expand All @@ -113,19 +120,16 @@ impl<K: Eq + Hash, V: PartialEq> PartialEq for CO<K, V> {
transaction_id,
transaction,
commit,
backup,
},
Self::Prepare {
transaction_id: other_transaction_id,
transaction: other_transaction,
commit: other_commit,
backup: other_backup,
},
) => {
transaction_id == other_transaction_id
&& transaction == other_transaction
&& commit == other_commit
&& backup == other_backup
}
(
Self::RaiseMinPrepareTime { time },
Expand Down
Loading

0 comments on commit 570f3f6

Please sign in to comment.