Skip to content

Add chain observer interface #281

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Jun 22, 2022
16 changes: 14 additions & 2 deletions mithril-aggregator/src/dependency.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::Arc;
use tokio::sync::RwLock;

use mithril_common::chain_observer::ChainObserver;
use mithril_common::store::stake_store::StakeStore;

use super::entities::*;
Expand All @@ -16,6 +17,9 @@ pub type BeaconStoreWrapper = Arc<RwLock<dyn BeaconStore>>;
/// SnapshotStoreWrapper wraps a SnapshotStore
pub type SnapshotStoreWrapper = Arc<RwLock<dyn SnapshotStore>>;

/// SnapshotUploaderWrapper wraps a SnapshotUploader
pub type SnapshotUploaderWrapper = Arc<RwLock<dyn SnapshotUploader>>;

/// MultiSignerWrapper wraps a MultiSigner
pub type MultiSignerWrapper = Arc<RwLock<dyn MultiSigner>>;

Expand All @@ -31,8 +35,8 @@ pub type VerificationKeyStoreWrapper = Arc<RwLock<VerificationKeyStore>>;
/// StakeStoreWrapper wraps a StakeStore
pub type StakeStoreWrapper = Arc<RwLock<StakeStore>>;

/// StakeStoreWrapper wraps a StakeStore
pub type SnapshotUploaderWrapper = Arc<RwLock<dyn SnapshotUploader>>;
/// ChainObserverWrapper wraps a ChainObserver
pub type ChainObserverWrapper = Arc<RwLock<dyn ChainObserver>>;

/// DependencyManager handles the dependencies
pub struct DependencyManager {
Expand All @@ -45,6 +49,7 @@ pub struct DependencyManager {
pub certificate_store: Option<CertificateStoreWrapper>,
pub verification_key_store: Option<VerificationKeyStoreWrapper>,
pub stake_store: Option<StakeStoreWrapper>,
pub chain_observer: Option<ChainObserverWrapper>,
}

impl DependencyManager {
Expand All @@ -60,6 +65,7 @@ impl DependencyManager {
certificate_store: None,
verification_key_store: None,
stake_store: None,
chain_observer: None,
}
}

Expand Down Expand Up @@ -123,6 +129,12 @@ impl DependencyManager {
self
}

/// With ChainObserver middleware
pub fn with_chain_observer(&mut self, chain_observer: ChainObserverWrapper) -> &mut Self {
self.chain_observer = Some(chain_observer);
self
}

#[cfg(test)]
pub fn fake() -> DependencyManager {
let config = Config {
Expand Down
41 changes: 8 additions & 33 deletions mithril-aggregator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ use clap::Parser;

use config::{Map, Source, Value, ValueKind};
use mithril_aggregator::{
AggregatorConfig, AggregatorRunner, AggregatorRuntime, BeaconStore, CertificatePendingStore,
AggregatorConfig, AggregatorRunner, AggregatorRuntime, CertificatePendingStore,
CertificateStore, Config, DependencyManager, MemoryBeaconStore, MultiSigner, MultiSignerImpl,
Server, VerificationKeyStore,
};
use mithril_common::crypto_helper::ProtocolStakeDistribution;
use mithril_common::chain_observer::FakeObserver;
use mithril_common::fake_data;
use mithril_common::store::adapter::JsonFileStoreAdapter;
use mithril_common::store::stake_store::StakeStore;
Expand Down Expand Up @@ -150,7 +150,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
verification_key_store.clone(),
stake_store.clone(),
)));
setup_dependencies_fake_data(multi_signer.clone(), beacon_store.clone()).await;
let chain_observer = Arc::new(RwLock::new(FakeObserver::new()));
setup_dependencies_fake_data(multi_signer.clone()).await;

// Init dependency manager
let mut dependency_manager = DependencyManager::new(config.clone());
Expand All @@ -162,7 +163,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
.with_certificate_pending_store(certificate_pending_store.clone())
.with_certificate_store(certificate_store.clone())
.with_verification_key_store(verification_key_store.clone())
.with_stake_store(stake_store.clone());
.with_stake_store(stake_store.clone())
.with_chain_observer(chain_observer.clone());
let dependency_manager = Arc::new(dependency_manager);

// Start snapshot uploader
Expand Down Expand Up @@ -204,21 +206,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
}

/// Setup dependencies with fake data
// TODO: remove this function when new runtime is implemented + remove protocol parameters from fake data
async fn setup_dependencies_fake_data(
multi_signer: Arc<RwLock<dyn MultiSigner>>,
beacon_store: Arc<RwLock<dyn BeaconStore>>,
) {
// Set current beacon
{
let beacon = fake_data::beacon();
let mut beacon_store = beacon_store.write().await;
beacon_store
.set_current_beacon(beacon.clone())
.await
.expect("fake set current beacon failed");
}

// TODO: remove this function when new protocol parameters are not fake anymore
async fn setup_dependencies_fake_data(multi_signer: Arc<RwLock<dyn MultiSigner>>) {
// Update protocol parameters
{
let mut multi_signer = multi_signer.write().await;
Expand All @@ -228,18 +217,4 @@ async fn setup_dependencies_fake_data(
.await
.expect("fake update protocol parameters failed");
}

// Update stake distribution
{
let mut multi_signer = multi_signer.write().await;
let total_signers = 5;
let stakes: ProtocolStakeDistribution = fake_data::signers_with_stakes(total_signers)
.into_iter()
.map(|signer| signer.into())
.collect::<_>();
multi_signer
.update_stake_distribution(&stakes)
.await
.expect("fake stake distribution update failed");
}
}
21 changes: 15 additions & 6 deletions mithril-aggregator/src/multi_signer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ pub trait MultiSigner: Sync + Send {

/// Get signers
async fn get_signers(&self) -> Result<Vec<entities::Signer>, ProtocolError> {
debug!("Get signers");
Ok(self
.get_signers_with_stake()
.await?
Expand Down Expand Up @@ -248,6 +249,7 @@ impl MultiSigner for MultiSignerImpl {

/// Get stake distribution
async fn get_stake_distribution(&self) -> Result<ProtocolStakeDistribution, ProtocolError> {
debug!("Get stake distribution");
#[allow(unused_variables, clippy::identity_op)]
let epoch = self
.beacon_store
Expand All @@ -258,8 +260,10 @@ impl MultiSigner for MultiSignerImpl {
.ok_or_else(ProtocolError::UnavailableBeacon)?
.epoch
- 0; // TODO: Should be -1 or -2
let epoch = 0; // TODO: to remove once the runtime feeds the stake distribution
warn!("Epoch computation is not final and needs to be fixed");
warn!(
"Epoch computation is not final and needs to be fixed: {}",
epoch
);
let signers = self
.stake_store
.read()
Expand Down Expand Up @@ -288,8 +292,6 @@ impl MultiSigner for MultiSignerImpl {
.await?
.ok_or_else(ProtocolError::UnavailableBeacon)?
.epoch;
let epoch = 0; // TODO: to remove once the runtime feeds the stake distribution
warn!("Epoch computation is not final and needs to be fixed");
let mut stake_store = self.stake_store.write().await;
for (party_id, stake) in stakes {
stake_store
Expand Down Expand Up @@ -318,7 +320,10 @@ impl MultiSigner for MultiSignerImpl {
.ok_or_else(ProtocolError::UnavailableBeacon)?
.epoch
- 0; // TODO: Should be -1 or -2
warn!("Epoch computation is not final and needs to be fixed");
warn!(
"Epoch computation is not final and needs to be fixed: {}",
epoch
);
let signers = self
.verification_key_store
.read()
Expand All @@ -338,6 +343,7 @@ impl MultiSigner for MultiSignerImpl {
async fn get_signers_with_stake(
&self,
) -> Result<Vec<entities::SignerWithStake>, ProtocolError> {
debug!("Get signers with stake");
#[allow(clippy::identity_op)]
let epoch = self
.beacon_store
Expand All @@ -348,7 +354,10 @@ impl MultiSigner for MultiSignerImpl {
.ok_or_else(ProtocolError::UnavailableBeacon)?
.epoch
- 0; // TODO: Should be -1 or -2
warn!("Epoch computation is not final and needs to be fixed");
warn!(
"Epoch computation is not final and needs to be fixed: {}",
epoch
);
let signers = self
.verification_key_store
.read()
Expand Down
10 changes: 9 additions & 1 deletion mithril-aggregator/src/runtime/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ use crate::snapshot_stores::SnapshotStoreError;
use crate::store::StoreError;
use crate::{BeaconStoreError, ProtocolError, SnapshotError};

use mithril_common::chain_observer::ChainObserverError;
use mithril_common::digesters::{DigesterError, ImmutableFileListingError};
use mithril_common::store::stake_store::StakeStoreError;
use std::io;
use thiserror::Error;

Expand All @@ -23,6 +25,9 @@ pub enum RuntimeError {
#[error("snapshot store error")]
SnapshotStore(#[from] SnapshotStoreError),

#[error("stake store error")]
StakeStore(#[from] StakeStoreError),

#[error("store error")]
StoreError(#[from] StoreError),

Expand All @@ -33,7 +38,10 @@ pub enum RuntimeError {
SnapshotBuild(#[from] io::Error),

#[error("immutable file scanning error")]
ImmutableFileError(#[from] ImmutableFileListingError),
ImmutableFile(#[from] ImmutableFileListingError),

#[error("chain observer error")]
ChainObserver(#[from] ChainObserverError),

#[error("general error")]
General(String),
Expand Down
72 changes: 69 additions & 3 deletions mithril-aggregator/src/runtime/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ use crate::{DependencyManager, SnapshotError, Snapshotter};
use async_trait::async_trait;
use chrono::Utc;
use mithril_common::digesters::{Digester, DigesterResult, ImmutableDigester, ImmutableFile};
use mithril_common::entities::{Beacon, Certificate, CertificatePending, Snapshot};
use mithril_common::entities::{
Beacon, Certificate, CertificatePending, SignerWithStake, Snapshot,
};

use mithril_common::store::stake_store::StakeStorer;
use slog_scope::{debug, error, info, trace, warn};
use std::path::Path;
use std::sync::Arc;
Expand Down Expand Up @@ -57,6 +60,10 @@ pub trait AggregatorRunnerTrait: Sync + Send {

async fn compute_digest(&self, new_beacon: &Beacon) -> Result<DigesterResult, RuntimeError>;

async fn update_beacon(&self, new_beacon: &Beacon) -> Result<(), RuntimeError>;

async fn update_stake_distribution(&self, new_beacon: &Beacon) -> Result<(), RuntimeError>;

async fn update_message_in_multisigner(
&self,
digest_result: DigesterResult,
Expand Down Expand Up @@ -123,16 +130,28 @@ impl AggregatorRunnerTrait for AggregatorRunner {
);
let db_path: &Path = self.config.db_directory.as_path();
let immutable_file_number = ImmutableFile::list_completed_in_dir(db_path)
.map_err(RuntimeError::ImmutableFileError)?
.map_err(RuntimeError::ImmutableFile)?
.into_iter()
.last()
.ok_or_else(|| RuntimeError::General("no immutable file was returned".to_string()))?
.number;
let epoch = self
.config
.dependencies
.chain_observer
.as_ref()
.ok_or_else(|| RuntimeError::General("no chain observer registered".to_string()))?
.read()
.await
.get_current_epoch()
.await?
.ok_or_else(|| RuntimeError::General("no epoch was returned".to_string()))?;
let current_beacon = Beacon {
network: self.config.network.clone(),
epoch: 0,
epoch,
immutable_file_number,
};
debug!("checking if there is a new beacon: {:?}", current_beacon);

match maybe_beacon {
Some(beacon) if current_beacon > beacon => Ok(Some(current_beacon)),
Expand Down Expand Up @@ -192,6 +211,53 @@ impl AggregatorRunnerTrait for AggregatorRunner {
}
}

async fn update_beacon(&self, new_beacon: &Beacon) -> Result<(), RuntimeError> {
info!("update beacon"; "beacon" => #?new_beacon);
let _ = self
.config
.dependencies
.beacon_store
.as_ref()
.ok_or_else(|| RuntimeError::General("no beacon store registered".to_string()))?
.write()
.await
.set_current_beacon(new_beacon.to_owned())
.await?;
Ok(())
}

async fn update_stake_distribution(&self, new_beacon: &Beacon) -> Result<(), RuntimeError> {
info!("update stake distribution"; "beacon" => #?new_beacon);
let stake_distribution = self
.config
.dependencies
.chain_observer
.as_ref()
.ok_or_else(|| RuntimeError::General("no chain observer registered".to_string()))?
.read()
.await
.get_current_stake_distribution()
.await?
.ok_or_else(|| RuntimeError::General("no epoch was returned".to_string()))?;
let mut stake_store = self
.config
.dependencies
.stake_store
.as_ref()
.ok_or_else(|| RuntimeError::General("no stake store registered".to_string()))?
.write()
.await;
for (party_id, stake) in &stake_distribution {
stake_store
.save_stake(
new_beacon.epoch,
SignerWithStake::new(*party_id, "".to_string(), *stake),
)
.await?;
}
Ok(())
}

async fn create_new_pending_certificate_from_multisigner(
&self,
beacon: Beacon,
Expand Down
12 changes: 12 additions & 0 deletions mithril-aggregator/src/runtime/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ impl AggregatorRuntime {
new_beacon: Beacon,
) -> Result<SigningState, RuntimeError> {
debug!("launching transition from IDLE to SIGNING state");
let _ = self.runner.update_beacon(&new_beacon).await?;
let _ = self.runner.update_stake_distribution(&new_beacon).await?; // TODO: This should happen only when the epoch is changing. This requires to modify the state machine by keeping track of the previous beacon in the state
let digester_result = self.runner.compute_digest(&new_beacon).await?;
let _ = self
.runner
Expand Down Expand Up @@ -264,6 +266,16 @@ mod tests {
last_immutable_file_number: 123,
})
});
runner
.expect_update_beacon()
.with(predicate::eq(fake_data::beacon()))
.times(1)
.returning(|_| Ok(()));
runner
.expect_update_stake_distribution()
.with(predicate::eq(fake_data::beacon()))
.times(1)
.returning(|_| Ok(()));
runner
.expect_update_message_in_multisigner()
.with(predicate::eq(DigesterResult {
Expand Down
Loading