Skip to content

Commit

Permalink
chain_exchange
Browse files Browse the repository at this point in the history
  • Loading branch information
hanabi1224 committed Nov 3, 2022
1 parent 8dfb2c6 commit ae20d1b
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 2 deletions.
2 changes: 1 addition & 1 deletion node/forest_libp2p/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub(crate) struct ForestBehaviour<P: StoreParams> {
// TODO would be nice to have this handled together and generic, to avoid duplicated polling
// but is fine for now, since the protocols are handled slightly differently.
pub(super) hello: RequestResponse<HelloCodec>,
chain_exchange: RequestResponse<ChainExchangeCodec>,
pub(super) chain_exchange: RequestResponse<ChainExchangeCodec>,
pub(super) bitswap: Bitswap<P>,
// #[behaviour(ignore)]
// events: Vec<ForestBehaviourEvent<P>>,
Expand Down
65 changes: 64 additions & 1 deletion node/forest_libp2p/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ where
RequestId,
futures::channel::oneshot::Sender<Result<HelloResponse, RequestResponseError>>,
> = HashMap::new();
let mut cx_request_table: HashMap<
RequestId,
futures::channel::oneshot::Sender<Result<ChainExchangeResponse, RequestResponseError>>,
> = HashMap::new();
loop {
select! {
swarm_event = swarm_stream.next() => match swarm_event {
Expand Down Expand Up @@ -271,6 +275,7 @@ where
warn!("Getting gossip messages from unknown topic: {}", topic);
}
},
ForestBehaviourEvent::Gossipsub(_) => {},
ForestBehaviourEvent::Hello(rr_event) => match rr_event {
RequestResponseEvent::Message { peer, message } => match message {
RequestResponseMessage::Request {
Expand Down Expand Up @@ -397,7 +402,65 @@ where
identify::Event::Pushed { .. } => (),
identify::Event::Error { .. } => (),
},
_ => {
ForestBehaviourEvent::ChainExchange(ce_event) => match ce_event {
RequestResponseEvent::Message { peer, message } => match message {
RequestResponseMessage::Request {
request,
channel,
request_id: _,
} => {
debug!("Received chain_exchange request (peer_id: {:?})", peer);
let db = self.cs.clone();
let bh_mut = swarm_stream.get_mut().behaviour_mut();
// TODO: Make make_chain_exchange_response async
bh_mut.chain_exchange.send_response(channel, make_chain_exchange_response(db.as_ref(), &request).await)?;
}
RequestResponseMessage::Response {
request_id,
response,
} => {
let tx = cx_request_table.remove(&request_id);

// Send the sucessful response through channel out.
if let Some(tx) = tx {
if tx.send(Ok(response)).is_err() {
debug!("RPCResponse receive timed out")
}
} else {
debug!("RPCResponse receive failed: channel not found");
};
}
},
RequestResponseEvent::OutboundFailure {
peer,
request_id,
error,
} => {
debug!(
"ChainExchange outbound error (peer: {:?}) (id: {:?}): {:?}",
peer, request_id, error
);

let tx = cx_request_table.remove(&request_id);

// Send error through channel out.
if let Some(tx) = tx {
if tx.send(Err(error.into())).is_err() {
debug!("RPCResponse receive failed")
}
}
}
RequestResponseEvent::InboundFailure {
peer,
error,
request_id: _,
} => {
debug!(
"ChainExchange inbound error (peer: {:?}): {:?}",
peer, error
);
}
_ => {}
},
},
None => { break; },
Expand Down

0 comments on commit ae20d1b

Please sign in to comment.