Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable write-ahead log, recover in-progress TXes at startup. #2518

Merged
merged 11 commits into from
Jun 16, 2022
Prev Previous commit
Next Next commit
Ensure that we never get stuck in an infinite recovery loop
If a poison pill tx is ever found (that somehow crashes the validator)
we must stop retrying it every time we restart.
  • Loading branch information
mystenmark committed Jun 16, 2022
commit a892720302bf8261cdc4b09afacd9ad2f938e58e
27 changes: 25 additions & 2 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ pub mod authority_notifier;
pub const MAX_ITEMS_LIMIT: u64 = 100_000;
const BROADCAST_CAPACITY: usize = 10_000;

const MAX_TX_RECOVERY_RETRY: u32 = 3;
type CertTxGuard<'a> = DBTxGuard<'a, CertifiedTransaction>;

/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
Expand Down Expand Up @@ -1058,13 +1059,35 @@ impl AuthorityState {
state
}

// Continually pop in-progress txes from the WAL and try to drive them to completion.
async fn process_recovery_log(&self, limit: Option<usize>) -> SuiResult {
let mut limit = limit.unwrap_or(usize::max_value());
while limit > 0 {
limit -= 1;
if let Some(tx_guard) = self.database.wal.pop_one_recoverable_tx().await {
mystenmark marked this conversation as resolved.
Show resolved Hide resolved
let cert = self.database.wal.get_tx_data(&tx_guard)?;
self.process_certificate(tx_guard, cert).await?;
let digest = tx_guard.tx_id();

let (cert, retry_count) = self.database.wal.get_tx_data(&tx_guard)?;

if retry_count >= MAX_TX_RECOVERY_RETRY {
// This tx will be only partially executed, however the store will be in a safe
// state. We will simply never reach eventual consistency for this TX.
// TODO: Should we revert the tx entirely? I'm not sure the effort is
// warranted, since the only way this can happen is if we are repeatedly
// failing to write to the db, in which case a revert probably won't succeed
// either.
mystenmark marked this conversation as resolved.
Show resolved Hide resolved
error!(
?digest,
"Abandoning in-progress TX after {} retries.", MAX_TX_RECOVERY_RETRY
);
// prevent the tx from going back into the recovery list again.
tx_guard.commit_tx();
continue;
}

if let Err(e) = self.process_certificate(tx_guard, cert).await {
warn!(?digest, "Failed to process in-progress certificate: {}", e);
}
} else {
break;
}
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-storage/benches/write_ahead_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ fn main() {

if let Some(g) = g {
sleep(Duration::from_millis(1)).await;
g.commit_tx().unwrap();
g.commit_tx();
}
}
}));
Expand Down
64 changes: 51 additions & 13 deletions crates/sui-storage/src/write_ahead_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ pub trait WriteAheadLog<'a, C> {
/// Get the data associated with a given digest - returns an error if no such transaction is
/// currently open.
/// Requires a TxGuard to prevent asking about transactions that aren't in the log.
fn get_tx_data(&self, g: &Self::Guard) -> SuiResult<C>;
fn get_tx_data(&self, g: &Self::Guard) -> SuiResult<(C, u32)>;
}

pub struct DBTxGuard<'a, C: Serialize + DeserializeOwned> {
Expand Down Expand Up @@ -139,6 +139,9 @@ where
// A WriteAheadLog implementation built on rocksdb.
pub struct DBWriteAheadLog<C> {
log: DBMap<TransactionDigest, C>,
// We use two tables, because if we instead have one table mapping digest -> (C, u32), we have
// to clone C to make a tuple ref to pass to insert.
retry_count: DBMap<TransactionDigest, u32>,
mystenmark marked this conversation as resolved.
Show resolved Hide resolved

// Can't use tokio Mutex - must be accessible synchronously from drop trait.
// Only acquire this lock in sync functions to make sure we don't hold it across an await.
Expand All @@ -159,13 +162,18 @@ where
let db = {
let path = &path;
let db_options = Some(options.clone());
let opt_cfs: &[(&str, &rocksdb::Options)] = &[("tx_write_ahead_log", &options)];
let opt_cfs: &[(&str, &rocksdb::Options)] = &[
("tx_write_ahead_log", &options),
("tx_retry_count", &options)
];
typed_store::rocks::open_cf_opts(path, db_options, opt_cfs)
}
.expect("Cannot open DB.");

let log: DBMap<TransactionDigest, C> =
DBMap::reopen(&db, Some("tx_write_ahead_log")).expect("Cannot open CF.");
let retry_count: DBMap<TransactionDigest, u32> =
DBMap::reopen(&db, Some("tx_retry_count")).expect("Cannot open CF.");

// Read in any digests that were left in the log, e.g. due to a crash.
//
Expand All @@ -178,27 +186,55 @@ where

Self {
log,
retry_count,
recoverable_txes: Mutex::new(recoverable_txes),
mutex_table: MutexTable::new(MUTEX_TABLE_SIZE),
}
}

fn commit_tx(&self, tx: &TransactionDigest) -> SuiResult {
debug!(digest = ?tx, "committing tx");
self.log.remove(tx).map_err(|e| e.into())
let write_batch = self.log.batch();
let write_batch = write_batch.delete_batch(&self.log, std::iter::once(tx))?;
let write_batch = write_batch.delete_batch(&self.retry_count, std::iter::once(tx))?;
write_batch.write().map_err(SuiError::from)
}

fn increment_retry_count(&self, tx: &TransactionDigest) -> SuiResult {
let cur = self.retry_count.get(tx)?.unwrap_or(0);
self.retry_count
.insert(tx, &(cur + 1))
.map_err(SuiError::from)
}

fn implicit_drop_tx(&self, tx: &TransactionDigest) {
// this function should be called very rarely so contention should not be an issue.
// unwrap ok because it is not safe to continue running if the mutex is poisoned.
let mut r = self.recoverable_txes.lock().unwrap();
r.push(*tx);
self.recoverable_txes.lock().unwrap().push(*tx);
}

fn pop_one_tx(&self) -> Option<TransactionDigest> {
// Only acquire this lock inside a sync function to make sure we don't accidentally
// hold it across an .await
self.recoverable_txes.lock().unwrap().pop()
// hold it across an .await - unwrap okay because we should crash if a mutex is
// poisoned.
let recoverable_txes = &mut self.recoverable_txes.lock().unwrap();

while let Some(tx) = recoverable_txes.pop() {
if let Err(e) = self.increment_retry_count(&tx) {
mystenmark marked this conversation as resolved.
Show resolved Hide resolved
// Note that this does not remove the tx from the log, so we will find it again
// next time we restart. But we will never retry a tx that we can't increment the
// retry count for.
error!(digest = ?tx,
"Failed to increment retry count for recovered tx. \
refusing to return it to avoid possible infinite \
crash loop. Error: {}", e);
continue;
} else {
return Some(tx);
}
}

None
}
}

Expand Down Expand Up @@ -248,11 +284,13 @@ where
}
}

fn get_tx_data(&self, g: &DBTxGuard<'a, C>) -> SuiResult<C> {
self.log
.get(&g.tx)
.map_err(SuiError::from)?
.ok_or(SuiError::TransactionNotFound { digest: g.tx })
fn get_tx_data(&self, g: &DBTxGuard<'a, C>) -> SuiResult<(C, u32)> {
let cert = self
.log
.get(&g.tx)?
.ok_or(SuiError::TransactionNotFound { digest: g.tx })?;
let attempt_num = self.retry_count.get(&g.tx)?.unwrap_or(0);
Ok((cert, attempt_num))
}
}

Expand Down Expand Up @@ -305,7 +343,7 @@ mod tests {
// recoverable txes still there
let r = log.pop_one_recoverable_tx().await.unwrap();
assert_eq!(r.tx_id(), tx3_id);
assert_eq!(log.get_tx_data(&r).unwrap(), 3);
assert_eq!(log.get_tx_data(&r).unwrap(), (3, 2 /* retry */));
assert!(recover_queue_empty(&log).await);

// commit the recoverable tx
Expand Down