Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 39 additions & 8 deletions src/comms_handler/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ pub struct Node {
connect_to_handshake_contacts: bool,
/// Threadhandle for a HeartBeat Prober
heartbeat_handle: Option<Arc<JoinHandle<()>>>,
/// Add rate limiting and backoff for miner connections
miner_connection_attempts: Arc<RwLock<HashMap<SocketAddr, u32>>>,
}

pub(crate) struct Peer {
Expand Down Expand Up @@ -281,6 +283,7 @@ impl Node {
seen_gossip_messages: Arc::new(RwLock::new(HashSet::new())),
connect_to_handshake_contacts: false,
heartbeat_handle: None,
miner_connection_attempts: Arc::new(RwLock::new(HashMap::new())),
};

if !disable_listening {
Expand Down Expand Up @@ -1009,18 +1012,46 @@ impl Node {
address: Some(peer_out_addr),
}))?;

// Check if the peer is a miner and we are a mempool
let mut sub_peers = self.sub_peers.write().await;
// Add rate limiting and backoff for miner connections
if self.node_type == NodeType::Mempool && peer_type == NodeType::Miner {
debug!("Current sub-peers: {:?}", sub_peers);
debug!("Sub-peer limit: {:?}", self.sub_peer_limit);
let mut sub_peers = self.sub_peers.write().await;

// Track connection attempts per miner
let mut attempts_map = self.miner_connection_attempts.write().await;
let attempts = attempts_map
.entry(peer_in_addr)
.and_modify(|counter| *counter += 1)
.or_insert(1);

if sub_peers.len() + 1 > self.sub_peer_limit {
all_peers.remove_entry(&peer_in_addr);
return Err(CommsError::PeerListFull);
} else {
sub_peers.insert(peer_in_addr);
// Calculate exponential backoff based on number of attempts
let backoff = Duration::from_secs(2u64.saturating_pow(*attempts as u32));

// Cap maximum backoff at 5 minutes
let capped_backoff = std::cmp::min(backoff, Duration::from_secs(300));

warn!(
"Miner connection attempt rate limited. Peer: {}, Attempts: {}, Backoff: {}s",
peer_in_addr,
attempts,
capped_backoff.as_secs()
);

tokio::time::sleep(capped_backoff).await;

// Still reject if at capacity after backoff
if sub_peers.len() + 1 > self.sub_peer_limit {
all_peers.remove_entry(&peer_in_addr);
return Err(CommsError::PeerListFull);
}
}

// Reset attempt counter on successful connection
if let Some(counter) = attempts_map.get_mut(&peer_in_addr) {
*counter = 0;
}

sub_peers.insert(peer_in_addr);
}

// We only do DNS validation on mempool and storage nodes
Expand Down
51 changes: 48 additions & 3 deletions src/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use std::{
};
use tokio::sync::{mpsc, Mutex, RwLock};
use tokio::task;
use tokio::time::Duration;
use tracing::{debug, error, error_span, info, info_span, trace, warn};
use tracing_futures::Instrument;
use tw_chain::primitives::asset::{Asset, TokenAmount};
Expand Down Expand Up @@ -81,6 +82,7 @@ pub enum MinerError {
Serialization(bincode::Error),
AsyncTask(task::JoinError),
WalletError(WalletDbError),
StartupFailed(String),
}

#[derive(Debug, PartialEq, Clone)]
Expand All @@ -103,6 +105,7 @@ impl fmt::Display for MinerError {
Self::AsyncTask(err) => write!(f, "Async task error: {err}"),
Self::Serialization(err) => write!(f, "Serialization error: {err}"),
Self::WalletError(err) => write!(f, "Wallet error: {err}"),
Self::StartupFailed(err) => write!(f, "Startup failed: {err}"),
}
}
}
Expand All @@ -115,6 +118,7 @@ impl Error for MinerError {
Self::AsyncTask(e) => Some(e),
Self::Serialization(ref e) => Some(e),
Self::WalletError(ref e) => Some(e),
Self::StartupFailed(_) => None,
}
}
}
Expand Down Expand Up @@ -203,9 +207,17 @@ impl MinerNode {
let tls_addr = create_socket_addr(&addr).await.unwrap();
let tcp_tls_config = TcpTlsConfig::from_tls_spec(tls_addr, &config.tls_config)?;
let api_addr = SocketAddr::new(tls_addr.ip(), config.miner_api_port);
let api_tls_info = config
.miner_api_use_tls
.then(|| tcp_tls_config.clone_private_info());
let api_tls_info = if config.miner_api_use_tls {
// Make sure the TLS config has a private key
if config.tls_config.key.is_none() {
return Err(MinerError::ConfigError(
"TLS config missing private key for API server",
));
}
Some(tcp_tls_config.clone_private_info())
} else {
None
};
let api_keys = to_api_keys(config.api_keys.clone());
let node = Node::new(
&tcp_tls_config,
Expand Down Expand Up @@ -551,6 +563,11 @@ impl MinerNode {
return res;
}
}
_ = tokio::time::sleep(Duration::from_secs(300)) => {
if let Err(e) = self.monitor_mining_status().await {
error!("Mining monitor error: {}", e);
}
}
_ = self.mining_partition_task.wait() => {
self.wait_partition_task = false;
return Some(Ok(Response {
Expand Down Expand Up @@ -1665,6 +1682,34 @@ impl MinerNode {
_ => None,
}
}

/// Add monitoring for mining status
pub async fn monitor_mining_status(&mut self) -> Result<()> {
let current_status = self.receive_get_mining_status_request().await;

if !current_status.success || *self.pause_node.read().await {
// Mining has stopped - attempt recovery
info!("Mining appears to have stopped, attempting recovery...");

// Check connection
if self.is_disconnected().await {
info!("Reconnecting to mempool...");
self.handle_connect_to_mempool().await;
}

// Attempt to resume mining
if let Err(err) = self.send_startup_requests().await {
error!("Failed to restart mining: {}", err);
return Err(MinerError::StartupFailed(err.to_string()));
}

*self.pause_node.write().await = false;

info!("Mining recovery attempted");
}

Ok(())
}
}

impl MinerInterface for MinerNode {
Expand Down
4 changes: 3 additions & 1 deletion src/miner_pow/opengl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ impl fmt::Display for OpenGlMinerError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::LockFailed => f.write_str("couldn't acquire global lock for using GLFW context"),
Self::InitializeGlfwMsg(cause, msg) => write!(f, "Failed to initialize GLFW: {cause}: {msg}"),
Self::InitializeGlfwMsg(cause, msg) => {
write!(f, "Failed to initialize GLFW: {cause}: {msg}")
}
Self::InitializeGlfw(cause) => write!(f, "Failed to initialize GLFW: {cause}"),
Self::CreateGlfwWindow => f.write_str("Failed to create GLFW window"),
Self::CompileShader(cause) => write!(f, "Failed to compile shader: {cause}"),
Expand Down
Loading