Skip to content

Commit

Permalink
Bugfix part 3.
Browse files Browse the repository at this point in the history
  • Loading branch information
finnbear committed Jun 28, 2023
1 parent f26d6a7 commit 1b80aa9
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 51 deletions.
3 changes: 2 additions & 1 deletion src/ir/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ pub trait Upcalls: Sized + Send + Serialize + DeserializeOwned + 'static {
fn exec_consensus(&mut self, op: &Self::CO) -> Self::CR;
/// Extension to TAPIR: Called when an entry becomes finalized. This
/// addresses a potential issue with `merge` rolling back finalized
/// operations. Not called during follower `sync`.
/// operations. The application assumes responsibility for calling
/// this during `sync` and, if necessary, `merge`.
fn finalize_consensus(&mut self, op: &Self::CO, res: &Self::CR) {
// No-op.
let _ = (op, res);
Expand Down
112 changes: 62 additions & 50 deletions src/tapir/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ impl<K: Key, V: Value> IrReplicaUpcalls for Replica<K, V> {
.transaction_log
.insert(*transaction_id, (*commit, true));
if let Some((ts, committed)) = old {
debug_assert!(committed);
debug_assert_eq!(ts, *commit);
debug_assert!(committed, "{transaction_id:?}");
debug_assert_eq!(ts, *commit, "{transaction_id:?}");
}
self.inner
.commit(*transaction_id, transaction.clone(), *commit);
Expand Down Expand Up @@ -338,7 +338,7 @@ impl<K: Key, V: Value> IrReplicaUpcalls for Replica<K, V> {
if local
.consensus
.get(op_id)
.map(|local| local.result == entry.result)
.map(|local| local.state.is_finalized() && local.result == entry.result)
.unwrap_or(false)
{
// Record already in local state.
Expand Down Expand Up @@ -399,8 +399,13 @@ impl<K: Key, V: Value> IrReplicaUpcalls for Replica<K, V> {
}
}
for (op_id, entry) in &leader.inconsistent {
if local.inconsistent.contains_key(op_id) {
// Record already in local state.
if local
.inconsistent
.get(op_id)
.map(|e| e.state.is_finalized())
.unwrap_or(false)
{
// Record already finalized in local state.
continue;
}

Expand Down Expand Up @@ -437,53 +442,53 @@ impl<K: Key, V: Value> IrReplicaUpcalls for Replica<K, V> {
commit,
backup,
} => {
ret.insert(
*op_id,
CR::Prepare({
let result = if let &CR::Prepare(result) = reply {
result
} else {
debug_assert!(false);
OccPrepareResult::Abstain
};
let result = CR::Prepare({
let result = if let &CR::Prepare(result) = reply {
result
} else {
debug_assert!(false);
OccPrepareResult::Abstain
};

if self
.transaction_log
.get(transaction_id)
.map(|(ts, c)| !*c || (*c && ts == commit))
.unwrap_or(false)
|| !result.is_ok()
if self
.transaction_log
.get(transaction_id)
.map(|(ts, c)| !*c || (*c && ts == commit))
.unwrap_or(false)
|| !result.is_ok()
{
result
/*
} else if commit.time < self.min_prepare_time
|| self
.inner
.prepared
.get(transaction_id)
.map(|(c, _)| c.time < self.min_prepare_time)
.unwrap_or(false)
|| self
.transaction_log
.get(transaction_id)
.map(|ts| ts.time < self.min_prepare_time)
.unwrap_or(false)
{
result
/*
} else if commit.time < self.min_prepare_time
|| self
.inner
.prepared
.get(transaction_id)
.map(|(c, _)| c.time < self.min_prepare_time)
.unwrap_or(false)
|| self
.transaction_log
.get(transaction_id)
.map(|ts| ts.time < self.min_prepare_time)
.unwrap_or(false)
{
OccPrepareResult::TooLate
*/
} else {
// Analogous to the IR slow path.
//
// Ensure the successful prepare is possible and, if so, durable.
self.inner.prepare(
*transaction_id,
transaction.clone(),
*commit,
*backup,
)
}
}),
);
OccPrepareResult::TooLate
*/
} else {
// Analogous to the IR slow path.
//
// Ensure the successful prepare is possible and, if so, durable.
self.inner.prepare(
*transaction_id,
transaction.clone(),
*commit,
*backup,
)
}
});

self.finalize_consensus(request, &result);
ret.insert(*op_id, result);
}
CO::RaiseMinPrepareTime { time } => {
ret.insert(
Expand Down Expand Up @@ -520,6 +525,7 @@ impl<K: Key, V: Value> IrReplicaUpcalls for Replica<K, V> {
self.inner.prepared.len()
);
let threshold: u64 = transport.time_offset(-500);
let mut governor = 2u8;
for (transaction_id, (commit, transaction, _)) in &self.inner.prepared {
if commit.time > threshold {
// Allow the client to finish on its own.
Expand All @@ -533,6 +539,12 @@ impl<K: Key, V: Value> IrReplicaUpcalls for Replica<K, V> {
transport.clone(),
);
tokio::spawn(future);

if let Some(new_governor) = governor.checked_sub(1) {
governor = new_governor;
} else {
break;
}
}
}
}

0 comments on commit 1b80aa9

Please sign in to comment.