Skip to content

Commit

Permalink
Coordinator recovery WIP.
Browse files Browse the repository at this point in the history
  • Loading branch information
finnbear committed Jun 25, 2023
1 parent 82d70ea commit c65545d
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 9 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ bench:

maelstrom:
cargo build --release --features maelstrom --bin maelstrom
maelstrom test -w lin-kv --bin target/release/maelstrom --latency 0 --rate 30 --time-limit 30 --concurrency 20 # --nemesis partition --nemesis-interval 10
maelstrom test -w lin-kv --bin target/release/maelstrom --latency 0 --rate 30 --time-limit 60 --concurrency 10 # --nemesis partition --nemesis-interval 10
4 changes: 3 additions & 1 deletion src/bin/maelstrom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,9 @@ impl Process<LinKv, Wrapper> for KvNode {
dest: src,
body: Body::Error(Error {
in_reply_to: msg_id,
text: String::from("read txn conflict"),
text: String::from(
"write txn conflict",
),
code: 30,
}),
})
Expand Down
12 changes: 7 additions & 5 deletions src/tapir/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
future::Future,
hash::Hash,
sync::atomic::{AtomicU64, Ordering},
time::SystemTime,
time::{Duration, SystemTime},
};

pub struct Client<K: Key, V: Value, T: Transport> {
Expand Down Expand Up @@ -77,11 +77,12 @@ impl<K: Key, V: Value, T: Transport<Message = IrMessage<Replica<K, V>>>> Transac
continue;
}
}
if matches!(result, OccPrepareResult::TooLate | OccPrepareResult::TooOld) {
continue;
}
let ok = matches!(result, OccPrepareResult::Ok);
use rand::Rng;
if inject_fault {
// Induce a coordinator failure.
return None;
T::sleep(Duration::from_secs(u64::MAX / 4)).await;
}
inner.end(timestamp, ok).await;

Expand All @@ -95,6 +96,7 @@ impl<K: Key, V: Value, T: Transport<Message = IrMessage<Replica<K, V>>>> Transac
}

pub fn commit(&self) -> impl Future<Output = Option<Timestamp>> {
self.commit_inner(false)
use rand::Rng;
self.commit_inner(rand::thread_rng().gen_bool(0.02))
}
}
2 changes: 1 addition & 1 deletion src/tapir/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ impl<K: Key, V: Value> IrReplicaUpcalls for Replica<K, V> {
membership: &IrMembership<T>,
transport: &T,
) {
let threshold: u64 = transport.time() - 5 * 1000 * 1000 * 1000;
let threshold: u64 = transport.time() - 10 * 1000 * 1000;
for (transaction_id, (commit, transaction)) in &self.inner.prepared {
if commit.time > threshold {
// Allow the client to finish on its own.
Expand Down
7 changes: 6 additions & 1 deletion src/tapir/shard_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,12 @@ impl<K: Key, V: Value, T: Transport<Message = IrMessage<Replica<K, V>>>> ShardTr
OccPrepareResult::Fail => {
return CR::Prepare(OccPrepareResult::Fail);
}
OccPrepareResult::TooLate | OccPrepareResult::TooOld => unimplemented!(),
OccPrepareResult::TooLate => {
return CR::Prepare(OccPrepareResult::TooLate);
}
OccPrepareResult::TooOld => {
return CR::Prepare(OccPrepareResult::TooOld);
}
}
}

Expand Down

0 comments on commit c65545d

Please sign in to comment.