diff --git a/README.md b/README.md index 58bfe2a01..73293dafa 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@
- + ![Webb Logo](./assets/webb_banner_light.png#gh-light-mode-only) ![Webb Logo](./assets/webb_banner_dark.png#gh-dark-mode-only) @@ -29,7 +29,7 @@
  • Testing
  • Contributing
  • License
  • - +

    Getting Started 🎉

    @@ -131,3 +131,13 @@ If you have a contribution in mind, please check out our [Contribution Guide](./ Licensed under GNU General Public License v3.0. Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in this crate by you, as defined in the GNU General Public License v3.0 license, shall be licensed as above, without any additional terms or conditions. + + +## Troubleshooting +The linking phase may fail due to not finding libgmp (i.e., "could not find library -lgmp") when building on a mac M1. To fix this problem, run: + +```bash +brew install gmp +# make sure to run the commands below each time when starting a new env, or, append them to .zshrc +export LIBRARY_PATH=$LIBRARY_PATH:/opt/homebrew/lib +export INCLUDE_PATH=$INCLUDE_PATH:/opt/homebrew/include diff --git a/dkg-gadget/src/async_protocols/mod.rs b/dkg-gadget/src/async_protocols/mod.rs index 790cef693..b7f52607a 100644 --- a/dkg-gadget/src/async_protocols/mod.rs +++ b/dkg-gadget/src/async_protocols/mod.rs @@ -680,6 +680,7 @@ where recipient_id: maybe_recipient_id, payload, session_id: params.session_id, + ssid: params.handle.ssid, }; if let Err(err) = params.engine.sign_and_send_msg(unsigned_dkg_message) { params diff --git a/dkg-gadget/src/async_protocols/remote.rs b/dkg-gadget/src/async_protocols/remote.rs index 8da204078..6898b42fc 100644 --- a/dkg-gadget/src/async_protocols/remote.rs +++ b/dkg-gadget/src/async_protocols/remote.rs @@ -37,6 +37,8 @@ pub struct AsyncProtocolRemote { pub(crate) current_round_blame_tx: Arc>, pub(crate) session_id: SessionId, pub(crate) associated_block_id: u64, + /// The signing set index. For keygen, this is always 0 + pub(crate) ssid: u8, pub(crate) logger: DebugLogger, status_history: Arc>>, } @@ -74,6 +76,7 @@ impl Clone for AsyncProtocolRemote { logger: self.logger.clone(), status_history: self.status_history.clone(), associated_block_id: self.associated_block_id, + ssid: self.ssid, } } } @@ -101,6 +104,7 @@ impl AsyncProtocolRemote { session_id: SessionId, logger: DebugLogger, associated_block_id: u64, + ssid: u8, ) -> Self { let (stop_tx, stop_rx) = tokio::sync::mpsc::unbounded_channel(); let (tx_keygen_signing, rx_keygen_signing) = tokio::sync::mpsc::unbounded_channel(); @@ -113,31 +117,6 @@ impl AsyncProtocolRemote { let status = Arc::new(Atomic::new(MetaHandlerStatus::Beginning)); let status_history = Arc::new(Mutex::new(vec![MetaHandlerStatus::Beginning])); - // let status_debug = status.clone(); - // let status_history_debug = status_history.clone(); - // let logger_debug = logger.clone(); - - // The purpose of this task is to log the status of the meta handler - // in the case that it is stalled/not-progressing. This is useful for debugging. - // tokio::task::spawn(async move { - // loop { - // tokio::time::sleep(std::time::Duration::from_secs(2)).await; - // let status = status_debug.load(Ordering::Relaxed); - // if [MetaHandlerStatus::Terminated, MetaHandlerStatus::Complete].contains(&status) { - // break - // } - // let status_history = status_history_debug.lock(); - - // if status == MetaHandlerStatus::Beginning && status_history.len() == 1 { - // continue - // } - - // logger_debug.debug(format!( - // "AsyncProtocolRemote status: {status:?} ||||| history: {status_history:?} ||||| - // session_id: {session_id:?}", )); - // } - // }); - Self { status, tx_keygen_signing, @@ -156,6 +135,7 @@ impl AsyncProtocolRemote { is_primary_remote: false, session_id, associated_block_id, + ssid, } } diff --git a/dkg-gadget/src/async_protocols/sign/handler.rs b/dkg-gadget/src/async_protocols/sign/handler.rs index c16e1467f..2579bd115 100644 --- a/dkg-gadget/src/async_protocols/sign/handler.rs +++ b/dkg-gadget/src/async_protocols/sign/handler.rs @@ -255,6 +255,7 @@ where recipient_id: None, payload, session_id: params.session_id, + ssid: params.handle.ssid, }; params.engine.sign_and_send_msg(unsigned_dkg_message.clone())?; diff --git a/dkg-gadget/src/dkg_modules/mod.rs b/dkg-gadget/src/dkg_modules/mod.rs index bb4cac9b0..0d0192310 100644 --- a/dkg-gadget/src/dkg_modules/mod.rs +++ b/dkg-gadget/src/dkg_modules/mod.rs @@ -54,6 +54,7 @@ pub enum SigningProtocolSetupParameters { >, signing_set: Vec, associated_block_id: NumberFor, + ssid: u8, }, WTFrost {}, } diff --git a/dkg-gadget/src/dkg_modules/mp_ecdsa.rs b/dkg-gadget/src/dkg_modules/mp_ecdsa.rs index 2e108fb3e..fd3597524 100644 --- a/dkg-gadget/src/dkg_modules/mp_ecdsa.rs +++ b/dkg-gadget/src/dkg_modules/mp_ecdsa.rs @@ -50,6 +50,7 @@ where keygen_protocol_hash, } = params { + const KEYGEN_SSID: u8 = 0; match self.dkg_worker.generate_async_proto_params( best_authorities, authority_public_key, @@ -58,9 +59,10 @@ where stage, crate::DKG_KEYGEN_PROTOCOL_NAME, associated_block, + KEYGEN_SSID, ) { Ok(async_proto_params) => { - let err_handler_tx = self.dkg_worker.error_handler.clone(); + let err_handler_tx = self.dkg_worker.error_handler_channel.tx.clone(); let remote = async_proto_params.handle.clone(); let keygen_manager = self.dkg_worker.keygen_manager.clone(); @@ -152,6 +154,7 @@ where unsigned_proposal_batch, signing_set, associated_block_id, + ssid, } = params { self.dkg_worker.logger.debug(format!("{party_i:?} All Parameters: {best_authorities:?} | authority_pub_key: {authority_public_key:?} | session_id: {session_id:?} | threshold: {threshold:?} | stage: {stage:?} | unsigned_proposal_batch: {unsigned_proposal_batch:?} | signing_set: {signing_set:?} | associated_block_id: {associated_block_id:?}")); @@ -163,11 +166,12 @@ where stage, crate::DKG_SIGNING_PROTOCOL_NAME, associated_block_id, + ssid, )?; let handle = async_proto_params.handle.clone(); - let err_handler_tx = self.dkg_worker.error_handler.clone(); + let err_handler_tx = self.dkg_worker.error_handler_channel.tx.clone(); let meta_handler = GenericAsyncHandler::setup_signing( async_proto_params, threshold, diff --git a/dkg-gadget/src/gossip_messages/misbehaviour_report.rs b/dkg-gadget/src/gossip_messages/misbehaviour_report.rs index 244cf446a..3df12afd0 100644 --- a/dkg-gadget/src/gossip_messages/misbehaviour_report.rs +++ b/dkg-gadget/src/gossip_messages/misbehaviour_report.rs @@ -150,6 +150,7 @@ where recipient_id: None, session_id: report.session_id, payload, + ssid: 0, }; let encoded_dkg_message = message.encode(); diff --git a/dkg-gadget/src/gossip_messages/public_key_gossip.rs b/dkg-gadget/src/gossip_messages/public_key_gossip.rs index 7a2e1c094..e96ebe7cb 100644 --- a/dkg-gadget/src/gossip_messages/public_key_gossip.rs +++ b/dkg-gadget/src/gossip_messages/public_key_gossip.rs @@ -149,6 +149,7 @@ pub(crate) fn gossip_public_key( recipient_id: None, session_id: msg.session_id, payload, + ssid: 0, }; let encoded_dkg_message = message.encode(); diff --git a/dkg-gadget/src/lib.rs b/dkg-gadget/src/lib.rs index b1bc304f9..d2e3dd294 100644 --- a/dkg-gadget/src/lib.rs +++ b/dkg-gadget/src/lib.rs @@ -34,7 +34,7 @@ pub mod keystore; pub mod gossip_engine; mod keygen_manager; -mod signing_manager; +pub mod signing_manager; // mod meta_async_rounds; pub mod db; mod metrics; diff --git a/dkg-gadget/src/signing_manager/mod.rs b/dkg-gadget/src/signing_manager/mod.rs index 07a454681..5a12dcaba 100644 --- a/dkg-gadget/src/signing_manager/mod.rs +++ b/dkg-gadget/src/signing_manager/mod.rs @@ -56,6 +56,7 @@ const MAX_RUNNING_TASKS: usize = 4; const MAX_ENQUEUED_TASKS: usize = 20; // How often to poll the jobs to check completion status const JOB_POLL_INTERVAL_IN_MILLISECONDS: u64 = 500; +pub const MAX_POTENTIAL_SIGNING_SETS_PER_PROPOSAL: u8 = 2; impl SigningManager where @@ -200,65 +201,70 @@ where if we are not, we continue the loop. */ let unsigned_proposal_bytes = batch.encode(); - let concat_data = dkg_pub_key - .clone() - .into_iter() - //.chain(at.encode()) - .chain(unsigned_proposal_bytes) - .collect::>(); - let seed = sp_core::keccak_256(&concat_data); let unsigned_proposal_hash = batch.hash().expect("unable to hash proposal"); - let maybe_set = self - .generate_signers(&seed, threshold, best_authorities.clone(), dkg_worker) - .ok(); - if let Some(signing_set) = maybe_set { - // if we are in the set, send to work manager - if signing_set.contains(&party_i) { - dkg_worker.logger.info(format!( - "🕸️ Session Id {:?} | {}-out-of-{} signers: ({:?})", - session_id, - threshold, - best_authorities.len(), - signing_set, - )); + for ssid in 0..MAX_POTENTIAL_SIGNING_SETS_PER_PROPOSAL { + let concat_data = dkg_pub_key + .clone() + .into_iter() + .chain(unsigned_proposal_bytes.clone()) + .chain(ssid.encode()) + .collect::>(); + let seed = sp_core::keccak_256(&concat_data); - let params = SigningProtocolSetupParameters::MpEcdsa { - best_authorities: best_authorities.clone(), - authority_public_key: authority_public_key.clone(), - party_i, - session_id, - threshold, - stage: ProtoStageType::Signing { unsigned_proposal_hash }, - unsigned_proposal_batch: batch, - signing_set, - associated_block_id: *header.number(), - }; + let maybe_set = self + .generate_signers(&seed, threshold, best_authorities.clone(), dkg_worker) + .ok(); + if let Some(signing_set) = maybe_set { + // if we are in the set, send to work manager + if signing_set.contains(&party_i) { + dkg_worker.logger.info(format!( + "🕸️ Session Id {:?} | SSID {} | {}-out-of-{} signers: ({:?})", + session_id, + ssid, + threshold, + best_authorities.len(), + signing_set, + )); - let signing_protocol = dkg_worker - .dkg_modules - .get_signing_protocol(¶ms) - .expect("Standard signing protocol should exist"); - match signing_protocol.initialize_signing_protocol(params).await { - Ok((handle, task)) => { - // Send task to the work manager. Force start if the type chain ID is - // None, implying this is a proposal needed for rotating sessions and - // thus a priority - let force_start = typed_chain_id == TypedChainId::None; - self.work_manager.push_task( - unsigned_proposal_hash, - force_start, - handle, - task, - )?; - }, - Err(err) => { - dkg_worker - .logger - .error(format!("Error creating signing protocol: {:?}", &err)); - dkg_worker.handle_dkg_error(err.clone()).await; - return Err(err) - }, + let params = SigningProtocolSetupParameters::MpEcdsa { + best_authorities: best_authorities.clone(), + authority_public_key: authority_public_key.clone(), + party_i, + session_id, + threshold, + stage: ProtoStageType::Signing { unsigned_proposal_hash }, + unsigned_proposal_batch: batch.clone(), + signing_set, + associated_block_id: *header.number(), + ssid, + }; + + let signing_protocol = dkg_worker + .dkg_modules + .get_signing_protocol(¶ms) + .expect("Standard signing protocol should exist"); + match signing_protocol.initialize_signing_protocol(params).await { + Ok((handle, task)) => { + // Send task to the work manager. Force start if the type chain ID + // is None, implying this is a proposal needed for rotating sessions + // and thus a priority + let force_start = typed_chain_id == TypedChainId::None; + self.work_manager.push_task( + unsigned_proposal_hash, + force_start, + handle, + task, + )?; + }, + Err(err) => { + dkg_worker + .logger + .error(format!("Error creating signing protocol: {:?}", &err)); + dkg_worker.handle_dkg_error(err.clone()).await; + return Err(err) + }, + } } } } diff --git a/dkg-gadget/src/signing_manager/work_manager.rs b/dkg-gadget/src/signing_manager/work_manager.rs index ea70b6c04..3566b10fa 100644 --- a/dkg-gadget/src/signing_manager/work_manager.rs +++ b/dkg-gadget/src/signing_manager/work_manager.rs @@ -42,7 +42,8 @@ pub struct WorkManager { pub struct WorkManagerInner { pub active_tasks: HashSet>, pub enqueued_tasks: VecDeque>, - pub enqueued_messages: HashMap<[u8; 32], VecDeque>>, + // task hash => SSID => enqueued messages + pub enqueued_messages: HashMap<[u8; 32], HashMap>>>, } #[derive(Debug)] @@ -218,30 +219,41 @@ impl WorkManager { // Next, remove any outdated enqueued messages to prevent RAM bloat let mut to_remove = vec![]; for (hash, queue) in lock.enqueued_messages.iter_mut() { - let before = queue.len(); - // Only keep the messages that are not outdated - queue.retain(|msg| { - associated_block_id_acceptable(now.saturated_into(), msg.msg.associated_block_id) - }); - let after = queue.len(); - - if before != after { - self.logger.info(format!( - "[worker] Removed {} outdated enqueued messages from the queue for {:?}", - before - after, - hex::encode(*hash) - )); - } + for (ssid, queue) in queue.iter_mut() { + let before = queue.len(); + // Only keep the messages that are not outdated + queue.retain(|msg| { + associated_block_id_acceptable( + now.saturated_into(), + msg.msg.associated_block_id, + ) + }); + let after = queue.len(); + + if before != after { + self.logger.info(format!( + "[worker] Removed {} outdated enqueued messages from the queue for {:?}", + before - after, + hex::encode(*hash) + )); + } - if queue.is_empty() { - to_remove.push(*hash); + if queue.is_empty() { + to_remove.push((*hash, *ssid)); + } } } - // Finally, to prevent the existence of piling-up empty queues, remove them - for hash in to_remove { - lock.enqueued_messages.remove(&hash); + // Next, to prevent the existence of piling-up empty *inner* queues, remove them + for (hash, ssid) in to_remove { + lock.enqueued_messages + .get_mut(&hash) + .expect("Should be available") + .remove(&ssid); } + + // Finally, remove any empty outer maps + lock.enqueued_messages.retain(|_, v| !v.is_empty()); } fn start_job_unconditional(&self, job: Job, lock: &mut WorkManagerInner) { @@ -252,24 +264,33 @@ impl WorkManager { .error(format!("Failed to start job {:?}: {err:?}", hex::encode(job.task_hash))); } else { // deliver all the enqueued messages to the protocol now - if let Some(mut enqueued_messages) = lock.enqueued_messages.remove(&job.task_hash) { - self.logger.info(format!( - "Will now deliver {} enqueued message(s) to the async protocol for {:?}", - enqueued_messages.len(), - hex::encode(job.task_hash) - )); - while let Some(message) = enqueued_messages.pop_front() { - if should_deliver(&job, &message, job.task_hash) { - if let Err(err) = job.handle.deliver_message(message) { - self.logger.error(format!( - "Unable to deliver message for job {:?}: {err:?}", - hex::encode(job.task_hash) - )); + if let Some(mut enqueued_messages_map) = lock.enqueued_messages.remove(&job.task_hash) { + let job_ssid = job.handle.ssid; + if let Some(mut enqueued_messages) = enqueued_messages_map.remove(&job_ssid) { + self.logger.info(format!( + "Will now deliver {} enqueued message(s) to the async protocol for {:?}", + enqueued_messages.len(), + hex::encode(job.task_hash) + )); + + while let Some(message) = enqueued_messages.pop_front() { + if should_deliver(&job, &message, job.task_hash) { + if let Err(err) = job.handle.deliver_message(message) { + self.logger.error(format!( + "Unable to deliver message for job {:?}: {err:?}", + hex::encode(job.task_hash) + )); + } + } else { + self.logger.warn("Will not deliver enqueued message to async protocol since the message is no longer acceptable") } - } else { - self.logger.warn("Will not deliver enqueued message to async protocol since the message is no longer acceptable") } } + + // If there are any other messages for other SSIDs, put them back in the map + if !enqueued_messages_map.is_empty() { + lock.enqueued_messages.insert(job.task_hash, enqueued_messages_map); + } } } let task = job.task.clone(); @@ -336,7 +357,12 @@ impl WorkManager { lock.enqueued_tasks.iter().map(|job| job.handle.session_id).collect(); self.logger .info(format!("Enqueuing message for {:?} | current_running_session_ids: {current_running_session_ids:?} | enqueued_session_ids: {enqueued_session_ids:?}", hex::encode(message_task_hash))); - lock.enqueued_messages.entry(message_task_hash).or_default().push_back(msg) + lock.enqueued_messages + .entry(message_task_hash) + .or_default() + .entry(msg.msg.ssid) + .or_default() + .push_back(msg) } } @@ -399,6 +425,7 @@ fn should_deliver( ) -> bool { task.handle.session_id == msg.msg.session_id && task.task_hash == message_task_hash && + task.handle.ssid == msg.msg.ssid && associated_block_id_acceptable( task.handle.associated_block_id, msg.msg.associated_block_id, diff --git a/dkg-gadget/src/worker.rs b/dkg-gadget/src/worker.rs index c9135d464..bb71f4b57 100644 --- a/dkg-gadget/src/worker.rs +++ b/dkg-gadget/src/worker.rs @@ -27,7 +27,7 @@ use sp_consensus::SyncOracle; use crate::signing_manager::SigningManager; use futures::StreamExt; use multi_party_ecdsa::protocols::multi_party_ecdsa::gg_2020::state_machine::keygen::LocalKey; -use parking_lot::RwLock; +use parking_lot::{Mutex, RwLock}; use sc_client_api::{Backend, FinalityNotification}; use sc_keystore::LocalKeystore; use sp_arithmetic::traits::SaturatedConversion; @@ -38,7 +38,7 @@ use std::{ marker::PhantomData, sync::Arc, }; -use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; +use tokio::sync::mpsc::UnboundedSender; use dkg_primitives::{ types::{DKGError, DKGMessage, NetworkMsgPayload, SessionId, SignedDKGMessage}, @@ -132,11 +132,8 @@ where pub aggregated_public_keys: Shared, /// Tracking for the misbehaviour reports pub aggregated_misbehaviour_reports: Shared, - pub misbehaviour_tx: Option>, /// Concrete type that points to the actual local keystore if it exists pub local_keystore: Shared>>, - /// For transmitting errors from parallel threads to the DKGWorker - pub error_handler: tokio::sync::broadcast::Sender, /// Used to keep track of network status pub network: Option>>, /// Used to keep track of sync status @@ -146,10 +143,17 @@ where pub dkg_modules: DKGModules, pub signing_manager: SigningManager, pub keygen_manager: KeygenManager, + pub(crate) error_handler_channel: ErrorHandlerChannel, // keep rustc happy _backend: PhantomData<(BE, MaxProposalLength)>, } +#[derive(Clone)] +pub(crate) struct ErrorHandlerChannel { + pub tx: tokio::sync::mpsc::UnboundedSender, + rx: Arc>>>, +} + /// Used only for tests #[derive(Clone)] pub struct TestBundle { @@ -183,9 +187,7 @@ where queued_validator_set: self.queued_validator_set.clone(), aggregated_public_keys: self.aggregated_public_keys.clone(), aggregated_misbehaviour_reports: self.aggregated_misbehaviour_reports.clone(), - misbehaviour_tx: self.misbehaviour_tx.clone(), local_keystore: self.local_keystore.clone(), - error_handler: self.error_handler.clone(), test_bundle: self.test_bundle.clone(), network: self.network.clone(), sync_service: self.sync_service.clone(), @@ -193,6 +195,7 @@ where dkg_modules: self.dkg_modules.clone(), signing_manager: self.signing_manager.clone(), keygen_manager: self.keygen_manager.clone(), + error_handler_channel: self.error_handler_channel.clone(), _backend: PhantomData, } } @@ -236,15 +239,17 @@ where .. } = worker_params; - let (error_handler, _) = tokio::sync::broadcast::channel(1024); let clock = Clock { latest_header: latest_header.clone() }; let signing_manager = SigningManager::::new(logger.clone(), clock.clone()); // 2 tasks max: 1 for current, 1 for queued let keygen_manager = KeygenManager::new(logger.clone(), clock); let dkg_modules = DKGModules::default(); + + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + let error_handler_channel = ErrorHandlerChannel { tx, rx: Arc::new(Mutex::new(Some(rx))) }; + let this = DKGWorker { client, - misbehaviour_tx: None, backend, key_store, db: db_backend, @@ -261,7 +266,7 @@ where aggregated_misbehaviour_reports: Arc::new(RwLock::new(HashMap::new())), local_keystore: Arc::new(RwLock::new(local_keystore)), test_bundle, - error_handler, + error_handler_channel, logger, network, sync_service, @@ -310,6 +315,7 @@ where stage: ProtoStageType, protocol_name: &str, associated_block: NumberFor, + ssid: u8, ) -> Result< AsyncProtocolParameters< DKGProtocolEngine< @@ -332,8 +338,13 @@ where let now = self.get_latest_block_number(); let associated_block_id: u64 = associated_block.saturated_into(); - let status_handle = - AsyncProtocolRemote::new(now, session_id, self.logger.clone(), associated_block_id); + let status_handle = AsyncProtocolRemote::new( + now, + session_id, + self.logger.clone(), + associated_block_id, + ssid, + ); // Fetch the active key. This requires rotating the key to have happened with // full certainty in order to ensure the right key is being used to make signatures. let active_local_key = match stage { @@ -696,6 +707,8 @@ where if let Some(metrics) = self.metrics.as_ref() { metrics.reset_session_metrics(); } + // Delete logs from old sessions to preserve disk space + self.logger.clear_local_logs(); } else { self.logger.info( "🕸️ No update to local session found, not rotating local sessions".to_string(), @@ -1048,9 +1061,8 @@ where // *** Main run loop *** pub async fn run(mut self) { crate::deadlock_detection::deadlock_detect(); - let (misbehaviour_tx, misbehaviour_rx) = tokio::sync::mpsc::unbounded_channel(); - self.misbehaviour_tx = Some(misbehaviour_tx); self.initialization().await; + self.logger.debug("Starting DKG Iteration loop"); // We run all these tasks in parallel and wait for any of them to complete. // If any of them completes, we stop all the other tasks since this means a fatal error has @@ -1060,7 +1072,6 @@ where self.spawn_keygen_messages_stream_task(), self.spawn_signing_messages_stream_task(), self.spawn_error_handling_task(), - self.spawn_misbehaviour_report_task(misbehaviour_rx), ]) .await; self.logger @@ -1125,28 +1136,17 @@ where }) } - fn spawn_misbehaviour_report_task( - &self, - mut misbehaviour_rx: UnboundedReceiver, - ) -> tokio::task::JoinHandle<()> { - let self_ = self.clone(); - tokio::spawn(async move { - while let Some(misbehaviour) = misbehaviour_rx.recv().await { - self_.logger.debug("Going to handle Misbehaviour"); - let gossip = gossip_misbehaviour_report(&self_, misbehaviour).await; - if gossip.is_err() { - self_.logger.info("🕸️ DKG gossip_misbehaviour_report failed!"); - } - } - }) - } - fn spawn_error_handling_task(&self) -> tokio::task::JoinHandle<()> { let self_ = self.clone(); - let mut error_handler_rx = self.error_handler.subscribe(); + let mut error_handler_rx = self + .error_handler_channel + .rx + .lock() + .take() + .expect("Error handler tx already taken"); let logger = self.logger.clone(); tokio::spawn(async move { - while let Ok(error) = error_handler_rx.recv().await { + while let Some(error) = error_handler_rx.recv().await { logger.debug("Going to handle Error"); self_.handle_dkg_error(error).await; } diff --git a/dkg-logging/src/debug_logger.rs b/dkg-logging/src/debug_logger.rs index 980024213..6113656d8 100644 --- a/dkg-logging/src/debug_logger.rs +++ b/dkg-logging/src/debug_logger.rs @@ -1,10 +1,10 @@ #![allow(clippy::unwrap_used)] use crate::{debug, error, info, trace, warn}; use lazy_static::lazy_static; -use parking_lot::RwLock; +use parking_lot::{Mutex, RwLock}; use serde::Serialize; use sp_core::{bytes::to_hex, hashing::sha2_256}; -use std::{collections::HashMap, fmt::Debug, io::Write, sync::Arc, time::Instant}; +use std::{collections::HashMap, fmt::Debug, io::Write, path::PathBuf, sync::Arc, time::Instant}; #[derive(Clone, Debug)] pub struct DebugLogger { @@ -15,6 +15,7 @@ pub struct DebugLogger { events_file_handle_signing: Arc>>, events_file_handle_voting: Arc>>, checkpoints_enabled: bool, + paths: Arc>>, } struct Checkpoint { @@ -37,6 +38,7 @@ pub enum AsyncProtocolType { enum MessageType { Default(String), Event(RoundsEvent), + Clear, } lazy_static::lazy_static! { @@ -162,10 +164,7 @@ type EventFiles = (Option, Option, Option, Option); impl DebugLogger { - pub fn new( - identifier: T, - file: Option, - ) -> std::io::Result { + pub fn new(identifier: T, file: Option) -> std::io::Result { // use a channel for sending file I/O requests to a dedicated thread to avoid blocking the // DKG workers @@ -188,8 +187,11 @@ impl DebugLogger { } } + let paths = Arc::new(Mutex::new(Vec::new())); + let paths_task = paths.clone(); + let (file, events_file_keygen, events_file_signing, events_file_voting) = - Self::get_files(file)?; + Self::get_files(&paths, file)?; let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); let file_handle = Arc::new(RwLock::new(file)); @@ -229,6 +231,14 @@ impl DebugLogger { } }, }, + MessageType::Clear => { + let lock = paths_task.lock(); + for path in &*lock { + if let Err(err) = std::fs::remove_file(path) { + error!(target: "dkg", "Failed to remove file: {err}"); + } + } + }, } } }); @@ -242,17 +252,23 @@ impl DebugLogger { events_file_handle_signing, events_file_handle_voting, checkpoints_enabled, + paths, }) } - fn get_files(base_output: Option) -> std::io::Result { - if let Some(file_path) = &base_output { - let file = std::fs::File::create(file_path)?; - let events_file = std::fs::File::create(format!("{}.keygen.log", file_path.display()))?; - let events_file_signing = - std::fs::File::create(format!("{}.signing.log", file_path.display()))?; - let events_file_voting = - std::fs::File::create(format!("{}.voting.log", file_path.display()))?; + fn get_files( + paths: &Arc>>, + base_output: Option, + ) -> std::io::Result { + if let Some(file_path) = base_output { + let file = std::fs::File::create(&file_path)?; + let keygen_path = PathBuf::from(format!("{}.keygen.log", file_path.display())); + let signing_path = PathBuf::from(format!("{}.signing.log", file_path.display())); + let voting_path = PathBuf::from(format!("{}.voting.log", file_path.display())); + let events_file = std::fs::File::create(&keygen_path)?; + let events_file_signing = std::fs::File::create(&signing_path)?; + let events_file_voting = std::fs::File::create(&voting_path)?; + *paths.lock() = vec![file_path, keygen_path, signing_path, voting_path]; Ok((Some(file), Some(events_file), Some(events_file_signing), Some(events_file_voting))) } else { Ok((None, None, None, None)) @@ -268,8 +284,8 @@ impl DebugLogger { *self.identifier.write() = id; } - pub fn set_output(&self, file: Option) -> std::io::Result<()> { - let (file, event_file, signing_file, voting_file) = Self::get_files(file)?; + pub fn set_output(&self, file: Option) -> std::io::Result<()> { + let (file, event_file, signing_file, voting_file) = Self::get_files(&self.paths, file)?; *self.file_handle.write() = file; *self.events_file_handle_keygen.write() = event_file; *self.events_file_handle_signing.write() = signing_file; @@ -424,6 +440,13 @@ impl DebugLogger { CHECKPOINTS.write().remove(&hash); } } + + /// Completely deletes all local logs associated with this program + pub fn clear_local_logs(&self) { + if let Err(err) = self.to_file_io.send(MessageType::Clear) { + error!(target: "dkg_gadget", "failed to send event message to file: {err:?}"); + } + } } pub fn message_to_string_hash(msg: T) -> String { diff --git a/dkg-mock-blockchain/src/server.rs b/dkg-mock-blockchain/src/server.rs index c85f7fe84..db319f283 100644 --- a/dkg-mock-blockchain/src/server.rs +++ b/dkg-mock-blockchain/src/server.rs @@ -33,6 +33,7 @@ pub struct MockBlockchain { orchestrator_rx: Arc>>>, orchestrator_state: Arc>, blocks_per_session: Arc, + max_signing_sets_per_proposal: Arc, blockchain: T, logger: DebugLogger, } @@ -87,6 +88,7 @@ impl MockBlockchain { blockchain: T, logger: DebugLogger, blocks_per_session: u64, + max_signing_sets_per_proposal: usize, ) -> std::io::Result { let listener = TcpListener::bind(&config.bind).await?; let clients = Arc::new(RwLock::new(HashMap::new())); @@ -103,6 +105,7 @@ impl MockBlockchain { blockchain, logger, blocks_per_session: Arc::new(blocks_per_session), + max_signing_sets_per_proposal: Arc::new(max_signing_sets_per_proposal), }) } @@ -532,11 +535,13 @@ impl MockBlockchain { IntraTestPhase::Signing { trace_id, queued_unsigned_proposals, .. } => { if let Some(unsigned_propos) = queued_unsigned_proposals.clone() { for _ in 0..unsigned_propos.len() { - client - .outstanding_tasks_signing - .entry(*trace_id) - .or_default() - .push(next_case.clone()); + for _ in 0..*self.max_signing_sets_per_proposal { + client + .outstanding_tasks_signing + .entry(*trace_id) + .or_default() + .push(next_case.clone()); + } } self.blockchain.set_should_execute_keygen(false); self.blockchain.set_unsigned_proposals(unsigned_propos); diff --git a/dkg-primitives/src/types.rs b/dkg-primitives/src/types.rs index a50a2d67d..d1211addd 100644 --- a/dkg-primitives/src/types.rs +++ b/dkg-primitives/src/types.rs @@ -52,6 +52,8 @@ pub struct DKGMessage { pub session_id: SessionId, /// The round ID pub associated_block_id: u64, + /// The signing set ID + pub ssid: u8, } #[derive(Debug, Clone, Decode, Encode)] diff --git a/dkg-test-orchestrator/src/main.rs b/dkg-test-orchestrator/src/main.rs index fed21c54a..89e3ef410 100644 --- a/dkg-test-orchestrator/src/main.rs +++ b/dkg-test-orchestrator/src/main.rs @@ -92,11 +92,19 @@ async fn main() -> Result<(), Box> { blocks_per_session, ); + let max_signing_sets_per_proposal = + dkg_gadget::signing_manager::MAX_POTENTIAL_SIGNING_SETS_PER_PROPOSAL; + // first, spawn the orchestrator/mock-blockchain - let orchestrator_task = - MockBlockchain::new(config, api.clone(), dummy_api_logger.clone(), blocks_per_session) - .await? - .execute(); + let orchestrator_task = MockBlockchain::new( + config, + api.clone(), + dummy_api_logger.clone(), + blocks_per_session, + max_signing_sets_per_proposal as usize, + ) + .await? + .execute(); let orchestrator_handle = tokio::task::spawn(orchestrator_task); // give time for the orchestrator to bind tokio::time::sleep(std::time::Duration::from_millis(1000)).await;