From 570f3f6128e651c8348dfc06b1fa5cd1d9ff8b22 Mon Sep 17 00:00:00 2001 From: Finn Bear Date: Fri, 30 Jun 2023 20:34:22 -0700 Subject: [PATCH] Overhaul coordinator recovery (again). --- src/bin/maelstrom.rs | 55 ++++----- src/ir/client.rs | 28 ++++- src/ir/membership.rs | 1 + src/ir/replica.rs | 21 +++- src/lib.rs | 1 + src/occ/store.rs | 6 +- src/tapir/client.rs | 4 +- src/tapir/message.rs | 16 ++- src/tapir/replica.rs | 237 ++++++++++++++++++++++---------------- src/tapir/shard_client.rs | 66 ++++++----- src/tapir/tests/kv.rs | 30 ++++- src/transport/channel.rs | 2 +- src/util/mod.rs | 2 +- 13 files changed, 286 insertions(+), 183 deletions(-) diff --git a/src/bin/maelstrom.rs b/src/bin/maelstrom.rs index 4ed32f7..2c13e65 100644 --- a/src/bin/maelstrom.rs +++ b/src/bin/maelstrom.rs @@ -84,7 +84,7 @@ impl Display for IdEnum { impl FromStr for IdEnum { type Err = (); fn from_str(s: &str) -> Result { - Ok(if let Some(n) = s.strip_prefix("n") { + Ok(if let Some(n) = s.strip_prefix('n') { let n = usize::from_str(n).map_err(|_| ())? - 1; if n < 3 { Self::Replica(n) @@ -92,7 +92,7 @@ impl FromStr for IdEnum { Self::App(n - 3) } } else { - let n = s.strip_prefix("c").ok_or(())?; + let n = s.strip_prefix('c').ok_or(())?; let n = usize::from_str(n).map_err(|_| ())? - 1; Self::Client(n) }) @@ -134,7 +134,7 @@ impl Transport for Maelstrom { address: Self::Address, message: impl Into + std::fmt::Debug, ) -> impl futures::Future + Send + 'static { - let id = self.id.clone(); + let id = self.id; let (sender, mut receiver) = tokio::sync::oneshot::channel(); let reply = thread_rng().gen(); self.inner.requests.lock().unwrap().insert(reply, sender); @@ -202,7 +202,7 @@ impl Process for KvNode { _ids: Vec, start_msg_id: MsgId, ) { - let ids = (0..3).map(|n| IdEnum::Replica(n)).collect::>(); + let ids = (0..3).map(IdEnum::Replica).collect::>(); let membership = IrMembership::new(ids); let id = IdEnum::from_str(&id).unwrap(); let transport = Maelstrom { @@ -260,34 +260,27 @@ impl Process for KvNode { } else { eprintln!("duplicate reply"); } - } else { - if let KvNodeInner::Replica(replica) = &inner { - if let Some(response) = replica - .receive(src.parse::().unwrap(), app.message) - { - let response = Msg { - src: transport.id.to_string(), - dest: src.clone(), - body: Body::Application(Wrapper { - message: response, - do_reply_to: None, - is_reply_to: app.do_reply_to, - }), - }; - eprintln!("sending response {response:?}"); - let _ = transport - .inner - .net - .txq - .send(response) - .await - .unwrap(); - } else { - eprintln!("NO RESPONSE"); - } + } else if let KvNodeInner::Replica(replica) = &inner { + if let Some(response) = + replica.receive(src.parse::().unwrap(), app.message) + { + let response = Msg { + src: transport.id.to_string(), + dest: src.clone(), + body: Body::Application(Wrapper { + message: response, + do_reply_to: None, + is_reply_to: app.do_reply_to, + }), + }; + eprintln!("sending response {response:?}"); + let _ = + transport.inner.net.txq.send(response).await.unwrap(); } else { - eprintln!("(was unsolicited)"); - }; + eprintln!("NO RESPONSE"); + } + } else { + eprintln!("(was unsolicited)"); } } Body::Workload(work) => { diff --git a/src/ir/client.rs b/src/ir/client.rs index 006da2a..6e6364e 100644 --- a/src/ir/client.rs +++ b/src/ir/client.rs @@ -5,7 +5,7 @@ use super::{ }; use crate::{ ir::{membership, Confirm, FinalizeConsensus, Replica, ReplyConsensus, ReplyInconsistent}, - util::join, + util::{join, Join, Until}, Transport, }; use futures::pin_mut; @@ -122,6 +122,7 @@ impl>> Client { } } + /// Unlogged request against a single replica. pub fn invoke_unlogged(&self, index: ReplicaIndex, op: U::UO) -> impl Future { let inner = Arc::clone(&self.inner); let address = { @@ -131,7 +132,7 @@ impl>> Client { let future = inner .transport - .send::>(address, RequestUnlogged { op: op.clone() }); + .send::>(address, RequestUnlogged { op }); async move { let response = future.await; @@ -141,6 +142,29 @@ impl>> Client { } } + /// A consenSUS operation; can get a quorum but doesn't preserve decisions. + pub fn invoke_unlogged_joined( + &self, + op: U::UO, + ) -> ( + Join>>, + MembershipSize, + ) { + let sync = self.inner.sync.lock().unwrap(); + let membership_size = sync.membership.size(); + + let future = join(sync.membership.iter().map(|(index, address)| { + ( + index, + self.inner + .transport + .send::>(address, RequestUnlogged { op: op.clone() }), + ) + })); + + (future, membership_size) + } + pub fn invoke_inconsistent(&self, op: U::IO) -> impl Future { let client_id = self.id; let inner = Arc::clone(&self.inner); diff --git a/src/ir/membership.rs b/src/ir/membership.rs index b520db5..cf3b523 100644 --- a/src/ir/membership.rs +++ b/src/ir/membership.rs @@ -36,6 +36,7 @@ impl Membership { Size((self.members.len() - 1) / 2) } + #[allow(clippy::len_without_is_empty)] pub fn len(&self) -> usize { self.members.len() } diff --git a/src/ir/replica.rs b/src/ir/replica.rs index c8bb091..4e44793 100644 --- a/src/ir/replica.rs +++ b/src/ir/replica.rs @@ -288,7 +288,7 @@ impl>> Replica { Message::::ProposeInconsistent(ProposeInconsistent { op_id, op, recent }) => { if sync.status.is_normal() { if !recent.is_recent_relative_to(sync.view.number) { - // eprintln!("ancient relative to {:?}", sync.view.number); + eprintln!("ancient relative to {:?}", sync.view.number); return Some(Message::::ReplyInconsistent(ReplyInconsistent { op_id, view_number: sync.view.number, @@ -319,7 +319,7 @@ impl>> Replica { Message::::ProposeConsensus(ProposeConsensus { op_id, op, recent }) => { if sync.status.is_normal() { if !recent.is_recent_relative_to(sync.view.number) { - // eprintln!("ancient relative to {:?}", sync.view.number); + eprintln!("ancient relative to {:?}", sync.view.number); return Some(Message::::ReplyConsensus(ReplyConsensus { op_id, view_number: sync.view.number, @@ -348,7 +348,7 @@ impl>> Replica { result_state: Some((result, state)), })); } else { - //println!("{:?} abnormal", self.index); + eprintln!("{:?} abnormal", self.index); } } Message::::FinalizeInconsistent(FinalizeInconsistent { op_id }) => { @@ -448,7 +448,20 @@ impl>> Replica { if sync.latest_normal_view == latest_normal_view { latest_records.push(sync.record.clone()); } - eprintln!("have {} latest ({:?})", latest_records.len(), sync.outstanding_do_view_changes.iter().map(|(i, dvt)| (i, dvt.view_number, dvt.addendum.as_ref().unwrap().latest_normal_view)).collect::>()); + eprintln!( + "have {} latest ({:?})", + latest_records.len(), + sync + .outstanding_do_view_changes + .iter() + .map(|(i, dvt)| (*i, dvt.view_number, dvt.addendum.as_ref().unwrap().latest_normal_view)) + .chain( + std::iter::once( + (self.index, sync.view.number, sync.latest_normal_view) + ) + ) + .collect::>() + ); #[allow(non_snake_case)] let mut R = Record::::default(); diff --git a/src/lib.rs b/src/lib.rs index ace8e80..cf79d10 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,6 +4,7 @@ #![feature(let_chains)] #![feature(btree_cursors)] #![allow(unused)] +#![allow(clippy::type_complexity)] mod ir; mod mvcc; diff --git a/src/occ/store.rs b/src/occ/store.rs index 64d4d44..47dc0e5 100644 --- a/src/occ/store.rs +++ b/src/occ/store.rs @@ -287,6 +287,7 @@ impl Store { commit: TS, finalized: bool, ) { + eprintln!("preparing {id:?} at {commit:?} (fin = {finalized})"); match self.prepared.entry(id) { Entry::Vacant(mut vacant) => { vacant.insert((commit, transaction.clone(), finalized)); @@ -300,7 +301,7 @@ impl Store { let old_commit = occupied.get().0; occupied.insert((commit, transaction.clone(), finalized)); self.remove_prepared_inner(id, transaction.clone(), old_commit); - self.add_prepared_inner(id, transaction.clone(), commit); + self.add_prepared_inner(id, transaction, commit); } } } @@ -327,7 +328,8 @@ impl Store { } pub fn remove_prepared(&mut self, id: TransactionId) -> bool { - if let Some((commit, transaction, _)) = self.prepared.remove(&id) { + if let Some((commit, transaction, finalized)) = self.prepared.remove(&id) { + eprintln!("removing prepared {id:?} at {commit:?} (fin = {finalized})"); self.remove_prepared_inner(id, transaction, commit); true } else { diff --git a/src/tapir/client.rs b/src/tapir/client.rs index b66dd19..0262b44 100644 --- a/src/tapir/client.rs +++ b/src/tapir/client.rs @@ -77,7 +77,7 @@ impl>>> Transac let new_time = inner.client.transport().time().max(proposed.saturating_add(1)).max(min_commit_timestamp); if new_time != timestamp.time { timestamp.time = new_time; - continue; + // TODO continue; } } if matches!(result, OccPrepareResult::TooLate | OccPrepareResult::TooOld) { @@ -122,7 +122,7 @@ impl>>> Transac unreachable!(); } result = inner => { - return result; + result } } } else { diff --git a/src/tapir/message.rs b/src/tapir/message.rs index d151de6..15abddb 100644 --- a/src/tapir/message.rs +++ b/src/tapir/message.rs @@ -12,11 +12,21 @@ pub enum UO { /// Get a different version instead (not part of normal TAPIR). timestamp: Option, }, + /// For backup coordinators. + CheckPrepare { + /// Id of transaction to check the preparedness of. + transaction_id: OccTransactionId, + /// Same as (any) known prepared timestamp. + commit: Timestamp, + }, } #[derive(Debug, Clone, Serialize, Deserialize)] pub enum UR { + /// To clients. Get(Option, Timestamp), + /// To backup coordinators. + CheckPrepare(OccPrepareResult), } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -96,9 +106,6 @@ pub enum CO { transaction: OccTransaction, /// Proposed commit timestamp. commit: Timestamp, - /// `true` when sent by backup coordinator(s), in which case the prepare - /// action is skipped. - backup: bool, }, RaiseMinPrepareTime { time: u64, @@ -113,19 +120,16 @@ impl PartialEq for CO { transaction_id, transaction, commit, - backup, }, Self::Prepare { transaction_id: other_transaction_id, transaction: other_transaction, commit: other_commit, - backup: other_backup, }, ) => { transaction_id == other_transaction_id && transaction == other_transaction && commit == other_commit - && backup == other_backup } ( Self::RaiseMinPrepareTime { time }, diff --git a/src/tapir/replica.rs b/src/tapir/replica.rs index a81cf29..5b205a7 100644 --- a/src/tapir/replica.rs +++ b/src/tapir/replica.rs @@ -1,10 +1,12 @@ use super::{Key, Timestamp, Value, CO, CR, IO, UO, UR}; +use crate::ir::ReplyUnlogged; use crate::util::vectorize; use crate::{ - IrClient, IrMembership, IrMessage, IrOpId, IrRecord, IrReplicaUpcalls, OccPrepareResult, - OccStore, OccTransaction, OccTransactionId, Transport, + IrClient, IrMembership, IrMembershipSize, IrMessage, IrOpId, IrRecord, IrReplicaIndex, + IrReplicaUpcalls, OccPrepareResult, OccStore, OccTransaction, OccTransactionId, Transport, }; use serde::{Deserialize, Serialize}; +use std::task::Context; use std::{collections::HashMap, fmt::Debug, future::Future, hash::Hash}; /// Diverge from TAPIR and don't maintain a no-vote list. Instead, wait for a @@ -107,46 +109,59 @@ impl Replica { return; } - let result = client - .invoke_consensus( - CO::Prepare { - transaction_id, - transaction: transaction.clone(), - commit, - backup: true, + fn decide( + results: &HashMap>>, + membership: IrMembershipSize, + ) -> Option> { + let highest_view = results.values().map(|r| r.view_number).max()?; + Some( + if results + .values() + .any(|r| matches!(r.result, UR::CheckPrepare(OccPrepareResult::Fail))) + { + OccPrepareResult::Fail + } else if results + .values() + .filter(|r| { + r.view_number == highest_view + && matches!(r.result, UR::CheckPrepare(OccPrepareResult::Ok)) + }) + .count() + >= membership.f_plus_one() + { + OccPrepareResult::Ok + } else if results + .values() + .filter(|r| { + r.view_number == highest_view + && matches!(r.result, UR::CheckPrepare(OccPrepareResult::TooLate)) + }) + .count() + >= membership.f_plus_one() + { + // TODO: Check views too. + OccPrepareResult::TooLate + } else { + return None; }, - |results, membership| { - let decision = if results.contains_key(&CR::Prepare(OccPrepareResult::Fail)) { - OccPrepareResult::Fail - } else if results - .get(&CR::Prepare(OccPrepareResult::TooLate)) - .copied() - .unwrap_or_default() - >= membership.f_plus_one() - { - OccPrepareResult::TooLate - } else if results - .get(&CR::Prepare(OccPrepareResult::Ok)) - .copied() - .unwrap_or_default() - >= membership.f_plus_one() - { - OccPrepareResult::Ok - } else { - OccPrepareResult::Retry { - proposed: commit.time, - } - }; - eprintln!( - "backup coordinator deciding on {results:?} -> {decision:?} for {transaction_id:?}" - ); - CR::Prepare(decision) + ) + } + + let (future, membership) = client.invoke_unlogged_joined(UO::CheckPrepare { + transaction_id, + commit, + }); + + let results = future + .until( + |results: &HashMap>>, + cx: &mut Context<'_>| { + decide(results, membership).is_some() }, ) .await; - let CR::Prepare(result) = result else { - debug_assert!(false); + let Some(result) = decide(&results, membership) else { return; }; @@ -194,6 +209,52 @@ impl IrReplicaUpcalls for Replica { }; UR::Get(v.cloned(), ts) } + UO::CheckPrepare { + transaction_id, + commit, + } => { + UR::CheckPrepare(if commit.time < self.gc_watermark { + // In theory, could check the other conditions first, but + // that might hide bugs. + OccPrepareResult::TooOld + } else if let Some((ts, c)) = self.transaction_log.get(&transaction_id) { + if *c && *ts == commit { + // Already committed at this timestamp. + OccPrepareResult::Ok + } else { + // Didn't (and will never) commit at this timestamp. + OccPrepareResult::Fail + } + } else if let Some(f) = self + .inner + .prepared + .get(&transaction_id) + .filter(|(ts, _, f)| *ts == commit) + .map(|(_, _, f)| *f) + { + // Already prepared at this timestamp. + if f { + // Prepare was finalized. + OccPrepareResult::Ok + } else { + // Prepare wasn't finalized, can't be sure yet. + OccPrepareResult::Abstain + } + } else if commit.time < self.min_prepare_time + || self + .inner + .prepared + .get(&transaction_id) + .map(|(c, _, _)| c.time < self.min_prepare_time) + .unwrap_or(false) + { + // Too late for the client to prepare. + OccPrepareResult::TooLate + } else { + /// Not sure. + OccPrepareResult::Abstain + }) + } } } @@ -222,6 +283,7 @@ impl IrReplicaUpcalls for Replica { transaction, commit, } => { + #[allow(clippy::blocks_in_if_conditions)] if commit .map(|commit| { debug_assert!( @@ -265,7 +327,6 @@ impl IrReplicaUpcalls for Replica { transaction_id, transaction, commit, - backup, } => CR::Prepare(if commit.time < self.gc_watermark { // In theory, could check the other conditions first, but // that might hide bugs. @@ -275,9 +336,6 @@ impl IrReplicaUpcalls for Replica { if ts == commit { // Already committed at this timestamp. OccPrepareResult::Ok - } else if *backup { - // Didn't (and will never) commit at this timestamp. - OccPrepareResult::Fail } else { // Committed at a different timestamp. OccPrepareResult::Retry { proposed: ts.time } @@ -286,23 +344,15 @@ impl IrReplicaUpcalls for Replica { // Already aborted by client. OccPrepareResult::Fail } - } else if let Some(f) = self + } else if self .inner .prepared .get(transaction_id) - .filter(|(ts, _, _)| *ts == *commit) - .map(|(_, _, f)| *f) + .map(|(ts, _, _)| *ts == *commit) + .unwrap_or(false) { // Already prepared at this timestamp. - if f || !*backup { - OccPrepareResult::Ok - } else { - // Backup coordinator needs to wait to finalized results, - // which won't simply evaporate on a view change. - OccPrepareResult::Retry { - proposed: commit.time, - } - } + OccPrepareResult::Ok } else if commit.time < self.min_prepare_time || self .inner @@ -315,7 +365,7 @@ impl IrReplicaUpcalls for Replica { OccPrepareResult::TooLate } else { self.inner - .prepare(*transaction_id, transaction.clone(), *commit, *backup) + .prepare(*transaction_id, transaction.clone(), *commit, false) }), CO::RaiseMinPrepareTime { time } => { // Want to avoid tentative prepare operations materializing later on... @@ -343,12 +393,10 @@ impl IrReplicaUpcalls for Replica { transaction_id, transaction, commit, - backup, } => { - if !*backup && matches!(res, CR::Prepare(OccPrepareResult::Ok)) && let Some((ts, _, finalized)) = self.inner.prepared.get_mut(transaction_id) { - if *commit == *ts { - *finalized = true; - } + if matches!(res, CR::Prepare(OccPrepareResult::Ok)) && let Some((ts, _, finalized)) = self.inner.prepared.get_mut(transaction_id) && *commit == *ts { + println!("confirming prepare {transaction_id:?} at {commit:?}"); + *finalized = true; } } CO::RaiseMinPrepareTime { time } => { @@ -375,45 +423,42 @@ impl IrReplicaUpcalls for Replica { transaction_id, transaction, commit, - backup, } => { // Backup coordinator prepares don't change state. - if !*backup { - if matches!(entry.result, CR::Prepare(OccPrepareResult::Ok)) { - if self - .inner - .prepared - .get(transaction_id) - .map(|(ts, _, _)| ts == commit) - .unwrap_or(true) - && !self.transaction_log.contains_key(transaction_id) - { - // Enough other replicas agreed to prepare - // the transaction so it must be okay. - // - // Finalize it immediately since we are syncing - // from the leader's record. - eprintln!("syncing successful {op_id:?} prepare for {transaction_id:?} at {commit:?}"); - self.inner.add_prepared( - *transaction_id, - transaction.clone(), - *commit, - true, - ); - } - } else if self + if matches!(entry.result, CR::Prepare(OccPrepareResult::Ok)) { + if self .inner .prepared .get(transaction_id) .map(|(ts, _, _)| ts == commit) - .unwrap_or(false) + .unwrap_or(true) + && !self.transaction_log.contains_key(transaction_id) { - eprintln!( - "syncing {:?} {op_id:?} prepare for {transaction_id:?} at {commit:?}", - entry.result + // Enough other replicas agreed to prepare + // the transaction so it must be okay. + // + // Finalize it immediately since we are syncing + // from the leader's record. + eprintln!("syncing successful {op_id:?} prepare for {transaction_id:?} at {commit:?} (had {:?})", self.inner.prepared.get(transaction_id)); + self.inner.add_prepared( + *transaction_id, + transaction.clone(), + *commit, + true, ); - self.inner.remove_prepared(*transaction_id); } + } else if self + .inner + .prepared + .get(transaction_id) + .map(|(ts, _, _)| ts == commit) + .unwrap_or(false) + { + eprintln!( + "syncing {:?} {op_id:?} prepare for {transaction_id:?} at {commit:?}", + entry.result + ); + self.inner.remove_prepared(*transaction_id); } } CO::RaiseMinPrepareTime { .. } => { @@ -474,7 +519,6 @@ impl IrReplicaUpcalls for Replica { transaction_id, transaction, commit, - backup, } => { let result = if matches!(reply, CR::Prepare(OccPrepareResult::Ok)) { // Possibly successful fast quorum. @@ -539,11 +583,12 @@ impl IrReplicaUpcalls for Replica { self.inner.prepared.len() ); let threshold: u64 = transport.time_offset(-500); - let mut governor = 2u8; - for (transaction_id, (commit, transaction, _)) in &self.inner.prepared { + if let Some((transaction_id, (commit, transaction, _))) = + self.inner.prepared.iter().min_by_key(|(_, (c, _, _))| *c) + { if commit.time > threshold { // Allow the client to finish on its own. - continue; + return; } let future = Self::recover_coordination( *transaction_id, @@ -553,12 +598,6 @@ impl IrReplicaUpcalls for Replica { transport.clone(), ); tokio::spawn(future); - - if let Some(new_governor) = governor.checked_sub(1) { - governor = new_governor; - } else { - break; - } } } } diff --git a/src/tapir/shard_client.rs b/src/tapir/shard_client.rs index 5f03c28..ff25291 100644 --- a/src/tapir/shard_client.rs +++ b/src/tapir/shard_client.rs @@ -87,8 +87,38 @@ impl>>> ShardTr let inner = Arc::clone(&self.inner); async move { - { - let lock = inner.lock().unwrap(); + loop { + { + let lock = inner.lock().unwrap(); + + // Read own writes. + if let Some(write) = lock.inner.write_set.get(&key) { + return write.as_ref().cloned(); + } + + // Consistent reads. + if let Some(read) = lock.read_cache.get(&key) { + return read.as_ref().cloned(); + } + } + + use rand::Rng; + let future = client.invoke_unlogged( + IrReplicaIndex(rand::thread_rng().gen_range(0..3)), + UO::Get { + key: key.clone(), + timestamp: None, + }, + ); + + let reply = future.await; + + let UR::Get(value, timestamp) = reply else { + debug_assert!(false); + continue; + }; + + let mut lock = inner.lock().unwrap(); // Read own writes. if let Some(write) = lock.inner.write_set.get(&key) { @@ -99,36 +129,11 @@ impl>>> ShardTr if let Some(read) = lock.read_cache.get(&key) { return read.as_ref().cloned(); } - } - - use rand::Rng; - let future = client.invoke_unlogged( - IrReplicaIndex(rand::thread_rng().gen_range(0..3)), - UO::Get { - key: key.clone(), - timestamp: None, - }, - ); - - let reply = future.await; - - let UR::Get(value, timestamp) = reply; - let mut lock = inner.lock().unwrap(); - - // Read own writes. - if let Some(write) = lock.inner.write_set.get(&key) { - return write.as_ref().cloned(); - } - - // Consistent reads. - if let Some(read) = lock.read_cache.get(&key) { - return read.as_ref().cloned(); + lock.read_cache.insert(key.clone(), value.clone()); + lock.inner.add_read(key, timestamp); + return value; } - - lock.read_cache.insert(key.clone(), value.clone()); - lock.inner.add_read(key, timestamp); - value } } @@ -147,7 +152,6 @@ impl>>> ShardTr transaction_id: lock.id, transaction: lock.inner.clone(), commit: timestamp, - backup: false, }, |results, membership_size| { let mut ok_count = 0; diff --git a/src/tapir/tests/kv.rs b/src/tapir/tests/kv.rs index 81a02b3..4f411c3 100644 --- a/src/tapir/tests/kv.rs +++ b/src/tapir/tests/kv.rs @@ -275,23 +275,45 @@ async fn throughput(linearizable: bool, num_replicas: usize, num_clients: usize) .await; } +#[ignore] #[tokio::test] -async fn coordinator_recovery_3() { +async fn coordinator_recovery_3_loop() { loop { - coordinator_recovery(3).await; + timeout(Duration::from_secs(120), coordinator_recovery(3)) + .await + .unwrap(); } } +#[tokio::test] +async fn coordinator_recovery_3() { + timeout(Duration::from_secs(120), coordinator_recovery(3)) + .await + .unwrap(); +} + #[tokio::test] async fn coordinator_recovery_5() { + timeout(Duration::from_secs(180), coordinator_recovery(5)) + .await + .unwrap(); +} + +#[ignore] +#[tokio::test] +async fn coordinator_recovery_7_loop() { loop { - coordinator_recovery(5).await; + timeout(Duration::from_secs(240), coordinator_recovery(7)) + .await + .unwrap(); } } #[tokio::test] async fn coordinator_recovery_7() { - coordinator_recovery(7).await; + timeout(Duration::from_secs(240), coordinator_recovery(7)) + .await + .unwrap(); } async fn coordinator_recovery(num_replicas: usize) { diff --git a/src/transport/channel.rs b/src/transport/channel.rs index 82478cf..7e6c75d 100644 --- a/src/transport/channel.rs +++ b/src/transport/channel.rs @@ -71,7 +71,7 @@ impl Clone for Channel { impl Channel { fn should_drop(from: usize, to: usize) -> bool { //return false; - // return from == 1 || to == 1; + return (from == 1) ^ (to == 1); use rand::Rng; rand::thread_rng().gen_bool(1.0 / 5.0) diff --git a/src/util/mod.rs b/src/util/mod.rs index 1fe7f0f..7e76fec 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -2,4 +2,4 @@ mod join_until; pub(crate) mod vectorize; pub(crate) mod vectorize_btree; -pub use join_until::join; +pub use join_until::{join, Join, JoinUntil, Until};