Skip to content

Commit

Permalink
GH-307: Network stops growing (#546)
Browse files Browse the repository at this point in the history
* GH-307: Upgraded log messages, promoted DEBUG to WARN

* GH-307: Fixed dormant multi-port bug

* GH-307: Working on peer_addr/local_addr problem; one test failing

* GH-307: Improved logging and a little instrumentation

* GH-307: Working when tested with TNT; still would like multinode test.

* GH-307: Added multinode test; may have broken other multinode tests, hard to tell

* GH-307: Formatting

* GH-307: Review issues
  • Loading branch information
substratumservices authored Sep 16, 2019
1 parent 4db4cb7 commit 69b1382
Show file tree
Hide file tree
Showing 9 changed files with 143 additions and 105 deletions.
8 changes: 7 additions & 1 deletion multinode_integration_tests/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,13 @@ impl MockNode {
let _ = self.write_control_stream().shutdown(Shutdown::Both);
break;
}
Ok(len) => framer.add_data(&buf[..len]),
Ok(len) => {
if len == 0 {
let _ = self.write_control_stream().shutdown(Shutdown::Both);
break;
}
framer.add_data(&buf[..len])
}
_ => (),
}
}
Expand Down
4 changes: 4 additions & 0 deletions multinode_integration_tests/src/multinode_gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::collections::{BTreeSet, HashSet};
use std::convert::{TryFrom, TryInto};
use std::net::IpAddr;

#[derive(PartialEq, Clone, Debug)]
pub enum GossipType {
DebutGossip(SingleNode),
PassGossip(SingleNode),
Expand Down Expand Up @@ -62,6 +63,7 @@ pub trait MultinodeGossip {
fn render(&self) -> Gossip;
}

#[derive(PartialEq, Clone, Debug)]
pub struct SingleNode {
node: AccessibleGossipRecord,
}
Expand Down Expand Up @@ -141,6 +143,7 @@ impl SingleNode {
}
}

#[derive(PartialEq, Clone, Debug)]
pub struct Introduction {
introducer: AccessibleGossipRecord,
introducee: AccessibleGossipRecord,
Expand Down Expand Up @@ -237,6 +240,7 @@ impl Introduction {
}
}

#[derive(PartialEq, Clone, Debug)]
pub struct Standard {
nodes: Vec<AccessibleGossipRecord>,
}
Expand Down
8 changes: 7 additions & 1 deletion multinode_integration_tests/src/substratum_mock_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ use serde_cbor;
use std::cell::RefCell;
use std::io;
use std::io::{Error, ErrorKind, Read, Write};
use std::net::IpAddr;
use std::net::Ipv4Addr;
use std::net::SocketAddr;
use std::net::TcpStream;
use std::net::{IpAddr, Shutdown};
use std::ops::Add;
use std::rc::Rc;
use std::thread;
Expand Down Expand Up @@ -354,6 +354,12 @@ impl SubstratumMockNode {
}
}

pub fn kill(self) {
let mut stream = self.control_stream.borrow_mut();
stream.flush().unwrap();
stream.shutdown(Shutdown::Both).unwrap();
}

fn do_docker_run(node_addr: &NodeAddr, host_node_parent_dir: Option<String>, name: &String) {
let root = match host_node_parent_dir {
Some(dir) => dir,
Expand Down
40 changes: 40 additions & 0 deletions multinode_integration_tests/tests/connection_termination_test.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright (c) 2017-2019, Substratum LLC (https://substratum.net) and/or its affiliates. All rights reserved.

use multinode_integration_tests_lib::multinode_gossip::{parse_gossip, GossipType};
use multinode_integration_tests_lib::neighborhood_constructor::construct_neighborhood;
use multinode_integration_tests_lib::substratum_mock_node::SubstratumMockNode;
use multinode_integration_tests_lib::substratum_node::{
Expand Down Expand Up @@ -204,6 +205,45 @@ fn reported_client_drop() {
ensure_no_further_traffic(&mock_node, &masquerader);
}

#[test]
fn downed_nodes_not_offered_in_passes_or_introductions() {
let real_node: NodeRecord = make_node_record(1234, true);
let mut db: NeighborhoodDatabase = db_from_node(&real_node);
let desirable_but_down = db.add_node(make_node_record(2345, true)).unwrap();
let undesirable_but_up = db.add_node(make_node_record(3456, true)).unwrap();
let fictional = db.add_node(make_node_record(4567, true)).unwrap();
db.add_arbitrary_full_neighbor(real_node.public_key(), &desirable_but_down);
db.add_arbitrary_full_neighbor(real_node.public_key(), &undesirable_but_up);
db.add_arbitrary_full_neighbor(&desirable_but_down, &undesirable_but_up);
db.add_arbitrary_full_neighbor(&desirable_but_down, &fictional);

let mut cluster = SubstratumNodeCluster::start().unwrap();
let (_, substratum_real_node, mut node_map) = construct_neighborhood(&mut cluster, db, vec![]);
let desirable_but_down_node = node_map.remove(&desirable_but_down).unwrap();
let undesirable_but_up_node = node_map.remove(&undesirable_but_up).unwrap();
let debuter: NodeRecord = make_node_record(5678, true);
let debuter_node = cluster.start_mock_node_with_public_key(vec![5550], debuter.public_key());

// Kill desirable neighbor
desirable_but_down_node.kill();
// Debut a new Node
debuter_node.transmit_debut(&substratum_real_node).unwrap();
// What's the return Gossip?
let (gossip, ip_addr) = debuter_node
.wait_for_gossip(Duration::from_secs(2))
.unwrap();
match parse_gossip(&gossip, ip_addr) {
GossipType::IntroductionGossip(introduction) => {
// It's an Introduction of the one that didn't go down!
assert_eq!(
introduction.introducee_key(),
undesirable_but_up_node.public_key()
);
}
unexpected => panic!("Unexpected gossip: {:?}", unexpected),
}
}

fn create_neighborhood(
cluster: &mut SubstratumNodeCluster,
) -> (SubstratumRealNode, SubstratumMockNode, PublicKey) {
Expand Down
81 changes: 17 additions & 64 deletions node/src/neighborhood/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ use neighborhood_database::NeighborhoodDatabase;
use node_record::NodeRecord;
use std::cmp::Ordering;
use std::convert::TryFrom;
use std::net::IpAddr;
use std::net::{IpAddr, SocketAddr};

pub struct Neighborhood {
cryptde: &'static dyn CryptDE,
Expand Down Expand Up @@ -901,34 +901,30 @@ impl Neighborhood {
if msg.stream_type != RemovedStreamType::Clandestine {
panic!("Neighborhood should never get ShutdownStreamMsg about non-clandestine stream")
}
let (neighbor_key, neighbor_node_addr) = match self
.neighborhood_database
.node_by_ip(&msg.peer_addr.ip())
{
let neighbor_key = match self.neighborhood_database.node_by_ip(&msg.peer_addr.ip()) {
None => {
debug!(self.logger, "Received shutdown notification for stream to {}, but no neighbor found there - ignoring", msg.peer_addr);
warning!(self.logger, "Received shutdown notification for stream to {}, but no Node with that IP is in the database - ignoring", msg.peer_addr.ip());
return;
}
Some(n) => (
n.public_key().clone(),
n.node_addr_opt().expect("NodeAddr suddenly disappeared"),
),
Some(n) => (n.public_key().clone()),
};
if !neighbor_node_addr.ports().contains(&msg.peer_addr.port()) {
debug!(self.logger, "Received shutdown notification for stream to {}, but no neighbor found there - ignoring", msg.peer_addr);
return;
}
match self.neighborhood_database.remove_neighbor(&neighbor_key) {
self.remove_neighbor(&neighbor_key, &msg.peer_addr);
}

fn remove_neighbor(&mut self, neighbor_key: &PublicKey, peer_addr: &SocketAddr) {
match self.neighborhood_database.remove_neighbor(neighbor_key) {
Err(_) => panic!("Node suddenly disappeared"),
Ok(true) => {
debug!(
self.logger,
"Received shutdown notification for {} at {}", neighbor_key, msg.peer_addr
"Received shutdown notification for {} at {}: removing neighborship",
neighbor_key,
peer_addr.ip()
);
self.gossip_to_neighbors()
}
Ok(false) => {
debug!(self.logger, "Received shutdown notification for {} at {}, but that Node is already isolated - ignoring", neighbor_key, msg.peer_addr);
debug!(self.logger, "Received shutdown notification for {} at {}, but that Node is no neighbor - ignoring", neighbor_key, peer_addr.ip());
}
};
}
Expand Down Expand Up @@ -3212,50 +3208,7 @@ mod tests {
assert_eq!(subject.neighborhood_database.keys().len(), 1);
let hopper_recording = hopper_recording_arc.lock().unwrap();
assert_eq!(hopper_recording.len(), 0);
TestLogHandler::new().exists_log_containing(&format!("DEBUG: Neighborhood: Received shutdown notification for stream to {}, but no neighbor found there - ignoring", unrecognized_socket_addr));
}

#[test]
fn handle_stream_shutdown_handles_socket_addr_with_unknown_port() {
init_test_logging();
let (hopper, _, hopper_recording_arc) = make_recorder();
let system = System::new("test");
let neighbor_node = make_node_record(3123, true);
let neighbor_node_addr = neighbor_node.node_addr_opt().unwrap();
let neighbor_node_socket_addr =
SocketAddr::new(neighbor_node_addr.ip_addr(), neighbor_node_addr.ports()[0]);
let unrecognized_socket_addr = SocketAddr::new(
neighbor_node_socket_addr.ip(),
neighbor_node_socket_addr.port() + 1,
);
let subject_node = make_global_cryptde_node_record(1345, true);
let mut subject = neighborhood_from_nodes(&subject_node, None);
subject
.neighborhood_database
.add_node(neighbor_node.clone())
.unwrap();
subject
.neighborhood_database
.add_arbitrary_full_neighbor(subject_node.public_key(), neighbor_node.public_key());
let peer_actors = peer_actors_builder().hopper(hopper).build();
subject.hopper = Some(peer_actors.hopper.from_hopper_client);

subject.handle_stream_shutdown_msg(StreamShutdownMsg {
peer_addr: SocketAddr::new(
neighbor_node_socket_addr.ip(),
neighbor_node_socket_addr.port() + 1,
),
stream_type: RemovedStreamType::Clandestine,
report_to_counterpart: false,
});

System::current().stop_with_code(0);
system.run();

assert_eq!(subject.neighborhood_database.keys().len(), 2);
let hopper_recording = hopper_recording_arc.lock().unwrap();
assert_eq!(hopper_recording.len(), 0);
TestLogHandler::new().exists_log_containing(&format!("DEBUG: Neighborhood: Received shutdown notification for stream to {}, but no neighbor found there - ignoring", unrecognized_socket_addr));
TestLogHandler::new().exists_log_containing(&format!("WARN: Neighborhood: Received shutdown notification for stream to {}, but no Node with that IP is in the database - ignoring", unrecognized_socket_addr.ip()));
}

#[test]
Expand Down Expand Up @@ -3310,7 +3263,7 @@ mod tests {
);
let hopper_recording = hopper_recording_arc.lock().unwrap();
assert_eq!(hopper_recording.len(), 0);
TestLogHandler::new().exists_log_containing(&format!("DEBUG: Neighborhood: Received shutdown notification for {} at {}, but that Node is already isolated - ignoring", inactive_neighbor_node.public_key(), inactive_neighbor_node_socket_addr));
TestLogHandler::new().exists_log_containing(&format!("DEBUG: Neighborhood: Received shutdown notification for {} at {}, but that Node is no neighbor - ignoring", inactive_neighbor_node.public_key(), inactive_neighbor_node_socket_addr.ip()));
}

#[test]
Expand Down Expand Up @@ -3366,9 +3319,9 @@ mod tests {
let hopper_recording = hopper_recording_arc.lock().unwrap();
assert_eq!(hopper_recording.len(), 1);
TestLogHandler::new().exists_log_containing(&format!(
"DEBUG: Neighborhood: Received shutdown notification for {} at {}",
"DEBUG: Neighborhood: Received shutdown notification for {} at {}: removing neighborship",
shutdown_neighbor_node.public_key(),
shutdown_neighbor_node_socket_addr
shutdown_neighbor_node_socket_addr.ip()
));
}

Expand Down
47 changes: 28 additions & 19 deletions node/src/stream_handler_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,19 +322,20 @@ impl StreamHandlerPool {
}

fn handle_remove_stream_msg(&mut self, msg: RemoveStreamMsg) {
let stream_writer_key = StreamWriterKey::from(msg.socket_addr);
let stream_writer_key = StreamWriterKey::from(msg.peer_addr);
debug!(
self.logger,
"Stream from {} has closed; removing stream writer {}",
msg.socket_addr,
"Stream from local {} to peer {} has closed; removing writer with key {}",
msg.local_addr,
msg.peer_addr,
stream_writer_key
);
let report_to_counterpart = match self.stream_writers.remove(&stream_writer_key) {
None | Some(None) => false,
Some(Some(_sender_wrapper)) => true,
};
let stream_shutdown_msg = StreamShutdownMsg {
peer_addr: msg.socket_addr,
peer_addr: msg.peer_addr,
stream_type: msg.stream_type,
report_to_counterpart,
};
Expand Down Expand Up @@ -512,7 +513,8 @@ impl StreamHandlerPool {
.map_err(move |err| { // connection was unsuccessful
error!(logger_me, "Stream to {} does not exist and could not be connected; discarding {} bytes: {}", peer_addr, msg_data_len, err);
remove_sub.try_send(RemoveStreamMsg {
socket_addr: peer_addr_e,
peer_addr: peer_addr_e,
local_addr: SocketAddr::new (localhost(), 0), // irrelevant; stream was never opened
stream_type: RemovedStreamType::Clandestine,
sub,
}).expect("StreamHandlerPool is dead");
Expand Down Expand Up @@ -932,7 +934,8 @@ mod tests {
subject_subs
.remove_sub
.try_send(RemoveStreamMsg {
socket_addr: peer_addr,
peer_addr,
local_addr,
stream_type: RemovedStreamType::Clandestine,
sub: peer_actors.dispatcher.stream_shutdown_sub,
})
Expand Down Expand Up @@ -963,15 +966,17 @@ mod tests {
let system = System::new("test");
let sub = recorder.start().recipient::<StreamShutdownMsg>();
let mut subject = StreamHandlerPool::new(vec![]);
let socket_addr = SocketAddr::from_str("127.0.0.1:5678").unwrap();
let sw_key = StreamWriterKey::from(socket_addr);
let sender_wrapper = SenderWrapperMock::new(socket_addr);
let peer_addr = SocketAddr::from_str("1.2.3.4:5678").unwrap();
let local_addr = SocketAddr::from_str("127.0.0.1:0").unwrap();
let sw_key = StreamWriterKey::from(peer_addr);
let sender_wrapper = SenderWrapperMock::new(local_addr);
subject
.stream_writers
.insert(sw_key.clone(), Some(Box::new(sender_wrapper)));

subject.handle_remove_stream_msg(RemoveStreamMsg {
socket_addr,
peer_addr,
local_addr,
stream_type: RemovedStreamType::Clandestine,
sub,
});
Expand All @@ -984,7 +989,7 @@ mod tests {
assert_eq!(
record,
&StreamShutdownMsg {
peer_addr: socket_addr,
peer_addr,
stream_type: RemovedStreamType::Clandestine,
report_to_counterpart: true
}
Expand All @@ -997,11 +1002,13 @@ mod tests {
let system = System::new("test");
let sub = recorder.start().recipient::<StreamShutdownMsg>();
let mut subject = StreamHandlerPool::new(vec![]);
let socket_addr = SocketAddr::from_str("127.0.0.1:5678").unwrap();
let sw_key = StreamWriterKey::from(socket_addr);
let peer_addr = SocketAddr::from_str("1.2.3.4:5678").unwrap();
let local_addr = SocketAddr::from_str("127.0.0.1:0").unwrap();
let sw_key = StreamWriterKey::from(peer_addr);

subject.handle_remove_stream_msg(RemoveStreamMsg {
socket_addr,
peer_addr,
local_addr,
stream_type: RemovedStreamType::NonClandestine(NonClandestineAttributes {
reception_port: HTTP_PORT,
sequence_number: 1234,
Expand All @@ -1017,7 +1024,7 @@ mod tests {
assert_eq!(
record,
&StreamShutdownMsg {
peer_addr: socket_addr,
peer_addr,
stream_type: RemovedStreamType::NonClandestine(NonClandestineAttributes {
reception_port: HTTP_PORT,
sequence_number: 1234
Expand All @@ -1033,12 +1040,14 @@ mod tests {
let system = System::new("test");
let sub = recorder.start().recipient::<StreamShutdownMsg>();
let mut subject = StreamHandlerPool::new(vec![]);
let socket_addr = SocketAddr::from_str("127.0.0.1:5678").unwrap();
let sw_key = StreamWriterKey::from(socket_addr);
let peer_addr = SocketAddr::from_str("1.2.3.4:5678").unwrap();
let local_addr = SocketAddr::from_str("127.0.0.1:0").unwrap();
let sw_key = StreamWriterKey::from(peer_addr);
subject.stream_writers.insert(sw_key.clone(), None);

subject.handle_remove_stream_msg(RemoveStreamMsg {
socket_addr,
peer_addr,
local_addr,
stream_type: RemovedStreamType::Clandestine,
sub,
});
Expand All @@ -1051,7 +1060,7 @@ mod tests {
assert_eq!(
record,
&StreamShutdownMsg {
peer_addr: socket_addr,
peer_addr,
stream_type: RemovedStreamType::Clandestine,
report_to_counterpart: false
}
Expand Down
Loading

0 comments on commit 69b1382

Please sign in to comment.