diff --git a/Cargo.lock b/Cargo.lock index d3219f456893..f3bae86946e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3921,6 +3921,22 @@ dependencies = [ "libc", ] +[[package]] +name = "manual-p2p" +version = "0.0.0" +dependencies = [ + "eyre", + "futures", + "once_cell", + "reth-discv4", + "reth-ecies", + "reth-eth-wire", + "reth-network", + "reth-primitives", + "secp256k1", + "tokio", +] + [[package]] name = "match_cfg" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 0e3f72f62c9f..25d1245fbbca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,6 +50,7 @@ members = [ "examples", "examples/additional-rpc-namespace-in-cli", "examples/rpc-db", + "examples/manual-p2p", ] default-members = ["bin/reth"] @@ -97,6 +98,9 @@ reth-tasks = { path = "./crates/tasks" } reth-network = { path = "./crates/net/network" } reth-network-api = { path = "./crates/net/network-api" } reth-rpc-types-compat = { path = "./crates/rpc/rpc-types-compat" } +reth-discv4 = { path = "./crates/net/discv4" } +reth-eth-wire = { path = "./crates/net/eth-wire" } +reth-ecies = { path = "./crates/net/ecies" } # revm revm = { git = "https://github.com/bluealloy/revm", rev = "516f62cc" } @@ -172,4 +176,4 @@ assert_matches = "1.5.0" proptest = "1.0" proptest-derive = "0.4" -serial_test = "2" \ No newline at end of file +serial_test = "2" diff --git a/examples/manual-p2p/Cargo.toml b/examples/manual-p2p/Cargo.toml new file mode 100644 index 000000000000..f31a579081ee --- /dev/null +++ b/examples/manual-p2p/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "manual-p2p" +version = "0.0.0" +publish = false +edition.workspace = true +license.workspace = true + +[dependencies] +once_cell = "1.17.0" +eyre = "0.6.8" + +reth-primitives.workspace = true +reth-network.workspace = true +reth-discv4.workspace = true +reth-eth-wire.workspace = true +reth-ecies.workspace = true + +futures.workspace = true +secp256k1.workspace = true +tokio.workspace = true diff --git a/examples/manual-p2p/src/main.rs b/examples/manual-p2p/src/main.rs new file mode 100644 index 000000000000..9088d3a002be --- /dev/null +++ b/examples/manual-p2p/src/main.rs @@ -0,0 +1,150 @@ +//! Low level example of connecting to and communicating with a peer. +//! +//! Run with +//! +//! ```not_rust +//! cargo run -p manual-p2p +//! ``` + +use std::time::Duration; + +use futures::StreamExt; +use once_cell::sync::Lazy; +use reth_discv4::{DiscoveryUpdate, Discv4, Discv4ConfigBuilder, DEFAULT_DISCOVERY_ADDRESS}; +use reth_ecies::{stream::ECIESStream, util::pk2id}; +use reth_eth_wire::{ + EthMessage, EthStream, HelloMessage, P2PStream, Status, UnauthedEthStream, UnauthedP2PStream, +}; +use reth_network::config::rng_secret_key; +use reth_primitives::{mainnet_nodes, Chain, Hardfork, Head, NodeRecord, MAINNET, MAINNET_GENESIS}; +use secp256k1::{SecretKey, SECP256K1}; +use tokio::net::TcpStream; + +type AuthedP2PStream = P2PStream>; +type AuthedEthStream = EthStream>>; + +pub static MAINNET_BOOT_NODES: Lazy> = Lazy::new(mainnet_nodes); + +#[tokio::main] +async fn main() -> eyre::Result<()> { + // Setup configs related to this 'node' by creating a new random + let our_key = rng_secret_key(); + let our_enr = NodeRecord::from_secret_key(DEFAULT_DISCOVERY_ADDRESS, &our_key); + + // Setup discovery v4 protocol to find peers to talk to + let mut discv4_cfg = Discv4ConfigBuilder::default(); + discv4_cfg.add_boot_nodes(MAINNET_BOOT_NODES.clone()).lookup_interval(Duration::from_secs(1)); + + // Start discovery protocol + let discv4 = Discv4::spawn(our_enr.udp_addr(), our_enr, our_key, discv4_cfg.build()).await?; + let mut discv4_stream = discv4.update_stream().await?; + + while let Some(update) = discv4_stream.next().await { + tokio::spawn(async move { + if let DiscoveryUpdate::Added(peer) = update { + // Boot nodes hard at work, lets not disturb them + if MAINNET_BOOT_NODES.contains(&peer) { + return + } + + let (p2p_stream, their_hello) = match handshake_p2p(peer, our_key).await { + Ok(s) => s, + Err(e) => { + println!("Failed P2P handshake with peer {}, {}", peer.address, e); + return + } + }; + + let (eth_stream, their_status) = match handshake_eth(p2p_stream).await { + Ok(s) => s, + Err(e) => { + println!("Failed ETH handshake with peer {}, {}", peer.address, e); + return + } + }; + + println!( + "Successfully connected to a peer at {}:{} ({}) using eth-wire version eth/{}", + peer.address, peer.tcp_port, their_hello.client_version, their_status.version + ); + + snoop(peer, eth_stream).await; + } + }); + } + + Ok(()) +} + +// Perform a P2P handshake with a peer +async fn handshake_p2p( + peer: NodeRecord, + key: SecretKey, +) -> eyre::Result<(AuthedP2PStream, HelloMessage)> { + let outgoing = TcpStream::connect((peer.address, peer.tcp_port)).await?; + let ecies_stream = ECIESStream::connect(outgoing, key, peer.id).await?; + + let our_peer_id = pk2id(&key.public_key(SECP256K1)); + let our_hello = HelloMessage::builder(our_peer_id).build(); + + Ok(UnauthedP2PStream::new(ecies_stream).handshake(our_hello).await?) +} + +// Perform a ETH Wire handshake with a peer +async fn handshake_eth(p2p_stream: AuthedP2PStream) -> eyre::Result<(AuthedEthStream, Status)> { + let fork_filter = MAINNET.fork_filter(Head { + timestamp: MAINNET.fork(Hardfork::Shanghai).as_timestamp().unwrap(), + ..Default::default() + }); + + let status = Status::builder() + .chain(Chain::mainnet()) + .genesis(MAINNET_GENESIS) + .forkid(Hardfork::Shanghai.fork_id(&MAINNET).unwrap()) + .build(); + + let status = Status { version: p2p_stream.shared_capability().version(), ..status }; + let eth_unauthed = UnauthedEthStream::new(p2p_stream); + Ok(eth_unauthed.handshake(status, fork_filter).await?) +} + +// Snoop by greedily capturing all broadcasts that the peer emits +// note: this node cannot handle request so will be disconnected by peer when challenged +async fn snoop(peer: NodeRecord, mut eth_stream: AuthedEthStream) { + while let Some(Ok(update)) = eth_stream.next().await { + match update { + EthMessage::NewPooledTransactionHashes66(txs) => { + println!("Got {} new tx hashes from peer {}", txs.0.len(), peer.address); + } + EthMessage::NewBlock(block) => { + println!("Got new block data {:?} from peer {}", block, peer.address); + } + EthMessage::NewPooledTransactionHashes68(txs) => { + println!("Got {} new tx hashes from peer {}", txs.hashes.len(), peer.address); + } + EthMessage::NewBlockHashes(block_hashes) => { + println!( + "Got {} new block hashes from peer {}", + block_hashes.0.len(), + peer.address + ); + } + EthMessage::GetNodeData(_) => { + println!("Unable to serve GetNodeData request to peer {}", peer.address); + } + EthMessage::GetReceipts(_) => { + println!("Unable to serve GetReceipts request to peer {}", peer.address); + } + EthMessage::GetBlockHeaders(_) => { + println!("Unable to serve GetBlockHeaders request to peer {}", peer.address); + } + EthMessage::GetBlockBodies(_) => { + println!("Unable to serve GetBlockBodies request to peer {}", peer.address); + } + EthMessage::GetPooledTransactions(_) => { + println!("Unable to serve GetPooledTransactions request to peer {}", peer.address); + } + _ => {} + } + } +}