Skip to content

Greg/221/certificate store #222

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions mithril-aggregator/config/dev.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
"url_snapshot_manifest": "https://storage.googleapis.com/cardano-testnet/snapshots.json",
"snapshot_store_type": "local",
"snapshot_uploader_type": "local",
"pending_certificate_store_directory": "/tmp/mithril/cert_db"
}
"pending_certificate_store_directory": "/tmp/mithril/pending_cert_db",
"certificate_store_directory": "/tmp/mithril/cert_db"
}
113 changes: 113 additions & 0 deletions mithril-aggregator/src/certificate_store/certificate_store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
#![allow(dead_code)]
/// ↑ only until we plug this into main code
use mithril_common::entities::Certificate;

use thiserror::Error;

use super::{store_adapter::AdapterError, StoreAdapter};

#[derive(Debug, Error)]
pub enum StoreError {
#[error("physical adapter returned an error: {0}")]
AdapterError(#[from] AdapterError),
}

type Adapter = Box<dyn StoreAdapter<Key = String, Record = Certificate>>;

pub struct CertificateStore {
adapter: Adapter,
}

impl CertificateStore {
pub fn new(adapter: Adapter) -> Self {
Self { adapter }
}

pub async fn get_from_hash(&self, hash: &str) -> Result<Option<Certificate>, StoreError> {
Ok(self.adapter.get_record(&hash.to_string()).await?)
}

pub async fn save(&mut self, certificate: Certificate) -> Result<(), StoreError> {
Ok(self
.adapter
.store_record(&certificate.hash, &certificate)
.await?)
}

pub async fn get_list(&self, last_n: usize) -> Result<Vec<Certificate>, StoreError> {
let vars = self.adapter.get_last_n_records(last_n).await?;
let result = vars.into_iter().map(|(_, y)| y).collect();

Ok(result)
}
}

#[cfg(test)]
mod test {
use super::*;
use crate::certificate_store::dumb_adapter::DumbStoreAdapter;
use mithril_common::fake_data::{self};

async fn get_certificate_store(size: u64) -> CertificateStore {
let mut adapter: DumbStoreAdapter<String, Certificate> = DumbStoreAdapter::new();

for ix in 0..size {
let certificate = fake_data::certificate(format!("cert_{:0>2}", ix));
adapter
.store_record(&certificate.hash, &certificate)
.await
.unwrap();
}
let store = CertificateStore::new(Box::new(adapter));

store
}

#[tokio::test]
async fn list_is_empty() {
let store = get_certificate_store(0).await;

assert_eq!(0, store.get_list(100).await.unwrap().len());
}

#[tokio::test]
async fn list_has_some_members() {
let store = get_certificate_store(1).await;

assert_eq!(1, store.get_list(100).await.unwrap().len());
}

#[tokio::test]
async fn get_certificate_with_good_hash() {
let store = get_certificate_store(1).await;
let result = store.get_from_hash("cert_00").await.unwrap();
assert!(result.is_some());
}

#[tokio::test]
async fn get_certificate_with_wrong_hash() {
let store = get_certificate_store(1).await;
let result = store.get_from_hash("cert_99").await.unwrap();
assert!(result.is_none());
}

#[tokio::test]
async fn save_certificate_once() {
let mut store = get_certificate_store(1).await;
let certificate = fake_data::certificate(format!("{}", "123".to_string()));

assert!(store.save(certificate).await.is_ok());
}

#[tokio::test]
async fn update_certificate() {
let mut store = get_certificate_store(1).await;
let mut certificate = store.get_from_hash("cert_00").await.unwrap().unwrap();

certificate.previous_hash = "whatever".to_string();
assert!(store.save(certificate).await.is_ok());
let certificate = store.get_from_hash("cert_00").await.unwrap().unwrap();

assert_eq!("whatever".to_string(), certificate.previous_hash);
}
}
2 changes: 2 additions & 0 deletions mithril-aggregator/src/certificate_store/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
mod certificate_store;
mod jsonfile_store_adapter;
mod pending_certificate_store;
mod store_adapter;

pub use certificate_store::CertificateStore;
pub use jsonfile_store_adapter::JsonFileStoreAdapter;
pub use pending_certificate_store::CertificatePendingStore;
pub use store_adapter::{AdapterError, StoreAdapter};
Expand Down
14 changes: 13 additions & 1 deletion mithril-aggregator/src/dependency.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::beacon_store::BeaconStore;
use crate::CertificatePendingStore;
use crate::{CertificatePendingStore, CertificateStore};
use std::sync::Arc;
use tokio::sync::RwLock;

Expand All @@ -18,13 +18,16 @@ pub type MultiSignerWrapper = Arc<RwLock<dyn MultiSigner>>;

pub type CertificatePendingStoreWrapper = Arc<RwLock<CertificatePendingStore>>;

pub type CertificateStoreWrapper = Arc<RwLock<CertificateStore>>;

/// DependencyManager handles the dependencies
pub struct DependencyManager {
pub config: Config,
pub snapshot_store: Option<SnapshotStoreWrapper>,
pub multi_signer: Option<MultiSignerWrapper>,
pub beacon_store: Option<BeaconStoreWrapper>,
pub certificate_pending_store: Option<CertificatePendingStoreWrapper>,
pub certificate_store: Option<CertificateStoreWrapper>,
}

impl DependencyManager {
Expand All @@ -36,6 +39,7 @@ impl DependencyManager {
multi_signer: None,
beacon_store: None,
certificate_pending_store: None,
certificate_store: None,
}
}

Expand Down Expand Up @@ -64,4 +68,12 @@ impl DependencyManager {
self.certificate_pending_store = Some(certificate_pending_store);
self
}

pub fn with_certificate_store(
&mut self,
certificate_store: CertificateStoreWrapper,
) -> &mut Self {
self.certificate_store = Some(certificate_store);
self
}
}
3 changes: 3 additions & 0 deletions mithril-aggregator/src/entities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ pub struct Config {

/// Directory to store pending certificates
pub pending_certificate_store_directory: PathBuf,

/// Directory to store certificates
pub certificate_store_directory: PathBuf,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
Expand Down
4 changes: 3 additions & 1 deletion mithril-aggregator/src/http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,9 @@ mod tests {
server_url: "http://0.0.0.0:8080".to_string(),
db_directory: Default::default(),
snapshot_directory: Default::default(),
pending_certificate_store_directory: std::env::temp_dir().join("mithril_test_cert_db"),
pending_certificate_store_directory: std::env::temp_dir()
.join("mithril_test_pending_cert_db"),
certificate_store_directory: std::env::temp_dir().join("mithril_test_cert_db"),
};
DependencyManager::new(config)
}
Expand Down
2 changes: 1 addition & 1 deletion mithril-aggregator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub use crate::multi_signer::{MultiSigner, MultiSignerImpl, ProtocolError};
pub use crate::snapshot_stores::{RemoteSnapshotStore, SnapshotStore};
pub use beacon_store::{BeaconStore, BeaconStoreError, MemoryBeaconStore};
pub use certificate_store::{
AdapterError, CertificatePendingStore, JsonFileStoreAdapter, StoreAdapter,
AdapterError, CertificatePendingStore, CertificateStore, JsonFileStoreAdapter, StoreAdapter,
};
pub use dependency::DependencyManager;
pub use runtime::AggregatorRuntime;
Expand Down
12 changes: 9 additions & 3 deletions mithril-aggregator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use clap::Parser;

use config::{Map, Source, Value, ValueKind};
use mithril_aggregator::{
AggregatorRuntime, CertificatePendingStore, Config, DependencyManager, JsonFileStoreAdapter,
MemoryBeaconStore, MultiSigner, MultiSignerImpl, Server,
AggregatorRuntime, CertificatePendingStore, CertificateStore, Config, DependencyManager,
JsonFileStoreAdapter, MemoryBeaconStore, MultiSigner, MultiSignerImpl, Server,
};
use mithril_common::crypto_helper::ProtocolStakeDistribution;
use mithril_common::fake_data;
Expand Down Expand Up @@ -132,14 +132,18 @@ async fn main() -> Result<(), Box<dyn Error>> {
let certificate_pending_store = Arc::new(RwLock::new(CertificatePendingStore::new(Box::new(
JsonFileStoreAdapter::new(config.pending_certificate_store_directory.clone())?,
))));
let certificate_store = Arc::new(RwLock::new(CertificateStore::new(Box::new(
JsonFileStoreAdapter::new(config.certificate_store_directory.clone())?,
))));

// Init dependency manager
let mut dependency_manager = DependencyManager::new(config.clone());
dependency_manager
.with_snapshot_store(snapshot_store.clone())
.with_multi_signer(multi_signer.clone())
.with_beacon_store(beacon_store.clone())
.with_certificate_pending_store(certificate_pending_store.clone());
.with_certificate_pending_store(certificate_pending_store.clone())
.with_certificate_store(certificate_store.clone());
let dependency_manager = Arc::new(dependency_manager);

// Start snapshot uploader
Expand All @@ -154,6 +158,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
multi_signer.clone(),
snapshot_store.clone(),
snapshot_uploader,
certificate_pending_store.clone(),
certificate_store.clone(),
);
runtime.run().await
});
Expand Down
11 changes: 11 additions & 0 deletions mithril-aggregator/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use mithril_common::digesters::{Digester, DigesterError, ImmutableDigester};
use mithril_common::entities::Beacon;
use mithril_common::fake_data;

use crate::dependency::{CertificatePendingStoreWrapper, CertificateStoreWrapper};
use crate::snapshot_stores::SnapshotStoreError;
use crate::snapshot_uploaders::{SnapshotLocation, SnapshotUploader};
use chrono::{DateTime, Utc};
Expand Down Expand Up @@ -68,6 +69,12 @@ pub struct AggregatorRuntime {

/// Snapshot uploader
snapshot_uploader: Box<dyn SnapshotUploader>,

/// Pending certificate store
certificate_pending_store: CertificatePendingStoreWrapper,

/// Certificate store
certificate_store: CertificateStoreWrapper,
}

impl AggregatorRuntime {
Expand All @@ -83,6 +90,8 @@ impl AggregatorRuntime {
multi_signer: MultiSignerWrapper,
snapshot_store: SnapshotStoreWrapper,
snapshot_uploader: Box<dyn SnapshotUploader>,
certificate_pending_store: CertificatePendingStoreWrapper,
certificate_store: CertificateStoreWrapper,
) -> Self {
Self {
interval,
Expand All @@ -93,6 +102,8 @@ impl AggregatorRuntime {
multi_signer,
snapshot_store,
snapshot_uploader,
certificate_pending_store,
certificate_store,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ aggregatorProcess cwd port = do
("URL_SNAPSHOT_MANIFEST", "https://storage.googleapis.com/cardano-testnet/snapshots.json"),
("SNAPSHOT_STORE_TYPE", "local"),
("SNAPSHOT_UPLOADER_TYPE", "local"),
("PENDING_CERTIFICATE_STORE_DIRECTORY", "./pending-certs")
("PENDING_CERTIFICATE_STORE_DIRECTORY", "./store/pending-certs"),
("CERTIFICATE_STORE_DIRECTORY", "./store/certs")
]
<> baseEnv
unlessM (doesFileExist aggregator) $ failure $ "cannot find mithril-aggregator executable in expected location (" <> binDir <> ")"
Expand Down