Skip to content

Commit

Permalink
rewrite graph struct to be 25 times faster
Browse files Browse the repository at this point in the history
  • Loading branch information
pmnoxx committed Feb 3, 2021
1 parent 4ac07bf commit d504147
Show file tree
Hide file tree
Showing 14 changed files with 1,151 additions and 56 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions chain/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ byteorder = "1.2"
lazy_static = "1.4"
tracing = "0.1.13"
strum = { version = "0.18", features = ["derive"] }
rustc-hash = "1.1.0"

borsh = "0.7.1"
cached = "0.12"
Expand Down
2 changes: 1 addition & 1 deletion chain/network/src/recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ impl MetricRecorder {
self
}

pub fn set_graph(&mut self, graph: &HashMap<PeerId, HashSet<PeerId>>) {
pub fn set_graph(&mut self, graph: HashMap<PeerId, HashSet<PeerId>>) {
self.graph.clear();
for (u, u_adj) in graph.iter() {
for v in u_adj {
Expand Down
199 changes: 145 additions & 54 deletions chain/network/src/routing.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::{hash_map::Entry, HashMap, HashSet};
use std::collections::{hash_map::Entry, HashMap, HashSet, VecDeque};
use std::ops::Sub;
use std::sync::Arc;
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -30,6 +30,8 @@ use crate::{
};
#[cfg(feature = "delay_detector")]
use delay_detector::DelayDetector;
use rustc_hash::{FxHashMap, FxHashSet};
use std::iter::FromIterator;

const ANNOUNCE_ACCOUNT_CACHE_SIZE: usize = 10_000;
const ROUTE_BACK_CACHE_SIZE: u64 = 100_000;
Expand Down Expand Up @@ -257,7 +259,7 @@ pub struct RoutingTable {
/// PeerId associated for every known account id.
account_peers: SizedCache<AccountId, AnnounceAccount>,
/// Active PeerId that are part of the shortest path to each PeerId.
pub peer_forwarding: HashMap<PeerId, HashSet<PeerId>>,
pub peer_forwarding: HashMap<PeerId, Vec<PeerId>>,
/// Store last update for known edges.
pub edges_info: HashMap<(PeerId, PeerId), Edge>,
/// Hash of messages that requires routing back to respective previous hop.
Expand Down Expand Up @@ -620,7 +622,17 @@ impl RoutingTable {
.into_iter()
.map(|announce_account| (announce_account.account_id, announce_account.peer_id))
.collect();
RoutingTableInfo { account_peers, peer_forwarding: self.peer_forwarding.clone() }
RoutingTableInfo {
account_peers,
peer_forwarding: self
.peer_forwarding
.iter()
.map(|(k, v)| {
let r: HashSet<PeerId> = HashSet::from_iter(v.iter().cloned());
(k.clone(), r)
})
.collect(),
}
}

fn try_save_edges(&mut self) {
Expand Down Expand Up @@ -739,8 +751,18 @@ impl RoutingTable {
}
}

pub fn get_raw_graph(&self) -> &HashMap<PeerId, HashSet<PeerId>> {
&self.raw_graph.adjacency
// This method is slow
pub fn get_raw_graph(&self) -> HashMap<PeerId, HashSet<PeerId>> {
let mut result = HashMap::with_capacity(self.raw_graph.adjacency.len());
for (key, neighbors) in self.raw_graph.adjacency.iter() {
let mut tmp: HashSet<PeerId> = HashSet::with_capacity(neighbors.len());
for node in neighbors.iter() {
tmp.insert(self.raw_graph.id2p.get(node).cloned().unwrap());
}
let key = self.raw_graph.id2p.get(&key).cloned().unwrap();
result.insert(key, tmp);
}
result
}
}

Expand All @@ -758,100 +780,169 @@ pub struct RoutingTableInfo {
#[derive(Clone)]
pub struct Graph {
pub source: PeerId,
adjacency: HashMap<PeerId, HashSet<PeerId>>,
p2id: HashMap<PeerId, usize>,
id2p: FxHashMap<usize, PeerId>,
adjacency: FxHashMap<usize, FxHashSet<usize>>,
total_active_edges: u64,
unused: FxHashSet<usize>,
}

impl Graph {
pub fn new(source: PeerId) -> Self {
Self { source, adjacency: HashMap::new(), total_active_edges: 0 }
let mut res = Self {
source: source.clone(),
p2id: HashMap::default(),
id2p: FxHashMap::default(),
adjacency: FxHashMap::default(),
total_active_edges: 0,
unused: FxHashSet::default(),
};
res.id2p.insert(0, source.clone());
res.p2id.insert(source, 0);

res
}

fn contains_edge(&mut self, peer0: &PeerId, peer1: &PeerId) -> bool {
if let Some(adj) = self.adjacency.get(&peer0) {
if adj.contains(&peer1) {
return true;
fn contains_edge(&self, peer0: &PeerId, peer1: &PeerId) -> bool {
if let Some(id0) = self.p2id.get(&peer0) {
if let Some(id1) = self.p2id.get(&peer1) {
if let Some(adj) = self.adjacency.get(id0) {
if adj.contains(&id1) {
return true;
}
}
}
}

false
}

fn add_directed_edge(&mut self, peer0: PeerId, peer1: PeerId) {
self.adjacency.entry(peer0).or_insert_with(HashSet::new).insert(peer1);
fn remove_if_unused(&mut self, id: usize) {
let entry = self.adjacency.entry(id).or_default();

if entry.len() == 0 && id != 0 {
let peer = self.id2p.get(&id).take().cloned().unwrap();
self.p2id.remove(&peer);
self.unused.insert(id);
self.adjacency.remove(&id);
}
}

fn get_id(&mut self, peer0: &PeerId) -> usize {
match self.p2id.entry(peer0.clone()) {
Entry::Occupied(occupied) => *occupied.get(),
Entry::Vacant(vacant) => {
let val =
if let Some(val) = self.unused.drain().next() { val } else { self.id2p.len() };

vacant.insert(val);
self.id2p.insert(val, peer0.clone());
val
}
}
}

fn add_edge_internal(&mut self, peer0: PeerId, peer1: PeerId) {
if peer0 != peer1 {
let id0 = self.get_id(&peer0);
let id1 = self.get_id(&peer1);

self.adjacency.entry(id0).or_default().insert(id1);
self.adjacency.entry(id1).or_default().insert(id0);
}
}

fn remove_directed_edge(&mut self, peer0: &PeerId, peer1: &PeerId) {
self.adjacency.get_mut(&peer0).unwrap().remove(&peer1);
fn remove_edge_internal(&mut self, peer0: &PeerId, peer1: &PeerId) {
if peer0 != peer1 {
let id0 = self.get_id(&peer0);
let id1 = self.get_id(&peer1);

self.adjacency.get_mut(&id0).unwrap().remove(&id1);
self.adjacency.get_mut(&id1).unwrap().remove(&id0);

self.remove_if_unused(id0);
self.remove_if_unused(id1);
}
}

pub fn add_edge(&mut self, peer0: PeerId, peer1: PeerId) {
if !self.contains_edge(&peer0, &peer1) {
self.add_directed_edge(peer0.clone(), peer1.clone());
self.add_directed_edge(peer1, peer0);
self.add_edge_internal(peer0, peer1);
self.total_active_edges += 1;
}
}

pub fn remove_edge(&mut self, peer0: &PeerId, peer1: &PeerId) {
if self.contains_edge(&peer0, &peer1) {
self.remove_directed_edge(&peer0, &peer1);
self.remove_directed_edge(&peer1, &peer0);
self.remove_edge_internal(&peer0, &peer1);
self.total_active_edges -= 1;
}
}

// TODO(MarX, #1363): This is too slow right now. (See benchmarks)
/// Compute for every node `u` on the graph (other than `source`) which are the neighbors of
/// `sources` which belong to the shortest path from `source` to `u`. Nodes that are
/// not connected to `source` will not appear in the result.
pub fn calculate_distance(&self) -> HashMap<PeerId, HashSet<PeerId>> {
let mut queue = vec![];
let mut distance = HashMap::new();
// TODO(MarX, #1363): Represent routes more efficiently at least while calculating distances
let mut routes: HashMap<PeerId, HashSet<PeerId>> = HashMap::new();

distance.insert(&self.source, 0);

// Add active connections
if let Some(neighbors) = self.adjacency.get(&self.source) {
for neighbor in neighbors {
queue.push(neighbor);
distance.insert(neighbor, 1);
routes.insert(neighbor.clone(), vec![neighbor.clone()].drain(..).collect());
pub fn calculate_distance(&self) -> HashMap<PeerId, Vec<PeerId>> {
let mut queue = VecDeque::new();

let nodes = self.unused.len() + self.id2p.len();
let mut distance: Vec<i32> = vec![-1; nodes];
let mut routes: Vec<u128> = vec![0; nodes];

let source_id = 0;
distance[source_id] = 0;

if let Some(neighbors) = self.adjacency.get(&source_id) {
for (id, neighbor) in neighbors.iter().enumerate().take(128) {
queue.push_back(*neighbor);
distance[*neighbor] = 1;
routes[*neighbor] = (1 as u128) << id;
}
}

let mut head = 0;

while head < queue.len() {
let cur_peer = queue[head];
let cur_distance = *distance.get(cur_peer).unwrap();
head += 1;
while let Some(cur_peer) = queue.pop_front() {
let cur_distance = distance[cur_peer];

if let Some(neighbors) = self.adjacency.get(&cur_peer) {
for neighbor in neighbors {
if let Entry::Vacant(entry) = distance.entry(neighbor) {
queue.push(entry.key());
entry.insert(cur_distance + 1);
routes.insert(neighbor.clone(), HashSet::new());
if distance[*neighbor] == -1 {
distance[*neighbor] = cur_distance + 1;
queue.push_back(*neighbor);
}

// If this edge belong to a shortest path, all paths to
// the closer nodes are also valid for the current node.
if *distance.get(neighbor).unwrap() == cur_distance + 1 {
let adding_routes = routes.get(cur_peer).unwrap().clone();
let target_routes = routes.get_mut(neighbor).unwrap();

for route in adding_routes {
target_routes.insert(route.clone());
}
if distance[*neighbor] == cur_distance + 1 {
routes[*neighbor] |= routes[cur_peer];
}
}
}
}

routes.into_iter().filter(|(_, hops)| !hops.is_empty()).collect()
self.compute_result(&mut routes)
}

fn compute_result(&self, routes: &mut Vec<u128>) -> HashMap<PeerId, Vec<PeerId>> {
let source_id = 0;
let mut result = HashMap::with_capacity(routes.len());

if let Some(neighbors) = self.adjacency.get(&source_id) {
let neighbors = Vec::from_iter(neighbors.into_iter());
for (key, cur_route) in routes.iter().enumerate() {
if key == source_id {
continue;
}
let mut peer_set: Vec<PeerId> = Vec::with_capacity(cur_route.count_ones() as usize);

for (id, neighbor) in neighbors.iter().enumerate().take(128) {
if (cur_route & ((1 as u128) << id)) != 0 {
peer_set.push(self.id2p.get(neighbor).unwrap().clone());
};
}
if peer_set.len() != 0 {
result.insert(self.id2p.get(&key).unwrap().clone(), peer_set);
}
}
}
result
}
}

Expand Down
2 changes: 1 addition & 1 deletion chain/network/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ pub fn random_epoch_id() -> EpochId {
}

pub fn expected_routing_tables(
current: HashMap<PeerId, HashSet<PeerId>>,
current: HashMap<PeerId, Vec<PeerId>>,
expected: Vec<(PeerId, Vec<PeerId>)>,
) -> bool {
if current.len() != expected.len() {
Expand Down
1 change: 1 addition & 0 deletions utils/deepsize/.cargo-ok
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ok
5 changes: 5 additions & 0 deletions utils/deepsize/.cargo_vcs_info.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"git": {
"sha1": "f5a2b192d7b15e108807bd69f89037083e08c0c5"
}
}
54 changes: 54 additions & 0 deletions utils/deepsize/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO
#
# When uploading crates to the registry Cargo will automatically
# "normalize" Cargo.toml files for maximal compatibility
# with all versions of Cargo and also rewrite `path` dependencies
# to registry (e.g., crates.io) dependencies
#
# If you believe there's an error in this file please file an
# issue against the rust-lang/cargo repository. If you're
# editing this file be aware that the upstream Cargo.toml
# will likely look very different (and much more reasonable)

[package]
edition = "2018"
name = "deepsize"
version = "0.1.2"
authors = ["Aeledfyr <aeledfyr@gmail.com>"]
include = ["Cargo.toml", "src/**/*.rs", "README.md", "LICENSE"]
description = "A crate for measuring the total size of object on the stack and heap"
readme = "README.md"
keywords = ["size", "stack", "heap"]
categories = ["development-tools::profiling", "memory-management"]
license = "MIT"
repository = "https://github.com/Aeledfyr/deepsize/"

[dependencies]

tokio = { version = "0.2", features = ["time", "sync"] }
tokio-util = { version = "0.2", features = ["codec"] }
prometheus = "0.8.0"
actix = "0.9"
log = "0.4"
linked-hash-map = "0.5.3"
cached = "0.12"
chrono = { version = "0.4.4", features = ["serde"] }
num-rational = "0.2.4"
ed25519-dalek = "1.0.1"
backtrace = "0.3"

[dependencies.deepsize_derive]
version = "0.1.1"
optional = true

[dependencies.slotmap]
version = "^0.3"
optional = true
[dev-dependencies.deepsize_derive]
version = "0.1.1"

[features]
default = ["std", "derive"]
derive = ["deepsize_derive"]
slotmap_support = ["slotmap"]
std = []
Loading

0 comments on commit d504147

Please sign in to comment.