Skip to content

Commit fa2cf90

Browse files
authored
Merge pull request #776 from input-output-hk/ensemble/665/runtime_error_handling
#665 handling errors better
2 parents 52cef88 + 43516f0 commit fa2cf90

28 files changed

+900
-351
lines changed

Cargo.lock

+3-3
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

mithril-aggregator/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "mithril-aggregator"
3-
version = "0.2.25"
3+
version = "0.2.26"
44
description = "A Mithril Aggregator server"
55
authors = { workspace = true }
66
edition = { workspace = true }

mithril-aggregator/src/command_args.rs

+53-33
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
use clap::{Parser, Subcommand};
22
use config::{builder::DefaultState, ConfigBuilder, Map, Source, Value, ValueKind};
33
use slog::Level;
4-
use slog_scope::debug;
5-
use std::{error::Error, fs, path::PathBuf, sync::Arc};
6-
use tokio::{sync::RwLock, time::Duration};
4+
use slog_scope::{crit, debug, info};
5+
use std::{error::Error, fs, net::IpAddr, path::PathBuf, sync::Arc};
6+
use tokio::{
7+
sync::{oneshot, RwLock},
8+
task::JoinSet,
9+
time::Duration,
10+
};
711

812
use mithril_common::{
913
certificate_chain::MithrilCertificateVerifier,
@@ -24,11 +28,12 @@ use mithril_common::{
2428

2529
use crate::{
2630
event_store::{self, TransmitterService},
31+
http_server::routes::router,
2732
tools::{EraTools, GenesisTools, GenesisToolsDependency},
2833
AggregatorConfig, AggregatorRunner, AggregatorRuntime, CertificatePendingStore,
2934
CertificateStore, Configuration, DefaultConfiguration, DependencyManager, GenesisConfiguration,
3035
GzipSnapshotter, MithrilSignerRegisterer, MultiSignerImpl, ProtocolParametersStore,
31-
ProtocolParametersStorer, Server, SingleSignatureStore, VerificationKeyStore,
36+
ProtocolParametersStorer, SingleSignatureStore, VerificationKeyStore,
3237
};
3338

3439
const SQLITE_MONITORING_FILE: &str = "monitoring.sqlite3";
@@ -454,51 +459,66 @@ impl ServeCommand {
454459
.await?;
455460

456461
let network = config.get_network()?;
457-
let runtime_dependencies = dependency_manager.clone();
462+
let runtime_dependency_manager = dependency_manager.clone();
458463

459-
// start the monitoring thread
460-
let event_store_thread = tokio::spawn(async move {
461-
event_store
462-
.run(Some(
463-
config.data_stores_directory.join(SQLITE_MONITORING_FILE),
464-
))
465-
.await
466-
.unwrap()
467-
});
464+
// start servers
465+
println!("Starting server...");
466+
println!("Press Ctrl+C to stop");
467+
let (shutdown_tx, shutdown_rx) = oneshot::channel();
468468

469-
// Start Aggregator state machine
470-
let handle = tokio::spawn(async move {
469+
let mut join_set = JoinSet::new();
470+
join_set.spawn(async move {
471471
let config =
472472
AggregatorConfig::new(config.run_interval, network, &config.db_directory.clone());
473473
let mut runtime = AggregatorRuntime::new(
474474
Duration::from_millis(config.interval),
475475
None,
476-
Arc::new(AggregatorRunner::new(config, runtime_dependencies)),
476+
Arc::new(AggregatorRunner::new(config, runtime_dependency_manager)),
477477
)
478478
.await
479479
.unwrap();
480-
runtime.run().await
480+
runtime.run().await.map_err(|e| e.to_string())
481481
});
482+
join_set.spawn(async move {
483+
let routes = router::routes(dependency_manager);
484+
let (_, server) = warp::serve(routes).bind_with_graceful_shutdown(
485+
(
486+
config.server_ip.clone().parse::<IpAddr>().unwrap(),
487+
config.server_port,
488+
),
489+
async {
490+
shutdown_rx.await.ok();
491+
},
492+
);
493+
server.await;
482494

483-
// Start REST server
484-
println!("Starting server...");
485-
println!("Press Ctrl+C to stop...");
486-
let shutdown_signal = async {
487-
tokio::signal::ctrl_c()
495+
Ok(())
496+
});
497+
join_set.spawn(async { tokio::signal::ctrl_c().await.map_err(|e| e.to_string()) });
498+
499+
// start the monitoring thread
500+
let event_store_thread = tokio::spawn(async move {
501+
event_store
502+
.run(Some(
503+
config.data_stores_directory.join(SQLITE_MONITORING_FILE),
504+
))
488505
.await
489-
.expect("failed to install CTRL+C signal handler");
490-
};
491-
let http_server = Server::new(
492-
config.server_ip.clone(),
493-
config.server_port,
494-
dependency_manager.clone(),
495-
);
496-
http_server.start(shutdown_signal).await;
506+
.unwrap()
507+
});
508+
509+
let res = join_set.join_next().await.unwrap()?;
510+
if let Err(e) = res {
511+
crit!("A critical error occurred: {e}");
512+
}
513+
514+
// stop servers
515+
join_set.shutdown().await;
516+
let _ = shutdown_tx.send(());
497517

498-
handle.abort();
518+
info!("Event store is finishing...");
499519
event_store_thread.await?;
520+
println!("Services stopped, exiting.");
500521

501-
println!("Exiting...");
502522
Ok(())
503523
}
504524

mithril-aggregator/src/configuration.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ impl Configuration {
161161
pub fn build_era_reader_adapter(
162162
&self,
163163
chain_observer: Arc<dyn ChainObserver>,
164-
) -> Result<Box<dyn EraReaderAdapter>, Box<dyn Error>> {
164+
) -> Result<Arc<dyn EraReaderAdapter>, Box<dyn Error>> {
165165
Ok(EraReaderAdapterBuilder::new(
166166
&self.era_reader_adapter_type,
167167
&self.era_reader_adapter_params,

mithril-aggregator/src/dependency.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,7 @@ pub mod tests {
361361
chain_observer.clone(),
362362
verification_key_store.clone(),
363363
));
364-
let era_reader = Arc::new(EraReader::new(Box::new(EraReaderBootstrapAdapter)));
364+
let era_reader = Arc::new(EraReader::new(Arc::new(EraReaderBootstrapAdapter)));
365365
let era_epoch_token = era_reader
366366
.read_era_epoch_token(beacon_provider.get_current_beacon().await.unwrap().epoch)
367367
.await
+2-3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
mod routes;
2-
mod server;
1+
pub mod routes;
32

4-
pub use server::{Server, SERVER_BASE_PATH};
3+
pub const SERVER_BASE_PATH: &str = "aggregator";

mithril-aggregator/src/http_server/server.rs

-35
This file was deleted.

mithril-aggregator/src/lib.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,12 @@ pub use crate::snapshot_stores::{LocalSnapshotStore, RemoteSnapshotStore, Snapsh
3636
pub use certificate_creator::{CertificateCreator, MithrilCertificateCreator};
3737
pub use command_args::MainOpts;
3838
pub use dependency::DependencyManager;
39-
pub use http_server::Server;
4039
pub use message_adapters::{
4140
FromRegisterSignerAdapter, ToCertificatePendingMessageAdapter, ToEpochSettingsMessageAdapter,
4241
};
43-
pub use runtime::{AggregatorConfig, AggregatorRunner, AggregatorRunnerTrait, AggregatorRuntime};
42+
pub use runtime::{
43+
AggregatorConfig, AggregatorRunner, AggregatorRunnerTrait, AggregatorRuntime, RuntimeError,
44+
};
4445
pub use signer_registerer::{
4546
MithrilSignerRegisterer, SignerRegisterer, SignerRegistrationError, SignerRegistrationRound,
4647
SignerRegistrationRoundOpener,
+71-56
Original file line numberDiff line numberDiff line change
@@ -1,65 +1,80 @@
1-
use crate::certificate_creator::CertificateCreationError;
2-
use crate::snapshot_stores::SnapshotStoreError;
3-
use crate::{ProtocolError, SignerRegistrationError, SnapshotError};
4-
5-
use mithril_common::certificate_chain::CertificateVerifierError;
6-
use mithril_common::chain_observer::ChainObserverError;
7-
use mithril_common::digesters::{ImmutableDigesterError, ImmutableFileListingError};
8-
use mithril_common::entities::BeaconComparisonError;
9-
use mithril_common::entities::Epoch;
10-
use mithril_common::store::StoreError;
11-
use mithril_common::BeaconProviderError;
121
use std::error::Error as StdError;
13-
use std::io;
142
use thiserror::Error;
153

4+
/// Error encountered or produced by the Runtime.
5+
/// This enum represents the faith of the errors produced during the state
6+
/// transitions.
167
#[derive(Error, Debug)]
178
pub enum RuntimeError {
18-
#[error("multi signer error: {0}")]
19-
MultiSigner(#[from] ProtocolError),
20-
21-
#[error("snapshotter error: {0}")]
22-
Snapshotter(#[from] SnapshotError),
23-
24-
#[error("digester error: {0}")]
25-
Digester(#[from] ImmutableDigesterError),
26-
27-
#[error("store error: {0}")]
28-
StoreError(#[from] StoreError),
29-
30-
#[error("snapshot uploader error: {0}")]
31-
SnapshotUploader(String),
32-
33-
#[error("snapshot build error: {0}")]
34-
SnapshotBuild(#[from] io::Error),
35-
36-
#[error("immutable file scanning error: {0}")]
37-
ImmutableFile(#[from] ImmutableFileListingError),
38-
39-
#[error("chain observer error: {0}")]
40-
ChainObserver(#[from] ChainObserverError),
41-
42-
#[error("beacon provider error: {0}")]
43-
BeaconProvider(#[from] BeaconProviderError),
44-
45-
#[error("certificate verifier error: {0}")]
46-
CertificateVerifier(#[from] CertificateVerifierError),
47-
48-
#[error("certificate chain gap error: {0} vs {1}")]
49-
CertificateChainEpochGap(Epoch, Epoch),
50-
51-
#[error("snapshot store error: {0}")]
52-
SnapshotStore(#[from] SnapshotStoreError),
53-
54-
#[error("certificate creation error: {0}")]
55-
CertificateCreation(#[from] CertificateCreationError),
9+
/// Errors that need the runtime to try again without changing its state.
10+
#[error("An error occured: {message}. This runtime cycle will be skipped. Nested error: {nested_error:#?}.")]
11+
KeepState {
12+
/// error message
13+
message: String,
14+
15+
/// Eventual caught error
16+
nested_error: Option<Box<dyn StdError + Sync + Send>>,
17+
},
18+
/// A Critical error means the Runtime stops and the software exits with an
19+
/// error code.
20+
#[error("Critical error:'{message}'. Nested error: {nested_error:#?}.")]
21+
Critical {
22+
/// error message
23+
message: String,
24+
25+
/// Eventual caught error
26+
nested_error: Option<Box<dyn StdError + Sync + Send>>,
27+
},
28+
}
5629

57-
#[error("beacon comparison error: {0}")]
58-
BeaconComparisonError(#[from] BeaconComparisonError),
30+
impl RuntimeError {
31+
/// Create a new KeepState error
32+
pub fn keep_state(message: &str, error: Option<Box<dyn StdError + Sync + Send>>) -> Self {
33+
Self::KeepState {
34+
message: message.to_string(),
35+
nested_error: error,
36+
}
37+
}
38+
39+
/// Create a new Critical error
40+
pub fn critical(message: &str, error: Option<Box<dyn StdError + Sync + Send>>) -> Self {
41+
Self::Critical {
42+
message: message.to_string(),
43+
nested_error: error,
44+
}
45+
}
46+
}
5947

60-
#[error("signer registration error: {0}")]
61-
SignerRegistration(#[from] SignerRegistrationError),
48+
impl From<Box<dyn StdError + Sync + Send>> for RuntimeError {
49+
fn from(value: Box<dyn StdError + Sync + Send>) -> Self {
50+
Self::KeepState {
51+
message: "Error caught, state preserved, will retry to cycle.".to_string(),
52+
nested_error: Some(value),
53+
}
54+
}
55+
}
6256

63-
#[error("general error: {0}")]
64-
General(Box<dyn StdError + Sync + Send>),
57+
/// Errors returned when the runner cannot fulfil its missions with no subsystem
58+
/// to fail.
59+
#[derive(Debug, Error)]
60+
pub enum RunnerError {
61+
/// Protocol message part is missing
62+
#[error("Missing protocol message: '{0}'.")]
63+
MissingProtocolMessage(String),
64+
65+
/// Epoch out of bounds
66+
#[error("Epoch out of bounds: '{0}'.")]
67+
EpochOutOfBounds(String),
68+
69+
/// No stack distribution found
70+
#[error("Missing stack distribution: '{0}'.")]
71+
MissingStakeDistribution(String),
72+
73+
/// Missing protocol parameters
74+
#[error("Missing protocol parameters: '{0}'.")]
75+
MissingProtocolParameters(String),
76+
77+
/// No AVK issued by the multisigner
78+
#[error("No MultiSignature issued: '{0}'.")]
79+
NoComputedMultiSignature(String),
6580
}

0 commit comments

Comments
 (0)