From 4f3bfa6aa47eab363a5a02391df7a08503e82c3f Mon Sep 17 00:00:00 2001 From: tyshko5 <74514992+tyshko5@users.noreply.github.com> Date: Tue, 13 Dec 2022 11:08:51 +0100 Subject: [PATCH] Refactor oversized method handle_forest_behaviour_event (#2360) Split out the handle_forest_behaviour_event function, type aliases, renaming --- node/forest_libp2p/src/service.rs | 628 +++++++++++++++++------------- 1 file changed, 349 insertions(+), 279 deletions(-) diff --git a/node/forest_libp2p/src/service.rs b/node/forest_libp2p/src/service.rs index f49ddccef6e9..248e57fc8e23 100644 --- a/node/forest_libp2p/src/service.rs +++ b/node/forest_libp2p/src/service.rs @@ -58,6 +58,12 @@ pub const PUBSUB_MSG_STR: &str = "/fil/msgs"; const PUBSUB_TOPICS: [&str; 2] = [PUBSUB_BLOCK_STR, PUBSUB_MSG_STR]; +type HelloRequestTable = + HashMap>>; + +type CxRequestTable = + HashMap>>; + /// Events emitted by this Service. #[allow(clippy::large_enum_variant)] #[derive(Debug)] @@ -154,9 +160,9 @@ pub struct Libp2pService { swarm: Swarm>, cs: Arc>, network_receiver_in: flume::Receiver, - network_sender_in: flume::Sender, + network_sender_in: Sender, network_receiver_out: flume::Receiver, - network_sender_out: flume::Sender, + network_sender_out: Sender, network_name: String, genesis_cid: Cid, } @@ -292,7 +298,7 @@ where } /// Returns a sender which allows sending messages to the libp2p service. - pub fn network_sender(&self) -> flume::Sender { + pub fn network_sender(&self) -> Sender { self.network_sender_in.clone() } @@ -305,15 +311,9 @@ where async fn handle_network_message( swarm: &mut Swarm>, message: NetworkMessage, - network_sender_out: &flume::Sender, - hello_request_table: &mut HashMap< - RequestId, - futures::channel::oneshot::Sender>, - >, - cx_request_table: &mut HashMap< - RequestId, - futures::channel::oneshot::Sender>, - >, + network_sender_out: &Sender, + hello_request_table: &mut HelloRequestTable, + cx_request_table: &mut CxRequestTable, outgoing_bitswap_query_ids: &mut HashMap, ) { match message { @@ -410,271 +410,286 @@ async fn handle_network_message( } } -#[allow(clippy::too_many_arguments)] -async fn handle_forest_behaviour_event( +async fn handle_discovery_event( + discovery_out: DiscoveryOut, swarm: &mut Swarm>, - event: ForestBehaviourEvent

, - db: &Arc>, - genesis_cid: &Cid, - network_sender_out: &flume::Sender, - hello_request_table: &mut HashMap< - RequestId, - futures::channel::oneshot::Sender>, - >, - cx_request_table: &mut HashMap< - RequestId, - futures::channel::oneshot::Sender>, - >, - outgoing_bitswap_query_ids: &mut HashMap, - cx_response_tx: Sender<( - RequestId, - ResponseChannel, - ChainExchangeResponse, - )>, - pubsub_block_str: &str, - pubsub_msg_str: &str, -) where - DB: Blockstore + Store + BitswapStore + Clone + Sync + Send + 'static, -{ + network_sender_out: &Sender, +) { let behaviour = swarm.behaviour_mut(); - match event { - ForestBehaviourEvent::Discovery(discovery_out) => match discovery_out { - DiscoveryOut::Connected(peer_id, addresses) => { - debug!("Peer connected, {:?}", peer_id); - for addr in addresses { - behaviour.bitswap.add_address(&peer_id, addr); - } - emit_event(network_sender_out, NetworkEvent::PeerConnected(peer_id)).await; + match discovery_out { + DiscoveryOut::Connected(peer_id, addresses) => { + debug!("Peer connected, {:?}", peer_id); + for addr in addresses { + behaviour.bitswap.add_address(&peer_id, addr); } - DiscoveryOut::Disconnected(peer_id, addresses) => { - debug!("Peer disconnected, {:?}", peer_id); - for addr in addresses { - behaviour.bitswap.remove_address(&peer_id, &addr); - } - emit_event(network_sender_out, NetworkEvent::PeerDisconnected(peer_id)).await; + emit_event(network_sender_out, NetworkEvent::PeerConnected(peer_id)).await; + } + DiscoveryOut::Disconnected(peer_id, addresses) => { + debug!("Peer disconnected, {:?}", peer_id); + for addr in addresses { + behaviour.bitswap.remove_address(&peer_id, &addr); } - }, - ForestBehaviourEvent::Gossipsub(e) => { - if let GossipsubEvent::Message { - propagation_source: source, - message, - message_id: _, - } = e - { - let topic = message.topic.as_str(); - let message = message.data; - trace!("Got a Gossip Message from {:?}", source); - if topic == pubsub_block_str { - match from_slice::(&message) { - Ok(b) => { - emit_event( - network_sender_out, - NetworkEvent::PubsubMessage { - source, - message: PubsubMessage::Block(b), - }, - ) - .await; - } - Err(e) => { - warn!( - "Gossip Block from peer {source:?} could not be deserialized: {e}", - ); - } - } - } else if topic == pubsub_msg_str { - match from_slice::(&message) { - Ok(m) => { - emit_event( - network_sender_out, - NetworkEvent::PubsubMessage { - source, - message: PubsubMessage::Message(m), - }, - ) - .await; - } - Err(e) => { - warn!( - "Gossip Message from peer {source:?} could not be deserialized: {e}"); - } - } - } else { - warn!("Getting gossip messages from unknown topic: {topic}"); + emit_event(network_sender_out, NetworkEvent::PeerDisconnected(peer_id)).await; + } + } +} + +async fn handle_gossip_event( + e: GossipsubEvent, + network_sender_out: &Sender, + pubsub_block_str: &str, + pubsub_msg_str: &str, +) { + if let GossipsubEvent::Message { + propagation_source: source, + message, + message_id: _, + } = e + { + let topic = message.topic.as_str(); + let message = message.data; + trace!("Got a Gossip Message from {:?}", source); + if topic == pubsub_block_str { + match from_slice::(&message) { + Ok(b) => { + emit_event( + network_sender_out, + NetworkEvent::PubsubMessage { + source, + message: PubsubMessage::Block(b), + }, + ) + .await; + } + Err(e) => { + warn!("Gossip Block from peer {source:?} could not be deserialized: {e}",); } } - } - ForestBehaviourEvent::Hello(rr_event) => match rr_event { - RequestResponseEvent::Message { peer, message } => match message { - RequestResponseMessage::Request { - request, - channel, - request_id: _, - } => { + } else if topic == pubsub_msg_str { + match from_slice::(&message) { + Ok(m) => { emit_event( network_sender_out, - NetworkEvent::HelloRequestInbound { source: peer }, + NetworkEvent::PubsubMessage { + source, + message: PubsubMessage::Message(m), + }, ) .await; + } + Err(e) => { + warn!("Gossip Message from peer {source:?} could not be deserialized: {e}"); + } + } + } else { + warn!("Getting gossip messages from unknown topic: {topic}"); + } + } +} - let arrival = SystemTime::now() +async fn handle_hello_event( + rr_event: RequestResponseEvent, + swarm: &mut Swarm>, + genesis_cid: &Cid, + network_sender_out: &Sender, + hello_request_table: &mut HelloRequestTable, +) { + let behaviour = swarm.behaviour_mut(); + match rr_event { + RequestResponseEvent::Message { peer, message } => match message { + RequestResponseMessage::Request { + request, + channel, + request_id: _, + } => { + emit_event( + network_sender_out, + NetworkEvent::HelloRequestInbound { source: peer }, + ) + .await; + + let arrival = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("System time before unix epoch") + .as_nanos() + .try_into() + .expect("System time since unix epoch should not exceed u64"); + + trace!("Received hello request: {:?}", request); + if &request.genesis_cid != genesis_cid { + warn!( + "Genesis hash mismatch: {} received, {genesis_cid} expected. Banning {peer}", + request.genesis_cid + ); + swarm.ban_peer_id(peer); + } else { + let sent = SystemTime::now() .duration_since(UNIX_EPOCH) .expect("System time before unix epoch") .as_nanos() .try_into() .expect("System time since unix epoch should not exceed u64"); - trace!("Received hello request: {:?}", request); - if &request.genesis_cid != genesis_cid { - warn!( - "Genesis hash mismatch: {} received, {genesis_cid} expected. Banning {peer}", - request.genesis_cid - ); - swarm.ban_peer_id(peer); + // Send hello response immediately, no need to have the overhead of emitting + // channel and polling future here. + if let Err(e) = behaviour + .hello + .send_response(channel, HelloResponse { arrival, sent }) + { + warn!("Failed to send HelloResponse: {e:?}"); } else { - let sent = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("System time before unix epoch") - .as_nanos() - .try_into() - .expect("System time since unix epoch should not exceed u64"); - - // Send hello response immediately, no need to have the overhead of emitting - // channel and polling future here. - if let Err(e) = behaviour - .hello - .send_response(channel, HelloResponse { arrival, sent }) - { - warn!("Failed to send HelloResponse: {e:?}"); - } else { - emit_event( - network_sender_out, - NetworkEvent::HelloResponseOutbound { - source: peer, - request, - }, - ) - .await; - } + emit_event( + network_sender_out, + NetworkEvent::HelloResponseOutbound { + source: peer, + request, + }, + ) + .await; } } - RequestResponseMessage::Response { - request_id, - response, - } => { - // Send the sucessful response through channel out. - if let Some(tx) = hello_request_table.remove(&request_id) { - if tx.send(Ok(response)).is_err() { - warn!("RPCResponse receive timed out"); - } else { - emit_event( - network_sender_out, - NetworkEvent::HelloResponseInbound { request_id }, - ) - .await; - } - } else { - warn!("RPCResponse receive failed: channel not found"); - }; - } - }, - RequestResponseEvent::OutboundFailure { - peer, + } + RequestResponseMessage::Response { request_id, - error, + response, } => { - debug!( - "Hello outbound error (peer: {:?}) (id: {:?}): {:?}", - peer, request_id, error - ); - - // Send error through channel out. - let tx = hello_request_table.remove(&request_id); - if let Some(tx) = tx { - if tx.send(Err(error.into())).is_err() { - warn!("RPCResponse receive failed"); + // Send the sucessful response through channel out. + if let Some(tx) = hello_request_table.remove(&request_id) { + if tx.send(Ok(response)).is_err() { + warn!("RPCResponse receive timed out"); + } else { + emit_event( + network_sender_out, + NetworkEvent::HelloResponseInbound { request_id }, + ) + .await; } - } - } - RequestResponseEvent::InboundFailure { - peer, - error, - request_id: _, - } => { - debug!("Hello inbound error (peer: {:?}): {:?}", peer, error); + } else { + warn!("RPCResponse receive failed: channel not found"); + }; } - RequestResponseEvent::ResponseSent { .. } => (), }, - ForestBehaviourEvent::Bitswap(bs_event) => { - let get_prefix = |query_id: &libp2p_bitswap::QueryId| { - if outgoing_bitswap_query_ids.contains_key(query_id) { - "Outgoing" - } else { - "Inbound" - } - }; - match bs_event { - BitswapEvent::Progress(query_id, num_missing) => { - let prefix = get_prefix(&query_id); - debug!("{prefix} bitswap query {query_id} in progress, {num_missing} blocks pending"); + RequestResponseEvent::OutboundFailure { + peer, + request_id, + error, + } => { + debug!( + "Hello outbound error (peer: {:?}) (id: {:?}): {:?}", + peer, request_id, error + ); + + // Send error through channel out. + let tx = hello_request_table.remove(&request_id); + if let Some(tx) = tx { + if tx.send(Err(error.into())).is_err() { + warn!("RPCResponse receive failed"); } - BitswapEvent::Complete(query_id, result) => match result { - Ok(()) => { - let prefix = get_prefix(&query_id); - debug!("{prefix} bitswap query {query_id} completed successfully"); - if let Some(cid) = outgoing_bitswap_query_ids.remove(&query_id) { - emit_event( - network_sender_out, - NetworkEvent::BitswapResponseInbound { query_id, cid }, - ) - .await; - } - } - Err(err) => { - let prefix = get_prefix(&query_id); - let msg = format!( - "{prefix} bitswap query {query_id} completed with error: {err}" - ); - if outgoing_bitswap_query_ids.contains_key(&query_id) { - warn!("{msg}"); - } else { - debug!("{msg}"); - } - } - }, } } - ForestBehaviourEvent::Ping(ping_event) => match ping_event.result { - Ok(ping::Success::Ping { rtt }) => { - trace!( - "PingSuccess::Ping rtt to {} is {} ms", - ping_event.peer.to_base58(), - rtt.as_millis() - ); - } - Ok(ping::Success::Pong) => { - trace!("PingSuccess::Pong from {}", ping_event.peer.to_base58()); - } - Err(ping::Failure::Other { error }) => { - warn!( - "PingFailure::Other {}: {}", - ping_event.peer.to_base58(), - error - ); + RequestResponseEvent::InboundFailure { + peer, + error, + request_id: _, + } => { + debug!("Hello inbound error (peer: {:?}): {:?}", peer, error); + } + RequestResponseEvent::ResponseSent { .. } => (), + } +} + +async fn handle_bitswap_event( + bs_event: BitswapEvent, + network_sender_out: &Sender, + outgoing_bitswap_query_ids: &mut HashMap, +) { + let get_prefix = |query_id: &libp2p_bitswap::QueryId| { + if outgoing_bitswap_query_ids.contains_key(query_id) { + "Outgoing" + } else { + "Inbound" + } + }; + match bs_event { + BitswapEvent::Progress(query_id, num_missing) => { + let prefix = get_prefix(&query_id); + debug!("{prefix} bitswap query {query_id} in progress, {num_missing} blocks pending"); + } + BitswapEvent::Complete(query_id, result) => match result { + Ok(()) => { + let prefix = get_prefix(&query_id); + debug!("{prefix} bitswap query {query_id} completed successfully"); + if let Some(cid) = outgoing_bitswap_query_ids.remove(&query_id) { + emit_event( + network_sender_out, + NetworkEvent::BitswapResponseInbound { query_id, cid }, + ) + .await; + } } Err(err) => { - let err = err.to_string(); - let peer = ping_event.peer.to_base58(); - warn!("{err}: {peer}",); - if err.contains("protocol not supported") { - warn!("Banning peer {peer} due to protocol error"); - swarm.ban_peer_id(ping_event.peer); + let prefix = get_prefix(&query_id); + let msg = format!("{prefix} bitswap query {query_id} completed with error: {err}"); + if outgoing_bitswap_query_ids.contains_key(&query_id) { + warn!("{msg}"); + } else { + debug!("{msg}"); } } }, - ForestBehaviourEvent::Identify(_) => {} - ForestBehaviourEvent::ChainExchange(ce_event) => match ce_event { - RequestResponseEvent::Message { peer, message } => match message { + } +} + +fn handle_ping_event( + ping_event: ping::Event, + swarm: &mut Swarm>, +) { + match ping_event.result { + Ok(ping::Success::Ping { rtt }) => { + trace!( + "PingSuccess::Ping rtt to {} is {} ms", + ping_event.peer.to_base58(), + rtt.as_millis() + ); + } + Ok(ping::Success::Pong) => { + trace!("PingSuccess::Pong from {}", ping_event.peer.to_base58()); + } + Err(ping::Failure::Other { error }) => { + warn!( + "PingFailure::Other {}: {}", + ping_event.peer.to_base58(), + error + ); + } + Err(err) => { + let err = err.to_string(); + let peer = ping_event.peer.to_base58(); + warn!("{err}: {peer}",); + if err.contains("protocol not supported") { + warn!("Banning peer {peer} due to protocol error"); + swarm.ban_peer_id(ping_event.peer); + } + } + } +} + +async fn handle_chain_exchange_event( + ce_event: RequestResponseEvent, + db: &Arc>, + network_sender_out: &Sender, + cx_request_table: &mut CxRequestTable, + cx_response_tx: Sender<( + RequestId, + ResponseChannel, + ChainExchangeResponse, + )>, +) where + DB: Blockstore + Store + BitswapStore + Clone + Sync + Send + 'static, +{ + match ce_event { + RequestResponseEvent::Message { peer, message } => { + match message { RequestResponseMessage::Request { request, channel, @@ -716,48 +731,103 @@ async fn handle_forest_behaviour_event( warn!("RPCResponse receive failed: channel not found"); }; } - }, - RequestResponseEvent::OutboundFailure { - peer, - request_id, - error, - } => { - warn!( - "ChainExchange outbound error (peer: {:?}) (id: {:?}): {:?}", - peer, request_id, error - ); + } + } + RequestResponseEvent::OutboundFailure { + peer, + request_id, + error, + } => { + warn!( + "ChainExchange outbound error (peer: {:?}) (id: {:?}): {:?}", + peer, request_id, error + ); - let tx = cx_request_table.remove(&request_id); + let tx = cx_request_table.remove(&request_id); - // Send error through channel out. - if let Some(tx) = tx { - if tx.send(Err(error.into())).is_err() { - warn!("RPCResponse receive failed") - } + // Send error through channel out. + if let Some(tx) = tx { + if tx.send(Err(error.into())).is_err() { + warn!("RPCResponse receive failed") } } - RequestResponseEvent::InboundFailure { - peer, - error, - request_id: _, - } => { - debug!( - "ChainExchange inbound error (peer: {:?}): {:?}", - peer, error - ); - } - RequestResponseEvent::ResponseSent { request_id, .. } => { - emit_event( - network_sender_out, - NetworkEvent::ChainExchangeResponseOutbound { request_id }, - ) - .await; - } - }, + } + RequestResponseEvent::InboundFailure { + peer, + error, + request_id: _, + } => { + debug!( + "ChainExchange inbound error (peer: {:?}): {:?}", + peer, error + ); + } + RequestResponseEvent::ResponseSent { request_id, .. } => { + emit_event( + network_sender_out, + NetworkEvent::ChainExchangeResponseOutbound { request_id }, + ) + .await; + } + } +} + +#[allow(clippy::too_many_arguments)] +async fn handle_forest_behaviour_event( + swarm: &mut Swarm>, + event: ForestBehaviourEvent

, + db: &Arc>, + genesis_cid: &Cid, + network_sender_out: &Sender, + hello_request_table: &mut HelloRequestTable, + cx_request_table: &mut CxRequestTable, + outgoing_bitswap_query_ids: &mut HashMap, + cx_response_tx: Sender<( + RequestId, + ResponseChannel, + ChainExchangeResponse, + )>, + pubsub_block_str: &str, + pubsub_msg_str: &str, +) where + DB: Blockstore + Store + BitswapStore + Clone + Sync + Send + 'static, +{ + match event { + ForestBehaviourEvent::Discovery(discovery_out) => { + handle_discovery_event(discovery_out, swarm, network_sender_out).await + } + ForestBehaviourEvent::Gossipsub(e) => { + handle_gossip_event(e, network_sender_out, pubsub_block_str, pubsub_msg_str).await + } + ForestBehaviourEvent::Hello(rr_event) => { + handle_hello_event( + rr_event, + swarm, + genesis_cid, + network_sender_out, + hello_request_table, + ) + .await + } + ForestBehaviourEvent::Bitswap(bs_event) => { + handle_bitswap_event(bs_event, network_sender_out, outgoing_bitswap_query_ids).await + } + ForestBehaviourEvent::Ping(ping_event) => handle_ping_event(ping_event, swarm), + ForestBehaviourEvent::Identify(_) => {} + ForestBehaviourEvent::ChainExchange(ce_event) => { + handle_chain_exchange_event( + ce_event, + db, + network_sender_out, + cx_request_table, + cx_response_tx, + ) + .await + } } } -async fn emit_event(sender: &flume::Sender, event: NetworkEvent) { +async fn emit_event(sender: &Sender, event: NetworkEvent) { if sender.send_async(event).await.is_err() { error!("Failed to emit event: Network channel receiver has been dropped"); }