Skip to content

Commit

Permalink
[sync] Let new subscribers know about already connected peers (backwa…
Browse files Browse the repository at this point in the history
…rd-compatible) (paritytech#7344)

Revert paritytech#7011 and replace
it with a backward-compatible solution suitable for backporting to a
release branch.

### Review notes
It's easier to review this PR per commit: the first commit is just a
revert, so it's enough to review only the second one, which is almost a
one-liner.

---------

Co-authored-by: cmd[bot] <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
dmitry-markin and github-actions[bot] authored Jan 27, 2025
1 parent c95e49c commit ee30ec7
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 54 deletions.
14 changes: 14 additions & 0 deletions prdoc/pr_7344.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
title: '[sync] Let new subscribers know about already connected peers (backward-compatible)'
doc:
- audience: Node Dev
description: Revert https://github.com/paritytech/polkadot-sdk/pull/7011 and replace
it with a backward-compatible solution suitable for backporting to a release branch.
crates:
- name: sc-network-gossip
bump: patch
- name: sc-network-statement
bump: patch
- name: sc-network-sync
bump: patch
- name: sc-network-transactions
bump: patch
10 changes: 4 additions & 6 deletions substrate/client/network-gossip/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,12 +254,10 @@ impl<B: BlockT> Future for GossipEngine<B> {

match sync_event_stream {
Poll::Ready(Some(event)) => match event {
SyncEvent::InitialPeers(peer_ids) =>
this.network.add_set_reserved(peer_ids, this.protocol.clone()),
SyncEvent::PeerConnected(peer_id) =>
this.network.add_set_reserved(vec![peer_id], this.protocol.clone()),
SyncEvent::PeerDisconnected(peer_id) =>
this.network.remove_set_reserved(peer_id, this.protocol.clone()),
SyncEvent::PeerConnected(remote) =>
this.network.add_set_reserved(remote, this.protocol.clone()),
SyncEvent::PeerDisconnected(remote) =>
this.network.remove_set_reserved(remote, this.protocol.clone()),
},
// The sync event stream closed. Do the same for [`GossipValidator`].
Poll::Ready(None) => {
Expand Down
13 changes: 5 additions & 8 deletions substrate/client/network-gossip/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,15 @@ mod validator;

/// Abstraction over a network.
pub trait Network<B: BlockT>: NetworkPeers + NetworkEventStream {
fn add_set_reserved(&self, peer_ids: Vec<PeerId>, protocol: ProtocolName) {
let addrs = peer_ids
.into_iter()
.map(|peer_id| Multiaddr::empty().with(Protocol::P2p(peer_id.into())))
.collect();
let result = self.add_peers_to_reserved_set(protocol, addrs);
fn add_set_reserved(&self, who: PeerId, protocol: ProtocolName) {
let addr = Multiaddr::empty().with(Protocol::P2p(*who.as_ref()));
let result = self.add_peers_to_reserved_set(protocol, iter::once(addr).collect());
if let Err(err) = result {
log::error!(target: "gossip", "add_set_reserved failed: {}", err);
}
}
fn remove_set_reserved(&self, peer_id: PeerId, protocol: ProtocolName) {
let result = self.remove_peers_from_reserved_set(protocol, iter::once(peer_id).collect());
fn remove_set_reserved(&self, who: PeerId, protocol: ProtocolName) {
let result = self.remove_peers_from_reserved_set(protocol, iter::once(who).collect());
if let Err(err) = result {
log::error!(target: "gossip", "remove_set_reserved failed: {}", err);
}
Expand Down
23 changes: 6 additions & 17 deletions substrate/client/network/statement/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ use futures::{channel::oneshot, prelude::*, stream::FuturesUnordered, FutureExt}
use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64};
use sc_network::{
config::{NonReservedPeerMode, SetConfig},
error,
multiaddr::{Multiaddr, Protocol},
error, multiaddr,
peer_store::PeerStoreProvider,
service::{
traits::{NotificationEvent, NotificationService, ValidationResult},
Expand Down Expand Up @@ -297,19 +296,9 @@ where

fn handle_sync_event(&mut self, event: SyncEvent) {
match event {
SyncEvent::InitialPeers(peer_ids) => {
let addrs = peer_ids
.into_iter()
.map(|peer_id| Multiaddr::empty().with(Protocol::P2p(peer_id.into())))
.collect();
let result =
self.network.add_peers_to_reserved_set(self.protocol_name.clone(), addrs);
if let Err(err) = result {
log::error!(target: LOG_TARGET, "Add reserved peers failed: {}", err);
}
},
SyncEvent::PeerConnected(peer_id) => {
let addr = Multiaddr::empty().with(Protocol::P2p(peer_id.into()));
SyncEvent::PeerConnected(remote) => {
let addr = iter::once(multiaddr::Protocol::P2p(remote.into()))
.collect::<multiaddr::Multiaddr>();
let result = self.network.add_peers_to_reserved_set(
self.protocol_name.clone(),
iter::once(addr).collect(),
Expand All @@ -318,10 +307,10 @@ where
log::error!(target: LOG_TARGET, "Add reserved peer failed: {}", err);
}
},
SyncEvent::PeerDisconnected(peer_id) => {
SyncEvent::PeerDisconnected(remote) => {
let result = self.network.remove_peers_from_reserved_set(
self.protocol_name.clone(),
iter::once(peer_id).collect(),
iter::once(remote).collect(),
);
if let Err(err) = result {
log::error!(target: LOG_TARGET, "Failed to remove reserved peer: {err}");
Expand Down
6 changes: 4 additions & 2 deletions substrate/client/network/sync/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -657,8 +657,10 @@ where
self.strategy.set_sync_fork_request(peers, &hash, number);
},
ToServiceCommand::EventStream(tx) => {
let _ = tx
.unbounded_send(SyncEvent::InitialPeers(self.peers.keys().cloned().collect()));
// Let a new subscriber know about already connected peers.
for peer_id in self.peers.keys() {
let _ = tx.unbounded_send(SyncEvent::PeerConnected(*peer_id));
}
self.event_streams.push(tx);
},
ToServiceCommand::RequestJustification(hash, number) =>
Expand Down
4 changes: 0 additions & 4 deletions substrate/client/network/sync/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,6 @@ where

/// Syncing-related events that other protocols can subscribe to.
pub enum SyncEvent {
/// All connected peers that the syncing implementation is tracking.
/// Always sent as the first message to the stream.
InitialPeers(Vec<PeerId>),

/// Peer that the syncing implementation is tracking connected.
PeerConnected(PeerId),

Expand Down
23 changes: 6 additions & 17 deletions substrate/client/network/transactions/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ use log::{debug, trace, warn};
use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64};
use sc_network::{
config::{NonReservedPeerMode, ProtocolId, SetConfig},
error,
multiaddr::{Multiaddr, Protocol},
error, multiaddr,
peer_store::PeerStoreProvider,
service::{
traits::{NotificationEvent, NotificationService, ValidationResult},
Expand Down Expand Up @@ -378,19 +377,9 @@ where

fn handle_sync_event(&mut self, event: SyncEvent) {
match event {
SyncEvent::InitialPeers(peer_ids) => {
let addrs = peer_ids
.into_iter()
.map(|peer_id| Multiaddr::empty().with(Protocol::P2p(peer_id.into())))
.collect();
let result =
self.network.add_peers_to_reserved_set(self.protocol_name.clone(), addrs);
if let Err(err) = result {
log::error!(target: LOG_TARGET, "Add reserved peers failed: {}", err);
}
},
SyncEvent::PeerConnected(peer_id) => {
let addr = Multiaddr::empty().with(Protocol::P2p(peer_id.into()));
SyncEvent::PeerConnected(remote) => {
let addr = iter::once(multiaddr::Protocol::P2p(remote.into()))
.collect::<multiaddr::Multiaddr>();
let result = self.network.add_peers_to_reserved_set(
self.protocol_name.clone(),
iter::once(addr).collect(),
Expand All @@ -399,10 +388,10 @@ where
log::error!(target: LOG_TARGET, "Add reserved peer failed: {}", err);
}
},
SyncEvent::PeerDisconnected(peer_id) => {
SyncEvent::PeerDisconnected(remote) => {
let result = self.network.remove_peers_from_reserved_set(
self.protocol_name.clone(),
iter::once(peer_id).collect(),
iter::once(remote).collect(),
);
if let Err(err) = result {
log::error!(target: LOG_TARGET, "Remove reserved peer failed: {}", err);
Expand Down

0 comments on commit ee30ec7

Please sign in to comment.