Skip to content

Feat: implement a signature processor for DMQ #2477

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
15 changes: 15 additions & 0 deletions mithril-aggregator/src/commands/serve_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,20 @@ impl ServeCommand {
Ok(())
});

let signature_processor = dependencies_builder
.create_signature_processor()
.await
.with_context(|| "Dependencies Builder can not create signature processor")?;
let signature_processor_clone = signature_processor.clone();
join_set.spawn(async move {
signature_processor_clone
.run()
.await
.map_err(|e| e.to_string())?;

Ok(())
});

// Create a SignersImporter only if the `cexplorer_pools_url` is provided in the config.
if let Some(cexplorer_pools_url) = config.cexplorer_pools_url {
match dependencies_builder
Expand Down Expand Up @@ -272,6 +286,7 @@ impl ServeCommand {
if !preload_task.is_finished() {
preload_task.abort();
}
signature_processor.stop().await?;

info!(root_logger, "Event store is finishing...");
event_store_thread.await.unwrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::dependency_injection::{DependenciesBuilder, Result};
use crate::get_dependency;
use crate::services::{
AggregatorClient, AggregatorHTTPClient, MessageService, MithrilMessageService,
SequentialSignatureProcessor, SignatureConsumer, SignatureConsumerNoop, SignatureProcessor,
};
impl DependenciesBuilder {
async fn build_signed_entity_type_lock(&mut self) -> Result<Arc<SignedEntityTypeLock>> {
Expand All @@ -26,7 +27,7 @@ impl DependenciesBuilder {
get_dependency!(self.signed_entity_type_lock)
}

/// build HTTP message service
/// Builds HTTP message service
pub async fn build_message_service(&mut self) -> Result<Arc<dyn MessageService>> {
let certificate_repository = Arc::new(CertificateRepository::new(
self.get_sqlite_connection().await?,
Expand All @@ -49,7 +50,7 @@ impl DependenciesBuilder {
get_dependency!(self.message_service)
}

/// build an [AggregatorClient]
/// Builds an [AggregatorClient]
pub async fn build_leader_aggregator_client(&mut self) -> Result<Arc<dyn AggregatorClient>> {
let leader_aggregator_endpoint = self
.configuration
Expand All @@ -70,4 +71,22 @@ impl DependenciesBuilder {
pub async fn get_leader_aggregator_client(&mut self) -> Result<Arc<dyn AggregatorClient>> {
get_dependency!(self.leader_aggregator_client)
}

/// Builds a [SignatureConsumer]
pub async fn build_signature_consumer(&mut self) -> Result<Arc<dyn SignatureConsumer>> {
let signature_consumer = SignatureConsumerNoop;

Ok(Arc::new(signature_consumer))
}

/// Builds a [SignatureProcessor]
pub async fn create_signature_processor(&mut self) -> Result<Arc<dyn SignatureProcessor>> {
let signature_processor = SequentialSignatureProcessor::new(
self.build_signature_consumer().await?,
self.get_certifier_service().await?,
self.root_logger(),
);

Ok(Arc::new(signature_processor))
}
}
4 changes: 4 additions & 0 deletions mithril-aggregator/src/services/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ mod epoch_service;
mod message;
mod prover;
mod signable_builder;
mod signature_consumer;
mod signature_processor;
mod signed_entity;
mod signer_registration;
mod snapshotter;
Expand All @@ -30,6 +32,8 @@ pub use epoch_service::*;
pub use message::*;
pub use prover::*;
pub use signable_builder::*;
pub use signature_consumer::*;
pub use signature_processor::*;
pub use signed_entity::*;
pub use signer_registration::*;
pub use snapshotter::*;
Expand Down
12 changes: 12 additions & 0 deletions mithril-aggregator/src/services/signature_consumer/interface.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
use mithril_common::{
entities::{SignedEntityType, SingleSignature},
StdResult,
};

/// A signature consumer which blocks until a messages are available.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// A signature consumer which blocks until a messages are available.
/// A signature consumer which blocks until a message is available.

#[cfg_attr(test, mockall::automock)]
#[async_trait::async_trait]
pub trait SignatureConsumer: Sync + Send {
/// Returns signatures when available
async fn get_signatures(&self) -> StdResult<Vec<(SingleSignature, SignedEntityType)>>;
}
5 changes: 5 additions & 0 deletions mithril-aggregator/src/services/signature_consumer/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
mod interface;
mod noop;

pub use interface::*;
pub use noop::*;
42 changes: 42 additions & 0 deletions mithril-aggregator/src/services/signature_consumer/noop.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use std::future;

use async_trait::async_trait;

use super::SignatureConsumer;

/// A no-op implementation of the [SignatureConsumer] trait that will never return signatures.
pub struct SignatureConsumerNoop;

#[async_trait]
impl SignatureConsumer for SignatureConsumerNoop {
async fn get_signatures(
&self,
) -> mithril_common::StdResult<
Vec<(
mithril_common::entities::SingleSignature,
mithril_common::entities::SignedEntityType,
)>,
> {
future::pending().await
}
}

#[cfg(test)]
mod tests {
use anyhow::anyhow;
use tokio::time::{sleep, Duration};

use super::*;

#[tokio::test]
async fn signature_consumer_noop_never_returns() {
let consumer = SignatureConsumerNoop;

let result = tokio::select!(
_res = sleep(Duration::from_millis(100)) => {Err(anyhow!("Timeout"))},
_res = consumer.get_signatures() => {Ok(())},
);

result.expect_err("Should have timed out");
}
}
209 changes: 209 additions & 0 deletions mithril-aggregator/src/services/signature_processor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
use std::sync::Arc;

use slog::{error, warn, Logger};

use mithril_common::{logging::LoggerExtensions, StdResult};
use tokio::sync::Mutex;

use super::{CertifierService, SignatureConsumer};

/// A signature processor which receives signature and processes them.
#[cfg_attr(test, mockall::automock)]
#[async_trait::async_trait]
pub trait SignatureProcessor: Sync + Send {
/// Processes the signatures received from the consumer.
async fn process_signatures(&self) -> StdResult<()>;

/// Starts the processor, which will run indefinitely, processing signatures as they arrive.
async fn run(&self) -> StdResult<()> {
loop {
self.process_signatures().await?;
}
}

/// Stops the processor. This method should be called to gracefully shut down the processor.
async fn stop(&self) -> StdResult<()>;
}

/// A sequential signature processor receives messages and processes them sequentially
pub struct SequentialSignatureProcessor {
consumer: Arc<dyn SignatureConsumer>,
certifier: Arc<dyn CertifierService>,
logger: Logger,
stop: Mutex<bool>,
}

impl SequentialSignatureProcessor {
/// Creates a new `SignatureProcessor` instance.
pub fn new(
consumer: Arc<dyn SignatureConsumer>,
certifier: Arc<dyn CertifierService>,
logger: Logger,
) -> Self {
Self {
consumer,
certifier,
logger: logger.new_with_component_name::<Self>(),
stop: Mutex::new(false),
}
}
}

#[async_trait::async_trait]
impl SignatureProcessor for SequentialSignatureProcessor {
async fn process_signatures(&self) -> StdResult<()> {
if *self.stop.lock().await {
warn!(self.logger, "Stopped signature processor");
return Ok(());
}

match self.consumer.get_signatures().await {
Ok(signatures) => {
for (signature, signed_entity_type) in signatures {
if let Err(e) = self
.certifier
.register_single_signature(&signed_entity_type, &signature)
.await
{
error!(self.logger, "Error dispatching single signature"; "error" => ?e);
}
}
}
Err(e) => {
error!(self.logger, "Error consuming single signatures"; "error" => ?e);
}
}

Ok(())
}

async fn stop(&self) -> StdResult<()> {
warn!(self.logger, "Stopping signature processor...");
*self.stop.lock().await = true;

Ok(())
}
}

#[cfg(test)]
mod tests {
use anyhow::anyhow;
use mithril_common::{
entities::{Epoch, SignedEntityType},
test_utils::fake_data,
};
use mockall::predicate::eq;
use tokio::time::{sleep, Duration};

use crate::{
services::{MockCertifierService, MockSignatureConsumer, SignatureRegistrationStatus},
test_tools::TestLogger,
};

use super::*;

#[tokio::test]
async fn processor_process_signatures_succeeds() {
let logger = TestLogger::stdout();
let mock_consumer = {
let mut mock_consumer = MockSignatureConsumer::new();
mock_consumer
.expect_get_signatures()
.returning(|| {
Ok(vec![
(
fake_data::single_signature(vec![1, 2, 3]),
SignedEntityType::MithrilStakeDistribution(Epoch(1)),
),
(
fake_data::single_signature(vec![4, 5, 6]),
SignedEntityType::MithrilStakeDistribution(Epoch(2)),
),
])
})
.times(1);
mock_consumer
};
let mock_certifier = {
let mut mock_certifier = MockCertifierService::new();
mock_certifier
.expect_register_single_signature()
.with(
eq(SignedEntityType::MithrilStakeDistribution(Epoch(1))),
eq(fake_data::single_signature(vec![1, 2, 3])),
)
.returning(|_, _| Ok(SignatureRegistrationStatus::Registered))
.times(1);
mock_certifier
.expect_register_single_signature()
.with(
eq(SignedEntityType::MithrilStakeDistribution(Epoch(2))),
eq(fake_data::single_signature(vec![4, 5, 6])),
)
.returning(|_, _| Ok(SignatureRegistrationStatus::Registered))
.times(1);

mock_certifier
};
let processor = SequentialSignatureProcessor::new(
Arc::new(mock_consumer),
Arc::new(mock_certifier),
logger,
);

processor
.process_signatures()
.await
.expect("Failed to process signatures");
}

#[tokio::test]
async fn processor_run_succeeds() {
let logger = TestLogger::stdout();
let mock_consumer = {
let mut mock_consumer = MockSignatureConsumer::new();
mock_consumer
.expect_get_signatures()
.returning(|| Err(anyhow!("Error consuming signatures")))
.times(1);
mock_consumer
.expect_get_signatures()
.returning(|| {
Ok(vec![(
fake_data::single_signature(vec![1, 2, 3]),
SignedEntityType::MithrilStakeDistribution(Epoch(1)),
)])
})
.times(1);
mock_consumer
.expect_get_signatures()
.returning(|| Ok(vec![]));
mock_consumer
};
let mock_certifier = {
let mut mock_certifier = MockCertifierService::new();
mock_certifier
.expect_register_single_signature()
.with(
eq(SignedEntityType::MithrilStakeDistribution(Epoch(1))),
eq(fake_data::single_signature(vec![1, 2, 3])),
)
.returning(|_, _| Ok(SignatureRegistrationStatus::Registered))
.times(1);

mock_certifier
};
let processor = SequentialSignatureProcessor::new(
Arc::new(mock_consumer),
Arc::new(mock_certifier),
logger,
);

tokio::select!(
_res = processor.run() => {},
_res = sleep(Duration::from_millis(10)) => {
processor.stop().await.expect("Failed to stop processor");
},
);
}
}
12 changes: 6 additions & 6 deletions mithril-signer/src/services/signature_publisher/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
mod delayer;
mod http;
mod interface;
mod signature_publisher_delayer;
mod signature_publisher_noop;
mod signature_publisher_retrier;
mod noop;
mod retrier;

pub use delayer::*;
pub use interface::*;
pub use signature_publisher_delayer::*;
pub use signature_publisher_noop::*;
pub use signature_publisher_retrier::*;
pub use noop::*;
pub use retrier::*;