Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Sweep panickers from IO and network #3018

Merged
merged 2 commits into from
Oct 31, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions util/io/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,8 +399,8 @@ impl<Message> IoService<Message> where Message: Send + Sync + Clone + 'static {
let thread = thread::spawn(move || {
let p = panic.clone();
panic.catch_panic(move || {
IoManager::<Message>::start(p, &mut event_loop, h).unwrap();
}).unwrap()
IoManager::<Message>::start(p, &mut event_loop, h).expect("Error starting IO service");
}).expect("Error starting panic handler")
});
Ok(IoService {
panic_handler: panic_handler,
Expand Down Expand Up @@ -434,7 +434,11 @@ impl<Message> Drop for IoService<Message> where Message: Send + Sync + Clone {
fn drop(&mut self) {
trace!(target: "shutdown", "[IoService] Closing...");
self.host_channel.lock().send(IoMessage::Shutdown).unwrap_or_else(|e| warn!("Error on IO service shutdown: {:?}", e));
self.thread.take().unwrap().join().ok();
if let Some(thread) = self.thread.take() {
thread.join().unwrap_or_else(|e| {
debug!(target: "shutdown", "Error joining IO service event loop thread: {:?}", e);
});
}
trace!(target: "shutdown", "[IoService] Closed.");
}
}
Expand Down
12 changes: 6 additions & 6 deletions util/io/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>.

use std::sync::Arc;
use std::mem;
use std::thread::{JoinHandle, self};
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
use crossbeam::sync::chase_lev;
Expand Down Expand Up @@ -81,7 +80,7 @@ impl Worker {
LOCAL_STACK_SIZE.with(|val| val.set(STACK_SIZE));
panic_handler.catch_panic(move || {
Worker::work_loop(stealer, channel.clone(), wait, wait_mutex.clone(), deleting)
}).unwrap()
}).expect("Error starting panic handler")
})
.expect("Error creating worker thread"));
worker
Expand All @@ -94,7 +93,7 @@ impl Worker {
where Message: Send + Sync + Clone + 'static {
loop {
{
let lock = wait_mutex.lock().unwrap();
let lock = wait_mutex.lock().expect("Poisoned work_loop mutex");
if deleting.load(AtomicOrdering::Acquire) {
return;
}
Expand Down Expand Up @@ -134,11 +133,12 @@ impl Worker {
impl Drop for Worker {
fn drop(&mut self) {
trace!(target: "shutdown", "[IoWorker] Closing...");
let _ = self.wait_mutex.lock().unwrap();
let _ = self.wait_mutex.lock().expect("Poisoned work_loop mutex");
self.deleting.store(true, AtomicOrdering::Release);
self.wait.notify_all();
let thread = mem::replace(&mut self.thread, None).unwrap();
thread.join().ok();
if let Some(thread) = self.thread.take() {
thread.join().ok();
}
trace!(target: "shutdown", "[IoWorker] Closed");
}
}
10 changes: 5 additions & 5 deletions util/network/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,11 @@ impl<Socket: GenericSocket> GenericConnection<Socket> {

/// Writable IO handler. Called when the socket is ready to send.
pub fn writable<Message>(&mut self, io: &IoContext<Message>) -> Result<WriteStatus, NetworkError> where Message: Send + Clone + Sync + 'static {
if self.send_queue.is_empty() {
return Ok(WriteStatus::Complete)
}
{
let buf = self.send_queue.front_mut().unwrap();
let buf = match self.send_queue.front_mut() {
Some(buf) => buf,
None => return Ok(WriteStatus::Complete),
};
let send_size = buf.get_ref().len();
let pos = buf.position() as usize;
if (pos as usize) >= send_size {
Expand Down Expand Up @@ -439,7 +439,7 @@ impl EncryptedConnection {
let mut prev = H128::new();
mac.clone().finalize(&mut prev);
let mut enc = H128::new();
mac_encoder.encrypt(&mut RefReadBuffer::new(&prev), &mut RefWriteBuffer::new(&mut enc), true).unwrap();
mac_encoder.encrypt(&mut RefReadBuffer::new(&prev), &mut RefWriteBuffer::new(&mut enc), true).expect("Error updating MAC");
mac_encoder.reset();

enc = enc ^ if seed.is_empty() { prev } else { H128::from_slice(seed) };
Expand Down
14 changes: 7 additions & 7 deletions util/network/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ impl Discovery {
trace!(target: "discovery", "Inserting {:?}", &e);
let id_hash = e.id.sha3();
let ping = {
let mut bucket = self.node_buckets.get_mut(Discovery::distance(&self.id_hash, &id_hash) as usize).unwrap();
let mut bucket = &mut self.node_buckets[Discovery::distance(&self.id_hash, &id_hash) as usize];
let updated = if let Some(node) = bucket.nodes.iter_mut().find(|n| n.address.id == e.id) {
node.address = e.clone();
node.timeout = None;
Expand All @@ -169,8 +169,9 @@ impl Discovery {

if bucket.nodes.len() > BUCKET_SIZE {
//ping least active node
bucket.nodes.back_mut().unwrap().timeout = Some(time::precise_time_ns());
Some(bucket.nodes.back().unwrap().address.endpoint.clone())
let mut last = bucket.nodes.back_mut().expect("Last item is always present when len() > 0");
last.timeout = Some(time::precise_time_ns());
Some(last.address.endpoint.clone())
} else { None }
};
if let Some(endpoint) = ping {
Expand All @@ -179,7 +180,7 @@ impl Discovery {
}

fn clear_ping(&mut self, id: &NodeId) {
let mut bucket = self.node_buckets.get_mut(Discovery::distance(&self.id_hash, &id.sha3()) as usize).unwrap();
let mut bucket = &mut self.node_buckets[Discovery::distance(&self.id_hash, &id.sha3()) as usize];
if let Some(node) = bucket.nodes.iter_mut().find(|n| &n.address.id == id) {
node.timeout = None;
}
Expand Down Expand Up @@ -294,7 +295,7 @@ impl Discovery {
if count == BUCKET_SIZE {
// delete the most distant element
let remove = {
let (key, last) = found.iter_mut().next_back().unwrap();
let (key, last) = found.iter_mut().next_back().expect("Last element is always Some when count > 0");
last.pop();
if last.is_empty() { Some(key.clone()) } else { None }
};
Expand All @@ -316,8 +317,7 @@ impl Discovery {
}

pub fn writable<Message>(&mut self, io: &IoContext<Message>) where Message: Send + Sync + Clone {
while !self.send_queue.is_empty() {
let data = self.send_queue.pop_front().unwrap();
while let Some(data) = self.send_queue.pop_front() {
match self.udp_socket.send_to(&data.payload, &data.address) {
Ok(Some(size)) if size == data.payload.len() => {
},
Expand Down
56 changes: 31 additions & 25 deletions util/network/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.

use std::net::SocketAddr;
use std::net::{SocketAddr, SocketAddrV4, Ipv4Addr};
use std::collections::{HashMap, HashSet};
use std::str::FromStr;
use std::sync::Arc;
Expand Down Expand Up @@ -48,6 +48,8 @@ type Slab<T> = ::slab::Slab<T, usize>;
const MAX_SESSIONS: usize = 1024 + MAX_HANDSHAKES;
const MAX_HANDSHAKES: usize = 1024;

const DEFAULT_PORT: u16 = 30303;

// Tokens
const TCP_ACCEPT: usize = SYS_TIMER + 1;
const IDLE: usize = SYS_TIMER + 2;
Expand Down Expand Up @@ -135,14 +137,14 @@ impl NetworkConfiguration {
/// Create new default configuration with sepcified listen port.
pub fn new_with_port(port: u16) -> NetworkConfiguration {
let mut config = NetworkConfiguration::new();
config.listen_address = Some(SocketAddr::from_str(&format!("0.0.0.0:{}", port)).unwrap());
config.listen_address = Some(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), port)));
config
}

/// Create new default configuration for localhost-only connection with random port (usefull for testing)
pub fn new_local() -> NetworkConfiguration {
let mut config = NetworkConfiguration::new();
config.listen_address = Some(SocketAddr::from_str("127.0.0.1:0").unwrap());
config.listen_address = Some(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 0)));
config.nat_enabled = false;
config
}
Expand Down Expand Up @@ -259,7 +261,7 @@ impl<'s> NetworkContext<'s> {
/// Respond to a current network message. Panics if no there is no packet in the context. If the session is expired returns nothing.
pub fn respond(&self, packet_id: PacketId, data: Vec<u8>) -> Result<(), NetworkError> {
assert!(self.session.is_some(), "Respond called without network context");
self.send(self.session_id.unwrap(), packet_id, data)
self.session_id.map_or_else(|| Err(NetworkError::Expired), |id| self.send(id, packet_id, data))
}

/// Get an IoChannel.
Expand Down Expand Up @@ -382,16 +384,16 @@ impl Host {
trace!(target: "host", "Creating new Host object");

let mut listen_address = match config.listen_address {
None => SocketAddr::from_str("0.0.0.0:30304").unwrap(),
None => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), DEFAULT_PORT)),
Some(addr) => addr,
};

let keys = if let Some(ref secret) = config.use_secret {
KeyPair::from_secret(secret.clone()).unwrap()
try!(KeyPair::from_secret(secret.clone()))
} else {
config.config_path.clone().and_then(|ref p| load_key(Path::new(&p)))
.map_or_else(|| {
let key = Random.generate().unwrap();
let key = Random.generate().expect("Error generating random key pair");
if let Some(path) = config.config_path.clone() {
save_key(Path::new(&path), key.secret());
}
Expand Down Expand Up @@ -488,7 +490,7 @@ impl Host {
let mut s = e.lock();
{
let id = s.id();
if id.is_some() && reserved.contains(id.unwrap()) {
if id.map_or(false, |id| reserved.contains(id)) {
continue;
}
}
Expand Down Expand Up @@ -814,11 +816,12 @@ impl Host {
(info.config.min_peers as usize, max_peers as usize, info.config.non_reserved_mode == NonReservedPeerMode::Deny)
};

// Check for the session limit. session_counts accounts for the new session.
if reserved_only ||
(s.info.originated && session_count >= min_peers) ||
(!s.info.originated && session_count >= max_peers) {
(s.info.originated && session_count > min_peers) ||
(!s.info.originated && session_count > max_peers) {
// only proceed if the connecting peer is reserved.
if !self.reserved_nodes.read().contains(s.id().unwrap()) {
if !self.reserved_nodes.read().contains(s.id().expect("Ready session always has id")) {
s.disconnect(io, DisconnectReason::TooManyPeers);
return;
}
Expand All @@ -827,7 +830,7 @@ impl Host {
// Add it to the node table
if !s.info.originated {
if let Ok(address) = s.remote_addr() {
let entry = NodeEntry { id: s.id().unwrap().clone(), endpoint: NodeEndpoint { address: address, udp_port: address.port() } };
let entry = NodeEntry { id: s.id().expect("Ready session always has id").clone(), endpoint: NodeEndpoint { address: address, udp_port: address.port() } };
self.nodes.write().add_node(Node::new(entry.id.clone(), entry.endpoint.clone()));
let mut discovery = self.discovery.lock();
if let Some(ref mut discovery) = *discovery {
Expand Down Expand Up @@ -861,15 +864,17 @@ impl Host {
}
let handlers = self.handlers.read();
for p in ready_data {
let h = handlers.get(&p).unwrap().clone();
self.stats.inc_sessions();
let reserved = self.reserved_nodes.read();
h.connected(&NetworkContext::new(io, p, session.clone(), self.sessions.clone(), &reserved), &token);
if let Some(h) = handlers.get(&p).clone() {
h.connected(&NetworkContext::new(io, p, session.clone(), self.sessions.clone(), &reserved), &token);
}
}
for (p, packet_id, data) in packet_data {
let h = handlers.get(&p).unwrap().clone();
let reserved = self.reserved_nodes.read();
h.read(&NetworkContext::new(io, p, session.clone(), self.sessions.clone(), &reserved), &token, packet_id, &data[1..]);
if let Some(h) = handlers.get(&p).clone() {
h.read(&NetworkContext::new(io, p, session.clone(), self.sessions.clone(), &reserved), &token, packet_id, &data[1..]);
}
}
}

Expand Down Expand Up @@ -909,9 +914,10 @@ impl Host {
}
}
for p in to_disconnect {
let h = self.handlers.read().get(&p).unwrap().clone();
let reserved = self.reserved_nodes.read();
h.disconnected(&NetworkContext::new(io, p, expired_session.clone(), self.sessions.clone(), &reserved), &token);
if let Some(h) = self.handlers.read().get(&p).clone() {
h.disconnected(&NetworkContext::new(io, p, expired_session.clone(), self.sessions.clone(), &reserved), &token);
}
}
if deregister {
io.deregister_stream(token).unwrap_or_else(|e| debug!("Error deregistering stream: {:?}", e));
Expand Down Expand Up @@ -975,7 +981,7 @@ impl IoHandler<NetworkIoMessage> for Host {
match stream {
FIRST_SESSION ... LAST_SESSION => self.session_readable(stream, io),
DISCOVERY => {
let node_changes = { self.discovery.lock().as_mut().unwrap().readable(io) };
let node_changes = { self.discovery.lock().as_mut().map_or(None, |d| d.readable(io)) };
if let Some(node_changes) = node_changes {
self.update_nodes(io, node_changes);
}
Expand All @@ -992,7 +998,7 @@ impl IoHandler<NetworkIoMessage> for Host {
match stream {
FIRST_SESSION ... LAST_SESSION => self.session_writable(stream, io),
DISCOVERY => {
self.discovery.lock().as_mut().unwrap().writable(io);
self.discovery.lock().as_mut().map(|d| d.writable(io));
}
_ => panic!("Received unknown writable token"),
}
Expand All @@ -1006,11 +1012,11 @@ impl IoHandler<NetworkIoMessage> for Host {
IDLE => self.maintain_network(io),
FIRST_SESSION ... LAST_SESSION => self.connection_timeout(token, io),
DISCOVERY_REFRESH => {
self.discovery.lock().as_mut().unwrap().refresh();
self.discovery.lock().as_mut().map(|d| d.refresh());
io.update_registration(DISCOVERY).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e));
},
DISCOVERY_ROUND => {
let node_changes = { self.discovery.lock().as_mut().unwrap().round() };
let node_changes = { self.discovery.lock().as_mut().map_or(None, |d| d.round()) };
if let Some(node_changes) = node_changes {
self.update_nodes(io, node_changes);
}
Expand Down Expand Up @@ -1102,7 +1108,7 @@ impl IoHandler<NetworkIoMessage> for Host {
session.lock().register_socket(reg, event_loop).expect("Error registering socket");
}
}
DISCOVERY => self.discovery.lock().as_ref().unwrap().register_socket(event_loop).expect("Error registering discovery socket"),
DISCOVERY => self.discovery.lock().as_ref().and_then(|d| d.register_socket(event_loop).ok()).expect("Error registering discovery socket"),
TCP_ACCEPT => event_loop.register(&*self.tcp_listener.lock(), Token(TCP_ACCEPT), Ready::all(), PollOpt::edge()).expect("Error registering stream"),
_ => warn!("Unexpected stream registration")
}
Expand Down Expand Up @@ -1130,7 +1136,7 @@ impl IoHandler<NetworkIoMessage> for Host {
connection.lock().update_socket(reg, event_loop).expect("Error updating socket");
}
}
DISCOVERY => self.discovery.lock().as_ref().unwrap().update_registration(event_loop).expect("Error reregistering discovery socket"),
DISCOVERY => self.discovery.lock().as_ref().and_then(|d| d.update_registration(event_loop).ok()).expect("Error reregistering discovery socket"),
TCP_ACCEPT => event_loop.reregister(&*self.tcp_listener.lock(), Token(TCP_ACCEPT), Ready::all(), PollOpt::edge()).expect("Error reregistering stream"),
_ => warn!("Unexpected stream update")
}
Expand Down Expand Up @@ -1200,7 +1206,7 @@ fn key_save_load() {

#[test]
fn host_client_url() {
let mut config = NetworkConfiguration::new();
let mut config = NetworkConfiguration::new_local();
let key = "6f7b0d801bc7b5ce7bbd930b84fd0369b3eb25d09be58d64ba811091046f3aa2".into();
config.use_secret = Some(key);
let host: Host = Host::new(config, Arc::new(NetworkStats::new())).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion util/network/src/node_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ impl NodeTable {
json.push_str("\"nodes\": [\n");
let node_ids = self.nodes(AllowIP::All);
for i in 0 .. node_ids.len() {
let node = self.nodes.get(&node_ids[i]).unwrap();
let node = self.nodes.get(&node_ids[i]).expect("self.nodes() only returns node IDs from self.nodes");
json.push_str(&format!("\t{{ \"url\": \"{}\", \"failures\": {} }}{}\n", node, node.failures, if i == node_ids.len() - 1 {""} else {","}))
}
json.push_str("]\n");
Expand Down
5 changes: 3 additions & 2 deletions util/network/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,8 +388,9 @@ impl Session {
Ok(SessionData::Continue)
},
PACKET_PONG => {
self.pong_time_ns = Some(time::precise_time_ns());
self.info.ping_ms = Some((self.pong_time_ns.unwrap() - self.ping_time_ns) / 1000_000);
let time = time::precise_time_ns();
self.pong_time_ns = Some(time);
self.info.ping_ms = Some((time - self.ping_time_ns) / 1000_000);
Ok(SessionData::Continue)
},
PACKET_GET_PEERS => Ok(SessionData::None), //TODO;
Expand Down