Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Send a status message on block announce handshake #5726

Merged
merged 4 commits into from
Apr 22, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,31 @@ impl Default for ProtocolConfig {
}
}

/// Handshake sent when we open a block announces substream.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
struct BlockAnnouncesHandshake<B: BlockT> {
/// Roles of the node.
roles: Roles,
/// Best block number.
best_number: NumberFor<B>,
/// Best block hash.
best_hash: B::Hash,
/// Genesis block hash.
genesis_hash: B::Hash,
}

impl<B: BlockT> BlockAnnouncesHandshake<B> {
fn build(protocol_config: &ProtocolConfig, chain: &Arc<dyn Client<B>>) -> Self {
let info = chain.info();
BlockAnnouncesHandshake {
genesis_hash: info.genesis_hash,
roles: protocol_config.roles.into(),
best_number: info.best_number,
best_hash: info.best_hash,
}
}
}

/// Fallback mechanism to use to send a notification if no substream is open.
#[derive(Debug, Clone, PartialEq, Eq)]
enum Fallback {
Expand Down Expand Up @@ -369,7 +394,10 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
proto.extend(b"/block-announces/1");
proto
});
behaviour.register_notif_protocol(block_announces_protocol.clone(), Vec::new());
behaviour.register_notif_protocol(
block_announces_protocol.clone(),
BlockAnnouncesHandshake::build(&config, &chain).encode()
);
legacy_equiv_by_name.insert(block_announces_protocol.clone(), Fallback::BlockAnnounce);

let protocol = Protocol {
Expand Down Expand Up @@ -1325,6 +1353,10 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
pub fn on_block_imported(&mut self, header: &B::Header, is_best: bool) {
if is_best {
self.sync.update_chain_info(header);
self.behaviour.set_notif_protocol_handshake(
&self.block_announces_protocol,
BlockAnnouncesHandshake::build(&self.config, &self.context_data.chain).encode()
);
}
}

Expand Down
28 changes: 28 additions & 0 deletions client/network/src/protocol/generic_proto/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,34 @@ impl GenericProto {
self.notif_protocols.push((protocol_name.into(), handshake_msg.into()));
}

/// Modifies the handshake of the given notifications protocol.
///
/// Has no effect if the protocol is unknown.
pub fn set_notif_protocol_handshake(
&mut self,
protocol_name: &[u8],
handshake_message: impl Into<Vec<u8>>
) {
let handshake_message = handshake_message.into();
if let Some(protocol) = self.notif_protocols.iter_mut().find(|(name, _)| name == &protocol_name) {
protocol.1 = handshake_message.clone();
} else {
return;
}

// Send an event to all the peers we're connected to, updating the handshake message.
for (peer_id, _) in self.peers.iter().filter(|(_, state)| state.is_open()) {
self.events.push(NetworkBehaviourAction::NotifyHandler {
peer_id: peer_id.clone(),
handler: NotifyHandler::All,
event: NotifsHandlerIn::UpdateHandshake {
protocol_name: Cow::Owned(protocol_name.to_owned()),
handshake_message: handshake_message.clone(),
},
});
}
}

/// Returns the number of discovered nodes that we keep in memory.
pub fn num_discovered_peers(&self) -> usize {
self.peerset.num_discovered_peers()
Expand Down
24 changes: 24 additions & 0 deletions client/network/src/protocol/generic_proto/handler/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,18 @@ pub enum NotifsHandlerIn {
message: Vec<u8>,
},

/// Modifies the handshake message of a notifications protocol.
UpdateHandshake {
/// Name of the protocol for the message.
///
/// Must match one of the registered protocols.
protocol_name: Cow<'static, [u8]>,

/// The new handshake message to send if we open a substream or if the remote opens a
/// substream towards us.
handshake_message: Vec<u8>,
},

/// Sends a notifications message.
SendNotification {
/// Name of the protocol for the message.
Expand Down Expand Up @@ -363,6 +375,18 @@ impl ProtocolsHandler for NotifsHandler {
},
NotifsHandlerIn::SendLegacy { message } =>
self.legacy.inject_event(LegacyProtoHandlerIn::SendCustomMessage { message }),
NotifsHandlerIn::UpdateHandshake { protocol_name, handshake_message } => {
for (handler, current_handshake) in &mut self.in_handlers {
if handler.protocol_name() == &*protocol_name {
*current_handshake = handshake_message.clone();
}
}
for (handler, current_handshake) in &mut self.out_handlers {
if handler.protocol_name() == &*protocol_name {
*current_handshake = handshake_message.clone();
}
}
}
NotifsHandlerIn::SendNotification { message, encoded_fallback_message, protocol_name } => {
for (handler, _) in &mut self.out_handlers {
if handler.protocol_name() != &protocol_name[..] {
Expand Down