Skip to content

Commit

Permalink
Debug coordinator recovery.
Browse files Browse the repository at this point in the history
  • Loading branch information
finnbear committed Jun 27, 2023
1 parent 09753bc commit 159294b
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 28 deletions.
4 changes: 2 additions & 2 deletions src/ir/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ impl<U: Upcalls, T: Transport<Message = Message<U>>> Replica<U, T> {
Message::<U>::ProposeInconsistent(ProposeInconsistent { op_id, op, recent }) => {
if sync.status.is_normal() {
if !recent.is_recent_relative_to(sync.view.number) {
eprintln!("ancient relative to {:?}", sync.view.number);
// eprintln!("ancient relative to {:?}", sync.view.number);
return Some(Message::<U>::ReplyInconsistent(ReplyInconsistent {
op_id,
view_number: sync.view.number,
Expand Down Expand Up @@ -311,7 +311,7 @@ impl<U: Upcalls, T: Transport<Message = Message<U>>> Replica<U, T> {
Message::<U>::ProposeConsensus(ProposeConsensus { op_id, op, recent }) => {
if sync.status.is_normal() {
if !recent.is_recent_relative_to(sync.view.number) {
eprintln!("ancient relative to {:?}", sync.view.number);
// eprintln!("ancient relative to {:?}", sync.view.number);
return Some(Message::<U>::ReplyConsensus(ReplyConsensus {
op_id,
view_number: sync.view.number,
Expand Down
4 changes: 2 additions & 2 deletions src/ir/tests/lock_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ async fn test_lock_server() {
}

async fn lock_server(num_replicas: usize) {
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Eq, PartialEq)]
struct Lock(IrClientId);

#[derive(Debug, Clone)]
#[derive(Debug, Clone, Eq, PartialEq)]
struct Unlock(IrClientId);

#[derive(Debug, Clone, Eq, PartialEq, Hash)]
Expand Down
6 changes: 5 additions & 1 deletion src/tapir/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,16 @@ pub enum IO<K, V> {
///
/// Unlike TAPIR, tolerate `Abort` at any timestamp except
/// that of a successful `Commit`.
///
/// Note: Clients may send spurious aborts so replicas will
/// ignore client aborts during coordinator recovery.
Abort {
transaction_id: OccTransactionId,
/// Same as unsuccessfully prepared transaction.
#[serde(bound(deserialize = "K: Eq + Deserialize<'de> + Hash, V: Deserialize<'de>"))]
transaction: OccTransaction<K, V, Timestamp>,
/// Same as unsuccessfully prepared commit timestamp or `None` to abort at every timestamp.
/// Same as unsuccessfully prepared commit timestamp for backup coordinators or `None`
/// used by clients to abort at every timestamp.
commit: Option<Timestamp>,
},
}
Expand Down
78 changes: 59 additions & 19 deletions src/tapir/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub struct Replica<K, V> {
deserialize = "K: Deserialize<'de> + Hash + Eq, V: Deserialize<'de>"
)
)]
transaction_log: HashMap<OccTransactionId, Timestamp>,
transaction_log: HashMap<OccTransactionId, (Timestamp, bool)>,
/// Extension to TAPIR: Garbage collection watermark time.
/// - All transactions before this are committed/aborted.
/// - Must not prepare transactions before this.
Expand Down Expand Up @@ -176,8 +176,11 @@ impl<K: Key, V: Value> IrReplicaUpcalls for Replica<K, V> {
transaction,
commit,
} => {
let old = self.transaction_log.insert(*transaction_id, *commit);
if let Some(ts) = old {
let old = self
.transaction_log
.insert(*transaction_id, (*commit, true));
if let Some((ts, committed)) = old {
debug_assert!(committed);
debug_assert_eq!(ts, *commit);
}
self.inner
Expand All @@ -196,18 +199,36 @@ impl<K: Key, V: Value> IrReplicaUpcalls for Replica<K, V> {
.map(|(ts, _)| *ts == commit)
.unwrap_or(false)
})
.unwrap_or(true)
.unwrap_or(
self.inner
.prepared
.get(transaction_id)
.map(|(ts, _)| ts.time >= self.min_prepare_time)
.unwrap_or(false),
)
{
self.inner.remove_prepared(*transaction_id);
}
if let Some(commit) = commit {
debug_assert!(!self
.transaction_log
.get(transaction_id)
.map(|ts| ts != commit)
.unwrap_or(true));
debug_assert!(
!self
.transaction_log
.get(transaction_id)
.map(|(ts, c)| *c && ts == commit)
.unwrap_or(false),
"{transaction_id:?} committed at {commit:?}"
);
} else {
debug_assert!(!self.transaction_log.contains_key(transaction_id));
debug_assert!(
!self
.transaction_log
.get(transaction_id)
.map(|(_, c)| *c)
.unwrap_or(false),
"{transaction_id:?} committed"
);
self.transaction_log
.insert(*transaction_id, (Default::default(), false));
}
}
}
Expand All @@ -227,11 +248,19 @@ impl<K: Key, V: Value> IrReplicaUpcalls for Replica<K, V> {
} else if self
.transaction_log
.get(transaction_id)
.map(|ts| ts == commit)
.map(|(ts, c)| *c && ts == commit)
.unwrap_or(false)
{
// Already committed at this timestamp.
OccPrepareResult::Ok
} else if self
.transaction_log
.get(transaction_id)
.map(|(_, c)| !*c)
.unwrap_or(false)
{
// Already aborted by client.
OccPrepareResult::Fail
} else if self
.inner
.prepared
Expand All @@ -251,14 +280,15 @@ impl<K: Key, V: Value> IrReplicaUpcalls for Replica<K, V> {
|| self
.transaction_log
.get(transaction_id)
.map(|ts| ts.time < self.min_prepare_time)
.map(|(ts, _)| ts.time < self.min_prepare_time)
.unwrap_or(false)
{
// Too late to prepare or reprepare.
OccPrepareResult::TooLate
} else if let Some(ts) = self.transaction_log.get(transaction_id) {
} else if let Some((ts, c)) = self.transaction_log.get(transaction_id) {
// Committed at a different timestamp.
debug_assert_ne!(ts, commit);
debug_assert!(*c);
OccPrepareResult::Retry { proposed: ts.time }
} else {
self.inner
Expand Down Expand Up @@ -293,13 +323,22 @@ impl<K: Key, V: Value> IrReplicaUpcalls for Replica<K, V> {
backup,
} => {
if matches!(entry.result, CR::Prepare(OccPrepareResult::Ok)) {
if !self.transaction_log.contains_key(transaction_id) && !*backup {
if !self.inner.prepared.contains_key(transaction_id)
&& !self.transaction_log.contains_key(transaction_id)
&& !*backup
{
// Enough other replicas agreed to prepare
// the transaction so it must be okay.
self.inner
.add_prepared(*transaction_id, transaction.clone(), *commit);
}
} else {
} else if self
.inner
.prepared
.get(transaction_id)
.map(|(ts, _)| ts == commit)
.unwrap_or(false)
{
// TODO: Is it safe for a backup coordinator to
// trigger this code path?
self.inner.remove_prepared(*transaction_id);
Expand Down Expand Up @@ -370,7 +409,7 @@ impl<K: Key, V: Value> IrReplicaUpcalls for Replica<K, V> {
if self
.transaction_log
.get(transaction_id)
.map(|ts| ts == commit)
.map(|(ts, c)| !*c || (*c && ts == commit))
.unwrap_or(false)
|| !result.is_ok()
{
Expand All @@ -392,8 +431,6 @@ impl<K: Key, V: Value> IrReplicaUpcalls for Replica<K, V> {
OccPrepareResult::TooLate
*/
} else {
debug_assert!(!self.transaction_log.contains_key(transaction_id));

// Analogous to the IR slow path.
//
// Ensure the successful prepare is possible and, if so, durable.
Expand Down Expand Up @@ -483,7 +520,10 @@ impl<K: Key, V: Value> IrReplicaUpcalls for Replica<K, V> {
membership: &IrMembership<T>,
transport: &T,
) {
// println!("there are {} prepared transactions", self.inner.prepared.len());
println!(
"there are {} prepared transactions",
self.inner.prepared.len()
);
let threshold: u64 = transport.time_offset(-500);
for (transaction_id, (commit, transaction)) in &self.inner.prepared {
if commit.time > threshold {
Expand Down
6 changes: 3 additions & 3 deletions src/tapir/tests/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,9 @@ async fn coordinator_recovery() {
conflicting.get(n).await;
tokio::spawn(conflicting.only_prepare());

let conflicting = clients[2].begin();
conflicting.put(n, Some(1));
tokio::spawn(conflicting.only_prepare());
//let conflicting = clients[2].begin();
//conflicting.put(n, Some(1));
//tokio::spawn(conflicting.only_prepare());

let txn = clients[0].begin();
txn.put(n, Some(42));
Expand Down
1 change: 0 additions & 1 deletion src/transport/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ impl<M: Message> Transport for Channel<M> {
}

fn time_offset(&self, offset: i64) -> u64 {
use rand::Rng;
(SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
Expand Down

0 comments on commit 159294b

Please sign in to comment.