Skip to content

Commit

Permalink
Bugfix part 9.
Browse files Browse the repository at this point in the history
  • Loading branch information
finnbear committed Jun 29, 2023
1 parent 664212c commit 3cb5031
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 140 deletions.
4 changes: 4 additions & 0 deletions src/ir/membership.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,18 +87,22 @@ impl<'a, T: Transport> IntoIterator for &'a Membership<T> {
}

impl Size {
/// In a replica group of size 3, this is 1.
pub fn f(&self) -> usize {
self.0
}

/// In a replica group of size 3, this is 2.
pub fn f_plus_one(&self) -> usize {
self.f() + 1
}

/// In a replica group of size 3, this is 3.
pub fn three_over_two_f_plus_one(&self) -> usize {
(self.f() * 3).div_ceil(2) + 1
}

/// In a replica group of size 3, this is 2.
pub fn f_over_two_plus_one(&self) -> usize {
self.f().div_ceil(2) + 1
}
Expand Down
278 changes: 141 additions & 137 deletions src/ir/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,170 +420,174 @@ impl<U: Upcalls, T: Transport<Message = Message<U>>> Replica<U, T> {
}

let threshold = sync.view.membership.size().f();
for do_view_change in sync.outstanding_do_view_changes.values() {
let matching = sync
.outstanding_do_view_changes
.values()
.filter(|other| other.view_number == do_view_change.view_number);

if matching.clone().count() >= threshold {
eprintln!("DOING VIEW CHANGE");
{
let latest_normal_view = sync.latest_normal_view.max(
matching
.clone()
.map(|r| {
r.addendum.as_ref().unwrap().latest_normal_view
})
.max()
.unwrap(),
);
let mut latest_records = matching
let matching = sync
.outstanding_do_view_changes
.values()
.filter(|other| other.view_number == sync.view.number);

if matching.clone().count() >= threshold {
eprintln!("DOING VIEW CHANGE");
{
let latest_normal_view = sync.latest_normal_view.max(
matching
.clone()
.filter(|r| {
.map(|r| {
r.addendum.as_ref().unwrap().latest_normal_view
== latest_normal_view
})
.map(|r| r.addendum.as_ref().unwrap().record.clone())
.collect::<Vec<_>>();
if sync.latest_normal_view == latest_normal_view {
latest_records.push(sync.record.clone());
}
eprintln!("have {} latest", latest_records.len());

#[allow(non_snake_case)]
let mut R = Record::<U>::default();
let mut entries_by_opid =
HashMap::<OpId, Vec<RecordConsensusEntry<U::CO, U::CR>>>::new();
let mut finalized = HashSet::new();
for r in latest_records {
for (op_id, entry) in r.inconsistent.clone() {
match R.inconsistent.entry(op_id) {
Entry::Vacant(vacant) => {
vacant.insert(entry);
}
Entry::Occupied(mut occupied) => {
if let RecordEntryState::Finalized(view) = entry.state {
let state = &mut occupied.get_mut().state;
*state = RecordEntryState::Finalized(view);
}
.max()
.unwrap(),
);
let mut latest_records = matching
.clone()
.filter(|r| {
r.addendum.as_ref().unwrap().latest_normal_view
== latest_normal_view
})
.map(|r| r.addendum.as_ref().unwrap().record.clone())
.collect::<Vec<_>>();
if sync.latest_normal_view == latest_normal_view {
latest_records.push(sync.record.clone());
}
eprintln!("have {} latest ({:?})", latest_records.len(), sync.outstanding_do_view_changes.iter().map(|(i, dvt)| (i, dvt.view_number, dvt.addendum.as_ref().unwrap().latest_normal_view)).collect::<Vec<_>>());

#[allow(non_snake_case)]
let mut R = Record::<U>::default();
let mut entries_by_opid =
HashMap::<OpId, Vec<RecordConsensusEntry<U::CO, U::CR>>>::new();
let mut finalized = HashSet::new();
for r in latest_records {
for (op_id, entry) in r.inconsistent.clone() {
match R.inconsistent.entry(op_id) {
Entry::Vacant(vacant) => {
// Mark as finalized as `sync` will execute it.
vacant.insert(entry).state = RecordEntryState::Finalized(sync.view.number);
}
Entry::Occupied(mut occupied) => {
if let RecordEntryState::Finalized(view) = entry.state {
let state = &mut occupied.get_mut().state;
*state = RecordEntryState::Finalized(view);
}
}
}
for (op_id, entry) in r.consensus.clone() {
match entry.state {
RecordEntryState::Finalized(_) => {
match R.consensus.entry(op_id) {
Entry::Vacant(vacant) => {
}
for (op_id, entry) in r.consensus.clone() {
match entry.state {
RecordEntryState::Finalized(_) => {
match R.consensus.entry(op_id) {
Entry::Vacant(vacant) => {
sync.upcalls.finalize_consensus(&entry.op, &entry.result);
vacant.insert(entry);
}
Entry::Occupied(mut occupied) => {
if occupied.get().state.is_tentative() {
sync.upcalls.finalize_consensus(&entry.op, &entry.result);
vacant.insert(entry);
}
Entry::Occupied(mut occupied) => {
if occupied.get().state.is_tentative() {
sync.upcalls.finalize_consensus(&entry.op, &entry.result);
occupied.insert(entry);
} else {
debug_assert_eq!(occupied.get().result, entry.result);
}
occupied.insert(entry);
} else {
debug_assert_eq!(occupied.get().result, entry.result);
}
}
finalized.insert(op_id);
entries_by_opid.remove(&op_id);
}
RecordEntryState::Tentative => {
if !finalized.contains(&op_id) {
entries_by_opid
.entry(op_id)
.or_default()
.push(entry);
}
finalized.insert(op_id);
entries_by_opid.remove(&op_id);
}
RecordEntryState::Tentative => {
if !finalized.contains(&op_id) {
entries_by_opid
.entry(op_id)
.or_default()
.push(entry);
}
}
}
}
}

// build d and u
let mut d =
HashMap::<OpId, (U::CO, U::CR)>::new();
let mut u =
Vec::<(OpId, U::CO, U::CR)>::new();

for (op_id, entries) in entries_by_opid.clone() {
let mut majority_result_in_d = None;

for entry in &entries {
let matches = entries
.iter()
.filter(|other| other.result == entry.result)
.count();

if matches
>= sync.view.membership.size().f_over_two_plus_one()
{
majority_result_in_d = Some(entry.result.clone());
break;
}
}
// build d and u
let mut d =
HashMap::<OpId, (U::CO, U::CR)>::new();
let mut u =
Vec::<(OpId, U::CO, U::CR)>::new();

if let Some(majority_result_in_d) = majority_result_in_d {
d.insert(op_id, (entries[0].op.clone(), majority_result_in_d));
} else {
u.extend(entries.into_iter().map(|e| (op_id, e.op, e.result)));
}
}
for (op_id, entries) in entries_by_opid.clone() {
debug_assert!(!finalized.contains(&op_id));

let mut majority_result_in_d = None;

// println!("d = {d:?}");
// println!("u = {u:?}");
for entry in &entries {
debug_assert!(entry.state.is_tentative());

{
let sync = &mut *sync;
sync.upcalls.sync(&sync.record, &R);
let matches = entries
.iter()
.filter(|other| other.result == entry.result)
.count();

if matches
>= sync.view.membership.size().f_over_two_plus_one()
{
majority_result_in_d = Some(entry.result.clone());
break;
}
}

let results_by_opid =
sync.upcalls.merge(d, u);

debug_assert_eq!(results_by_opid.len(), entries_by_opid.len());

for (op_id, result) in results_by_opid {
let mut entries = entries_by_opid.get(&op_id).unwrap();
let entry = &entries[0];
sync.upcalls.finalize_consensus(&entry.op, &result);
R.consensus.insert(
op_id,
RecordConsensusEntry {
op: entry.op.clone(),
result: result.clone(),
state: RecordEntryState::Finalized(sync.view.number),
},
);
if let Some(majority_result_in_d) = majority_result_in_d {
eprintln!("merge majority replied {:?} to {op_id:?}", majority_result_in_d);
d.insert(op_id, (entries[0].op.clone(), majority_result_in_d));
} else {
eprintln!("merge no majority for {op_id:?}; deciding among {:?}", entries.iter().map(|entry| (entry.result.clone(), entry.state)).collect::<Vec<_>>());
u.extend(entries.into_iter().map(|e| (op_id, e.op, e.result)));
}
}

sync.record = R;
// println!("d = {d:?}");
// println!("u = {u:?}");

{
let sync = &mut *sync;
sync.upcalls.sync(&sync.record, &R);
}
sync.changed_view_recently = true;
sync.status = Status::Normal;
sync.view.number = msg_view_number;
sync.latest_normal_view = msg_view_number;
self.persist_view_info(&*sync);
for (index, address) in &sync.view.membership {
if index == self.index {
continue;
}
self.inner.transport.do_send(
address,
Message::<U>::StartView(StartView {
record: sync.record.clone(),
view_number: sync.view.number,
}),

let results_by_opid =
sync.upcalls.merge(d, u);

debug_assert_eq!(results_by_opid.len(), entries_by_opid.len());

for (op_id, result) in results_by_opid {
let mut entries = entries_by_opid.get(&op_id).unwrap();
let entry = &entries[0];
sync.upcalls.finalize_consensus(&entry.op, &result);
R.consensus.insert(
op_id,
RecordConsensusEntry {
op: entry.op.clone(),
result: result.clone(),
state: RecordEntryState::Finalized(sync.view.number),
},
);
}
self.inner.transport.persist(
&format!("checkpoint_{}", sync.view.number.0),
Some(&sync.upcalls),

sync.record = R;
}
sync.changed_view_recently = true;
sync.status = Status::Normal;
sync.view.number = msg_view_number;
sync.latest_normal_view = msg_view_number;
self.persist_view_info(&*sync);
for (index, address) in &sync.view.membership {
if index == self.index {
continue;
}
self.inner.transport.do_send(
address,
Message::<U>::StartView(StartView {
record: sync.record.clone(),
view_number: sync.view.number,
}),
);
break;
}
self.inner.transport.persist(
&format!("checkpoint_{}", sync.view.number.0),
Some(&sync.upcalls),
);
}
}
}
Expand Down
Loading

0 comments on commit 3cb5031

Please sign in to comment.