Skip to content

fix: the right code #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions src/tasks/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,13 @@ impl BlockBuilder {

let (sender, mut inbound) = mpsc::unbounded_channel();

let mut sleep = Box::pin(tokio::time::sleep(Duration::from_secs(
self.incoming_transactions_buffer,
)));

let handle = tokio::spawn(
async move {
loop {
let sleep: tokio::time::Sleep = tokio::time::sleep(Duration::from_secs(self.incoming_transactions_buffer));
tokio::pin!(sleep);

select! {
biased;
Expand All @@ -124,6 +126,10 @@ impl BlockBuilder {
break
}
}

// Reset the sleep timer, as we want to do so when (and only when) our sleep future has elapsed,
// irrespective of whether we have any blocks to build.
sleep.as_mut().reset(tokio::time::Instant::now() + Duration::from_secs(self.incoming_transactions_buffer));
}
item_res = inbound.recv() => {
match item_res {
Expand Down
143 changes: 89 additions & 54 deletions src/tasks/submit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,39 +3,37 @@ use crate::{
signer::LocalOrAws,
tasks::block::InProgressBlock,
};
use alloy::consensus::{constants::GWEI_TO_WEI, SimpleCoder};
use alloy::eips::BlockNumberOrTag;
use alloy::network::{TransactionBuilder, TransactionBuilder4844};
use alloy::providers::{Provider as _, WalletProvider};
use alloy::rpc::types::eth::TransactionRequest;
use alloy::signers::Signer;
use alloy::sol_types::SolCall;
use alloy::transports::TransportError;
use alloy::{consensus::SimpleCoder, providers::SendableTx};
use alloy_primitives::{FixedBytes, U256};
use eyre::bail;
use alloy_sol_types::SolError;
use eyre::eyre;
use oauth2::{
basic::BasicClient, basic::BasicTokenType, reqwest::http_client, AuthUrl, ClientId,
ClientSecret, EmptyExtraTokenFields, StandardTokenResponse, TokenResponse, TokenUrl,
};
use tokio::{sync::mpsc, task::JoinHandle};
use tracing::{debug, error, instrument, trace, warn};
use zenith_types::{SignRequest, SignResponse, Zenith};

macro_rules! spawn_provider_send {
($provider:expr, $tx:expr) => {
let p = $provider.clone();
let t = $tx.clone();
tokio::spawn(async move {
if let Err(e) = p.send_tx_envelope(t).await {
warn!(%e, "error in transaction broadcast");
}
});

};
}
use tracing::{debug, error, instrument, trace};
use zenith_types::{
SignRequest, SignResponse,
Zenith::{self, IncorrectHostBlock},
};

/// OAuth Audience Claim Name, required param by IdP for client credential grant
const OAUTH_AUDIENCE_CLAIM: &str = "audience";

pub enum ControlFlow {
Retry,
Skip,
Done,
}

/// Submits sidecars in ethereum txns to mainnet ethereum
pub struct SubmitTask {
/// Ethereum Provider
Expand Down Expand Up @@ -102,10 +100,10 @@ impl SubmitTask {
#[instrument(skip_all)]
async fn construct_sig_request(&self, contents: &InProgressBlock) -> eyre::Result<SignRequest> {
let ru_chain_id = U256::from(self.config.ru_chain_id);
let block_height = self.host_block_height().await?;
let next_block_height = self.next_host_block_height().await?;

Ok(SignRequest {
host_block_number: U256::from(block_height),
host_block_number: U256::from(next_block_height),
host_chain_id: U256::from(self.config.host_chain_id),
ru_chain_id,
gas_limit: U256::from(self.config.rollup_block_gas_limit),
Expand Down Expand Up @@ -133,19 +131,23 @@ impl SubmitTask {
let sidecar = in_progress.encode_blob::<SimpleCoder>().build()?;
Ok(TransactionRequest::default()
.with_blob_sidecar(sidecar)
.with_input(data))
.with_input(data)
.with_max_priority_fee_per_gas((GWEI_TO_WEI * 16) as u128))
}

async fn host_block_height(&self) -> eyre::Result<u64> {
async fn next_host_block_height(&self) -> eyre::Result<u64> {
let result = self.provider.get_block_number().await?;
Ok(result)
let next = result
.checked_add(1)
.ok_or_else(|| eyre!("next host block height overflow"))?;
Ok(next)
}

async fn submit_transaction(
&self,
resp: &SignResponse,
in_progress: &InProgressBlock,
) -> eyre::Result<()> {
) -> eyre::Result<ControlFlow> {
let v: u8 = resp.sig.v().y_parity_byte() + 27;
let r: FixedBytes<32> = resp.sig.r().into();
let s: FixedBytes<32> = resp.sig.s().into();
Expand All @@ -161,60 +163,62 @@ impl SubmitTask {
let tx = self
.build_blob_tx(header, v, r, s, in_progress)?
.with_from(self.provider.default_signer_address())
.with_to(self.config.zenith_address);

if let Err(TransportError::ErrorResp(e)) = self.provider.call(&tx).await {
.with_to(self.config.zenith_address)
.with_gas_limit(1_000_000);

if let Err(TransportError::ErrorResp(e)) = self
.provider
.call(&tx)
.block(BlockNumberOrTag::Pending.into())
.await
{
error!(
code = e.code,
message = %e.message,
data = ?e.data,
"error in transaction submission"
);

bail!("bailing transaction submission")
}

self.send_transaction(resp, tx).await?;
if e.as_revert_data() == Some(IncorrectHostBlock::SELECTOR.into()) {
return Ok(ControlFlow::Retry);
}

Ok(())
}
return Ok(ControlFlow::Skip);
}

async fn send_transaction(
&self,
resp: &SignResponse,
tx: TransactionRequest,
) -> Result<(), eyre::Error> {
tracing::debug!(
host_block_number = %resp.req.host_block_number,
gas_limit = %resp.req.gas_limit,
"sending transaction to network"
);

let SendableTx::Envelope(tx) = self.provider.fill(tx).await? else {
bail!("failed to fill transaction")
let _ = match self.provider.send_transaction(tx).await {
Ok(result) => result,
Err(e) => {
error!(error = %e, "error sending transaction");
return Ok(ControlFlow::Skip);
}
};

// Send the tx via the primary provider
spawn_provider_send!(&self.provider, &tx);

// Spawn send_tx futures for all additional broadcast providers
for provider in self.config.connect_additional_broadcast().await? {
spawn_provider_send!(&provider, &tx);
}

tracing::info!(
tx_hash = %tx.tx_hash(),
ru_chain_id = %resp.req.ru_chain_id,
gas_limit = %resp.req.gas_limit,
"dispatched to network"
);
Ok(())

Ok(ControlFlow::Done)
}

#[instrument(skip_all, err)]
async fn handle_inbound(&self, in_progress: &InProgressBlock) -> eyre::Result<()> {
async fn handle_inbound(&self, in_progress: &InProgressBlock) -> eyre::Result<ControlFlow> {
tracing::info!(txns = in_progress.len(), "handling inbound block");
let sig_request = self.construct_sig_request(in_progress).await?;
let sig_request = match self.construct_sig_request(in_progress).await {
Ok(sig_request) => sig_request,
Err(e) => {
tracing::error!(error = %e, "error constructing signature request");
return Ok(ControlFlow::Skip);
}
};

tracing::debug!(
host_block_number = %sig_request.host_block_number,
Expand All @@ -235,7 +239,13 @@ impl SubmitTask {
sig,
}
} else {
let resp: SignResponse = self.sup_quincey(&sig_request).await?;
let resp: SignResponse = match self.sup_quincey(&sig_request).await {
Ok(resp) => resp,
Err(e) => {
tracing::error!(error = %e, "error acquiring signature from quincey");
return Ok(ControlFlow::Retry);
}
};
tracing::debug!(
sig = hex::encode(resp.sig.as_bytes()),
"acquired signature from quincey"
Expand All @@ -252,8 +262,33 @@ impl SubmitTask {
let handle = tokio::spawn(async move {
loop {
if let Some(in_progress) = inbound.recv().await {
if let Err(e) = self.handle_inbound(&in_progress).await {
error!(%e, "error in block submission. Dropping block.");
let mut retries = 0;
loop {
match self.handle_inbound(&in_progress).await {
Ok(ControlFlow::Retry) => {
retries += 1;
if retries > 3 {
tracing::error!(
"error handling inbound block: too many retries"
);
break;
}
tracing::error!("error handling inbound block: retrying");
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
}
Ok(ControlFlow::Skip) => {
tracing::info!("skipping block");
break;
}
Ok(ControlFlow::Done) => {
tracing::info!("block landed successfully");
break;
}
Err(e) => {
tracing::error!(error = %e, "error handling inbound block");
break;
}
}
}
} else {
tracing::debug!("upstream task gone");
Expand Down