Skip to content

Commit

Permalink
Reopen AuthorityEpochTables under new pathname during reconfiguration (
Browse files Browse the repository at this point in the history
…MystenLabs#5261)

* Reopen AuthorityEpochTables under new pathname during reconfiguration

* Sequence shared objects correctly after epoch change.
  • Loading branch information
mystenmark authored Oct 19, 2022
1 parent 2fea4f8 commit fa3cf60
Show file tree
Hide file tree
Showing 16 changed files with 424 additions and 178 deletions.
3 changes: 2 additions & 1 deletion crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1538,7 +1538,8 @@ impl AuthorityState {
None => &default_genesis,
};

let store = Arc::new(AuthorityStore::open(&path.join("store"), None));
// unwrap ok - for testing only.
let store = Arc::new(AuthorityStore::open(&path.join("store"), None).unwrap());
let mut checkpoints = CheckpointStore::open(
&path.join("checkpoints"),
None,
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-core/src/authority/authority_notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ mod tests {
let path = dir.join(format!("DB_{:?}", ObjectID::random()));
fs::create_dir(&path).unwrap();

let store = Arc::new(AuthorityStore::open(&path, None));
let store = Arc::new(AuthorityStore::open(&path, None).unwrap());

let notifier = Arc::new(TransactionNotifier::new(store.clone()).unwrap());

Expand Down
229 changes: 102 additions & 127 deletions crates/sui-core/src/authority/authority_store.rs

Large diffs are not rendered by default.

95 changes: 89 additions & 6 deletions crates/sui-core/src/authority/authority_store_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,16 @@ impl<S> AuthorityEpochTables<S>
where
S: std::fmt::Debug + Serialize + for<'de> Deserialize<'de>,
{
pub fn path(parent_path: &Path) -> PathBuf {
parent_path.join("epoch")
pub fn path(epoch: EpochId, parent_path: &Path) -> PathBuf {
parent_path.join(format!("epoch_{}", epoch))
}

pub fn open(parent_path: &Path, db_options: Option<Options>) -> Self {
Self::open_tables_read_write(Self::path(parent_path), db_options, None)
pub fn open(epoch: EpochId, parent_path: &Path, db_options: Option<Options>) -> Self {
Self::open_tables_read_write(Self::path(epoch, parent_path), db_options, None)
}

pub fn open_readonly(parent_path: &Path) -> AuthorityEpochTablesReadOnly<S> {
Self::get_read_only_handle(Self::path(parent_path), None, None)
pub fn open_readonly(epoch: EpochId, parent_path: &Path) -> AuthorityEpochTablesReadOnly<S> {
Self::get_read_only_handle(Self::path(epoch, parent_path), None, None)
}
}

Expand Down Expand Up @@ -154,6 +154,89 @@ where
pub fn open_readonly(parent_path: &Path) -> AuthorityPerpetualTablesReadOnly<S> {
Self::get_read_only_handle(Self::path(parent_path), None, None)
}

/// Read an object and return it, or Err(ObjectNotFound) if the object was not found.
pub fn get_object(&self, object_id: &ObjectID) -> Result<Option<Object>, SuiError> {
let obj_entry = self
.objects
.iter()
.skip_prior_to(&ObjectKey::max_for_id(object_id))?
.next();

let obj = match obj_entry {
Some((ObjectKey(obj_id, _), obj)) if obj_id == *object_id => obj,
_ => return Ok(None),
};

// Note that the two reads in this function are (obviously) not atomic, and the
// object may be deleted after we have read it. Hence we check get_latest_parent_entry
// last, so that the write to self.parent_sync gets the last word.
//
// TODO: verify this race is ok.
//
// I believe it is - Even if the reads were atomic, calls to this function would still
// race with object deletions (the object could be deleted between when the function is
// called and when the first read takes place, which would be indistinguishable to the
// caller with the case in which the object is deleted in between the two reads).
let parent_entry = self.get_latest_parent_entry(*object_id)?;

match parent_entry {
None => {
error!(
?object_id,
"Object is missing parent_sync entry, data store is inconsistent"
);
Ok(None)
}
Some((obj_ref, _)) if obj_ref.2.is_alive() => Ok(Some(obj)),
_ => Ok(None),
}
}

pub fn get_latest_parent_entry(
&self,
object_id: ObjectID,
) -> Result<Option<(ObjectRef, TransactionDigest)>, SuiError> {
let mut iterator = self
.parent_sync
.iter()
// Make the max possible entry for this object ID.
.skip_prior_to(&(object_id, SequenceNumber::MAX, ObjectDigest::MAX))?;

Ok(iterator.next().and_then(|(obj_ref, tx_digest)| {
if obj_ref.0 == object_id {
Some((obj_ref, tx_digest))
} else {
None
}
}))
}

pub fn get_sui_system_state_object(&self) -> SuiResult<SuiSystemState> {
let sui_system_object = self
.get_object(&SUI_SYSTEM_STATE_OBJECT_ID)?
.expect("Sui System State object must always exist");
let move_object = sui_system_object
.data
.try_as_move()
.expect("Sui System State object must be a Move object");
let result = bcs::from_bytes::<SuiSystemState>(move_object.contents())
.expect("Sui System State object deserialization cannot fail");
Ok(result)
}

pub fn get_epoch(&self) -> SuiResult<EpochId> {
Ok(self.get_sui_system_state_object()?.epoch)
}

pub fn database_is_empty(&self) -> SuiResult<bool> {
Ok(self
.objects
.iter()
.skip_to(&ObjectKey::ZERO)?
.next()
.is_none())
}
}

#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq)]
Expand Down
4 changes: 4 additions & 0 deletions crates/sui-core/src/epoch/reconfiguration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ where

let sui_system_state = self.state.get_sui_system_state_object().await?;
let next_epoch = epoch + 1;

// Create new AuthorityEpochTables for epoch-specific data.
self.state.database.reopen_epoch_db(next_epoch);

let new_committee = sui_system_state.get_next_epoch_committee();
debug!(
?epoch,
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-core/src/gateway_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ impl<A> GatewayState<A> {
let gateway_metrics = GatewayMetrics::new(prometheus_registry);
let auth_agg_metrics = AuthAggMetrics::new(prometheus_registry);
let safe_client_metrics = Arc::new(SafeClientMetrics::new(prometheus_registry));
let gateway_store = Arc::new(GatewayStore::open(&base_path.join("store"), None));
let gateway_store = Arc::new(GatewayStore::open(&base_path.join("store"), None)?);
let committee_store = Arc::new(CommitteeStore::new(
base_path.join("epochs"),
&committee,
Expand Down
4 changes: 2 additions & 2 deletions crates/sui-core/src/unit_tests/authority_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1649,7 +1649,7 @@ async fn test_authority_persist() {
fs::create_dir(&path).unwrap();

// Create an authority
let store = Arc::new(AuthorityStore::open(&path, None));
let store = Arc::new(AuthorityStore::open(&path, None).unwrap());
let authority =
crate::authority_batch::batch_tests::init_state(committee, authority_key, store).await;

Expand All @@ -1670,7 +1670,7 @@ async fn test_authority_persist() {
crate::authority_batch::batch_tests::init_state_parameters_from_rng(
&mut StdRng::from_seed(seed),
);
let store = Arc::new(AuthorityStore::open(&path, None));
let store = Arc::new(AuthorityStore::open(&path, None).unwrap());
let authority2 =
crate::authority_batch::batch_tests::init_state(committee, authority_key, store).await;
let obj2 = authority2.get_object(&object_id).await.unwrap().unwrap();
Expand Down
16 changes: 8 additions & 8 deletions crates/sui-core/src/unit_tests/batch_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ async fn test_open_manager() {
init_state_parameters_from_rng(&mut StdRng::from_seed(seed));
{
// Create an authority
let store = Arc::new(AuthorityStore::open(&path, None));
let store = Arc::new(AuthorityStore::open(&path, None).unwrap());
let mut authority_state = init_state(committee, authority_key, store.clone()).await;

// TEST 1: init from an empty database should return to a zero block
Expand All @@ -145,7 +145,7 @@ async fn test_open_manager() {
init_state_parameters_from_rng(&mut StdRng::from_seed(seed));
{
// Create an authority
let store = Arc::new(AuthorityStore::open(&path, None));
let store = Arc::new(AuthorityStore::open(&path, None).unwrap());
let mut authority_state = init_state(committee, authority_key, store.clone()).await;

let last_block = authority_state
Expand All @@ -168,7 +168,7 @@ async fn test_open_manager() {
init_state_parameters_from_rng(&mut StdRng::from_seed(seed));
{
// Create an authority
let store = Arc::new(AuthorityStore::open(&path, None));
let store = Arc::new(AuthorityStore::open(&path, None).unwrap());
let mut authority_state = init_state(committee, authority_key, store.clone()).await;

let last_block = authority_state.init_batches_from_database().unwrap();
Expand All @@ -189,7 +189,7 @@ async fn test_batch_manager_happy_path() {
fs::create_dir(&path).unwrap();

// Create an authority
let store = Arc::new(AuthorityStore::open(&path, None));
let store = Arc::new(AuthorityStore::open(&path, None).unwrap());

// Make a test key pair
let seed = [1u8; 32];
Expand Down Expand Up @@ -250,7 +250,7 @@ async fn test_batch_manager_out_of_order() {
fs::create_dir(&path).unwrap();

// Create an authority
let store = Arc::new(AuthorityStore::open(&path, None));
let store = Arc::new(AuthorityStore::open(&path, None).unwrap());

// Make a test key pair
let seed = [1u8; 32];
Expand Down Expand Up @@ -320,7 +320,7 @@ async fn test_batch_manager_drop_out_of_order() {
fs::create_dir(&path).unwrap();

// Create an authority
let store = Arc::new(AuthorityStore::open(&path, None));
let store = Arc::new(AuthorityStore::open(&path, None).unwrap());

// Make a test key pair
let seed = [1u8; 32];
Expand Down Expand Up @@ -437,7 +437,7 @@ async fn test_batch_store_retrieval() {
fs::create_dir(&path).unwrap();

// Create an authority
let store = Arc::new(AuthorityStore::open(&path, None));
let store = Arc::new(AuthorityStore::open(&path, None).unwrap());

// Make a test key pair
let seed = [1u8; 32];
Expand Down Expand Up @@ -825,7 +825,7 @@ async fn test_safe_batch_stream() {
authorities.insert(public_key_bytes, 1);
let committee = Committee::new(0, authorities).unwrap();
// Create an authority
let store = Arc::new(AuthorityStore::open(&path.join("store"), None));
let store = Arc::new(AuthorityStore::open(&path.join("store"), None).unwrap());
let state = init_state(committee.clone(), authority_key, store).await;
let committee_store = state.committee_store().clone();

Expand Down
32 changes: 24 additions & 8 deletions crates/sui-core/src/unit_tests/gateway_state_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ async fn create_gateway_state_with_object_basics_ref(
.collect();
let (authorities, _, pkg_ref) = init_local_authorities(4, genesis_objects).await;
let path = tempfile::tempdir().unwrap().into_path();
let gateway_store = Arc::new(GatewayStore::open(&path, None));
let gateway_store = Arc::new(GatewayStore::open(&path, None).unwrap());
let gateway = GatewayState::new_with_authorities(
gateway_store,
authorities,
Expand Down Expand Up @@ -245,7 +245,10 @@ async fn test_coin_split_insufficient_gas() {
// Tx should fail due to out of gas, and no transactions should remain pending.
// Objects are not locked either.
assert!(response.is_err());
assert_eq!(gateway.store().pending_transactions().iter().count(), 0);
assert_eq!(
gateway.store().epoch_tables().transactions.iter().count(),
0
);
assert_eq!(
gateway
.store()
Expand Down Expand Up @@ -342,8 +345,14 @@ async fn test_equivocation_resilient() {
.count(),
1
);
println!("{:?}", gateway.store().pending_transactions().iter().next());
assert_eq!(gateway.store().pending_transactions().iter().count(), 0);
println!(
"{:?}",
gateway.store().epoch_tables().transactions.iter().next()
);
assert_eq!(
gateway.store().epoch_tables().transactions.iter().count(),
0
);
}

#[tokio::test]
Expand Down Expand Up @@ -380,10 +389,14 @@ async fn test_public_transfer_object_with_retry() {
// transactions table.
// However objects in the transaction should no longer be locked since we reset
// them at the last failed retry.
assert_eq!(gateway.store().pending_transactions().iter().count(), 1);
assert_eq!(
gateway.store().epoch_tables().transactions.iter().count(),
1
);
let (tx_digest, tx) = gateway
.store()
.pending_transactions()
.epoch_tables()
.transactions
.iter()
.next()
.unwrap();
Expand All @@ -408,7 +421,10 @@ async fn test_public_transfer_object_with_retry() {
let new_owner = &oref.owner;
assert_eq!(new_owner, &Owner::AddressOwner(addr2));

assert_eq!(gateway.store().pending_transactions().iter().count(), 0);
assert_eq!(
gateway.store().epoch_tables().transactions.iter().count(),
0
);
assert!(gateway
.store()
.get_object_locking_transaction(&coin_object.compute_object_reference())
Expand Down Expand Up @@ -591,7 +607,7 @@ async fn test_multiple_gateways() {
let path = tempfile::tempdir().unwrap().into_path();
// gateway2 shares the same set of authorities as gateway1.
let gateway2 = GatewayState::new_with_authorities(
Arc::new(GatewayStore::open(&path, None)),
Arc::new(GatewayStore::open(&path, None).unwrap()),
gateway1.authorities.clone(),
GatewayMetrics::new_for_tests(),
)
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl SuiNode {

let secret = Arc::pin(config.protocol_key_pair().copy());
let committee = genesis.committee()?;
let store = Arc::new(AuthorityStore::open(&config.db_path().join("store"), None));
let store = Arc::new(AuthorityStore::open(&config.db_path().join("store"), None)?);
let committee_store = Arc::new(CommitteeStore::new(
config.db_path().join("epochs"),
&committee,
Expand Down
Loading

0 comments on commit fa3cf60

Please sign in to comment.