Skip to content

Adds bundle support to block builder #8

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ aws-sdk-kms = "1.15.0"
hex = { package = "const-hex", version = "1", default-features = false, features = [
"alloc",
] }

signet-types = { git = "ssh://git@github.com/init4tech/signet-node.git" }

serde = { version = "1.0.197", features = ["derive"] }
tracing = "0.1.40"

Expand All @@ -41,7 +44,7 @@ openssl = { version = "0.10", features = ["vendored"] }
reqwest = { version = "0.11.24", features = ["blocking", "json"] }
ruint = "1.12.1"
serde_json = "1.0"
thiserror = "1.0.58"
thiserror = "1.0.68"
tokio = { version = "1.36.0", features = ["full", "macros", "rt-multi-thread"] }
tracing-subscriber = "0.3.18"

Expand Down
24 changes: 18 additions & 6 deletions bin/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

use builder::config::BuilderConfig;
use builder::service::serve_builder_with_span;
use builder::tasks::bundler::BundlePoller;
use builder::tasks::oauth::Authenticator;
use builder::tasks::tx_poller::TxPoller;

use tokio::select;
Expand All @@ -11,8 +13,9 @@ async fn main() -> eyre::Result<()> {
tracing_subscriber::fmt::try_init().unwrap();
let span = tracing::info_span!("zenith-builder");

let config = BuilderConfig::load_from_env()?;
let config = BuilderConfig::load_from_env()?.clone();
let provider = config.connect_provider().await?;
let authenticator = Authenticator::new(&config);

tracing::debug!(
rpc_url = config.host_rpc_url.as_ref(),
Expand All @@ -23,23 +26,26 @@ async fn main() -> eyre::Result<()> {
let zenith = config.connect_zenith(provider.clone());

let port = config.builder_port;

let tx_poller = TxPoller::new(&config);
let bundle_poller = BundlePoller::new(&config, authenticator.clone()).await;
let builder = builder::tasks::block::BlockBuilder::new(&config);

let submit = builder::tasks::submit::SubmitTask {
authenticator: authenticator.clone(),
provider,
zenith,
client: reqwest::Client::new(),
sequencer_signer,
config,
config: config.clone(),
};

let authenticator_jh = authenticator.spawn();
let (submit_channel, submit_jh) = submit.spawn();
let (build_channel, build_jh) = builder.spawn(submit_channel);
let tx_poller_jh = tx_poller.spawn(build_channel.clone());
let (tx_channel, bundle_channel, build_jh) = builder.spawn(submit_channel);
let tx_poller_jh = tx_poller.spawn(tx_channel.clone());
let bundle_poller_jh = bundle_poller.spawn(bundle_channel);

let server = serve_builder_with_span(build_channel, ([0, 0, 0, 0], port), span);
let server = serve_builder_with_span(tx_channel, ([0, 0, 0, 0], port), span);

select! {
_ = submit_jh => {
Expand All @@ -54,6 +60,12 @@ async fn main() -> eyre::Result<()> {
_ = tx_poller_jh => {
tracing::info!("tx_poller finished");
}
_ = bundle_poller_jh => {
tracing::info!("bundle_poller finished");
}
_ = authenticator_jh => {
tracing::info!("authenticator finished");
}
}

tracing::info!("shutting down");
Expand Down
4 changes: 4 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const BUILDER_REWARDS_ADDRESS: &str = "BUILDER_REWARDS_ADDRESS";
const ROLLUP_BLOCK_GAS_LIMIT: &str = "ROLLUP_BLOCK_GAS_LIMIT";
const TX_POOL_URL: &str = "TX_POOL_URL";
const TX_POOL_POLL_INTERVAL: &str = "TX_POOL_POLL_INTERVAL";
const AUTH_TOKEN_REFRESH_INTERVAL: &str = "AUTH_TOKEN_REFRESH_INTERVAL";
const TX_POOL_CACHE_DURATION: &str = "TX_POOL_CACHE_DURATION";
const OAUTH_CLIENT_ID: &str = "OAUTH_CLIENT_ID";
const OAUTH_CLIENT_SECRET: &str = "OAUTH_CLIENT_SECRET";
Expand Down Expand Up @@ -82,6 +83,8 @@ pub struct BuilderConfig {
pub oauth_token_url: String,
/// OAuth audience for the builder.
pub oauth_audience: String,
/// The oauth token refresh interval in seconds.
pub oauth_token_refresh_interval: u64,
}

#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -159,6 +162,7 @@ impl BuilderConfig {
oauth_authenticate_url: load_string(OAUTH_AUTHENTICATE_URL)?,
oauth_token_url: load_string(OAUTH_TOKEN_URL)?,
oauth_audience: load_string(OAUTH_AUDIENCE)?,
oauth_token_refresh_interval: load_u64(AUTH_TOKEN_REFRESH_INTERVAL)?,
})
}

Expand Down
1 change: 0 additions & 1 deletion src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ pub async fn ingest_raw_handler(
) -> Result<Response, AppError> {
let body = body.strip_prefix("0x").unwrap_or(&body);
let buf = hex::decode(body).map_err(AppError::bad_req)?;

let envelope = TxEnvelope::decode_2718(&mut buf.as_slice()).map_err(AppError::bad_req)?;

ingest_handler(State(state), Json(envelope)).await
Expand Down
57 changes: 50 additions & 7 deletions src/tasks/block.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
use alloy::consensus::{SidecarBuilder, SidecarCoder, TxEnvelope};
use alloy::{
consensus::{SidecarBuilder, SidecarCoder, TxEnvelope},
eips::eip2718::Decodable2718,
};
use alloy_primitives::{keccak256, Bytes, B256};
use alloy_rlp::Buf;
use std::{sync::OnceLock, time::Duration};
use tokio::{select, sync::mpsc, task::JoinHandle};
use tracing::Instrument;
use zenith_types::{encode_txns, Alloy2718Coder};

use crate::config::BuilderConfig;

use super::bundler::Bundle;

#[derive(Debug, Default, Clone)]
/// A block in progress.
pub struct InProgressBlock {
Expand Down Expand Up @@ -57,6 +63,29 @@ impl InProgressBlock {
self.transactions.push(tx.clone());
}

/// Ingest a bundle into the in-progress block.
/// Ignores Signed Orders for now.
pub fn ingest_bundle(&mut self, bundle: Bundle) {
tracing::info!(bundle = %bundle.id, "ingesting bundle");

let txs = bundle
.bundle
.bundle
.txs
.into_iter()
.map(|tx| TxEnvelope::decode_2718(&mut tx.chunk()))
.collect::<Result<Vec<_>, _>>();

if let Ok(txs) = txs {
self.unseal();
// extend the transactions with the decoded transactions.
// As this builder does not provide bundles landing "top of block", its fine to just extend.
self.transactions.extend(txs);
} else {
tracing::error!("failed to decode bundle. dropping");
}
}

/// Encode the in-progress block
fn encode_raw(&self) -> &Bytes {
self.seal();
Expand Down Expand Up @@ -102,10 +131,15 @@ impl BlockBuilder {
pub fn spawn(
self,
outbound: mpsc::UnboundedSender<InProgressBlock>,
) -> (mpsc::UnboundedSender<TxEnvelope>, JoinHandle<()>) {
) -> (
mpsc::UnboundedSender<TxEnvelope>,
mpsc::UnboundedSender<Bundle>,
JoinHandle<()>,
) {
let mut in_progress = InProgressBlock::default();

let (sender, mut inbound) = mpsc::unbounded_channel();
let (tx_sender, mut tx_inbound) = mpsc::unbounded_channel();
let (bundle_sender, mut bundle_inbound) = mpsc::unbounded_channel();

let mut sleep = Box::pin(tokio::time::sleep(Duration::from_secs(
self.incoming_transactions_buffer,
Expand All @@ -131,9 +165,18 @@ impl BlockBuilder {
// irrespective of whether we have any blocks to build.
sleep.as_mut().reset(tokio::time::Instant::now() + Duration::from_secs(self.incoming_transactions_buffer));
}
item_res = inbound.recv() => {
match item_res {
Some(item) => in_progress.ingest_tx(&item),
tx_resp = tx_inbound.recv() => {
match tx_resp {
Some(tx) => in_progress.ingest_tx(&tx),
None => {
tracing::debug!("upstream task gone");
break
}
}
}
bundle_resp = bundle_inbound.recv() => {
match bundle_resp {
Some(bundle) => in_progress.ingest_bundle(bundle),
None => {
tracing::debug!("upstream task gone");
break
Expand All @@ -146,6 +189,6 @@ impl BlockBuilder {
.in_current_span(),
);

(sender, handle)
(tx_sender, bundle_sender, handle)
}
}
127 changes: 127 additions & 0 deletions src/tasks/bundler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
//! Bundler service responsible for polling and submitting bundles to the in-progress block.
use std::time::{Duration, Instant};

pub use crate::config::BuilderConfig;
use alloy_primitives::map::HashMap;
use reqwest::Url;
use serde::{Deserialize, Serialize};
use signet_types::SignetEthBundle;
use tokio::{sync::mpsc, task::JoinHandle};
use tracing::debug;

use oauth2::TokenResponse;

use super::oauth::Authenticator;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Bundle {
pub id: String,
pub bundle: SignetEthBundle,
}

/// Response from the tx-pool containing a list of bundles.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TxPoolBundleResponse {
pub bundles: Vec<Bundle>,
}

pub struct BundlePoller {
pub config: BuilderConfig,
pub authenticator: Authenticator,
pub seen_uuids: HashMap<String, Instant>,
}

/// Implements a poller for the block builder to pull bundles from the tx cache.
impl BundlePoller {
/// Creates a new BundlePoller from the provided builder config.
pub async fn new(config: &BuilderConfig, authenticator: Authenticator) -> Self {
Self {
config: config.clone(),
authenticator,
seen_uuids: HashMap::new(),
}
}

/// Fetches bundles from the transaction cache and returns the (oldest? random?) bundle in the cache.
pub async fn check_bundle_cache(&mut self) -> eyre::Result<Vec<Bundle>> {
let mut unique: Vec<Bundle> = Vec::new();

let bundle_url: Url = Url::parse(&self.config.tx_pool_url)?.join("bundles")?;
let token = self.authenticator.fetch_oauth_token().await?;

// Add the token to the request headers
let result = reqwest::Client::new()
.get(bundle_url)
.bearer_auth(token.access_token().secret())
.send()
.await?
.error_for_status()?;

let body = result.bytes().await?;
let bundles: TxPoolBundleResponse = serde_json::from_slice(&body)?;

bundles.bundles.iter().for_each(|bundle| {
self.check_seen_bundles(bundle.clone(), &mut unique);
});

Ok(unique)
}

/// Checks if the bundle has been seen before and if not, adds it to the unique bundles list.
fn check_seen_bundles(&mut self, bundle: Bundle, unique: &mut Vec<Bundle>) {
self.seen_uuids.entry(bundle.id.clone()).or_insert_with(|| {
// add to the set of unique bundles
unique.push(bundle.clone());
Instant::now() + Duration::from_secs(self.config.tx_pool_cache_duration)
});
}

/// Evicts expired bundles from the cache.
fn evict(&mut self) {
let expired_keys: Vec<String> = self
.seen_uuids
.iter()
.filter_map(|(key, expiry)| {
if expiry.elapsed().is_zero() {
Some(key.clone())
} else {
None
}
})
.collect();

for key in expired_keys {
self.seen_uuids.remove(&key);
}
}

pub fn spawn(mut self, bundle_channel: mpsc::UnboundedSender<Bundle>) -> JoinHandle<()> {
let handle: JoinHandle<()> = tokio::spawn(async move {
loop {
let bundle_channel = bundle_channel.clone();
let bundles = self.check_bundle_cache().await;

match bundles {
Ok(bundles) => {
for bundle in bundles {
let result = bundle_channel.send(bundle);
if result.is_err() {
tracing::debug!("bundle_channel failed to send bundle");
}
}
}
Err(err) => {
debug!(?err, "error fetching bundles from tx-pool");
}
}

// evict expired bundles once every loop
self.evict();

tokio::time::sleep(Duration::from_secs(self.config.tx_pool_poll_interval)).await;
}
});

handle
}
}
2 changes: 2 additions & 0 deletions src/tasks/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
pub mod block;
pub mod bundler;
pub mod oauth;
pub mod submit;
pub mod tx_poller;
Loading