diff --git a/client/finality-grandpa/src/tests.rs b/client/finality-grandpa/src/tests.rs index d2905e4da4453..6e8def57f50ea 100644 --- a/client/finality-grandpa/src/tests.rs +++ b/client/finality-grandpa/src/tests.rs @@ -23,7 +23,7 @@ use assert_matches::assert_matches; use environment::HasVoted; use sc_network_test::{ Block, BlockImportAdapter, Hash, PassThroughVerifier, Peer, PeersClient, PeersFullClient, - TestClient, TestNetFactory, + TestClient, TestNetFactory, FullPeerConfig, }; use sc_network::config::{ProtocolConfig, BoxFinalityProofRequestBuilder}; use parking_lot::Mutex; @@ -94,6 +94,15 @@ impl TestNetFactory for GrandpaTestNet { ProtocolConfig::default() } + fn add_full_peer(&mut self) { + self.add_full_peer_with_config(FullPeerConfig { + notifications_protocols: vec![ + (communication::GRANDPA_ENGINE_ID, communication::GRANDPA_PROTOCOL_NAME.into()) + ], + ..Default::default() + }) + } + fn make_verifier( &self, _client: PeersClient, diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index dac52bc314af5..a585f91145ede 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -39,13 +39,13 @@ use sp_consensus::{ use codec::{Decode, Encode}; use sp_runtime::{generic::BlockId, ConsensusEngineId, Justification}; use sp_runtime::traits::{ - Block as BlockT, Header as HeaderT, NumberFor, One, Zero, CheckedSub + Block as BlockT, Header as HeaderT, NumberFor, Zero, CheckedSub }; use sp_arithmetic::traits::SaturatedConversion; use message::{BlockAnnounce, Message}; use message::generic::{Message as GenericMessage, Roles}; use prometheus_endpoint::{ - Registry, Gauge, Counter, CounterVec, GaugeVec, + Registry, Gauge, Counter, GaugeVec, PrometheusError, Opts, register, U64 }; use sync::{ChainSync, SyncState}; @@ -53,7 +53,7 @@ use std::borrow::Cow; use std::collections::{HashMap, HashSet, VecDeque, hash_map::Entry}; use std::sync::Arc; use std::fmt::Write; -use std::{cmp, io, num::NonZeroUsize, pin::Pin, task::Poll, time}; +use std::{io, num::NonZeroUsize, pin::Pin, task::Poll, time}; use log::{log, Level, trace, debug, warn, error}; use wasm_timer::Instant; @@ -86,11 +86,6 @@ pub(crate) const CURRENT_VERSION: u32 = 6; /// Lowest version we support pub(crate) const MIN_VERSION: u32 = 3; -// Maximum allowed entries in `BlockResponse` -const MAX_BLOCK_DATA_RESPONSE: u32 = 128; -// Maximum total bytes allowed for block bodies in `BlockResponse` -const MAX_BODIES_BYTES: usize = 8 * 1024 * 1024; - /// When light node connects to the full node and the full node is behind light node /// for at least `LIGHT_MAXIMAL_BLOCKS_DIFFERENCE` blocks, we consider it not useful /// and disconnect to free connection slot. @@ -119,8 +114,6 @@ mod rep { pub const UNEXPECTED_RESPONSE: Rep = Rep::new_fatal("Unexpected response packet"); /// We received an unexpected transaction packet. pub const UNEXPECTED_TRANSACTIONS: Rep = Rep::new_fatal("Unexpected transactions packet"); - /// We received an unexpected light node request. - pub const UNEXPECTED_REQUEST: Rep = Rep::new_fatal("Unexpected block request packet"); /// Peer has different genesis. pub const GENESIS_MISMATCH: Rep = Rep::new_fatal("Genesis mismatch"); /// Peer is on unsupported protocol version. @@ -139,7 +132,6 @@ struct Metrics { finality_proofs: GaugeVec, justifications: GaugeVec, propagated_transactions: Counter, - legacy_requests_received: CounterVec, } impl Metrics { @@ -185,13 +177,6 @@ impl Metrics { "sync_propagated_transactions", "Number of transactions propagated to at least one peer", )?, r)?, - legacy_requests_received: register(CounterVec::new( - Opts::new( - "sync_legacy_requests_received", - "Number of block/finality/light-client requests received on the legacy substream", - ), - &["kind"] - )?, r)?, }) } } @@ -604,12 +589,6 @@ impl Protocol { match message { GenericMessage::Status(_) => debug!(target: "sub-libp2p", "Received unexpected Status"), - GenericMessage::BlockRequest(r) => self.on_block_request(who, r), - GenericMessage::BlockResponse(r) => { - let outcome = self.on_block_response(who.clone(), r); - self.update_peer_info(&who); - return outcome - }, GenericMessage::BlockAnnounce(announce) => { let outcome = self.on_block_announce(who.clone(), announce); self.update_peer_info(&who); @@ -617,6 +596,8 @@ impl Protocol { }, GenericMessage::Transactions(m) => self.on_transactions(who, m), + GenericMessage::BlockResponse(_) => + warn!(target: "sub-libp2p", "Received unexpected BlockResponse"), GenericMessage::RemoteCallResponse(_) => warn!(target: "sub-libp2p", "Received unexpected RemoteCallResponse"), GenericMessage::RemoteReadResponse(_) => @@ -627,6 +608,7 @@ impl Protocol { warn!(target: "sub-libp2p", "Received unexpected RemoteChangesResponse"), GenericMessage::FinalityProofResponse(_) => warn!(target: "sub-libp2p", "Received unexpected FinalityProofResponse"), + GenericMessage::BlockRequest(_) | GenericMessage::FinalityProofRequest(_) | GenericMessage::RemoteReadChildRequest(_) | GenericMessage::RemoteCallRequest(_) | @@ -678,21 +660,6 @@ impl Protocol { CustomMessageOutcome::None } - fn send_message( - &mut self, - who: &PeerId, - message: Option<(Cow<'static, str>, Vec)>, - legacy: Message, - ) { - send_message::( - &mut self.behaviour, - &mut self.context_data.stats, - who, - message, - legacy, - ); - } - fn update_peer_request(&mut self, who: &PeerId, request: &mut message::BlockRequest) { update_peer_request::(&mut self.context_data.peers, who, request) } @@ -718,92 +685,6 @@ impl Protocol { } } - fn on_block_request(&mut self, peer: PeerId, request: message::BlockRequest) { - if let Some(metrics) = &self.metrics { - metrics.legacy_requests_received.with_label_values(&["block-request"]).inc(); - } - - trace!(target: "sync", "BlockRequest {} from {}: from {:?} to {:?} max {:?} for {:?}", - request.id, - peer, - request.from, - request.to, - request.max, - request.fields, - ); - - // sending block requests to the node that is unable to serve it is considered a bad behavior - if !self.config.roles.is_full() { - trace!(target: "sync", "Peer {} is trying to sync from the light node", peer); - self.behaviour.disconnect_peer(&peer); - self.peerset_handle.report_peer(peer, rep::UNEXPECTED_REQUEST); - return; - } - - let mut blocks = Vec::new(); - let mut id = match request.from { - message::FromBlock::Hash(h) => BlockId::Hash(h), - message::FromBlock::Number(n) => BlockId::Number(n), - }; - let max = cmp::min(request.max.unwrap_or(u32::max_value()), MAX_BLOCK_DATA_RESPONSE) as usize; - let get_header = request.fields.contains(message::BlockAttributes::HEADER); - let get_body = request.fields.contains(message::BlockAttributes::BODY); - let get_justification = request - .fields - .contains(message::BlockAttributes::JUSTIFICATION); - let mut total_size = 0; - while let Some(header) = self.context_data.chain.header(id).unwrap_or(None) { - if blocks.len() >= max || (blocks.len() >= 1 && total_size > MAX_BODIES_BYTES) { - break; - } - let number = *header.number(); - let hash = header.hash(); - let parent_hash = *header.parent_hash(); - let justification = if get_justification { - self.context_data.chain.justification(&BlockId::Hash(hash)).unwrap_or(None) - } else { - None - }; - let block_data = message::generic::BlockData { - hash, - header: if get_header { Some(header) } else { None }, - body: if get_body { - self.context_data - .chain - .block_body(&BlockId::Hash(hash)) - .unwrap_or(None) - } else { - None - }, - receipt: None, - message_queue: None, - justification, - }; - // Stop if we don't have requested block body - if get_body && block_data.body.is_none() { - trace!(target: "sync", "Missing data for block request."); - break; - } - total_size += block_data.body.as_ref().map_or(0, |b| b.len()); - blocks.push(block_data); - match request.direction { - message::Direction::Ascending => id = BlockId::Number(number + One::one()), - message::Direction::Descending => { - if number.is_zero() { - break; - } - id = BlockId::Hash(parent_hash) - } - } - } - let response = message::generic::BlockResponse { - id: request.id, - blocks, - }; - trace!(target: "sync", "Sending BlockResponse with {} blocks", response.blocks.len()); - self.send_message(&peer, None, GenericMessage::BlockResponse(response)) - } - /// Adjusts the reputation of a node. pub fn report_peer(&self, who: PeerId, reputation: sc_peerset::ReputationChange) { self.peerset_handle.report_peer(who, reputation) @@ -1207,14 +1088,11 @@ impl Protocol { .push(who.to_base58()); } trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), who); - let encoded = to_send.encode(); - send_message:: ( - &mut self.behaviour, - &mut self.context_data.stats, - &who, - Some((self.transactions_protocol.clone(), encoded)), - GenericMessage::Transactions(to_send) - ) + self.behaviour.write_notification( + who, + self.transactions_protocol.clone(), + to_send.encode() + ); } } @@ -1289,15 +1167,11 @@ impl Protocol { }, }; - let encoded = message.encode(); - - send_message:: ( - &mut self.behaviour, - &mut self.context_data.stats, - &who, - Some((self.block_announces_protocol.clone(), encoded)), - Message::::BlockAnnounce(message), - ) + self.behaviour.write_notification( + who, + self.block_announces_protocol.clone(), + message.encode() + ); } } } @@ -1605,24 +1479,6 @@ fn update_peer_request( } } -fn send_message( - behaviour: &mut GenericProto, - stats: &mut HashMap<&'static str, PacketStats>, - who: &PeerId, - message: Option<(Cow<'static, str>, Vec)>, - legacy_message: Message, -) { - let encoded = legacy_message.encode(); - let mut stats = stats.entry(legacy_message.id()).or_default(); - stats.bytes_out += encoded.len() as u64; - stats.count_out += 1; - if let Some((proto, msg)) = message { - behaviour.write_notification(who, proto, msg, encoded); - } else { - behaviour.send_packet(who, encoded); - } -} - impl NetworkBehaviour for Protocol { type ProtocolsHandler = ::ProtocolsHandler; type OutEvent = CustomMessageOutcome; diff --git a/client/network/src/protocol/generic_proto/behaviour.rs b/client/network/src/protocol/generic_proto/behaviour.rs index 56a5b3fb0ab2d..996a810605d13 100644 --- a/client/network/src/protocol/generic_proto/behaviour.rs +++ b/client/network/src/protocol/generic_proto/behaviour.rs @@ -553,7 +553,6 @@ impl GenericProto { target: &PeerId, protocol_name: Cow<'static, str>, message: impl Into>, - encoded_fallback_message: Vec, ) { let notifs_sink = match self.peers.get(target).and_then(|p| p.get_open()) { None => { @@ -574,33 +573,10 @@ impl GenericProto { trace!(target: "sub-libp2p", "Handler({:?}) <= Packet", target); notifs_sink.send_sync_notification( protocol_name, - encoded_fallback_message, message ); } - /// Sends a message to a peer. - /// - /// Has no effect if the custom protocol is not open with the given peer. - /// - /// Also note that even we have a valid open substream, it may in fact be already closed - /// without us knowing, in which case the packet will not be received. - pub fn send_packet(&mut self, target: &PeerId, message: Vec) { - let notifs_sink = match self.peers.get(target).and_then(|p| p.get_open()) { - None => { - debug!(target: "sub-libp2p", - "Tried to sent packet to {:?} without an open channel.", - target); - return - } - Some(sink) => sink - }; - - trace!(target: "sub-libp2p", "External API => Packet for {:?}", target); - trace!(target: "sub-libp2p", "Handler({:?}) <= Packet", target); - notifs_sink.send_legacy(message); - } - /// Returns the state of the peerset manager, for debugging purposes. pub fn peerset_debug_info(&mut self) -> serde_json::Value { self.peerset.debug_info() diff --git a/client/network/src/protocol/generic_proto/handler/group.rs b/client/network/src/protocol/generic_proto/handler/group.rs index 6804dd3c789da..acb241af2ad2d 100644 --- a/client/network/src/protocol/generic_proto/handler/group.rs +++ b/client/network/src/protocol/generic_proto/handler/group.rs @@ -262,16 +262,10 @@ struct NotificationsSinkInner { /// dedicated to the peer. #[derive(Debug)] enum NotificationsSinkMessage { - /// Message emitted by [`NotificationsSink::send_legacy`]. - Legacy { - message: Vec, - }, - /// Message emitted by [`NotificationsSink::reserve_notification`] and /// [`NotificationsSink::write_notification_now`]. Notification { protocol_name: Cow<'static, str>, - encoded_fallback_message: Vec, message: Vec, }, @@ -280,26 +274,6 @@ enum NotificationsSinkMessage { } impl NotificationsSink { - /// Sends a message to the peer using the legacy substream. - /// - /// If too many messages are already buffered, the message is silently discarded and the - /// connection to the peer will be closed shortly after. - /// - /// This method will be removed in a future version. - pub fn send_legacy<'a>(&'a self, message: impl Into>) { - let mut lock = self.inner.sync_channel.lock(); - let result = lock.try_send(NotificationsSinkMessage::Legacy { - message: message.into() - }); - - if result.is_err() { - // Cloning the `mpsc::Sender` guarantees the allocation of an extra spot in the - // buffer, and therefore that `try_send` will succeed. - let _result2 = lock.clone().try_send(NotificationsSinkMessage::ForceClose); - debug_assert!(_result2.map(|()| true).unwrap_or_else(|err| err.is_disconnected())); - } - } - /// Sends a notification to the peer. /// /// If too many messages are already buffered, the notification is silently discarded and the @@ -312,13 +286,11 @@ impl NotificationsSink { pub fn send_sync_notification<'a>( &'a self, protocol_name: Cow<'static, str>, - encoded_fallback_message: impl Into>, message: impl Into> ) { let mut lock = self.inner.sync_channel.lock(); let result = lock.try_send(NotificationsSinkMessage::Notification { - protocol_name: protocol_name, - encoded_fallback_message: encoded_fallback_message.into(), + protocol_name, message: message.into() }); @@ -364,12 +336,10 @@ impl<'a> Ready<'a> { /// Returns an error if the substream has been closed. pub fn send( mut self, - encoded_fallback_message: impl Into>, notification: impl Into> ) -> Result<(), ()> { self.lock.start_send(NotificationsSinkMessage::Notification { protocol_name: self.protocol_name, - encoded_fallback_message: encoded_fallback_message.into(), message: notification.into(), }).map_err(|_| ()) } @@ -602,26 +572,38 @@ impl ProtocolsHandler for NotifsHandler { }; match message { - NotificationsSinkMessage::Legacy { message } => { - self.legacy.inject_event(LegacyProtoHandlerIn::SendCustomMessage { - message - }); - } NotificationsSinkMessage::Notification { protocol_name, - encoded_fallback_message, message } => { + let mut found_any_with_name = false; + for (handler, _) in &mut self.out_handlers { - if *handler.protocol_name() == protocol_name && handler.is_open() { - handler.send_or_discard(message); - continue 'poll_notifs_sink; + if *handler.protocol_name() == protocol_name { + found_any_with_name = true; + if handler.is_open() { + handler.send_or_discard(message); + continue 'poll_notifs_sink; + } } } - self.legacy.inject_event(LegacyProtoHandlerIn::SendCustomMessage { - message: encoded_fallback_message, - }); + // This code can be reached via the following scenarios: + // + // - User tried to send a notification on a non-existing protocol. This + // most likely relates to https://github.com/paritytech/substrate/issues/6827 + // - User tried to send a notification to a peer we're not or no longer + // connected to. This happens in a normal scenario due to the racy nature + // of connections and disconnections, and is benign. + // + // We print a warning in the former condition. + if !found_any_with_name { + log::warn!( + target: "sub-libp2p", + "Tried to send a notification on non-registered protocol: {:?}", + protocol_name + ); + } } NotificationsSinkMessage::ForceClose => { return Poll::Ready(ProtocolsHandlerEvent::Close(NotifsHandlerError::SyncNotificationsClogged)); diff --git a/client/network/src/protocol/generic_proto/handler/legacy.rs b/client/network/src/protocol/generic_proto/handler/legacy.rs index 7d31ed323a43b..d98d864dfc6fa 100644 --- a/client/network/src/protocol/generic_proto/handler/legacy.rs +++ b/client/network/src/protocol/generic_proto/handler/legacy.rs @@ -204,12 +204,6 @@ pub enum LegacyProtoHandlerIn { /// The node should stop using custom protocols. Disable, - - /// Sends a message through a custom protocol substream. - SendCustomMessage { - /// The message to send. - message: Vec, - }, } /// Event that can be emitted by a `LegacyProtoHandler`. @@ -495,17 +489,6 @@ impl LegacyProtoHandler { ProtocolState::KillAsap => ProtocolState::KillAsap, }; } - - /// Sends a message to the remote. - fn send_message(&mut self, message: Vec) { - match self.state { - ProtocolState::Normal { ref mut substreams, .. } => - substreams[0].send_message(message), - - _ => debug!(target: "sub-libp2p", "Tried to send message over closed protocol \ - with {:?}", self.remote_peer_id) - } - } } impl ProtocolsHandler for LegacyProtoHandler { @@ -539,12 +522,9 @@ impl ProtocolsHandler for LegacyProtoHandler { match message { LegacyProtoHandlerIn::Disable => self.disable(), LegacyProtoHandlerIn::Enable => self.enable(), - LegacyProtoHandlerIn::SendCustomMessage { message } => - self.send_message(message), } } - #[inline] fn inject_dial_upgrade_error(&mut self, _: (), err: ProtocolsHandlerUpgrErr) { let is_severe = match err { ProtocolsHandlerUpgrErr::Upgrade(_) => true, diff --git a/client/network/src/protocol/generic_proto/tests.rs b/client/network/src/protocol/generic_proto/tests.rs index 15c4a17df8d3a..dbe02c350100f 100644 --- a/client/network/src/protocol/generic_proto/tests.rs +++ b/client/network/src/protocol/generic_proto/tests.rs @@ -16,19 +16,16 @@ #![cfg(test)] -use futures::{prelude::*, ready}; -use codec::{Encode, Decode}; -use libp2p::core::connection::{ConnectionId, ListenerId}; -use libp2p::core::ConnectedPoint; -use libp2p::swarm::{Swarm, ProtocolsHandler, IntoProtocolsHandler}; -use libp2p::swarm::{PollParameters, NetworkBehaviour, NetworkBehaviourAction}; +use crate::protocol::generic_proto::{GenericProto, GenericProtoOut}; + +use futures::prelude::*; use libp2p::{PeerId, Multiaddr, Transport}; -use rand::seq::SliceRandom; +use libp2p::core::{connection::{ConnectionId, ListenerId}, ConnectedPoint}; +use libp2p::swarm::{ + Swarm, ProtocolsHandler, IntoProtocolsHandler, PollParameters, + NetworkBehaviour, NetworkBehaviourAction +}; use std::{error, io, task::Context, task::Poll, time::Duration}; -use std::collections::HashSet; -use crate::protocol::message::{generic::BlockResponse, Message}; -use crate::protocol::generic_proto::{GenericProto, GenericProtoOut}; -use sp_test_primitives::Block; /// Builds two nodes that have each other as bootstrap nodes. /// This is to be used only for testing, and a panic will happen if something goes wrong. @@ -216,137 +213,6 @@ impl NetworkBehaviour for CustomProtoWithAddr { } } -#[test] -fn two_nodes_transfer_lots_of_packets() { - // We spawn two nodes, then make the first one send lots of packets to the second one. The test - // ends when the second one has received all of them. - - // This test consists in transferring this given number of packets. Considering that (by - // design) the connection gets closed if one of the remotes can't follow the pace, this number - // should not exceed the size of the buffer of pending notifications. - const NUM_PACKETS: u32 = 512; - - let (mut service1, mut service2) = build_nodes(); - - let fut1 = future::poll_fn(move |cx| -> Poll<()> { - loop { - match ready!(service1.poll_next_unpin(cx)) { - Some(GenericProtoOut::CustomProtocolOpen { peer_id, .. }) => { - for n in 0 .. NUM_PACKETS { - service1.send_packet( - &peer_id, - Message::::BlockResponse(BlockResponse { - id: n as _, - blocks: Vec::new(), - }).encode() - ); - } - }, - // An empty handshake is being sent after opening. - Some(GenericProtoOut::LegacyMessage { message, .. }) if message.is_empty() => {}, - _ => panic!(), - } - } - }); - - let mut packet_counter = 0u32; - let fut2 = future::poll_fn(move |cx| { - loop { - match ready!(service2.poll_next_unpin(cx)) { - Some(GenericProtoOut::CustomProtocolOpen { .. }) => {}, - // An empty handshake is being sent after opening. - Some(GenericProtoOut::LegacyMessage { message, .. }) if message.is_empty() => {}, - Some(GenericProtoOut::LegacyMessage { message, .. }) => { - match Message::::decode(&mut &message[..]).unwrap() { - Message::::BlockResponse(BlockResponse { id: _, blocks }) => { - assert!(blocks.is_empty()); - packet_counter += 1; - if packet_counter == NUM_PACKETS { - return Poll::Ready(()) - } - }, - _ => panic!(), - } - } - _ => panic!(), - } - } - }); - - futures::executor::block_on(async move { - future::select(fut1, fut2).await; - }); -} - -#[test] -fn basic_two_nodes_requests_in_parallel() { - let (mut service1, mut service2) = build_nodes(); - - // Generate random messages with or without a request id. - let mut to_send = { - let mut to_send = Vec::new(); - let mut existing_ids = HashSet::new(); - for _ in 0..200 { // Note: don't make that number too high or the CPU usage will explode. - let req_id = loop { - let req_id = rand::random::(); - - // ensure uniqueness - odds of randomly sampling collisions - // is unlikely, but possible to cause spurious test failures. - if existing_ids.insert(req_id) { - break req_id; - } - }; - - to_send.push(Message::::BlockResponse( - BlockResponse { id: req_id, blocks: Vec::new() } - )); - } - to_send - }; - - // Clone `to_send` in `to_receive`. Below we will remove from `to_receive` the messages we - // receive, until the list is empty. - let mut to_receive = to_send.clone(); - to_send.shuffle(&mut rand::thread_rng()); - - let fut1 = future::poll_fn(move |cx| -> Poll<()> { - loop { - match ready!(service1.poll_next_unpin(cx)) { - Some(GenericProtoOut::CustomProtocolOpen { peer_id, .. }) => { - for msg in to_send.drain(..) { - service1.send_packet(&peer_id, msg.encode()); - } - }, - // An empty handshake is being sent after opening. - Some(GenericProtoOut::LegacyMessage { message, .. }) if message.is_empty() => {}, - _ => panic!(), - } - } - }); - - let fut2 = future::poll_fn(move |cx| { - loop { - match ready!(service2.poll_next_unpin(cx)) { - Some(GenericProtoOut::CustomProtocolOpen { .. }) => {}, - // An empty handshake is being sent after opening. - Some(GenericProtoOut::LegacyMessage { message, .. }) if message.is_empty() => {}, - Some(GenericProtoOut::LegacyMessage { message, .. }) => { - let pos = to_receive.iter().position(|m| m.encode() == message).unwrap(); - to_receive.remove(pos); - if to_receive.is_empty() { - return Poll::Ready(()) - } - } - _ => panic!(), - } - } - }); - - futures::executor::block_on(async move { - future::select(fut1, fut2).await; - }); -} - #[test] fn reconnect_after_disconnect() { // We connect two nodes together, then force a disconnect (through the API of the `Service`), diff --git a/client/network/src/protocol/generic_proto/upgrade/legacy.rs b/client/network/src/protocol/generic_proto/upgrade/legacy.rs index 0937a7798be98..1b2b97253d1ae 100644 --- a/client/network/src/protocol/generic_proto/upgrade/legacy.rs +++ b/client/network/src/protocol/generic_proto/upgrade/legacy.rs @@ -123,15 +123,6 @@ impl RegisteredProtocolSubstream { self.is_closing = true; self.send_queue.clear(); } - - /// Sends a message to the substream. - pub fn send_message(&mut self, data: Vec) { - if self.is_closing { - return - } - - self.send_queue.push_back(From::from(&data[..])); - } } /// Event produced by the `RegisteredProtocolSubstream`. diff --git a/client/network/src/service.rs b/client/network/src/service.rs index a3ac8371dc739..4fa37c64c75e2 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -635,18 +635,7 @@ impl NetworkService { // Determine the wire protocol name corresponding to this `engine_id`. let protocol_name = self.protocol_name_by_engine.lock().get(&engine_id).cloned(); if let Some(protocol_name) = protocol_name { - // For backwards-compatibility reason, we have to duplicate the message and pass it - // in the situation where the remote still uses the legacy substream. - let fallback = codec::Encode::encode(&{ - protocol::message::generic::Message::<(), (), (), ()>::Consensus({ - protocol::message::generic::ConsensusMessage { - engine_id, - data: message.clone(), - } - }) - }); - - sink.send_sync_notification(protocol_name, fallback, message); + sink.send_sync_notification(protocol_name, message); } else { return; } @@ -751,7 +740,6 @@ impl NetworkService { Ok(NotificationSender { sink, protocol_name, - engine_id, notification_size_metric: self.notifications_sizes_metric.as_ref().map(|histogram| { histogram.with_label_values(&["out", &maybe_utf8_bytes_to_string(&engine_id)]) }), @@ -1064,9 +1052,6 @@ pub struct NotificationSender { /// Name of the protocol on the wire. protocol_name: Cow<'static, str>, - /// Engine ID used for the fallback message. - engine_id: ConsensusEngineId, - /// Field extracted from the [`Metrics`] struct and necessary to report the /// notifications-related metrics. notification_size_metric: Option, @@ -1080,7 +1065,6 @@ impl NotificationSender { Ok(r) => r, Err(()) => return Err(NotificationSenderError::Closed), }, - engine_id: self.engine_id, notification_size_metric: self.notification_size_metric.clone(), }) } @@ -1091,9 +1075,6 @@ impl NotificationSender { pub struct NotificationSenderReady<'a> { ready: Ready<'a>, - /// Engine ID used for the fallback message. - engine_id: ConsensusEngineId, - /// Field extracted from the [`Metrics`] struct and necessary to report the /// notifications-related metrics. notification_size_metric: Option, @@ -1108,18 +1089,8 @@ impl<'a> NotificationSenderReady<'a> { notification_size_metric.observe(notification.len() as f64); } - // For backwards-compatibility reason, we have to duplicate the message and pass it - // in the situation where the remote still uses the legacy substream. - let fallback = codec::Encode::encode(&{ - protocol::message::generic::Message::<(), (), (), ()>::Consensus({ - protocol::message::generic::ConsensusMessage { - engine_id: self.engine_id, - data: notification.clone(), - } - }) - }); - - self.ready.send(fallback, notification) + self.ready + .send(notification) .map_err(|()| NotificationSenderError::Closed) } } diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index d269842386cdd..587feebe55c14 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -22,7 +22,10 @@ mod block_import; #[cfg(test)] mod sync; -use std::{collections::HashMap, pin::Pin, sync::Arc, marker::PhantomData, task::{Poll, Context as FutureContext}}; +use std::{ + borrow::Cow, collections::HashMap, pin::Pin, sync::Arc, marker::PhantomData, + task::{Poll, Context as FutureContext} +}; use libp2p::build_multiaddr; use log::trace; @@ -55,7 +58,7 @@ use sp_core::H256; use sc_network::config::ProtocolConfig; use sp_runtime::generic::{BlockId, OpaqueDigestItemId}; use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor}; -use sp_runtime::Justification; +use sp_runtime::{ConsensusEngineId, Justification}; use substrate_test_runtime_client::{self, AccountKeyring}; use sc_service::client::Client; pub use sc_network::config::EmptyTransactionPool; @@ -553,6 +556,8 @@ pub struct FullPeerConfig { pub keep_blocks: Option, /// Block announce validator. pub block_announce_validator: Option + Send + Sync>>, + /// List of notification protocols that the network must support. + pub notifications_protocols: Vec<(ConsensusEngineId, Cow<'static, str>)>, } pub trait TestNetFactory: Sized { @@ -663,6 +668,7 @@ pub trait TestNetFactory: Sized { network_config.transport = TransportConfig::MemoryOnly; network_config.listen_addresses = vec![listen_addr.clone()]; network_config.allow_non_globals_in_dht = true; + network_config.notifications_protocols = config.notifications_protocols; let network = NetworkWorker::new(sc_network::config::Params { role: Role::Full,