Skip to content

Commit

Permalink
Store pending messages separately
Browse files Browse the repository at this point in the history
In case we are sending the same message to a lot of peers, this should
reduce the memory footprint as we are only storing a pointer to the
message until we actually hand it to the connection handler.
  • Loading branch information
thomaseizinger committed Dec 15, 2022
1 parent 416d78a commit 4fc8ce4
Showing 1 changed file with 51 additions and 40 deletions.
91 changes: 51 additions & 40 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::{
collections::HashSet,
collections::VecDeque,
collections::{BTreeSet, HashMap},
fmt,
fmt, iter,
net::IpAddr,
task::{Context, Poll},
time::Duration,
Expand Down Expand Up @@ -66,6 +66,7 @@ use crate::types::{
};
use crate::types::{GossipsubRpc, PeerConnections, PeerKind};
use crate::{rpc_proto, TopicScoreParams};
use std::sync::Arc;
use std::{cmp::Ordering::Equal, fmt::Debug};
use wasm_timer::Interval;

Expand Down Expand Up @@ -221,6 +222,9 @@ pub struct Gossipsub<
/// Events that need to be yielded to the outside when polling.
events: VecDeque<NetworkBehaviourAction<GossipsubEvent, GossipsubHandler>>,

/// Events that need to be yielded to the outside when polling.
pending_messages: VecDeque<(PeerId, Arc<rpc_proto::Rpc>)>,

/// Pools non-urgent control messages between heartbeats.
control_pool: HashMap<PeerId, Vec<GossipsubControlAction>>,

Expand Down Expand Up @@ -422,6 +426,7 @@ where
Ok(Gossipsub {
metrics: metrics.map(|(registry, cfg)| Metrics::new(registry, cfg)),
events: VecDeque::new(),
pending_messages: Default::default(),
control_pool: HashMap::new(),
publish_config: privacy.into(),
duplicate_cache: DuplicateCache::new(config.duplicate_cache_time()),
Expand Down Expand Up @@ -530,11 +535,10 @@ where
}
.into_protobuf();

for peer in peer_list {
debug!("Sending SUBSCRIBE to peer: {:?}", peer);
self.send_message(peer, event.clone())
.map_err(SubscriptionError::PublishError)?;
}
debug!("Sending SUBSCRIBE to peers: {:?}", &peer_list);

self.send_message(peer_list.iter(), event)
.map_err(SubscriptionError::PublishError)?;
}

// call JOIN(topic)
Expand Down Expand Up @@ -570,10 +574,8 @@ where
}
.into_protobuf();

for peer in peer_list {
debug!("Sending UNSUBSCRIBE to peer: {}", peer.to_string());
self.send_message(peer, event.clone())?;
}
debug!("Sending UNSUBSCRIBE to peers: {:?}", peer_list);
self.send_message(peer_list.iter(), event)?;
}

// call LEAVE(topic)
Expand Down Expand Up @@ -728,13 +730,12 @@ where

// Send to peers we know are subscribed to the topic.
let msg_bytes = event.encoded_len();
for peer_id in recipient_peers.iter() {
trace!("Sending message to peer: {:?}", peer_id);
self.send_message(*peer_id, event.clone())?;

if let Some(m) = self.metrics.as_mut() {
m.msg_sent(&topic_hash, msg_bytes);
}
trace!("Sending message to peers: {:?}", recipient_peers);
self.send_message(recipient_peers.iter(), event)?;

if let Some(m) = self.metrics.as_mut() {
m.msg_sent(&topic_hash, msg_bytes * recipient_peers.len());
}

debug!("Published message: {:?}", &msg_id);
Expand Down Expand Up @@ -1348,7 +1349,7 @@ where

let msg_bytes = message.encoded_len();

if self.send_message(*peer_id, message).is_err() {
if self.send_message(iter::once(peer_id), message).is_err() {
error!("Failed to send cached messages. Messages too large");
} else if let Some(m) = self.metrics.as_mut() {
// Sending of messages succeeded, register them on the internal metrics.
Expand Down Expand Up @@ -1511,7 +1512,7 @@ where
);

if let Err(e) = self.send_message(
*peer_id,
iter::once(peer_id),
GossipsubRpc {
subscriptions: Vec::new(),
messages: Vec::new(),
Expand Down Expand Up @@ -2058,7 +2059,7 @@ where
if !topics_to_graft.is_empty()
&& self
.send_message(
*propagation_source,
iter::once(propagation_source),
GossipsubRpc {
subscriptions: Vec::new(),
messages: Vec::new(),
Expand Down Expand Up @@ -2627,7 +2628,7 @@ where
// send the control messages
if self
.send_message(
peer,
iter::once(&peer),
GossipsubRpc {
subscriptions: Vec::new(),
messages: Vec::new(),
Expand Down Expand Up @@ -2667,7 +2668,7 @@ where

if self
.send_message(
*peer,
iter::once(peer),
GossipsubRpc {
subscriptions: Vec::new(),
messages: Vec::new(),
Expand Down Expand Up @@ -2743,13 +2744,17 @@ where
.into_protobuf();

let msg_bytes = event.encoded_len();
for peer in recipient_peers.iter() {
debug!("Sending message: {:?} to peer {:?}", msg_id, peer);
self.send_message(*peer, event.clone())?;
if let Some(m) = self.metrics.as_mut() {
m.msg_sent(&message.topic, msg_bytes);
}

debug!(
"Sending message: {:?} to peers {:?}",
msg_id, recipient_peers
);
self.send_message(recipient_peers.iter(), event)?;

if let Some(m) = self.metrics.as_mut() {
m.msg_sent(&message.topic, msg_bytes * recipient_peers.len());
}

debug!("Completed forwarding message");
Ok(true)
} else {
Expand Down Expand Up @@ -2864,7 +2869,7 @@ where
for (peer, controls) in self.control_pool.drain().collect::<Vec<_>>() {
if self
.send_message(
peer,
iter::once(&peer),
GossipsubRpc {
subscriptions: Vec::new(),
messages: Vec::new(),
Expand All @@ -2882,26 +2887,22 @@ where
self.pending_iwant_msgs.clear();
}

/// Send a GossipsubRpc message to a peer. This will wrap the message in an arc if it
/// is not already an arc.
fn send_message(
/// Send a GossipsubRpc message to a list of peers.
fn send_message<'p>(
&mut self,
peer_id: PeerId,
peers: impl Iterator<Item = &'p PeerId> + Clone,
message: rpc_proto::Rpc,
) -> Result<(), PublishError> {
// If the message is oversized, try and fragment it. If it cannot be fragmented, log an
// error and drop the message (all individual messages should be small enough to fit in the
// max_transmit_size)

let messages = self.fragment_message(message)?;
let messages = self.fragment_message(message)?.into_iter().map(Arc::new);

for message in messages {
self.events
.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id,
event: GossipsubHandlerIn::Message(message),
handler: NotifyHandler::Any,
})
for peer in peers.clone() {
self.pending_messages.push_back((*peer, message.clone()));
}
}
Ok(())
}
Expand Down Expand Up @@ -3092,7 +3093,7 @@ where
// send our subscriptions to the peer
if self
.send_message(
peer_id,
iter::once(&peer_id),
GossipsubRpc {
messages: Vec::new(),
subscriptions,
Expand Down Expand Up @@ -3448,6 +3449,16 @@ where
return Poll::Ready(event);
}

if let Some((peer, message)) = self.pending_messages.pop_front() {
return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
peer_id: peer,
handler: NotifyHandler::Any,
event: GossipsubHandlerIn::Message(
Arc::try_unwrap(message).unwrap_or_else(|m| (*m).clone()),
),
});
}

// update scores
if let Some((peer_score, _, interval, _)) = &mut self.peer_score {
while let Poll::Ready(Some(())) = interval.poll_next_unpin(cx) {
Expand Down

0 comments on commit 4fc8ce4

Please sign in to comment.