diff --git a/oracle/src/cli_ext.rs b/oracle/src/cli_ext.rs index 4116267..ac85ff3 100644 --- a/oracle/src/cli_ext.rs +++ b/oracle/src/cli_ext.rs @@ -33,7 +33,7 @@ mod tests { #[test] fn test_oracle_ext() { - let cli = CommandParser::::parse_from(&[ + let cli = CommandParser::::parse_from([ "test", "--disc.tcp-port", "30304", diff --git a/oracle/src/main.rs b/oracle/src/main.rs index 5e60122..8b15d76 100644 --- a/oracle/src/main.rs +++ b/oracle/src/main.rs @@ -24,13 +24,13 @@ fn main() -> eyre::Result<()> { let handle = builder .node(EthereumNode::default()) .install_exex(ORACLE_EXEX_ID, move |ctx| async move { - let subproto = OracleProtoHandler::new(); + let (subproto, proto_events) = OracleProtoHandler::new(); ctx.network().add_rlpx_sub_protocol(subproto.into_rlpx_sub_protocol()); let exex = ExEx::new(ctx); - let network = Network::new(tcp_port, udp_port).await?; + let (network, to_gossip) = Network::new(proto_events, tcp_port, udp_port).await?; let data_feed = DataFeederStream::new(args.binance_symbols).await?; - let oracle = Oracle::new(exex, network, data_feed); + let oracle = Oracle::new(exex, network, data_feed, to_gossip); Ok(oracle) }) .launch() diff --git a/oracle/src/network/discovery.rs b/oracle/src/network/discovery.rs index 8113759..6e328cc 100644 --- a/oracle/src/network/discovery.rs +++ b/oracle/src/network/discovery.rs @@ -67,11 +67,11 @@ impl Future for Discovery { match ready!(this.events.poll_recv(cx)) { Some(evt) => match evt { Event::Discovered(enr) => { - info!(?enr, "Discovered a new peer."); + info!(?enr, "Discovered a new node."); this.add_node(enr)?; } Event::SessionEstablished(enr, socket_addr) => { - info!(?enr, ?socket_addr, "Session established with a new peer."); + info!(?enr, ?socket_addr, "Session established with a new node."); } evt => { info!(?evt, "New discovery event."); diff --git a/oracle/src/network/gossip.rs b/oracle/src/network/gossip.rs new file mode 100644 index 0000000..2bd9afc --- /dev/null +++ b/oracle/src/network/gossip.rs @@ -0,0 +1,59 @@ +use reth_tracing::tracing::error; +use tokio::sync::{ + broadcast::{error::RecvError, Sender}, + mpsc::UnboundedSender, +}; + +use super::proto::{connection::OracleCommand, data::SignedTicker}; + +/// The size of the broadcast channel. +/// +/// This value is based on the estimated message rate and the tolerance for lag. +/// - We assume an average of 10-20 updates per second per symbol. +/// - For 2 symbols (e.g., ETHUSDC and BTCUSDC), this gives approximately 20-40 messages per second. +/// - To allow subscribers to catch up if they fall behind, we provide a lag tolerance of 5 seconds. +/// +/// Thus, the buffer size is calculated as: +/// +/// `Buffer Size = Message Rate per Second * Lag Tolerance` +/// +/// For 2 symbols, we calculate: `40 * 5 = 200`. +const BROADCAST_CHANNEL_SIZE: usize = 200; + +pub(crate) struct Gossip { + pub(crate) inner: Sender, +} + +impl Gossip { + /// Creates a new Broadcast instance. + pub(crate) fn new() -> (Self, tokio::sync::broadcast::Sender) { + let (sender, _receiver) = tokio::sync::broadcast::channel(BROADCAST_CHANNEL_SIZE); + (Self { inner: sender.clone() }, sender) + } + + /// Starts to gossip data to the connected peers from the broadcast channel. + pub(crate) fn start( + &self, + to_connection: UnboundedSender, + ) -> Result<(), RecvError> { + let mut receiver = self.inner.subscribe(); + + tokio::task::spawn(async move { + loop { + match receiver.recv().await { + Ok(signed_data) => { + if let Err(e) = to_connection.send(OracleCommand::Tick(signed_data)) { + error!(?e, "Failed to broadcast message to peer"); + } + } + Err(e) => { + error!(?e, "Data feed task encountered an error"); + return Err::<(), RecvError>(e); + } + } + } + }); + + Ok(()) + } +} diff --git a/oracle/src/network/mod.rs b/oracle/src/network/mod.rs index 1a0f42d..54d43e4 100644 --- a/oracle/src/network/mod.rs +++ b/oracle/src/network/mod.rs @@ -1,5 +1,7 @@ use discovery::Discovery; use futures::FutureExt; +use gossip::Gossip; +use proto::{data::SignedTicker, ProtocolEvent}; use reth_tracing::tracing::{error, info}; use std::{ future::Future, @@ -7,8 +9,10 @@ use std::{ pin::Pin, task::{Context, Poll}, }; +use tokio::sync::broadcast::Sender; mod discovery; +mod gossip; pub(crate) mod proto; /// The Network struct is a long running task that orchestrates discovery of new peers and network @@ -16,14 +20,23 @@ pub(crate) mod proto; pub(crate) struct Network { /// The discovery task for this node. discovery: Discovery, + /// The protocol events channel. + proto_events: proto::ProtoEvents, + /// Helper struct to manage gossiping data to connected peers. + gossip: Gossip, } impl Network { - pub(crate) async fn new(tcp_port: u16, udp_port: u16) -> eyre::Result { + pub(crate) async fn new( + proto_events: proto::ProtoEvents, + tcp_port: u16, + udp_port: u16, + ) -> eyre::Result<(Self, Sender)> { let disc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), udp_port); let rlpx_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), tcp_port); let discovery = Discovery::new(disc_addr, rlpx_addr).await?; - Ok(Self { discovery }) + let (gossip, to_gossip) = Gossip::new(); + Ok((Self { discovery, proto_events, gossip }, to_gossip)) } } @@ -42,6 +55,29 @@ impl Future for Network { error!(?e, "Discovery task encountered an error"); return Poll::Ready(Err(e)); } + Poll::Pending => break, + } + } + + loop { + match this.proto_events.poll_recv(cx) { + Poll::Ready(Some(ProtocolEvent::Established { + direction, + peer_id, + to_connection, + })) => { + info!( + ?direction, + ?peer_id, + ?to_connection, + "Established connection, will start gossiping" + ); + this.gossip.start(to_connection)?; + } + Poll::Ready(None) => { + return Poll::Ready(Ok(())); + } + Poll::Pending => {} } } diff --git a/oracle/src/network/proto/connection.rs b/oracle/src/network/proto/connection.rs index f58248a..587d6d8 100644 --- a/oracle/src/network/proto/connection.rs +++ b/oracle/src/network/proto/connection.rs @@ -1,35 +1,34 @@ -use super::{OracleProtoMessage, OracleProtoMessageKind, ProtocolEvent, ProtocolState}; +use super::{ + data::SignedTicker, OracleProtoMessage, OracleProtoMessageKind, ProtocolEvent, ProtocolState, +}; +use alloy_rlp::Encodable; use futures::{Stream, StreamExt}; use reth_eth_wire::{ capability::SharedCapabilities, multiplex::ProtocolConnection, protocol::Protocol, }; use reth_network::protocol::{ConnectionHandler, OnNotSupported}; use reth_network_api::Direction; -use reth_primitives::BytesMut; +use reth_primitives::{Address, BytesMut}; use reth_rpc_types::PeerId; use std::{ pin::Pin, task::{ready, Context, Poll}, }; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::mpsc; use tokio_stream::wrappers::UnboundedReceiverStream; /// The commands supported by the OracleConnection. pub(crate) enum OracleCommand { - /// Sends a message to the peer - Message { - msg: String, - /// The response will be sent to this channel. - response: oneshot::Sender, - }, + /// Sends a signed tick to a peer + Tick(SignedTicker), } /// This struct defines the connection object for the Oracle subprotocol. pub(crate) struct OracleConnection { conn: ProtocolConnection, commands: UnboundedReceiverStream, - pending_pong: Option>, initial_ping: Option, + attestations: Vec
, } impl Stream for OracleConnection { @@ -42,23 +41,41 @@ impl Stream for OracleConnection { return Poll::Ready(Some(initial_ping.encoded())); } - let Some(msg) = ready!(this.conn.poll_next_unpin(cx)) else { return Poll::Ready(None) }; + loop { + if let Poll::Ready(Some(cmd)) = this.commands.poll_next_unpin(cx) { + return match cmd { + OracleCommand::Tick(tick) => { + Poll::Ready(Some(OracleProtoMessage::signed_ticker(tick).encoded())) + } + }; + } + + let Some(msg) = ready!(this.conn.poll_next_unpin(cx)) else { return Poll::Ready(None) }; - let Some(msg) = OracleProtoMessage::decode_message(&mut &msg[..]) else { - return Poll::Ready(None); - }; + let Some(msg) = OracleProtoMessage::decode_message(&mut &msg[..]) else { + return Poll::Ready(None); + }; - match msg.message { - OracleProtoMessageKind::Ping => { - return Poll::Ready(Some(OracleProtoMessage::pong().encoded())) - } - OracleProtoMessageKind::Pong => {} - OracleProtoMessageKind::SignedTicker(_) => { - // TODO: verify signature and keep count + match msg.message { + OracleProtoMessageKind::Ping => { + return Poll::Ready(Some(OracleProtoMessage::pong().encoded())) + } + OracleProtoMessageKind::Pong => {} + OracleProtoMessageKind::SignedTicker(signed_data) => { + let signer = signed_data.signer; + let sig = signed_data.signature; + + let mut buffer = BytesMut::new(); + signed_data.ticker.encode(&mut buffer); + + let addr = sig.recover_address_from_msg(buffer).ok().unwrap(); + + if addr == signer && !this.attestations.contains(&addr) { + this.attestations.push(addr); + } + } } } - - Poll::Pending } } @@ -98,7 +115,7 @@ impl ConnectionHandler for OracleConnHandler { conn, initial_ping: direction.is_outgoing().then(OracleProtoMessage::ping), commands: UnboundedReceiverStream::new(rx), - pending_pong: None, + attestations: Vec::new(), } } } diff --git a/oracle/src/network/proto/mod.rs b/oracle/src/network/proto/mod.rs index f3ba4e8..67d4f87 100644 --- a/oracle/src/network/proto/mod.rs +++ b/oracle/src/network/proto/mod.rs @@ -18,7 +18,7 @@ pub(crate) mod data; pub(crate) enum OracleProtoMessageId { Ping = 0x00, Pong = 0x01, - TickData = 0x04, + TickData = 0x02, } #[derive(Clone, Debug, PartialEq)] @@ -45,6 +45,14 @@ impl OracleProtoMessage { Protocol::new(Self::capability(), 4) } + /// Creates a signed ticker message + pub(crate) fn signed_ticker(data: SignedTicker) -> Self { + Self { + message_type: OracleProtoMessageId::TickData, + message: OracleProtoMessageKind::SignedTicker(Box::new(data)), + } + } + /// Creates a ping message pub(crate) fn ping() -> Self { Self { message_type: OracleProtoMessageId::Ping, message: OracleProtoMessageKind::Ping } @@ -100,11 +108,13 @@ pub(crate) struct OracleProtoHandler { state: ProtocolState, } +pub(crate) type ProtoEvents = mpsc::UnboundedReceiver; + impl OracleProtoHandler { /// Creates a new `OracleProtoHandler` with the given protocol state. - pub(crate) fn new() -> Self { - let (tx, _) = mpsc::unbounded_channel(); - Self { state: ProtocolState { events: tx } } + pub(crate) fn new() -> (Self, ProtoEvents) { + let (tx, rx) = mpsc::unbounded_channel(); + (Self { state: ProtocolState { events: tx } }, rx) } } diff --git a/oracle/src/oracle.rs b/oracle/src/oracle.rs index e7563a3..9f53cf3 100644 --- a/oracle/src/oracle.rs +++ b/oracle/src/oracle.rs @@ -1,3 +1,11 @@ +use crate::{ + exex::ExEx, + network::{proto::data::SignedTicker, Network}, + offchain_data::{DataFeederStream, DataFeeds}, +}; +use alloy_rlp::{BytesMut, Encodable}; +use alloy_signer::SignerSync; +use alloy_signer_local::PrivateKeySigner; use futures::{FutureExt, StreamExt}; use reth_node_api::FullNodeComponents; use reth_tracing::tracing::{error, info}; @@ -7,8 +15,6 @@ use std::{ task::{Context, Poll}, }; -use crate::{exex::ExEx, network::Network, offchain_data::DataFeederStream}; - /// The Oracle struct is a long running task that orchestrates discovery of new peers, /// decoding data from chain events (ExEx) and gossiping it to peers. pub(crate) struct Oracle { @@ -19,11 +25,20 @@ pub(crate) struct Oracle { exex: ExEx, /// The offchain data feed stream. data_feed: DataFeederStream, + /// The signer to sign the data feed. + signer: PrivateKeySigner, + /// Half of the broadcast channel to send data to gossip. + to_gossip: tokio::sync::broadcast::Sender, } impl Oracle { - pub(crate) fn new(exex: ExEx, network: Network, data_feed: DataFeederStream) -> Self { - Self { exex, network, data_feed } + pub(crate) fn new( + exex: ExEx, + network: Network, + data_feed: DataFeederStream, + to_gossip: tokio::sync::broadcast::Sender, + ) -> Self { + Self { exex, network, data_feed, signer: PrivateKeySigner::random(), to_gossip } } } @@ -52,8 +67,13 @@ impl Future for Oracle { // Poll the data feed future until it's drained while let Poll::Ready(item) = this.data_feed.poll_next_unpin(cx) { match item { - Some(Ok(_data)) => { - // Process the data feed by signing it and sending it to the network + Some(Ok(ticker_data)) => { + let DataFeeds::Binance(ticker) = ticker_data; + let mut buffer = BytesMut::new(); + ticker.encode(&mut buffer); + let signature = this.signer.sign_message_sync(&buffer)?; + let signed_ticker = SignedTicker::new(ticker, signature, this.signer.address()); + this.to_gossip.send(signed_ticker.clone())?; } Some(Err(e)) => { error!(?e, "Data feed task encountered an error");