Skip to content

Commit

Permalink
[core] Add effects digest to follower API + checkpoints (MystenLabs#2421
Browse files Browse the repository at this point in the history
)

* Add structures for ExecutionDigest
* Into point
* Add effects digest to follower / checkpoints
* Add const length

Co-authored-by: George Danezis <george@danez.is>
  • Loading branch information
gdanezis and George Danezis authored Jun 6, 2022
1 parent 6b831e2 commit 50300a4
Show file tree
Hide file tree
Showing 23 changed files with 416 additions and 291 deletions.
2 changes: 1 addition & 1 deletion crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -860,7 +860,7 @@ impl AuthorityState {
.skip_to(&next_expected_tx)
.expect("Seeking batches should never fail at this point")
{
let transactions: Vec<(TxSequenceNumber, TransactionDigest)> = state
let transactions: Vec<(TxSequenceNumber, ExecutionDigests)> = state
.database
.executed_sequence
.iter()
Expand Down
18 changes: 9 additions & 9 deletions crates/sui-core/src/authority/authority_notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl TransactionNotifier {
pub fn iter_from(
self: &Arc<Self>,
next_seq: u64,
) -> SuiResult<impl futures::Stream<Item = (TxSequenceNumber, TransactionDigest)> + Unpin> {
) -> SuiResult<impl futures::Stream<Item = (TxSequenceNumber, ExecutionDigests)> + Unpin> {
if self
.has_stream
.compare_exchange(
Expand All @@ -93,7 +93,7 @@ impl TransactionNotifier {

// The state we inject in the async stream
let transaction_notifier = self.clone();
let temp_buffer: VecDeque<(TxSequenceNumber, TransactionDigest)> = VecDeque::new();
let temp_buffer: VecDeque<(TxSequenceNumber, ExecutionDigests)> = VecDeque::new();
let uniquess_guard = IterUniquenessGuard(transaction_notifier.clone());
let initial_state = (transaction_notifier, temp_buffer, next_seq, uniquess_guard);

Expand Down Expand Up @@ -238,17 +238,17 @@ mod tests {

{
let t0 = &notifier.ticket().expect("ok");
store.side_sequence(t0.seq(), &TransactionDigest::random());
store.side_sequence(t0.seq(), &ExecutionDigests::random());
}

{
let t0 = &notifier.ticket().expect("ok");
store.side_sequence(t0.seq(), &TransactionDigest::random());
store.side_sequence(t0.seq(), &ExecutionDigests::random());
}

{
let t0 = &notifier.ticket().expect("ok");
store.side_sequence(t0.seq(), &TransactionDigest::random());
store.side_sequence(t0.seq(), &ExecutionDigests::random());
}

let mut iter = notifier.iter_from(0).unwrap();
Expand Down Expand Up @@ -276,7 +276,7 @@ mod tests {

{
let t0 = &notifier.ticket().expect("ok");
store.side_sequence(t0.seq(), &TransactionDigest::random());
store.side_sequence(t0.seq(), &ExecutionDigests::random());
}

let x = iter.next().await;
Expand All @@ -293,15 +293,15 @@ mod tests {
let t7 = notifier.ticket().expect("ok");
let t8 = notifier.ticket().expect("ok");

store.side_sequence(t6.seq(), &TransactionDigest::random());
store.side_sequence(t6.seq(), &ExecutionDigests::random());
drop(t6);

store.side_sequence(t5.seq(), &TransactionDigest::random());
store.side_sequence(t5.seq(), &ExecutionDigests::random());
drop(t5);

drop(t7);

store.side_sequence(t8.seq(), &TransactionDigest::random());
store.side_sequence(t8.seq(), &ExecutionDigests::random());
drop(t8);

assert!(matches!(iter.next().await, Some((5, _))));
Expand Down
24 changes: 15 additions & 9 deletions crates/sui-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ pub struct SuiDataStore<const ALL_OBJ_VER: bool, const USE_LOCKS: bool, S> {

// Tables used for authority batch structure
/// A sequence on all executed certificates and effects.
pub executed_sequence: DBMap<TxSequenceNumber, TransactionDigest>,
pub executed_sequence: DBMap<TxSequenceNumber, ExecutionDigests>,

/// A sequence of batches indexing into the sequence of executed transactions.
pub batches: DBMap<TxSequenceNumber, SignedBatch>,
Expand Down Expand Up @@ -252,7 +252,7 @@ impl<
}

#[cfg(test)]
pub fn side_sequence(&self, seq: TxSequenceNumber, digest: &TransactionDigest) {
pub fn side_sequence(&self, seq: TxSequenceNumber, digest: &ExecutionDigests) {
self.executed_sequence.insert(&seq, digest).unwrap();
}

Expand Down Expand Up @@ -561,6 +561,7 @@ impl<
)?;

// Store the signed effects of the transaction
let effects_digest = effects.effects.digest();
write_batch = write_batch.insert_batch(
&self.effects,
std::iter::once((transaction_digest, effects)),
Expand All @@ -575,7 +576,7 @@ impl<
write_batch,
temporary_store,
*transaction_digest,
sequence_number,
sequence_number.map(|seq| (seq, effects_digest)),
)
.await
}
Expand Down Expand Up @@ -625,6 +626,7 @@ impl<
)?;

// Store the unsigned effects of the transaction
let effects_digest = effects.effects.digest();
write_batch = write_batch.insert_batch(
&self.effects,
std::iter::once((transaction_digest, effects)),
Expand All @@ -639,7 +641,7 @@ impl<
write_batch,
temporary_store,
*transaction_digest,
Some(sequence_number),
Some((sequence_number, effects_digest)),
)
.await
}
Expand All @@ -650,7 +652,7 @@ impl<
mut write_batch: DBBatch,
temporary_store: AuthorityTemporaryStore<BackingPackageStore>,
transaction_digest: TransactionDigest,
seq_opt: Option<TxSequenceNumber>,
seq_opt: Option<(TxSequenceNumber, TransactionEffectsDigest)>,
) -> Result<(), SuiError> {
let (objects, active_inputs, written, deleted, _events) = temporary_store.into_inner();
trace!(written =? written.values().map(|((obj_id, ver, _), _)| (obj_id, ver)).collect::<Vec<_>>(),
Expand Down Expand Up @@ -771,7 +773,7 @@ impl<
}
}

if let Some(next_seq) = seq_opt {
if let Some((next_seq, effects_digest)) = seq_opt {
// Now we are sure we are going to execute, add to the sequence
// number and insert into authority sequence.
//
Expand All @@ -781,7 +783,10 @@ impl<
// full sequence, and the batching logic needs to deal with this.
write_batch = write_batch.insert_batch(
&self.executed_sequence,
std::iter::once((next_seq, transaction_digest)),
std::iter::once((
next_seq,
ExecutionDigests::new(transaction_digest, effects_digest),
)),
)?;
}

Expand Down Expand Up @@ -922,6 +927,7 @@ impl<
.iter()
.skip_to(&start)?
.take_while(|(seq, _tx)| *seq < end)
.map(|(seq, exec)| (seq, exec.transaction))
.collect())
}

Expand All @@ -942,7 +948,7 @@ impl<
&self,
start: u64,
end: u64,
) -> Result<(Vec<SignedBatch>, Vec<(TxSequenceNumber, TransactionDigest)>), SuiError> {
) -> Result<(Vec<SignedBatch>, Vec<(TxSequenceNumber, ExecutionDigests)>), SuiError> {
/*
Get all batches that include requested transactions. This includes the signed batch
prior to the first requested transaction, the batch including the last requested
Expand Down Expand Up @@ -1002,7 +1008,7 @@ impl<
sequence misses items. This will confuse calling logic, so we filter them out and allow
callers to use the subscription API to catch the latest items in order. */

let transactions: Vec<(TxSequenceNumber, TransactionDigest)> = self
let transactions: Vec<(TxSequenceNumber, ExecutionDigests)> = self
.executed_sequence
.iter()
.skip_to(&first_seq)?
Expand Down
20 changes: 12 additions & 8 deletions crates/sui-core/src/authority_active/checkpoint_driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{

use parking_lot::Mutex;
use sui_types::{
base_types::{AuthorityName, TransactionDigest},
base_types::{AuthorityName, ExecutionDigests, TransactionDigest},
error::SuiError,
messages::{CertifiedTransaction, ConfirmationTransaction, TransactionInfoRequest},
messages_checkpoint::{
Expand Down Expand Up @@ -685,17 +685,17 @@ pub async fn augment_fragment_with_diff_transactions<A>(
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
let mut diff_certs: BTreeMap<TransactionDigest, CertifiedTransaction> = BTreeMap::new();
let mut diff_certs: BTreeMap<ExecutionDigests, CertifiedTransaction> = BTreeMap::new();

// These are the trasnactions that we have that the other validator does not
// have, so we can read them from our local database.
for tx_digest in &fragment.diff.second.items {
let cert = active_authority
.state
.read_certificate(tx_digest)
.read_certificate(&tx_digest.transaction)
.await?
.ok_or(SuiError::CertificateNotfound {
certificate_digest: *tx_digest,
certificate_digest: tx_digest.transaction,
})?;
diff_certs.insert(*tx_digest, cert);
}
Expand All @@ -707,12 +707,12 @@ where
.clone_client(&fragment.other.0.authority);
for tx_digest in &fragment.diff.first.items {
let response = client
.handle_transaction_info_request(TransactionInfoRequest::from(*tx_digest))
.handle_transaction_info_request(TransactionInfoRequest::from(tx_digest.transaction))
.await?;
let cert = response
.certified_transaction
.ok_or(SuiError::CertificateNotfound {
certificate_digest: *tx_digest,
certificate_digest: tx_digest.transaction,
})?;
diff_certs.insert(*tx_digest, cert);
}
Expand Down Expand Up @@ -772,7 +772,11 @@ where

for digest in &unprocessed_digests {
// If we have processed this continue with the next cert, nothing to do
if active_authority.state.database.effects_exists(digest)? {
if active_authority
.state
.database
.effects_exists(&digest.transaction)?
{
continue;
}

Expand All @@ -781,7 +785,7 @@ where
if let Err(err) = sync_digest(
active_authority.state.name,
active_authority.net.clone(),
*digest,
digest.transaction,
per_other_authority_delay,
)
.await
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ fn fix() {

#[derive(Clone)]
pub struct TestBatch {
pub digests: Vec<TransactionDigest>,
pub digests: Vec<ExecutionDigests>,
}

#[derive(Clone)]
Expand Down Expand Up @@ -210,7 +210,7 @@ pub async fn init_configurable_authorities(
) -> (
BTreeMap<AuthorityName, ConfigurableBatchActionClient>,
Vec<Arc<AuthorityState>>,
Vec<TransactionDigest>,
Vec<ExecutionDigests>,
) {
let authority_count = 4;
let (addr1, key1) = get_key_pair();
Expand Down Expand Up @@ -248,7 +248,7 @@ pub async fn init_configurable_authorities(

// Execute transactions for every EmitUpdateItem Action, use the digest of the transaction to
// create a batch action internal sequence.
let mut executed_digests = Vec::new();
let mut to_be_executed_digests = Vec::new();
let mut batch_action_internal = Vec::new();
let framework_obj_ref = genesis::get_framework_object_ref();

Expand All @@ -269,10 +269,14 @@ pub async fn init_configurable_authorities(
}
// Add the digest and number to the internal actions.
let t_b = TestBatch {
digests: vec![*transaction.digest()],
// TODO: need to put in here the real effects digest
digests: vec![ExecutionDigests::new(
*transaction.digest(),
TransactionEffectsDigest::random(),
)],
};
batch_action_internal.push(BatchActionInternal::EmitUpdateItem(t_b));
executed_digests.push(*transaction.digest());
to_be_executed_digests.push(*transaction.digest());
}
if let BatchAction::EmitError() = action {
batch_action_internal.push(BatchActionInternal::EmitError());
Expand All @@ -285,8 +289,9 @@ pub async fn init_configurable_authorities(
authority_clients.insert(name, client);
}

let mut executed_digests = Vec::new();
// Execute certificate for each digest, and register the action sequence on the authorities who executed the certificates.
for digest in executed_digests.clone() {
for digest in to_be_executed_digests.clone() {
// Get a cert
let authority_clients_ref: Vec<_> = authority_clients.values().collect();
let authority_clients_slice = authority_clients_ref.as_slice();
Expand All @@ -298,7 +303,8 @@ pub async fn init_configurable_authorities(
// TODO: This only works when every validator has equal stake
.take(committee.quorum_threshold() as usize)
{
_ = do_cert(cert_client, &cert1).await;
let effects = do_cert(cert_client, &cert1).await;
executed_digests.push(ExecutionDigests::new(digest, effects.digest()));

// Register the internal actions to client
cert_client
Expand Down
6 changes: 3 additions & 3 deletions crates/sui-core/src/authority_active/gossip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ where

// Upon receiving a transaction digest, store it if it is not processed already.
Some(Ok(BatchInfoResponseItem(UpdateItem::Transaction((seq, digest))))) => {
if !self.state.database.effects_exists(&digest)? {
if !self.state.database.effects_exists(&digest.transaction)? {
queue.push(async move {
tokio::time::sleep(Duration::from_millis(EACH_ITEM_DELAY_MS)).await;
digest
Expand Down Expand Up @@ -267,9 +267,9 @@ where
},
digest = &mut queue.next() , if !queue.is_empty() => {
let digest = digest.unwrap();
if !self.state.database.effects_exists(&digest)? {
if !self.state.database.effects_exists(&digest.transaction)? {
// Download the certificate
let response = self.client.handle_transaction_info_request(TransactionInfoRequest::from(digest)).await?;
let response = self.client.handle_transaction_info_request(TransactionInfoRequest::from(digest.transaction)).await?;
self.process_response(response).await?;
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/sui-core/src/authority_active/gossip/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub async fn test_gossip() {
for digest in &digests {
let result1 = client
.handle_transaction_info_request(TransactionInfoRequest {
transaction_digest: *digest,
transaction_digest: digest.transaction,
})
.await;

Expand Down Expand Up @@ -78,7 +78,7 @@ pub async fn test_gossip_error() {
for digest in &digests {
let result1 = client
.handle_transaction_info_request(TransactionInfoRequest {
transaction_digest: *digest,
transaction_digest: digest.transaction,
})
.await;

Expand Down
2 changes: 1 addition & 1 deletion crates/sui-core/src/authority_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ impl crate::authority::AuthorityState {
// The structures we use to build the next batch. The current_batch holds the sequence
// of transactions in order, following the last batch. The loose transactions holds
// transactions we may have received out of order.
let mut current_batch: Vec<(TxSequenceNumber, TransactionDigest)> = Vec::new();
let mut current_batch: Vec<(TxSequenceNumber, ExecutionDigests)> = Vec::new();

while !exit {
// Reset the flags.
Expand Down
Loading

0 comments on commit 50300a4

Please sign in to comment.