Skip to content

New competition bid source. #652

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
833 changes: 86 additions & 747 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ alloy-primitives = { version = "1.1.0", default-features = false }
alloy-rlp = "0.3.10"
alloy-chains = "0.2.0"
alloy-evm = { version = "0.10", default-features = false }
alloy-provider = { version = "1.0.9", features = ["ipc", "pubsub"] }
alloy-provider = { version = "1.0.9", features = ["ipc", "pubsub", "ws"] }
alloy-pubsub = { version = "1.0.9" }
alloy-eips = { version = "1.0.9" }
alloy-rpc-types = { version = "1.0.9" }
Expand Down Expand Up @@ -171,6 +171,7 @@ ahash = "0.8.6"
time = { version = "0.3.36", features = ["macros", "formatting", "parsing"] }

eth-sparse-mpt = { path = "crates/eth-sparse-mpt" }
bid-scraper = { path = "crates/bid-scraper" }
rbuilder = { path = "crates/rbuilder" }
sysperf = { path = "crates/sysperf" }
metrics_macros = { path = "crates/rbuilder/src/telemetry/metrics_macros"}
3 changes: 3 additions & 0 deletions config-live-example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ redacted_telemetry_server_ip = "0.0.0.0"
full_telemetry_server_port = 6060
full_telemetry_server_ip = "0.0.0.0"


chain = "mainnet"
reth_datadir = "/mnt/data/reth"

Expand All @@ -20,6 +21,8 @@ jsonrpc_server_ip = "0.0.0.0"
el_node_ipc_path = "/tmp/reth.ipc"
extra_data = "⚡🤖"

scraped_bids_publisher_url = "tcp://0.0.0.0:5555"

blocklist_file_path = "./blocklist.json"

ignore_cancellable_orders = true
Expand Down
12 changes: 8 additions & 4 deletions crates/bid-scraper/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ repository.workspace = true
exclude.workspace = true

[dependencies]
ethers = { version = "^2", default-features = false, features = ["ws", "rustls"] }
tokio = { version = "1.40", default-features = false, features = ["rt", "rt-multi-thread", "macros","signal"] }
tokio-util = "0.7.12"
tracing = "0.1.37"
Expand All @@ -21,22 +20,27 @@ serde = { workspace = true }
serde_with = { version = "3.9.0", features = ["time_0_3"] }
chrono = "^0.4"
futures = "^0.3"
futures-retry = "0.6"
rand = "^0.8"
runng = "^0.3"
reqwest = { version = "^0.11", default-features = false, features = ["rustls", "gzip", "brotli", "deflate", "json"] }
reqwest = { version = "^0.11", default-features = false, features = ["rustls", "gzip", "brotli", "deflate", "json", "rustls-tls"] }
lru = "^0.10"
async-trait = "^0.1"
tokio-tungstenite = { version = "^0.19", features = ["rustls-tls-native-roots"] }
futures-util = "^0.3"
ethereum_ssz = "^0.5"
ethereum_ssz_derive = "^0.5"
ethereum_ssz_derive = "0.9"
ethereum_ssz = "0.9"
ssz_types = "^0.5"
hex = "^0.4"
derivative = "2.2"
toml = "0.8.8"
eyre = "0.6.12"
thiserror = { version = "1.0.64" }
parking_lot = { version = "0.12.3" }
strum = { version = "0.25", features = ["derive"] }
alloy-primitives.workspace = true
alloy-provider.workspace = true
alloy-rpc-types-beacon.workspace = true

[dev-dependencies]
proptest = "1.4"
69 changes: 69 additions & 0 deletions crates/bid-scraper/src/bid_scraper_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use futures_retry::{FutureRetry, RetryPolicy};
use runng::{
asyncio::{AsyncSocket, ReadAsync},
latest::ProtocolFactory,
protocol::Subscribe,
Dial,
};
use std::{sync::Arc, time::Duration};
use tokio_util::sync::CancellationToken;
use tracing::info;

use crate::types::BlockBid;

/// Sink for scraped bids.
pub trait ScrapedBidsObs: Send + Sync {
/// Be careful, we don't assume any kind of filtering here so bid may contain our own bids.
fn update_new_bid(&self, bid: BlockBid);
}

/// NNG subscriber with infinite retries.
/// timeout: if we don't get a new bid in this time we reconnect.
/// retry_wait: time we wait to reconnect.
pub async fn run_nng_subscriber_with_retries(
obs: Arc<dyn ScrapedBidsObs>,
cancel: CancellationToken,
publisher_url: String,
timeout: Duration,
retry_wait: Duration,
) {
let url = publisher_url.clone(); // for reuse in error handler
tokio::select! {
result = FutureRetry::new(
move || run_nng_subscriber(obs.clone(), publisher_url.clone(), timeout),
move |error: Box<dyn std::error::Error>| {
tracing::error!("Subscriber to {url} returned an error: {error:?}");
RetryPolicy::<()>::WaitRetry(retry_wait)
},
) => {
let attempts = match result {
Ok((_, attempts)) => attempts,
Err((_, attempts)) => attempts,
};
unreachable!("NNG subscription exited after {attempts} attempts")
}
_ = cancel.cancelled() => {
info!("bid scraper NNG subscription cancelled");
}
}
}

/// NNG subscriber that forwards bids to the channel.
async fn run_nng_subscriber(
obs: Arc<dyn ScrapedBidsObs>,
publisher_url: String,
timeout: Duration,
) -> Result<(), Box<dyn std::error::Error>> {
let mut socket = ProtocolFactory::default().subscriber_open()?;
socket.dial(&publisher_url)?;
socket.subscribe_str("").expect("failed to subscribe");

let mut nng_reader = socket.create_async()?;
tracing::info!(target: "bidder", publisher_url, "Created nanomsg socket and subscribed");

loop {
let msg = tokio::time::timeout(timeout, nng_reader.receive()).await??;
let block_bid: BlockBid = serde_json::from_slice(msg.body())?;
obs.update_new_bid(block_bid);
}
}
63 changes: 28 additions & 35 deletions crates/bid-scraper/src/bids_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@ use crate::{
types::{BlockBid, PublisherType},
DynResult, RPC_TIMEOUT,
};
use alloy_primitives::{Address, BlockHash, U256};
use alloy_provider::{Provider, ProviderBuilder};
use alloy_rpc_types_beacon::BlsPublicKey;
use async_trait::async_trait;
use ethers::{
abi::{AbiDecode, AbiEncode},
prelude::*,
};
use eyre::{eyre, Context};
use eyre::Context;
use lru::LruCache;
use parking_lot::{Mutex, MutexGuard};
use serde::Deserialize;
Expand Down Expand Up @@ -115,13 +114,11 @@ impl Service<RelayBidsPublisherConfig> for BidsPublisherService {
.simple_relay_publisher_config()
.eth_provider_uri
.clone();
let provider = timeout(
RPC_TIMEOUT,
Provider::<Ws>::connect(eth_provider_uri.clone()),
)
.await
.wrap_err("could not connect to node in time")?
.wrap_err("unable to connect to node?")?;
let ws_conn = alloy_provider::WsConnect::new(eth_provider_uri);
let provider = timeout(RPC_TIMEOUT, ProviderBuilder::new().connect_ws(ws_conn))
.await
.wrap_err("could not connect to node in time")?
.wrap_err("unable to connect to node?")?;

let mut subscription = provider
.subscribe_blocks()
Expand All @@ -130,16 +127,16 @@ impl Service<RelayBidsPublisherConfig> for BidsPublisherService {
info!("New blocks subscriber connected and ready. Waiting for the first block...");
let cancel_token = self.cancellation_token();
while !cancel_token.is_cancelled() {
let block = timeout(RPC_TIMEOUT, subscription.next())
let block = timeout(RPC_TIMEOUT, subscription.recv())
.await
.wrap_err("didn't receive a new block in time")?
.ok_or(eyre!("didn't receive a new block"))?;
.wrap_err("didn't receive a new block")?;
{
trace!("got block {:?}", block);
let mut inner = self.inner();
inner.last_block_number = block.number.ok_or(eyre!("no block number"))?.as_u64();
inner.last_block_hash = block.hash.ok_or(eyre!("no block hash"))?.encode_hex();
inner.last_slot = slot::get_slot_number(block.timestamp.as_u64());
inner.last_block_number = block.number;
inner.last_block_hash = block.hash.to_string();
inner.last_slot = slot::get_slot_number(block.timestamp);
info!(
"New block {} ({}).",
inner.last_block_number, inner.last_block_hash,
Expand All @@ -158,11 +155,10 @@ impl BidsPublisherService {
client: &reqwest::Client,
) -> DynResult<Vec<BlockBid>> {
debug!("Getting bids for relay {relay_name}");

let block_number = self.inner().last_block_number + 1;
let url = format!(
"{}/relay/v1/data/bidtraces/builder_blocks_received?block_number={}&order_by=-value",
relay_endpoint,
self.inner().last_block_number + 1
relay_endpoint, block_number
);
// By default it's ordered by slot (so, no effect). So we order by decreasing value
// instead, it's more interesting to us.
Expand All @@ -183,19 +179,18 @@ impl BidsPublisherService {
let bid = BlockBid {
publisher_name: self.name.clone(),
publisher_type: PublisherType::RelayBids,
builder_pubkey: Some(
builder_pubkey: Some(BlsPublicKey::from_str(
json_bid["builder_pubkey"]
.as_str()
.ok_or("unable to parse builder_pubkey")?
.to_lowercase(),
),
.ok_or("unable to parse builder_pubkey")?,
)?),
relay_name: relay_name.to_string(),
parent_hash: H256::decode_hex(
parent_hash: BlockHash::from_str(
json_bid["parent_hash"]
.as_str()
.ok_or("unable to parse parent_hash")?,
)?,
block_hash: H256::decode_hex(
block_hash: BlockHash::from_str(
json_bid["block_hash"]
.as_str()
.ok_or("unable to parse block_hash")?,
Expand All @@ -219,26 +214,24 @@ impl BidsPublisherService {
.ok_or("unable to parse value")?
.parse::<u128>()?,
),
slot_number: U64::from(
json_bid["slot"]
.as_str()
.ok_or("unable to parse slot")?
.parse::<u64>()?,
),
gas_used: Some(U64::from(
slot_number: json_bid["slot"]
.as_str()
.ok_or("unable to parse slot")?
.parse::<u64>()?,
gas_used: Some(
json_bid["gas_used"]
.as_str()
.ok_or("unable to parse gas_used")?
.parse::<u64>()?,
)),
),
proposer_fee_recipient: Some(Address::from_str(
json_bid["proposer_fee_recipient"]
.as_str()
.ok_or("unable to parse proposer_fee_recipient")?,
)?),
fee_recipient: None,
optimistic_submission: json_bid["optimistic_submission"].as_bool(),
block_number: None,
block_number,
extra_data: None,
};
debug!("Found bid: {bid:?}");
Expand Down
58 changes: 58 additions & 0 deletions crates/bid-scraper/src/bin/bid-scraper-test-client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use std::{env, sync::Arc, time::Duration};

use alloy_primitives::utils::format_ether;
use bid_scraper::{
bid_scraper_client::{run_nng_subscriber_with_retries, ScrapedBidsObs},
code_from_rbuilder::{setup_tracing_subscriber, LoggerConfig},
types::BlockBid,
};
use tokio::signal::ctrl_c;
use tokio_util::sync::CancellationToken;

struct ScrapedBidsPrinter {}
impl ScrapedBidsObs for ScrapedBidsPrinter {
fn update_new_bid(&self, bid: BlockBid) {
println!(
"New bid {:?} ({:?}) Block {:?} val {:?}",
bid.publisher_name,
bid.publisher_type,
bid.block_number,
format_ether(bid.value)
);
}
}

#[tokio::main]
async fn main() -> eyre::Result<()> {
let args: Vec<String> = env::args().collect();
if args.len() != 2 {
println!("How do you expect me to know where to connect to get your bids? Should I guess? Try all ips and ports at random?\nHere is an idea: Pass me as a single parameter where to connect to.\nSomething like: {} tcp://127.0.01:5555",args[0]);
return Ok(());
}
let log_config = LoggerConfig {
env_filter: "info".to_owned(),
log_json: false,
log_color: true,
};
setup_tracing_subscriber(log_config)?;

let cancel = CancellationToken::new();
tokio::spawn({
let cancel = cancel.clone();
async move {
ctrl_c().await.unwrap_or_default();
cancel.cancel()
}
});
let publisher_url = args[1].clone();
println!("Connecting to publishers..");
let _ = tokio::spawn(run_nng_subscriber_with_retries(
Arc::new(ScrapedBidsPrinter {}),
cancel,
publisher_url,
Duration::from_secs(10),
Duration::from_secs(10),
))
.await;
Ok(())
}
22 changes: 14 additions & 8 deletions crates/bid-scraper/src/bloxroute_ws_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@ use crate::{
ws_publisher::{ConnectionHandler, Service},
DynResult, RPC_TIMEOUT,
};
use ethers::prelude::*;
use futures::stream::{SplitSink, SplitStream};
use alloy_primitives::{Address, BlockHash, U256};
use alloy_rpc_types_beacon::BlsPublicKey;
use futures::{
stream::{SplitSink, SplitStream},
StreamExt,
};
use futures_util::SinkExt;
use serde::Deserialize;
use serde_json::json;
use std::str::FromStr;
use tokio::{net::TcpStream, time::timeout};
use tokio_tungstenite::{
tungstenite::{http::Request, protocol::Message},
Expand All @@ -32,10 +37,11 @@ pub struct BloxrouteWsPublisherConfig {
struct BloxrouteWsBid {
relay_type: String,
builder_pubkey: String,
parent_hash: H256,
block_hash: H256,
parent_hash: BlockHash,
block_hash: BlockHash,
timestamp_ms: u64,
block_value: u128,
block_number: u64,
slot_number: u64,
#[serde(default)]
gas_used: u64,
Expand Down Expand Up @@ -67,19 +73,19 @@ impl BloxrouteWsConnectionHandler {
let bid = BlockBid {
publisher_name: self.name.clone(),
publisher_type: PublisherType::BloxrouteWs,
builder_pubkey: Some(parsed.builder_pubkey.to_lowercase()),
builder_pubkey: Some(BlsPublicKey::from_str(&parsed.builder_pubkey)?),
relay_name,
parent_hash: parsed.parent_hash,
block_hash: parsed.block_hash,
seen_time: get_timestamp_f64(),
relay_time: Some(parsed.timestamp_ms as f64 / 1000.),
value: U256::from(parsed.block_value),
slot_number: U64::from(parsed.slot_number),
gas_used: Some(U64::from(parsed.gas_used)),
slot_number: parsed.slot_number,
gas_used: Some(parsed.gas_used),
proposer_fee_recipient: Some(parsed.proposer_fee_recipient),
fee_recipient: None,
optimistic_submission: parsed.optimistic_submission,
block_number: None,
block_number: parsed.block_number,
extra_data: None,
};
debug!("Found bid: {bid:?}");
Expand Down
Loading
Loading