Skip to content

Commit

Permalink
fix: further simplifications with initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
trevelyan committed Jan 17, 2022
1 parent c1cad42 commit 2277e75
Showing 1 changed file with 85 additions and 80 deletions.
165 changes: 85 additions & 80 deletions src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,18 +95,18 @@ impl Network {
/// connects to any peers configured in our peers list.
/// Opens a socket, does handshake, synchronizes, etc.
async fn run_client(&self) -> crate::Result<()> {
self.initialize_configured_peers().await;
self.spawn_reconnect_to_configured_peers_task(self.wallet_lock.clone())
.await
.unwrap();
self.initialize().await;
// self.spawn_reconnect_to_configured_peers_task(self.wallet_lock.clone())
// .await
// .unwrap();
Ok(())
}

/// Initialize configured peers (peers set in the configuration/*.yml) if any.
/// This does not connect to the peers, it only sets their
/// state and inserts them into PEERS_DB_GLOBAL so that the Task created by
/// spawn_reconnect_to_configured_peers_task will open the connection.
async fn initialize_configured_peers(&self) {
/// Initialize the network class generally, including adding any peers we have
/// configured (peers set in the configuration/*.yml) into our PEERS_DB_GLOBAL
/// data structure.
async fn initialize(&self) {

info!("{:?}", self.peer_conf);
if let Some(peer_settings) = &self.peer_conf {
for peer_setting in peer_settings {
Expand All @@ -132,47 +132,11 @@ impl Network {
}
}
}
}

/// Launch a task which will monitor peers and make sure they stay connected. If a peer in our
/// configured "peers list" becomes disconnected, this task will reconnect to the peer and
/// redo the handshake and blockchain synchronization. For convenience, this task is also
/// used to make initial connections with peers(not only to reconnect).
async fn spawn_reconnect_to_configured_peers_task(
&self,
wallet_lock: Arc<RwLock<Wallet>>,
) -> crate::Result<()> {
tokio::spawn(async move {
loop {
let peer_states: Vec<(SaitoHash, bool)>;
{
let peers_db_global = PEERS_DB_GLOBAL.clone();
let peers_db = peers_db_global.read().await;
peer_states = peers_db
.keys()
.map(|connection_id| {
let peer = peers_db.get(connection_id).unwrap();
let should_try_reconnect = peer.get_is_from_peer_list()
&& !peer.get_is_connected_or_connecting();
(*connection_id, should_try_reconnect)
})
.collect::<Vec<(SaitoHash, bool)>>();
}
for (connection_id, should_try_reconnect) in peer_states {
if should_try_reconnect {
info!("found disconnected peer in peer settings, (re)connecting...");
Network::connect_to_peer(connection_id, wallet_lock.clone()).await;
}
}
sleep(Duration::from_millis(1000)).await;
}
})
.await
.expect("error: spawn_reconnect_to_configured_peers_task failed");
Ok(())
}



/// Connect to a peer via websocket and spawn a Task to handle message received on the socket
/// and pipe them to handle_peer_message().
async fn connect_to_peer(connection_id: SaitoHash, wallet_lock: Arc<RwLock<Wallet>>) {
Expand Down Expand Up @@ -369,36 +333,6 @@ impl Network {



/// Runs warp::serve to listen for incoming connections
pub async fn run_server(network_lock_clone : Arc<RwLock<Network>>) -> crate::Result<()> {

let mut routes;
let mut host;
let mut port;

{
let network = network_lock_clone.read().await;
port = network.port;
host = network.host;
routes = get_block_route_filter(network.blockchain_lock.clone())
.or(post_transaction_route_filter(
network.mempool_lock.clone(),
network.blockchain_lock.clone(),
))
.or(ws_upgrade_route_filter(
network.wallet_lock.clone(),
network.mempool_lock.clone(),
network.blockchain_lock.clone(),
network.broadcast_channel_sender.clone(),
));

info!("Listening for HTTP on port {}", port);
let (_, server) = warp::serve(routes)
.bind_with_graceful_shutdown((host, port), signal_for_shutdown());
server.await;
}
Ok(())
}


pub async fn run(
Expand Down Expand Up @@ -443,6 +377,19 @@ pub async fn run(
}


//
// initialize network
//
{
println!("initializing the network");
let network = network_lock.write().await;
network.initialize().await;
println!("done initializing the network");
}




//
// listen to local and global messages
//
Expand All @@ -461,11 +408,34 @@ pub async fn run(
//
NetworkMessage::LocalNetworkMonitoring => {

// initialize into DB if desired
//
// Check Disconnected Peers
//
let peer_states: Vec<(SaitoHash, bool)>;
{
let peers_db_global = PEERS_DB_GLOBAL.clone();
let peers_db = peers_db_global.read().await;
peer_states = peers_db
.keys()
.map(|connection_id| {
let peer = peers_db.get(connection_id).unwrap();
let should_try_reconnect = peer.get_is_from_peer_list()
&& !peer.get_is_connected_or_connecting();
(*connection_id, should_try_reconnect)
})
.collect::<Vec<(SaitoHash, bool)>>();
}
for (connection_id, should_try_reconnect) in peer_states {
if should_try_reconnect {
info!("found disconnected peer in peer settings, (re)connecting...");
let network = network_lock_clone2.read().await;
let wallet_lock_clone = network.wallet_lock.clone();
Network::connect_to_peer(connection_id, wallet_lock_clone).await;
}
}

// reconnect one-by-one
println!("Monitor Network loop!");

println!("Finished Connecting!");

},
_ => {}
Expand Down Expand Up @@ -509,9 +479,44 @@ pub async fn run(
}
}

Ok(())
//Ok(())

}



/// Runs warp::serve to listen for incoming connections
pub async fn run_server(network_lock_clone : Arc<RwLock<Network>>) -> crate::Result<()> {

let mut routes;
let mut host;
let mut port;

{
let network = network_lock_clone.read().await;
port = network.port;
host = network.host;
routes = get_block_route_filter(network.blockchain_lock.clone())
.or(post_transaction_route_filter(
network.mempool_lock.clone(),
network.blockchain_lock.clone(),
))
.or(ws_upgrade_route_filter(
network.wallet_lock.clone(),
network.mempool_lock.clone(),
network.blockchain_lock.clone(),
network.broadcast_channel_sender.clone(),
));

info!("Listening for HTTP on port {}", port);
let (_, server) = warp::serve(routes)
.bind_with_graceful_shutdown((host, port), signal_for_shutdown());
server.await;
}
Ok(())
}


#[cfg(test)]
mod tests {
use std::convert::TryInto;
Expand Down

0 comments on commit 2277e75

Please sign in to comment.