Skip to content

Greg/250/chain observer #294

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 186 additions & 0 deletions mithril-aggregator/src/beacon_provider.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
use async_trait::async_trait;
use std::{error::Error, path::PathBuf, sync::Arc};
use tokio::sync::RwLock;

use mithril_common::{chain_observer::ChainObserver, digesters::ImmutableFile, entities::Beacon};

use crate::runtime::RuntimeError;

#[async_trait]
pub trait ImmutableFileObserver
where
Self: Sync + Send,
{
async fn get_last_immutable_number(&self) -> Result<u64, Box<dyn Error + Sync + Send>>;
}

pub struct ImmutableFileSystemObserver {
db_path: PathBuf,
}

impl ImmutableFileSystemObserver {
pub fn new(db_path: &PathBuf) -> Self {
let db_path = db_path.to_owned();

Self { db_path }
}
}

#[async_trait]
impl ImmutableFileObserver for ImmutableFileSystemObserver {
async fn get_last_immutable_number(&self) -> Result<u64, Box<dyn Error + Sync + Send>> {
let immutable_file_number = ImmutableFile::list_completed_in_dir(&self.db_path)
.map_err(RuntimeError::ImmutableFile)?
.into_iter()
.last()
.ok_or_else(|| {
RuntimeError::General("no immutable file was returned".to_string().into())
})?
.number;

Ok(immutable_file_number)
}
}

#[async_trait]
pub trait BeaconProvider
where
Self: Sync + Send,
{
async fn get_current_beacon(&self) -> Result<Beacon, Box<dyn Error + Sync + Send>>;
}

pub struct BeaconProviderImpl {
chain_observer: Arc<RwLock<dyn ChainObserver>>,
immutable_observer: Arc<RwLock<dyn ImmutableFileObserver>>,
network: String,
}

impl BeaconProviderImpl {
pub fn new(
chain_observer: Arc<RwLock<dyn ChainObserver>>,
immutable_observer: Arc<RwLock<dyn ImmutableFileObserver>>,
network: &str,
) -> Self {
let network = network.to_string();

Self {
chain_observer,
immutable_observer,
network,
}
}
}

#[async_trait]
impl BeaconProvider for BeaconProviderImpl {
async fn get_current_beacon(&self) -> Result<Beacon, Box<dyn Error + Sync + Send>> {
let epoch = self
.chain_observer
.read()
.await
.get_current_epoch()
.await?
.ok_or_else(|| RuntimeError::General("could not get Epoch".to_string().into()))?;
let immutable_file_number = self
.immutable_observer
.read()
.await
.get_last_immutable_number()
.await?;

let beacon = Beacon {
network: self.network.clone(),
epoch,
immutable_file_number,
};

Ok(beacon)
}
}

#[cfg(test)]
mod tests {
use std::io::ErrorKind;

use mithril_common::chain_observer::{ChainObserver, ChainObserverError};
use mithril_common::digesters::ImmutableFileListingError;
use mithril_common::entities::{Epoch, StakeDistribution};

use super::*;

struct TestChainObserver {}

#[async_trait]
impl ChainObserver for TestChainObserver {
async fn get_current_epoch(&self) -> Result<Option<Epoch>, ChainObserverError> {
Ok(Some(42))
}

async fn get_current_stake_distribution(
&self,
) -> Result<Option<StakeDistribution>, ChainObserverError> {
Err(ChainObserverError::General(
"this should not be called in the BeaconProvider"
.to_string()
.into(),
))
}
}

struct TestImmutableFileObserver {
shall_return: Option<u64>,
}

impl TestImmutableFileObserver {
pub fn new() -> Self {
Self {
shall_return: Some(119827),
}
}

pub fn shall_return(&mut self, what: Option<u64>) -> &mut Self {
self.shall_return = what;
self
}
}

#[async_trait]
impl ImmutableFileObserver for TestImmutableFileObserver {
async fn get_last_immutable_number(&self) -> Result<u64, Box<dyn Error + Sync + Send>> {
match self.shall_return {
Some(n) => Ok(n),
None => Err(Box::new(ImmutableFileListingError::MetadataParsing(
std::io::Error::new(ErrorKind::Unsupported, "test error"),
))),
}
}
}

#[tokio::test]
async fn test_beacon_ok() {
let beacon_provider = BeaconProviderImpl::new(
Arc::new(RwLock::new(TestChainObserver {})),
Arc::new(RwLock::new(TestImmutableFileObserver::new())),
"whatever",
);
let beacon = beacon_provider.get_current_beacon().await.unwrap();

assert_eq!(42, beacon.epoch);
assert_eq!(119_827, beacon.immutable_file_number);
}

#[tokio::test]
async fn test_beacon_error() {
let mut immutable_observer = TestImmutableFileObserver::new();
immutable_observer.shall_return(None);
let beacon_provider = BeaconProviderImpl::new(
Arc::new(RwLock::new(TestChainObserver {})),
Arc::new(RwLock::new(immutable_observer)),
"whatever",
);

let result = beacon_provider.get_current_beacon().await;
assert!(result.is_err());
}
}
28 changes: 27 additions & 1 deletion mithril-aggregator/src/dependency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ use mithril_common::store::stake_store::StakeStore;
use super::entities::*;
use super::multi_signer::MultiSigner;
use super::snapshot_stores::SnapshotStore;
use crate::beacon_provider::ImmutableFileObserver;
use crate::beacon_store::BeaconStore;
use crate::snapshot_uploaders::SnapshotUploader;
use crate::{
CertificatePendingStore, CertificateStore, SingleSignatureStore, VerificationKeyStore,
BeaconProvider, CertificatePendingStore, CertificateStore, SingleSignatureStore,
VerificationKeyStore,
};

/// BeaconStoreWrapper wraps a BeaconStore
Expand Down Expand Up @@ -43,6 +45,12 @@ pub type SingleSignatureStoreWrapper = Arc<RwLock<SingleSignatureStore>>;
/// ChainObserverWrapper wraps a ChainObserver
pub type ChainObserverWrapper = Arc<RwLock<dyn ChainObserver>>;

/// BeaconProviderWrapper wraps a BeaconProvider
pub type BeaconProviderWrapper = Arc<RwLock<dyn BeaconProvider>>;

/// BeaconProviderWrapper wraps a BeaconProvider
pub type ImmutableFileObserverWrapper = Arc<RwLock<dyn ImmutableFileObserver>>;

/// DependencyManager handles the dependencies
pub struct DependencyManager {
pub config: Config,
Expand All @@ -56,6 +64,8 @@ pub struct DependencyManager {
pub stake_store: Option<StakeStoreWrapper>,
pub single_signature_store: Option<SingleSignatureStoreWrapper>,
pub chain_observer: Option<ChainObserverWrapper>,
pub beacon_provider: Option<BeaconProviderWrapper>,
pub immutable_file_observer: Option<ImmutableFileObserverWrapper>,
}

impl DependencyManager {
Expand All @@ -73,9 +83,17 @@ impl DependencyManager {
stake_store: None,
single_signature_store: None,
chain_observer: None,
beacon_provider: None,
immutable_file_observer: None,
}
}

/// With BeaconProvider middleware
pub fn with_beacon_provider(&mut self, beacon_provider: BeaconProviderWrapper) -> &mut Self {
self.beacon_provider = Some(beacon_provider);
self
}

/// With SnapshotStore middleware
pub fn with_snapshot_store(&mut self, snapshot_store: SnapshotStoreWrapper) -> &mut Self {
self.snapshot_store = Some(snapshot_store);
Expand Down Expand Up @@ -151,6 +169,14 @@ impl DependencyManager {
self
}

pub fn with_immutable_file_observer(
&mut self,
immutable_file_observer: ImmutableFileObserverWrapper,
) -> &mut Self {
self.immutable_file_observer = Some(immutable_file_observer);
self
}

#[cfg(test)]
pub fn fake() -> DependencyManager {
let config = Config {
Expand Down
4 changes: 4 additions & 0 deletions mithril-aggregator/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod beacon_provider;
mod beacon_store;
mod dependency;
mod entities;
Expand All @@ -13,6 +14,9 @@ mod tools;
pub use crate::entities::Config;
pub use crate::multi_signer::{MultiSigner, MultiSignerImpl, ProtocolError};
pub use crate::snapshot_stores::{RemoteSnapshotStore, SnapshotStore};
pub use beacon_provider::{
BeaconProvider, BeaconProviderImpl, ImmutableFileObserver, ImmutableFileSystemObserver,
};
pub use beacon_store::{BeaconStore, BeaconStoreError, MemoryBeaconStore};
pub use dependency::DependencyManager;
pub use http_server::Server;
Expand Down
19 changes: 15 additions & 4 deletions mithril-aggregator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ use clap::Parser;

use config::{Map, Source, Value, ValueKind};
use mithril_aggregator::{
AggregatorConfig, AggregatorRunner, AggregatorRuntime, CertificatePendingStore,
CertificateStore, Config, DependencyManager, MemoryBeaconStore, MultiSigner, MultiSignerImpl,
Server, SingleSignatureStore, VerificationKeyStore,
AggregatorConfig, AggregatorRunner, AggregatorRuntime, BeaconProviderImpl,
CertificatePendingStore, CertificateStore, Config, DependencyManager,
ImmutableFileSystemObserver, MemoryBeaconStore, MultiSigner, MultiSignerImpl, Server,
SingleSignatureStore, VerificationKeyStore,
};
use mithril_common::chain_observer::FakeObserver;
use mithril_common::fake_data;
Expand Down Expand Up @@ -150,6 +151,14 @@ async fn main() -> Result<(), Box<dyn Error>> {
single_signature_store.clone(),
)));
let chain_observer = Arc::new(RwLock::new(FakeObserver::new()));
let immutable_file_observer = Arc::new(RwLock::new(ImmutableFileSystemObserver::new(
&config.db_directory,
)));
let beacon_provider = Arc::new(RwLock::new(BeaconProviderImpl::new(
chain_observer.clone(),
immutable_file_observer.clone(),
&config.network,
)));
setup_dependencies_fake_data(multi_signer.clone()).await;

// Init dependency manager
Expand All @@ -164,7 +173,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
.with_verification_key_store(verification_key_store.clone())
.with_stake_store(stake_store.clone())
.with_single_signature_store(single_signature_store.clone())
.with_chain_observer(chain_observer.clone());
.with_chain_observer(chain_observer.clone())
.with_beacon_provider(beacon_provider.clone())
.with_immutable_file_observer(immutable_file_observer);
let dependency_manager = Arc::new(dependency_manager);

// Start snapshot uploader
Expand Down
33 changes: 8 additions & 25 deletions mithril-aggregator/src/runtime/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::snapshot_uploaders::SnapshotLocation;
use crate::{DependencyManager, SnapshotError, Snapshotter};
use async_trait::async_trait;
use chrono::Utc;
use mithril_common::digesters::{Digester, DigesterResult, ImmutableDigester, ImmutableFile};
use mithril_common::digesters::{Digester, DigesterResult, ImmutableDigester};
use mithril_common::entities::{
Beacon, Certificate, CertificatePending, SignerWithStake, Snapshot,
};
Expand Down Expand Up @@ -124,37 +124,20 @@ impl AggregatorRunnerTrait for AggregatorRunner {
maybe_beacon: Option<Beacon>,
) -> Result<Option<Beacon>, RuntimeError> {
info!("checking if there is a new beacon");
debug!(
"checking immutables in directory {}",
self.config.db_directory.to_string_lossy()
);
let db_path: &Path = self.config.db_directory.as_path();
let immutable_file_number = ImmutableFile::list_completed_in_dir(db_path)
.map_err(RuntimeError::ImmutableFile)?
.into_iter()
.last()
.ok_or_else(|| {
RuntimeError::General("no immutable file was returned".to_string().into())
})?
.number;
let epoch = self
let current_beacon = self
.config
.dependencies
.chain_observer
.beacon_provider
.as_ref()
.ok_or_else(|| {
RuntimeError::General("no chain observer registered".to_string().into())
RuntimeError::General("no beacon provider registered".to_string().into())
})?
.read()
.await
.get_current_epoch()
.await?
.ok_or_else(|| RuntimeError::General("no epoch was returned".to_string().into()))?;
let current_beacon = Beacon {
network: self.config.network.clone(),
epoch,
immutable_file_number,
};
.get_current_beacon()
.await
.map_err(RuntimeError::General)?;

debug!("checking if there is a new beacon: {:?}", current_beacon);

match maybe_beacon {
Expand Down
Loading