Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement gossipsub IDONTWANT #5422

Merged
merged 30 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
675bd55
move gossipsub into a separate crate
jxs Mar 11, 2024
30967b4
Merge branch 'unstable' of github.com:sigp/lighthouse into separate-g…
jxs Mar 20, 2024
a022ca6
update rpc.proto and generate rust bindings
jxs Mar 15, 2024
0384495
gossipsub: implement IDONTWANT messages
jxs Mar 17, 2024
ae0b6b2
address review
jxs Mar 18, 2024
6467769
move GossipPromises out of PeerScore
jxs Mar 20, 2024
4f7e639
impl PeerKind::is_gossipsub
jxs Mar 20, 2024
fb150e9
address review 2
jxs Mar 21, 2024
94c9569
Merge branch 'separate-gossipsub' of github.com:sigp/lighthouse into …
jxs Mar 21, 2024
8c490b5
Merge branch 'unstable' of github.com:sigp/lighthouse into impl-gossi…
jxs Mar 26, 2024
77f4666
add metrics
jxs Mar 27, 2024
db1bfa6
add tests
jxs Apr 9, 2024
34edff5
make 1.2 beta before spec is merged
jxs Apr 21, 2024
35afb28
Merge branch 'unstable' of github.com:sigp/lighthouse into impl-gossi…
jxs Apr 21, 2024
1a41120
cargo clippy
jxs Apr 21, 2024
5966019
Collect decoded IDONTWANT messages
ackintosh Apr 23, 2024
0cc18cc
Use the beta tag in most places to simplify the transition
AgeManning Apr 30, 2024
c4c3a4b
Fix failed test by using fresh message-ids
ackintosh Apr 30, 2024
5946982
Gossipsub v1.2-beta
ackintosh Apr 30, 2024
e570ebc
Merge latest unstable
AgeManning Apr 30, 2024
1130392
Cargo update
AgeManning Apr 30, 2024
ec56b69
Merge pull request #5 from ackintosh/impl-gossipsub-idontwant-ackinto…
jxs May 1, 2024
c5b5a95
Merge branch 'unstable' of github.com:sigp/lighthouse into impl-gossi…
jxs May 3, 2024
1abba86
update CHANGELOG.md
jxs May 3, 2024
686f14f
remove beta for 1.2 IDONTWANT spec has been merged
jxs May 15, 2024
aaa7e9e
Merge branch 'unstable' of github.com:sigp/lighthouse into impl-gossi…
jxs Jun 11, 2024
8c3bb60
Merge branch 'impl-gossipsub-idontwant' of github.com:jxs/lighthouse …
jxs Jun 11, 2024
630369b
Merge branch 'unstable' of github.com:sigp/lighthouse into impl-gossi…
jxs Jul 5, 2024
62069dd
improve comments wording
jxs Jul 5, 2024
053a272
Merge branch 'impl-gossipsub-idontwant' of github.com:jxs/lighthouse …
jxs Jul 5, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
gossipsub: implement IDONTWANT messages
  • Loading branch information
jxs committed Mar 20, 2024
commit 0384495245e1f8a6a9309136de788f70d6f8c502
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions beacon_node/lighthouse_network/gossipsub/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ tracing = "0.1.37"
void = "1.0.2"

prometheus-client = "0.22.0"
lru.workspace = true

[dev-dependencies]
quickcheck = { workspace = true }
Expand Down
76 changes: 76 additions & 0 deletions beacon_node/lighthouse_network/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ use std::{
collections::{BTreeSet, HashMap},
fmt,
net::IpAddr,
num::NonZeroUsize,
task::{Context, Poll},
time::Duration,
};

use futures::StreamExt;
use futures_ticker::Ticker;
use lru::LruCache;
use prometheus_client::registry::Registry;
use rand::{seq::SliceRandom, thread_rng};

Expand All @@ -45,6 +47,8 @@ use libp2p::swarm::{
THandlerOutEvent, ToSwarm,
};

use crate::types::IDontWant;

use super::gossip_promises::GossipPromises;
use super::handler::{Handler, HandlerEvent, HandlerIn};
use super::mcache::MessageCache;
Expand Down Expand Up @@ -74,6 +78,9 @@ use std::{cmp::Ordering::Equal, fmt::Debug};
#[cfg(test)]
mod tests;

/// IDONTWANT Cache capacity.
const IDONTWANT_CAP: usize = 100;

/// Determines if published messages should be signed or not.
///
/// Without signing, a number of privacy preserving modes can be selected.
Expand Down Expand Up @@ -1796,6 +1803,9 @@ where
// Calculate the message id on the transformed data.
let msg_id = self.config.message_id(&message);

// Broadcast IDONTWANT messages.
self.send_idontwant(&raw_message, &msg_id, propagation_source);

// Check the validity of the message
// Peers get penalized if this message is invalid. We don't add it to the duplicate cache
// and instead continually penalize peers that repeatedly send this message.
Expand Down Expand Up @@ -2656,6 +2666,54 @@ where
}
}

/// Helper function which sends an IDONTWANT message to mesh\[topic\] peers.
fn send_idontwant(
&mut self,
message: &RawMessage,
msg_id: &MessageId,
propagation_source: &PeerId,
) {
let Some(mesh_peers) = self.mesh.get(&message.topic) else {
return;
};

let recipient_peers = mesh_peers.iter().filter(|peer_id| {
*peer_id != propagation_source && Some(*peer_id) != message.source.as_ref()
});

for peer_id in recipient_peers {
let Some(peer) = self.connected_peers.get_mut(peer_id) else {
tracing::error!(peer = %peer_id,
"Could not IDONTWANT, peer doesn't exist in connected peer list");
continue;
};

// Only gossipsub 1.2 peers support IDONTWANT.
if peer.kind == PeerKind::Gossipsubv1_2 {
continue;
}

if peer
.sender
.idontwant(IDontWant {
message_ids: vec![msg_id.clone()],
})
.is_err()
{
tracing::warn!(peer=%peer_id, "Send Queue full. Could not send IDONTWANT");

if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.failed_message_slow_peer(peer_id);
}
// Increment failed message count
self.failed_messages
.entry(*peer_id)
.or_default()
.non_priority += 1;
}
}
}

/// Helper function which forwards a message to mesh\[topic\] peers.
///
/// Returns true if at least one peer was messaged.
Expand Down Expand Up @@ -2709,6 +2767,11 @@ where
if !recipient_peers.is_empty() {
for peer_id in recipient_peers.iter() {
if let Some(peer) = self.connected_peers.get_mut(peer_id) {
if peer.dont_send.get(msg_id).is_some() {
tracing::debug!(%peer_id, message=%msg_id, "Peer doesn't want message");
continue;
}

tracing::debug!(%peer_id, message=%msg_id, "Sending message to peer");
if peer
.sender
Expand Down Expand Up @@ -3058,6 +3121,7 @@ where
connections: vec![],
sender: RpcSender::new(self.config.connection_handler_queue_len()),
topics: Default::default(),
dont_send: LruCache::new(NonZeroUsize::new(IDONTWANT_CAP).unwrap()),
});
// Add the new connection
connected_peer.connections.push(connection_id);
Expand Down Expand Up @@ -3088,6 +3152,7 @@ where
connections: vec![],
sender: RpcSender::new(self.config.connection_handler_queue_len()),
topics: Default::default(),
dont_send: LruCache::new(NonZeroUsize::new(IDONTWANT_CAP).unwrap()),
});
// Add the new connection
connected_peer.connections.push(connection_id);
Expand Down Expand Up @@ -3246,6 +3311,17 @@ where
peers,
backoff,
}) => prune_msgs.push((topic_hash, peers, backoff)),
ControlAction::IDontWant(IDontWant { message_ids }) => {
let Some(peer) = self.connected_peers.get_mut(&propagation_source)
else {
tracing::error!(peer = %propagation_source,
"Could not handle IDONTWANT, peer doesn't exist in connected peer list");
continue;
};
for message_id in message_ids {
peer.dont_send.push(message_id, ());
}
}
}
}
if !ihave_msgs.is_empty() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ where
kind: kind.clone().unwrap_or(PeerKind::Floodsub),
connections: vec![connection_id],
topics: Default::default(),
dont_send: LruCache::new(NonZeroUsize::new(IDONTWANT_CAP).unwrap()),
sender,
},
);
Expand Down Expand Up @@ -624,6 +625,7 @@ fn test_join() {
kind: PeerKind::Floodsub,
connections: vec![connection_id],
topics: Default::default(),
dont_send: LruCache::new(NonZeroUsize::new(IDONTWANT_CAP).unwrap()),
sender,
},
);
Expand Down Expand Up @@ -1019,6 +1021,7 @@ fn test_get_random_peers() {
connections: vec![ConnectionId::new_unchecked(0)],
topics: topics.clone(),
sender: RpcSender::new(gs.config.connection_handler_queue_len()),
dont_send: LruCache::new(NonZeroUsize::new(IDONTWANT_CAP).unwrap()),
},
);
}
Expand Down
10 changes: 9 additions & 1 deletion beacon_node/lighthouse_network/gossipsub/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ use void::Void;

pub(crate) const SIGNING_PREFIX: &[u8] = b"libp2p-pubsub:";

pub(crate) const GOSSIPSUB_1_2_0_PROTOCOL: ProtocolId = ProtocolId {
protocol: StreamProtocol::new("/meshsub/1.2.0"),
kind: PeerKind::Gossipsubv1_1,
};
pub(crate) const GOSSIPSUB_1_1_0_PROTOCOL: ProtocolId = ProtocolId {
protocol: StreamProtocol::new("/meshsub/1.1.0"),
kind: PeerKind::Gossipsubv1_1,
Expand Down Expand Up @@ -69,7 +73,11 @@ impl Default for ProtocolConfig {
Self {
max_transmit_size: 65536,
validation_mode: ValidationMode::Strict,
protocol_ids: vec![GOSSIPSUB_1_1_0_PROTOCOL, GOSSIPSUB_1_0_0_PROTOCOL],
protocol_ids: vec![
GOSSIPSUB_1_2_0_PROTOCOL,
GOSSIPSUB_1_1_0_PROTOCOL,
GOSSIPSUB_1_0_0_PROTOCOL,
],
}
}
}
Expand Down
50 changes: 50 additions & 0 deletions beacon_node/lighthouse_network/gossipsub/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use futures_timer::Delay;
use instant::Duration;
use libp2p::identity::PeerId;
use libp2p::swarm::ConnectionId;
use lru::LruCache;
use prometheus_client::encoding::EncodeLabelValue;
use quick_protobuf::MessageWrite;
use std::collections::BTreeSet;
Expand Down Expand Up @@ -121,11 +122,15 @@ pub(crate) struct PeerConnections {
pub(crate) sender: RpcSender,
/// Subscribed topics.
pub(crate) topics: BTreeSet<TopicHash>,
/// Don't send messages.
pub(crate) dont_send: LruCache<MessageId, ()>,
}

/// Describes the types of peers that can exist in the gossipsub context.
#[derive(Debug, Clone, PartialEq, Hash, EncodeLabelValue, Eq)]
pub enum PeerKind {
/// A gossipsub 1.2 peer.
Gossipsubv1_2,
/// A gossipsub 1.1 peer.
Gossipsubv1_1,
/// A gossipsub 1.0 peer.
Expand Down Expand Up @@ -257,6 +262,8 @@ pub enum ControlAction {
Graft(Graft),
/// The node has been removed from the mesh - Prune control message.
Prune(Prune),
/// The node requests us to not forward message ids (peer_id + sequence _number) - IDontWant control message.
IDontWant(IDontWant),
}

/// Node broadcasts known messages per topic - IHave control message.
Expand Down Expand Up @@ -293,6 +300,13 @@ pub struct Prune {
pub(crate) backoff: Option<u64>,
}

/// The node requests us to not forward message ids (peer_id + sequence _number) - IDontWant control message.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct IDontWant {
/// A list of known message ids (peer_id + sequence _number) as a string.
pub(crate) message_ids: Vec<MessageId>,
}

/// A Gossipsub RPC message sent.
#[derive(Debug)]
pub enum RpcOut {
Expand All @@ -314,6 +328,8 @@ pub enum RpcOut {
IHave(IHave),
/// Send a IWant control message.
IWant(IWant),
/// Send a IDontWant control message.
IDontWant(IDontWant),
}

impl RpcOut {
Expand Down Expand Up @@ -374,6 +390,7 @@ impl From<RpcOut> for proto::RPC {
iwant: vec![],
graft: vec![],
prune: vec![],
idontwant: vec![],
}),
},
RpcOut::IWant(IWant { message_ids }) => proto::RPC {
Expand All @@ -386,6 +403,7 @@ impl From<RpcOut> for proto::RPC {
}],
graft: vec![],
prune: vec![],
idontwant: vec![],
}),
},
RpcOut::Graft(Graft { topic_hash }) => proto::RPC {
Expand All @@ -398,6 +416,7 @@ impl From<RpcOut> for proto::RPC {
topic_id: Some(topic_hash.into_string()),
}],
prune: vec![],
idontwant: vec![],
}),
},
RpcOut::Prune(Prune {
Expand All @@ -424,9 +443,23 @@ impl From<RpcOut> for proto::RPC {
.collect(),
backoff,
}],
idontwant: vec![],
}),
}
}
RpcOut::IDontWant(IDontWant { message_ids }) => proto::RPC {
publish: Vec::new(),
subscriptions: Vec::new(),
control: Some(proto::ControlMessage {
ihave: vec![],
iwant: vec![],
graft: vec![],
prune: vec![],
idontwant: vec![proto::ControlIDontWant {
message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
}],
}),
},
}
}
}
Expand Down Expand Up @@ -485,6 +518,7 @@ impl From<Rpc> for proto::RPC {
iwant: Vec::new(),
graft: Vec::new(),
prune: Vec::new(),
idontwant: Vec::new(),
};

let empty_control_msg = rpc.control_msgs.is_empty();
Expand Down Expand Up @@ -533,6 +567,12 @@ impl From<Rpc> for proto::RPC {
};
control.prune.push(rpc_prune);
}
ControlAction::IDontWant(IDontWant { message_ids }) => {
let rpc_iwant = proto::ControlIDontWant {
message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
};
control.idontwant.push(rpc_iwant);
}
}
}

Expand Down Expand Up @@ -571,6 +611,7 @@ impl PeerKind {
Self::Floodsub => "Floodsub",
Self::Gossipsub => "Gossipsub v1.0",
Self::Gossipsubv1_1 => "Gossipsub v1.1",
Self::Gossipsubv1_2 => "Gossipsub v1.2",
}
}
}
Expand Down Expand Up @@ -657,6 +698,15 @@ impl RpcSender {
.map_err(|err| err.into_inner())
}

/// Send a `RpcOut::IWant` message to the `RpcReceiver`
/// this is low priority, if the queue is full an Err is returned.
#[allow(clippy::result_large_err)]
pub(crate) fn idontwant(&mut self, idontwant: IDontWant) -> Result<(), RpcOut> {
self.non_priority_sender
.try_send(RpcOut::IDontWant(idontwant))
.map_err(|err| err.into_inner())
}

/// Send a `RpcOut::Subscribe` message to the `RpcReceiver`
/// this is high priority.
pub(crate) fn subscribe(&mut self, topic: TopicHash) {
Expand Down