Skip to content

Commit

Permalink
Cleanup.
Browse files Browse the repository at this point in the history
  • Loading branch information
finnbear committed Jul 2, 2023
1 parent 7af46e4 commit 7dcd8c1
Show file tree
Hide file tree
Showing 24 changed files with 113 additions and 161 deletions.
33 changes: 16 additions & 17 deletions src/ir/client.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use super::{
DoViewChange, FinalizeInconsistent, Membership, MembershipSize, Message, OpId,
ProposeConsensus, ProposeInconsistent, ReplicaIndex, ReplicaUpcalls, ReplyUnlogged,
RequestUnlogged, ViewChangeAddendum, ViewNumber,
Confirm, DoViewChange, FinalizeConsensus, FinalizeInconsistent, Membership, MembershipSize,
Message, OpId, ProposeConsensus, ProposeInconsistent, ReplicaIndex, ReplicaUpcalls,
ReplyConsensus, ReplyInconsistent, ReplyUnlogged, RequestUnlogged, ViewNumber,
};
use crate::{
ir::{membership, Confirm, FinalizeConsensus, Replica, ReplyConsensus, ReplyInconsistent},
util::{join, Join, Until},
util::{join, Join},
Transport,
};
use futures::{pin_mut, stream::FuturesUnordered, StreamExt};
use futures::{stream::FuturesUnordered, StreamExt};
use rand::{thread_rng, Rng};
use serde::{Deserialize, Serialize};
use std::{
Expand All @@ -17,23 +16,19 @@ use std::{
future::Future,
hash::Hash,
marker::PhantomData,
pin::Pin,
sync::{
atomic::{AtomicU64, Ordering},
Arc, Mutex,
},
sync::{Arc, Mutex},
task::Context,
time::Duration,
};
use tokio::select;

/// Randomly chosen id, unique to each IR client.
#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)]
pub struct Id(pub u64);

impl Id {
fn new() -> Self {
let mut rng = thread_rng();
Self(rng.gen())
Self(thread_rng().gen())
}
}

Expand All @@ -43,6 +38,7 @@ impl Debug for Id {
}
}

/// IR client, capable of invoking operations on an IR replica group.
pub struct Client<U: ReplicaUpcalls, T: Transport> {
id: Id,
inner: Arc<Inner<T>>,
Expand Down Expand Up @@ -72,7 +68,7 @@ struct SyncInner<T: Transport> {

impl<T: Transport> SyncInner<T> {
fn next_number(&mut self) -> u64 {
let mut ret = self.operation_counter;
let ret = self.operation_counter;
self.operation_counter += 1;
ret
}
Expand Down Expand Up @@ -150,7 +146,7 @@ impl<U: ReplicaUpcalls, T: Transport<Message = Message<U>>> Client<U, T> {
futures.push(future);
}

let mut timeout = std::pin::pin!(T::sleep(Duration::from_millis(250)));
let timeout = std::pin::pin!(T::sleep(Duration::from_millis(250)));

let response = select! {
_ = timeout, if futures.len() < count => {
Expand All @@ -167,6 +163,8 @@ impl<U: ReplicaUpcalls, T: Transport<Message = Message<U>>> Client<U, T> {
}
}

/// Returns a `Join` over getting an unlogged response from all replicas.
///
/// A consenSUS operation; can get a quorum but doesn't preserve decisions.
pub fn invoke_unlogged_joined(
&self,
Expand All @@ -190,8 +188,8 @@ impl<U: ReplicaUpcalls, T: Transport<Message = Message<U>>> Client<U, T> {
(future, membership_size)
}

/// Returns when the inconsistent operation is finalized, retrying indefinitely.
pub fn invoke_inconsistent(&self, op: U::IO) -> impl Future<Output = ()> {
let client_id = self.id;
let inner = Arc::clone(&self.inner);

let op_id = {
Expand Down Expand Up @@ -289,6 +287,7 @@ impl<U: ReplicaUpcalls, T: Transport<Message = Message<U>>> Client<U, T> {
}
}

/// Returns the consensus result, retrying indefinitely.
pub fn invoke_consensus(
&self,
op: U::CO,
Expand Down Expand Up @@ -339,7 +338,7 @@ impl<U: ReplicaUpcalls, T: Transport<Message = Message<U>>> Client<U, T> {
let inner = Arc::clone(&self.inner);

async move {
'retry: loop {
loop {
let (membership_size, op_id, future) = {
let mut sync = inner.sync.lock().unwrap();
let number = sync.next_number();
Expand Down
3 changes: 2 additions & 1 deletion src/ir/membership.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use super::ReplicaIndex;
#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
pub struct Size(usize);

/// Stores the address of replica group members.
#[derive(Clone)]
pub struct Membership<T: Transport> {
members: Vec<T::Address>,
Expand All @@ -22,8 +23,8 @@ impl<T: Transport> Debug for Membership<T> {
}

impl<T: Transport> Membership<T> {
/// Must have an odd number of replicas.
pub fn new(members: Vec<T::Address>) -> Self {
assert_ne!(members.len(), 0);
assert_eq!(members.len() % 2, 1);
Self { members }
}
Expand Down
2 changes: 1 addition & 1 deletion src/ir/message.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::{
record::RecordImpl, OpId, Record, RecordEntryState, ReplicaIndex, ReplicaUpcalls, ViewNumber,
record::RecordImpl, OpId, RecordEntryState, ReplicaIndex, ReplicaUpcalls, ViewNumber,
};
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
Expand Down
1 change: 0 additions & 1 deletion src/ir/op.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use serde::{Deserialize, Serialize};

use super::ClientId;
use std::fmt::Debug;

Expand Down
4 changes: 3 additions & 1 deletion src/ir/record.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::{OpId, ReplicaUpcalls, ViewNumber};
use crate::util::vectorize;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, fmt::Debug};

/// The state of a record entry according to a replica.
Expand Down Expand Up @@ -47,10 +47,12 @@ impl Debug for Consistency {
}

impl Consistency {
#[allow(unused)]
pub fn is_inconsistent(&self) -> bool {
matches!(self, Self::Inconsistent)
}

#[allow(unused)]
pub fn is_consensus(&self) -> bool {
matches!(self, Self::Consensus)
}
Expand Down
15 changes: 9 additions & 6 deletions src/ir/replica.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
use super::{
message::ViewChangeAddendum, record::Consistency, Confirm, DoViewChange, FinalizeConsensus,
FinalizeInconsistent, Membership, Message, OpId, ProposeConsensus, ProposeInconsistent, Record,
RecordConsensusEntry, RecordEntryState, RecordInconsistentEntry, ReplyConsensus,
ReplyInconsistent, ReplyUnlogged, RequestUnlogged, StartView, View, ViewNumber,
message::ViewChangeAddendum, Confirm, DoViewChange, FinalizeConsensus, FinalizeInconsistent,
Membership, Message, OpId, ProposeConsensus, ProposeInconsistent, Record, RecordConsensusEntry,
RecordEntryState, RecordInconsistentEntry, ReplyConsensus, ReplyInconsistent, ReplyUnlogged,
RequestUnlogged, StartView, View, ViewNumber,
};
use crate::{Transport, TransportMessage};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
fmt::Debug,
hash::Hash,
sync::{Arc, Mutex, MutexGuard, Weak},
sync::{Arc, Mutex},
time::{Duration, Instant},
};

Expand Down Expand Up @@ -77,6 +77,7 @@ pub trait Upcalls: Sized + Send + Serialize + DeserializeOwned + 'static {
membership: &Membership<T>,
transport: &T,
) {
let _ = (membership, transport);
// No-op.
}
}
Expand Down Expand Up @@ -283,6 +284,8 @@ impl<U: Upcalls, T: Transport<Message = Message<U>>> Replica<U, T> {
result,
view_number: sync.view.number,
}));
} else {
eprintln!("{:?} abnormal", self.index);
}
}
Message::<U>::ProposeInconsistent(ProposeInconsistent { op_id, op, recent }) => {
Expand Down Expand Up @@ -565,7 +568,7 @@ impl<U: Upcalls, T: Transport<Message = Message<U>>> Replica<U, T> {
debug_assert_eq!(results_by_opid.len(), entries_by_opid.len());

for (op_id, result) in results_by_opid {
let mut entries = entries_by_opid.get(&op_id).unwrap();
let entries = entries_by_opid.get(&op_id).unwrap();
let entry = &entries[0];
sync.upcalls.finalize_consensus(&entry.op, &result);
R.consensus.insert(
Expand Down
18 changes: 7 additions & 11 deletions src/ir/tests/lock_server.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
ChannelRegistry, ChannelTransport, IrClient, IrClientId, IrMembership, IrMembershipSize,
IrMessage, IrOpId, IrRecord, IrRecordConsensusEntry, IrReplica, IrReplicaIndex,
IrMessage, IrOpId, IrRecord, IrReplica, IrReplicaIndex,
IrReplicaUpcalls, Transport,
};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -60,6 +60,7 @@ async fn lock_server(num_replicas: usize) {
type CR = LockResult;

fn exec_unlogged(&mut self, op: Self::UO) -> Self::UR {
let _ = op;
unreachable!();
}
fn exec_inconsistent(&mut self, op: &Self::IO) {
Expand All @@ -80,10 +81,10 @@ async fn lock_server(num_replicas: usize) {

let mut locked = HashSet::<IrClientId>::new();
let mut unlocked = HashSet::<IrClientId>::new();
for (op_id, entry) in &record.inconsistent {
for entry in record.inconsistent.values() {
unlocked.insert(entry.op.0);
}
for (op_id, entry) in &record.consensus {
for entry in record.consensus.values() {
if matches!(entry.result, LockResult::Ok) {
locked.insert(entry.op.0);
}
Expand Down Expand Up @@ -154,13 +155,8 @@ async fn lock_server(num_replicas: usize) {
registry: &ChannelRegistry<Message>,
membership: &IrMembership<ChannelTransport<Message>>,
) -> Arc<IrClient<Upcalls, ChannelTransport<Message>>> {
Arc::new_cyclic(
|weak: &std::sync::Weak<IrClient<Upcalls, ChannelTransport<Message>>>| {
let weak = weak.clone();
let channel = registry.channel(move |_, _| unreachable!());
IrClient::new(membership.clone(), channel)
},
)
let channel = registry.channel(move |_, _| unreachable!());
Arc::new(IrClient::new(membership.clone(), channel))
}

let clients = (0..2)
Expand Down Expand Up @@ -195,7 +191,7 @@ async fn lock_server(num_replicas: usize) {
.invoke_inconsistent(Unlock(clients[0].id()))
.await;

for i in 0..10 {
for _ in 0..10 {
ChannelTransport::<Message>::sleep(Duration::from_secs(5)).await;

eprintln!("@@@@@ INVOKE {replicas:?}");
Expand Down
3 changes: 1 addition & 2 deletions src/ir/view.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use serde::{Deserialize, Serialize};

use super::{Membership, ReplicaIndex};
use crate::transport::Transport;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;

#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Serialize, Deserialize)]
Expand Down
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#![feature(int_roundings)]
#![feature(let_chains)]
#![feature(btree_cursors)]
#![allow(unused)]
#![allow(clippy::type_complexity)]

mod ir;
Expand Down
3 changes: 1 addition & 2 deletions src/mvcc/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::{
collections::{BTreeMap, HashMap},
hash::Hash,
ops::{Bound, Deref, DerefMut},
time,
};

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -104,7 +103,7 @@ impl<K: Hash + Eq, V, TS: Ord + Eq + Copy + Default> Store<K, V, TS> {
self.inner
.get(key)
.map(|versions| {
let mut cursor = versions.upper_bound(Bound::Included(&timestamp));
let cursor = versions.upper_bound(Bound::Included(&timestamp));
if let Some((fk, _)) = cursor.key_value() {
if let Some((lk, _)) = cursor.peek_next() {
(*fk, Some(*lk))
Expand Down
Loading

0 comments on commit 7dcd8c1

Please sign in to comment.