Skip to content

Commit

Permalink
feat(gossip): added a broadcast channel to subscribe to the datafeed …
Browse files Browse the repository at this point in the history
…stream on peer established event
  • Loading branch information
loocapro committed Sep 16, 2024
1 parent 4119ec7 commit 55de218
Show file tree
Hide file tree
Showing 8 changed files with 184 additions and 42 deletions.
2 changes: 1 addition & 1 deletion oracle/src/cli_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ mod tests {

#[test]
fn test_oracle_ext() {
let cli = CommandParser::<OracleExt>::parse_from(&[
let cli = CommandParser::<OracleExt>::parse_from([
"test",
"--disc.tcp-port",
"30304",
Expand Down
6 changes: 3 additions & 3 deletions oracle/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ fn main() -> eyre::Result<()> {
let handle = builder
.node(EthereumNode::default())
.install_exex(ORACLE_EXEX_ID, move |ctx| async move {
let subproto = OracleProtoHandler::new();
let (subproto, proto_events) = OracleProtoHandler::new();
ctx.network().add_rlpx_sub_protocol(subproto.into_rlpx_sub_protocol());

let exex = ExEx::new(ctx);
let network = Network::new(tcp_port, udp_port).await?;
let (network, to_gossip) = Network::new(proto_events, tcp_port, udp_port).await?;
let data_feed = DataFeederStream::new(args.binance_symbols).await?;
let oracle = Oracle::new(exex, network, data_feed);
let oracle = Oracle::new(exex, network, data_feed, to_gossip);
Ok(oracle)
})
.launch()
Expand Down
4 changes: 2 additions & 2 deletions oracle/src/network/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ impl Future for Discovery {
match ready!(this.events.poll_recv(cx)) {
Some(evt) => match evt {
Event::Discovered(enr) => {
info!(?enr, "Discovered a new peer.");
info!(?enr, "Discovered a new node.");
this.add_node(enr)?;
}
Event::SessionEstablished(enr, socket_addr) => {
info!(?enr, ?socket_addr, "Session established with a new peer.");
info!(?enr, ?socket_addr, "Session established with a new node.");
}
evt => {
info!(?evt, "New discovery event.");
Expand Down
59 changes: 59 additions & 0 deletions oracle/src/network/gossip.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use reth_tracing::tracing::error;
use tokio::sync::{
broadcast::{error::RecvError, Sender},
mpsc::UnboundedSender,
};

use super::proto::{connection::OracleCommand, data::SignedTicker};

/// The size of the broadcast channel.
///
/// This value is based on the estimated message rate and the tolerance for lag.
/// - We assume an average of 10-20 updates per second per symbol.
/// - For 2 symbols (e.g., ETHUSDC and BTCUSDC), this gives approximately 20-40 messages per second.
/// - To allow subscribers to catch up if they fall behind, we provide a lag tolerance of 5 seconds.
///
/// Thus, the buffer size is calculated as:
///
/// `Buffer Size = Message Rate per Second * Lag Tolerance`
///
/// For 2 symbols, we calculate: `40 * 5 = 200`.
const BROADCAST_CHANNEL_SIZE: usize = 200;

pub(crate) struct Gossip {
pub(crate) inner: Sender<SignedTicker>,
}

impl Gossip {
/// Creates a new Broadcast instance.
pub(crate) fn new() -> (Self, tokio::sync::broadcast::Sender<SignedTicker>) {
let (sender, _receiver) = tokio::sync::broadcast::channel(BROADCAST_CHANNEL_SIZE);
(Self { inner: sender.clone() }, sender)
}

/// Starts to gossip data to the connected peers from the broadcast channel.
pub(crate) fn start(
&self,
to_connection: UnboundedSender<OracleCommand>,
) -> Result<(), RecvError> {
let mut receiver = self.inner.subscribe();

tokio::task::spawn(async move {
loop {
match receiver.recv().await {
Ok(signed_data) => {
if let Err(e) = to_connection.send(OracleCommand::Tick(signed_data)) {
error!(?e, "Failed to broadcast message to peer");
}
}
Err(e) => {
error!(?e, "Data feed task encountered an error");
return Err::<(), RecvError>(e);
}
}
}
});

Ok(())
}
}
40 changes: 38 additions & 2 deletions oracle/src/network/mod.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,42 @@
use discovery::Discovery;
use futures::FutureExt;
use gossip::Gossip;
use proto::{data::SignedTicker, ProtocolEvent};
use reth_tracing::tracing::{error, info};
use std::{
future::Future,
net::{IpAddr, Ipv4Addr, SocketAddr},
pin::Pin,
task::{Context, Poll},
};
use tokio::sync::broadcast::Sender;

mod discovery;
mod gossip;
pub(crate) mod proto;

/// The Network struct is a long running task that orchestrates discovery of new peers and network
/// gossiping via the RLPx subprotocol.
pub(crate) struct Network {
/// The discovery task for this node.
discovery: Discovery,
/// The protocol events channel.
proto_events: proto::ProtoEvents,
/// Helper struct to manage gossiping data to connected peers.
gossip: Gossip,
}

impl Network {
pub(crate) async fn new(tcp_port: u16, udp_port: u16) -> eyre::Result<Self> {
pub(crate) async fn new(
proto_events: proto::ProtoEvents,
tcp_port: u16,
udp_port: u16,
) -> eyre::Result<(Self, Sender<SignedTicker>)> {
let disc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), udp_port);
let rlpx_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), tcp_port);
let discovery = Discovery::new(disc_addr, rlpx_addr).await?;
Ok(Self { discovery })
let (gossip, to_gossip) = Gossip::new();
Ok((Self { discovery, proto_events, gossip }, to_gossip))
}
}

Expand All @@ -42,6 +55,29 @@ impl Future for Network {
error!(?e, "Discovery task encountered an error");
return Poll::Ready(Err(e));
}
Poll::Pending => break,
}
}

loop {
match this.proto_events.poll_recv(cx) {
Poll::Ready(Some(ProtocolEvent::Established {
direction,
peer_id,
to_connection,
})) => {
info!(
?direction,
?peer_id,
?to_connection,
"Established connection, will start gossiping"
);
this.gossip.start(to_connection)?;
}
Poll::Ready(None) => {
return Poll::Ready(Ok(()));
}

Poll::Pending => {}
}
}
Expand Down
65 changes: 41 additions & 24 deletions oracle/src/network/proto/connection.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,34 @@
use super::{OracleProtoMessage, OracleProtoMessageKind, ProtocolEvent, ProtocolState};
use super::{
data::SignedTicker, OracleProtoMessage, OracleProtoMessageKind, ProtocolEvent, ProtocolState,
};
use alloy_rlp::Encodable;
use futures::{Stream, StreamExt};
use reth_eth_wire::{
capability::SharedCapabilities, multiplex::ProtocolConnection, protocol::Protocol,
};
use reth_network::protocol::{ConnectionHandler, OnNotSupported};
use reth_network_api::Direction;
use reth_primitives::BytesMut;
use reth_primitives::{Address, BytesMut};
use reth_rpc_types::PeerId;
use std::{
pin::Pin,
task::{ready, Context, Poll},
};
use tokio::sync::{mpsc, oneshot};
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;

/// The commands supported by the OracleConnection.
pub(crate) enum OracleCommand {
/// Sends a message to the peer
Message {
msg: String,
/// The response will be sent to this channel.
response: oneshot::Sender<String>,
},
/// Sends a signed tick to a peer
Tick(SignedTicker),
}

/// This struct defines the connection object for the Oracle subprotocol.
pub(crate) struct OracleConnection {
conn: ProtocolConnection,
commands: UnboundedReceiverStream<OracleCommand>,
pending_pong: Option<oneshot::Sender<String>>,
initial_ping: Option<OracleProtoMessage>,
attestations: Vec<Address>,
}

impl Stream for OracleConnection {
Expand All @@ -42,23 +41,41 @@ impl Stream for OracleConnection {
return Poll::Ready(Some(initial_ping.encoded()));
}

let Some(msg) = ready!(this.conn.poll_next_unpin(cx)) else { return Poll::Ready(None) };
loop {
if let Poll::Ready(Some(cmd)) = this.commands.poll_next_unpin(cx) {
return match cmd {
OracleCommand::Tick(tick) => {
Poll::Ready(Some(OracleProtoMessage::signed_ticker(tick).encoded()))
}
};
}

let Some(msg) = ready!(this.conn.poll_next_unpin(cx)) else { return Poll::Ready(None) };

let Some(msg) = OracleProtoMessage::decode_message(&mut &msg[..]) else {
return Poll::Ready(None);
};
let Some(msg) = OracleProtoMessage::decode_message(&mut &msg[..]) else {
return Poll::Ready(None);
};

match msg.message {
OracleProtoMessageKind::Ping => {
return Poll::Ready(Some(OracleProtoMessage::pong().encoded()))
}
OracleProtoMessageKind::Pong => {}
OracleProtoMessageKind::SignedTicker(_) => {
// TODO: verify signature and keep count
match msg.message {
OracleProtoMessageKind::Ping => {
return Poll::Ready(Some(OracleProtoMessage::pong().encoded()))
}
OracleProtoMessageKind::Pong => {}
OracleProtoMessageKind::SignedTicker(signed_data) => {
let signer = signed_data.signer;
let sig = signed_data.signature;

let mut buffer = BytesMut::new();
signed_data.ticker.encode(&mut buffer);

let addr = sig.recover_address_from_msg(buffer).ok().unwrap();

if addr == signer && !this.attestations.contains(&addr) {
this.attestations.push(addr);
}
}
}
}

Poll::Pending
}
}

Expand Down Expand Up @@ -98,7 +115,7 @@ impl ConnectionHandler for OracleConnHandler {
conn,
initial_ping: direction.is_outgoing().then(OracleProtoMessage::ping),
commands: UnboundedReceiverStream::new(rx),
pending_pong: None,
attestations: Vec::new(),
}
}
}
18 changes: 14 additions & 4 deletions oracle/src/network/proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub(crate) mod data;
pub(crate) enum OracleProtoMessageId {
Ping = 0x00,
Pong = 0x01,
TickData = 0x04,
TickData = 0x02,
}

#[derive(Clone, Debug, PartialEq)]
Expand All @@ -45,6 +45,14 @@ impl OracleProtoMessage {
Protocol::new(Self::capability(), 4)
}

/// Creates a signed ticker message
pub(crate) fn signed_ticker(data: SignedTicker) -> Self {
Self {
message_type: OracleProtoMessageId::TickData,
message: OracleProtoMessageKind::SignedTicker(Box::new(data)),
}
}

/// Creates a ping message
pub(crate) fn ping() -> Self {
Self { message_type: OracleProtoMessageId::Ping, message: OracleProtoMessageKind::Ping }
Expand Down Expand Up @@ -100,11 +108,13 @@ pub(crate) struct OracleProtoHandler {
state: ProtocolState,
}

pub(crate) type ProtoEvents = mpsc::UnboundedReceiver<ProtocolEvent>;

impl OracleProtoHandler {
/// Creates a new `OracleProtoHandler` with the given protocol state.
pub(crate) fn new() -> Self {
let (tx, _) = mpsc::unbounded_channel();
Self { state: ProtocolState { events: tx } }
pub(crate) fn new() -> (Self, ProtoEvents) {
let (tx, rx) = mpsc::unbounded_channel();
(Self { state: ProtocolState { events: tx } }, rx)
}
}

Expand Down
32 changes: 26 additions & 6 deletions oracle/src/oracle.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
use crate::{
exex::ExEx,
network::{proto::data::SignedTicker, Network},
offchain_data::{DataFeederStream, DataFeeds},
};
use alloy_rlp::{BytesMut, Encodable};
use alloy_signer::SignerSync;
use alloy_signer_local::PrivateKeySigner;
use futures::{FutureExt, StreamExt};
use reth_node_api::FullNodeComponents;
use reth_tracing::tracing::{error, info};
Expand All @@ -7,8 +15,6 @@ use std::{
task::{Context, Poll},
};

use crate::{exex::ExEx, network::Network, offchain_data::DataFeederStream};

/// The Oracle struct is a long running task that orchestrates discovery of new peers,
/// decoding data from chain events (ExEx) and gossiping it to peers.
pub(crate) struct Oracle<Node: FullNodeComponents> {
Expand All @@ -19,11 +25,20 @@ pub(crate) struct Oracle<Node: FullNodeComponents> {
exex: ExEx<Node>,
/// The offchain data feed stream.
data_feed: DataFeederStream,
/// The signer to sign the data feed.
signer: PrivateKeySigner,
/// Half of the broadcast channel to send data to gossip.
to_gossip: tokio::sync::broadcast::Sender<SignedTicker>,
}

impl<Node: FullNodeComponents> Oracle<Node> {
pub(crate) fn new(exex: ExEx<Node>, network: Network, data_feed: DataFeederStream) -> Self {
Self { exex, network, data_feed }
pub(crate) fn new(
exex: ExEx<Node>,
network: Network,
data_feed: DataFeederStream,
to_gossip: tokio::sync::broadcast::Sender<SignedTicker>,
) -> Self {
Self { exex, network, data_feed, signer: PrivateKeySigner::random(), to_gossip }
}
}

Expand Down Expand Up @@ -52,8 +67,13 @@ impl<Node: FullNodeComponents> Future for Oracle<Node> {
// Poll the data feed future until it's drained
while let Poll::Ready(item) = this.data_feed.poll_next_unpin(cx) {
match item {
Some(Ok(_data)) => {
// Process the data feed by signing it and sending it to the network
Some(Ok(ticker_data)) => {
let DataFeeds::Binance(ticker) = ticker_data;
let mut buffer = BytesMut::new();
ticker.encode(&mut buffer);
let signature = this.signer.sign_message_sync(&buffer)?;
let signed_ticker = SignedTicker::new(ticker, signature, this.signer.address());
this.to_gossip.send(signed_ticker.clone())?;
}
Some(Err(e)) => {
error!(?e, "Data feed task encountered an error");
Expand Down

0 comments on commit 55de218

Please sign in to comment.