Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion node/bft/src/bft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ use std::{
#[cfg(not(feature = "locktick"))]
use tokio::sync::Mutex as TMutex;
use tokio::{
sync::{OnceCell, oneshot},
sync::{Notify, OnceCell, oneshot},
task::JoinHandle,
};

Expand Down Expand Up @@ -100,6 +100,7 @@ impl<N: Network> BFT<N> {
trusted_validators: &[SocketAddr],
trusted_peers_only: bool,
storage_mode: StorageMode,
tx_pool_notifier: Option<Arc<Notify>>,
dev: Option<u16>,
) -> Result<Self> {
Ok(Self {
Expand All @@ -112,6 +113,7 @@ impl<N: Network> BFT<N> {
trusted_validators,
trusted_peers_only,
storage_mode,
tx_pool_notifier,
dev,
)?,
dag: Default::default(),
Expand Down
11 changes: 10 additions & 1 deletion node/bft/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ use std::{
};
#[cfg(not(feature = "locktick"))]
use tokio::sync::Mutex as TMutex;
use tokio::{sync::OnceCell, task::JoinHandle};
use tokio::{sync::{Notify, OnceCell}, task::JoinHandle};

/// A helper type for an optional proposed batch.
pub type ProposedBatch<N> = RwLock<Option<Proposal<N>>>;
Expand Down Expand Up @@ -116,6 +116,8 @@ pub struct Primary<N: Network> {
propose_lock: Arc<TMutex<u64>>,
/// The storage mode of the node.
storage_mode: StorageMode,
/// The notifier for the transaction pool.
tx_pool_notifier: Option<Arc<Notify>>,
}

impl<N: Network> Primary<N> {
Expand All @@ -133,6 +135,7 @@ impl<N: Network> Primary<N> {
trusted_validators: &[SocketAddr],
trusted_peers_only: bool,
storage_mode: StorageMode,
tx_pool_notifier: Option<Arc<Notify>>,
dev: Option<u16>,
) -> Result<Self> {
// Initialize the gateway.
Expand Down Expand Up @@ -163,6 +166,7 @@ impl<N: Network> Primary<N> {
handles: Default::default(),
propose_lock: Default::default(),
storage_mode,
tx_pool_notifier,
})
}

Expand Down Expand Up @@ -721,6 +725,11 @@ impl<N: Network> Primary<N> {
*self.latest_proposed_batch_timestamp.write() = proposal.timestamp();
// Set the proposed batch.
*self.proposed_batch.write() = Some(proposal);

// Notify the transaction pool that the workers have been drained.
if let Some(notifier) = &self.tx_pool_notifier {
notifier.notify_one();
}
Ok(())
}

Expand Down
18 changes: 13 additions & 5 deletions node/consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ extern crate snarkos_node_metrics as metrics;
use snarkos_account::Account;
use snarkos_node_bft::{
BFT,
MAX_BATCH_DELAY_IN_MS,
Primary,
helpers::{
ConsensusReceiver,
Expand Down Expand Up @@ -62,8 +61,8 @@ use locktick::parking_lot::{Mutex, RwLock};
use lru::LruCache;
#[cfg(not(feature = "locktick"))]
use parking_lot::{Mutex, RwLock};
use std::{future::Future, net::SocketAddr, num::NonZeroUsize, sync::Arc, time::Duration};
use tokio::{sync::oneshot, task::JoinHandle};
use std::{future::Future, net::SocketAddr, num::NonZeroUsize, sync::Arc};
use tokio::{sync::{Notify, oneshot}, task::JoinHandle};

#[cfg(feature = "metrics")]
use std::collections::HashMap;
Expand Down Expand Up @@ -111,6 +110,8 @@ pub struct Consensus<N: Network> {
ping: Arc<Ping<N>>,
/// The block sync logic.
block_sync: Arc<BlockSync<N>>,
/// The notifier for the transaction pool.
tx_pool_notifier: Arc<Notify>,
}

impl<N: Network> Consensus<N> {
Expand All @@ -133,6 +134,8 @@ impl<N: Network> Consensus<N> {
let transmissions = Arc::new(BFTPersistentStorage::open(storage_mode.clone())?);
// Initialize the Narwhal storage.
let storage = NarwhalStorage::new(ledger.clone(), transmissions, BatchHeader::<N>::MAX_GC_ROUNDS as u64);
// Initialize the notifier for the transaction pool.
let tx_pool_notifier = Arc::new(Notify::new());
// Initialize the BFT.
let bft = BFT::new(
account,
Expand All @@ -143,6 +146,7 @@ impl<N: Network> Consensus<N> {
trusted_validators,
trusted_peers_only,
storage_mode,
Some(tx_pool_notifier.clone()),
dev,
)?;
// Create a new instance of Consensus.
Expand All @@ -159,6 +163,7 @@ impl<N: Network> Consensus<N> {
transmissions_tracker: Default::default(),
handles: Default::default(),
ping: ping.clone(),
tx_pool_notifier,
};

info!("Starting the consensus instance...");
Expand Down Expand Up @@ -499,8 +504,8 @@ impl<N: Network> Consensus<N> {
let self_ = self.clone();
self.spawn(async move {
loop {
// Sleep briefly.
tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
// Wait for a notification.
self_.tx_pool_notifier.notified().await;
// Process the unconfirmed transactions in the memory pool.
if let Err(e) = self_.process_unconfirmed_transactions().await {
warn!("Cannot process unconfirmed transactions - {e}");
Expand Down Expand Up @@ -530,6 +535,9 @@ impl<N: Network> Consensus<N> {
error!("{}", flatten_error(e));
// On failure, reinsert the transmissions into the memory pool.
self.reinsert_transmissions(transmissions).await;
} else {
// Notify the transaction pool that the block has been advanced.
self.tx_pool_notifier.notify_one();
}
// Send the callback **after** advancing to the next block.
// Note: We must await the block to be advanced before sending the callback.
Expand Down