Skip to content

Commit

Permalink
[epoch] Index authenticated epoch data structure by its epoch ID (Mys…
Browse files Browse the repository at this point in the history
…tenLabs#3553)

* [epoch] Index authenticated epoch datat structure by its epoch ID

* Address comments
  • Loading branch information
lxfind authored Jul 29, 2022
1 parent b74783d commit 5e06c90
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 129 deletions.
29 changes: 20 additions & 9 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1031,11 +1031,16 @@ impl AuthorityState {
.bulk_object_insert(&genesis.objects().iter().collect::<Vec<_>>())
.await
.expect("Cannot bulk insert genesis objects");
store
.init_genesis_epoch(genesis_committee.clone())
.expect("Init genesis epoch data must not fail");
genesis_committee
} else if let Some(latest_epoch) = store.get_latest_authenticated_epoch() {
latest_epoch.epoch_info().next_epoch_committee().clone()
} else {
genesis_committee
store
.get_latest_authenticated_epoch()
.epoch_info()
.committee()
.clone()
};

let event_handler = event_store.map(|es| Arc::new(EventHandler::new(store.clone(), es)));
Expand Down Expand Up @@ -1156,17 +1161,23 @@ impl AuthorityState {

pub(crate) fn sign_new_epoch_and_update_committee(
&self,
next_epoch_committee: Committee,
last_checkpoint: CheckpointSequenceNumber,
new_committee: Committee,
next_checkpoint: CheckpointSequenceNumber,
) -> SuiResult {
// TODO: It's likely safer to do the following operations atomically, in case this function
// gets called from different threads. It cannot happen today, but worth the caution.
fp_ensure!(
self.epoch() + 1 == new_committee.epoch,
SuiError::from("Invalid new epoch to sign and update")
);
self.database.sign_new_epoch(
self.epoch(),
next_epoch_committee.clone(),
new_committee.clone(),
self.name,
&*self.secret,
last_checkpoint,
next_checkpoint,
)?;
self.committee.swap(Arc::new(next_epoch_committee));
// TODO: Do we want to make it possible to subscribe to committee changes?
self.committee.swap(Arc::new(new_committee));
Ok(())
}

Expand Down
82 changes: 25 additions & 57 deletions crates/sui-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use sui_storage::{
};
use sui_types::base_types::SequenceNumber;
use sui_types::batch::{SignedBatch, TxSequenceNumber};
use sui_types::committee::EpochId;
use sui_types::crypto::{AuthoritySignInfo, EmptySignInfo};
use sui_types::messages::AuthenticatedEpoch;
use sui_types::messages_checkpoint::CheckpointSequenceNumber;
use sui_types::object::{Owner, OBJECT_START_VERSION};
use tokio::sync::Notify;
Expand All @@ -41,28 +41,6 @@ const NUM_SHARDS: usize = 4096;
// TODO: Make a single table (e.g., called `variables`) storing all our lonely variables in one place.
const LAST_CONSENSUS_INDEX_ADDR: u64 = 0;

#[derive(Clone, Serialize, Deserialize, Debug)]
pub enum AuthenticatedEpoch {
Signed(SignedEpoch),
Certified(CertifiedEpoch),
}

impl AuthenticatedEpoch {
pub fn epoch(&self) -> EpochId {
match self {
Self::Signed(s) => s.epoch_info.epoch(),
Self::Certified(c) => c.epoch_info.epoch(),
}
}

pub fn epoch_info(&self) -> &EpochInfo {
match self {
Self::Signed(s) => &s.epoch_info,
Self::Certified(c) => &c.epoch_info,
}
}
}

/// ALL_OBJ_VER determines whether we want to store all past
/// versions of every object in the store. Authority doesn't store
/// them, but other entities such as replicas will.
Expand Down Expand Up @@ -1339,57 +1317,47 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {

// Epoch related functions

pub fn init_genesis_epoch(&self, genesis_committee: Committee) -> SuiResult {
assert_eq!(genesis_committee.epoch, 0);
let epoch_data = AuthenticatedEpoch::Genesis(GenesisEpoch::new(genesis_committee));
self.tables.epochs.insert(&0, &epoch_data)?;
Ok(())
}

/// This function should be called at the end of the epoch identified by `epoch`,
/// and after this call, we expect that the node's committee has changed
/// to `next_epoch_committee`.
/// to `new_committee`.
pub fn sign_new_epoch(
&self,
epoch: EpochId,
next_epoch_committee: Committee,
new_committee: Committee,
authority: AuthorityName,
secret: &dyn signature::Signer<AuthoritySignature>,
last_checkpoint: CheckpointSequenceNumber,
next_checkpoint: CheckpointSequenceNumber,
) -> SuiResult {
let cur_epoch = new_committee.epoch;
let latest_epoch = self.get_latest_authenticated_epoch();
match latest_epoch {
Some(a) => {
fp_ensure!(
a.epoch() + 1 == epoch,
SuiError::from("Unexpected new epoch number")
);
}
None => {
fp_ensure!(
epoch == 0,
SuiError::from("Could not find previous epoch information")
);
}
}

let signed_epoch = SignedEpoch::new(
epoch,
next_epoch_committee,
authority,
secret,
last_checkpoint,
)?;
fp_ensure!(
latest_epoch.epoch() + 1 == cur_epoch,
SuiError::from("Unexpected new epoch number")
);

let mut writer = self.tables.epochs.batch();
writer = writer.insert_batch(
&self.tables.epochs,
iter::once((epoch, AuthenticatedEpoch::Signed(signed_epoch))),
)?;
writer.write()?;
let signed_epoch = SignedEpoch::new(new_committee, authority, secret, next_checkpoint);
self.tables
.epochs
.insert(&cur_epoch, &AuthenticatedEpoch::Signed(signed_epoch))?;
Ok(())
}

pub fn get_latest_authenticated_epoch(&self) -> Option<AuthenticatedEpoch> {
pub fn get_latest_authenticated_epoch(&self) -> AuthenticatedEpoch {
self.tables
.epochs
.iter()
.skip_to_last()
.next()
.map(|(_, a)| a)
// unwrap safe because we guarantee there is at least a genesis epoch
// when initializing the store.
.unwrap()
.1
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/sui-core/src/authority/authority_store_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use super::{
authority_store::{AuthenticatedEpoch, InternalSequenceNumber, ObjectKey},
authority_store::{InternalSequenceNumber, ObjectKey},
*,
};
use narwhal_executor::ExecutionIndices;
Expand Down
6 changes: 3 additions & 3 deletions crates/sui-core/src/epoch/reconfiguration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ where
pub async fn finish_epoch_change(&self) -> SuiResult {
let epoch = self.state.committee.load().epoch;
info!(?epoch, "Finishing epoch change");
let last_checkpoint = if let Some(checkpoints) = &self.state.checkpoints {
let next_checkpoint = if let Some(checkpoints) = &self.state.checkpoints {
let mut checkpoints = checkpoints.lock();
assert!(
checkpoints.is_ready_to_finish_epoch_change(),
Expand All @@ -85,7 +85,7 @@ where

self.state.database.remove_all_pending_certificates()?;

checkpoints.next_checkpoint() - 1
checkpoints.next_checkpoint()

// drop checkpoints lock
} else {
Expand All @@ -111,7 +111,7 @@ where
"New committee for the next epoch: {:?}", new_committee
);
self.state
.sign_new_epoch_and_update_committee(new_committee.clone(), last_checkpoint)?;
.sign_new_epoch_and_update_committee(new_committee.clone(), next_checkpoint)?;

// Reconnect the network if we have an type of AuthorityClient that has a network.
if A::needs_network_recreation() {
Expand Down
17 changes: 13 additions & 4 deletions crates/sui-core/src/epoch/tests/reconfiguration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ use sui_types::{
crypto::{get_key_pair, AccountKeyPair, AuthoritySignature, Signature, SuiAuthoritySignature},
error::SuiError,
gas::SuiGasStatus,
messages::{InputObjects, SignatureAggregator, Transaction, TransactionData},
messages::{
AuthenticatedEpoch, InputObjects, SignatureAggregator, Transaction, TransactionData,
},
object::Object,
SUI_SYSTEM_STATE_OBJECT_ID,
};
Expand All @@ -38,6 +40,12 @@ async fn test_start_epoch_change() {
// Create authority_aggregator and authority states.
let (net, states) = init_local_authorities(4, genesis_objects.clone()).await;
let state = states[0].clone();

// Check that we initialized the genesis epoch.
let init_epoch = state.database.get_latest_authenticated_epoch();
assert!(matches!(init_epoch, AuthenticatedEpoch::Genesis(..)));
assert_eq!(init_epoch.epoch(), 0);

// Set the checkpoint number to be near the end of epoch.

state
Expand Down Expand Up @@ -206,9 +214,10 @@ async fn test_finish_epoch_change() {
for active in actives {
assert_eq!(active.state.epoch(), 1);
assert_eq!(active.net.load().committee.epoch, 1);
let latest_epoch = active.state.db().get_latest_authenticated_epoch().unwrap();
assert_eq!(latest_epoch.epoch(), 0);
assert_eq!(latest_epoch.epoch_info().next_epoch_committee().epoch, 1);
let latest_epoch = active.state.db().get_latest_authenticated_epoch();
assert_eq!(latest_epoch.epoch(), 1);
assert!(matches!(latest_epoch, AuthenticatedEpoch::Signed(..)));
assert_eq!(latest_epoch.epoch_info().epoch(), 1);
// Verify that validator is no longer halted.
assert!(!active.state.is_halted());
let system_state = active.state.get_sui_system_state_object().await.unwrap();
Expand Down
Loading

0 comments on commit 5e06c90

Please sign in to comment.