Skip to content

Commit

Permalink
deps: bump Reth and use new notifications API (#28)
Browse files Browse the repository at this point in the history
* deps: bump Reth and use new notifications API

* remove unused import
  • Loading branch information
shekhirin authored Oct 7, 2024
1 parent bd1c173 commit c58e2a7
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 137 deletions.
253 changes: 128 additions & 125 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion backfill/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl<Node: FullNodeComponents> BackfillExEx<Node> {
loop {
tokio::select! {
Some(notification) = self.ctx.notifications.next() => {
self.handle_notification(notification).await?;
self.handle_notification(notification?).await?;
}
Some(message) = self.backfill_rx.recv() => {
self.handle_backfill_message(message).await;
Expand Down
4 changes: 2 additions & 2 deletions discv5/src/exex/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use eyre::Result;
use futures::{Future, FutureExt, StreamExt};
use futures::{Future, FutureExt, TryStreamExt};
use reth_exex::{ExExContext, ExExEvent, ExExNotification};
use reth_node_api::FullNodeComponents;
use reth_tracing::tracing::{error, info};
Expand Down Expand Up @@ -45,7 +45,7 @@ impl<Node: FullNodeComponents> Future for ExEx<Node> {

// Continuously poll the ExExContext notifications
loop {
if let Some(notification) = ready!(self.exex.notifications.poll_next_unpin(cx)) {
if let Some(notification) = ready!(self.exex.notifications.try_next().poll_unpin(cx))? {
match &notification {
ExExNotification::ChainCommitted { new } => {
info!(committed_chain = ?new.range(), "Received commit");
Expand Down
4 changes: 2 additions & 2 deletions in-memory-state/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![warn(unused_crate_dependencies)]

use futures_util::StreamExt;
use futures_util::{FutureExt, TryStreamExt};
use reth_execution_types::ExecutionOutcome;
use reth_exex::{ExExContext, ExExEvent, ExExNotification};
use reth_node_api::FullNodeComponents;
Expand Down Expand Up @@ -33,7 +33,7 @@ impl<Node: FullNodeComponents + Unpin> Future for InMemoryStateExEx<Node> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();

while let Some(notification) = ready!(this.ctx.notifications.poll_next_unpin(cx)) {
while let Some(notification) = ready!(this.ctx.notifications.try_next().poll_unpin(cx))? {
match &notification {
ExExNotification::ChainCommitted { new } => {
info!(committed_chain = ?new.range(), "Received commit");
Expand Down
4 changes: 2 additions & 2 deletions op-bridge/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use alloy_primitives::{address, Address};
use alloy_sol_types::{sol, SolEventInterface};
use futures::{Future, FutureExt, StreamExt};
use futures::{Future, FutureExt, TryStreamExt};
use reth_execution_types::Chain;
use reth_exex::{ExExContext, ExExEvent};
use reth_node_api::FullNodeComponents;
Expand Down Expand Up @@ -103,7 +103,7 @@ async fn op_bridge_exex<Node: FullNodeComponents>(
connection: Connection,
) -> eyre::Result<()> {
// Process all new chain state notifications
while let Some(notification) = ctx.notifications.next().await {
while let Some(notification) = ctx.notifications.try_next().await? {
// Revert all deposits and withdrawals
if let Some(reverted_chain) = notification.reverted_chain() {
let events = decode_chain_into_events(&reverted_chain);
Expand Down
4 changes: 2 additions & 2 deletions oracle/src/exex.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use eyre::Result;
use futures::{Future, StreamExt};
use futures::{Future, FutureExt, TryStreamExt};
use reth::providers::ExecutionOutcome;
use reth_exex::{ExExContext, ExExEvent, ExExNotification};
use reth_node_api::FullNodeComponents;
Expand Down Expand Up @@ -28,7 +28,7 @@ impl<Node: FullNodeComponents> Future for ExEx<Node> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Continuously poll the ExExContext notifications
let this = self.get_mut();
while let Some(notification) = ready!(this.ctx.notifications.poll_next_unpin(cx)) {
while let Some(notification) = ready!(this.ctx.notifications.try_next().poll_unpin(cx))? {
match &notification {
ExExNotification::ChainCommitted { new } => {
info!(committed_chain = ?new.range(), "Received commit");
Expand Down
2 changes: 1 addition & 1 deletion remote/bin/exex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async fn exex<Node: FullNodeComponents>(
mut ctx: ExExContext<Node>,
notifications: broadcast::Sender<ExExNotification>,
) -> eyre::Result<()> {
while let Some(notification) = ctx.notifications.next().await {
while let Some(notification) = ctx.notifications.try_next().await? {
if let Some(committed_chain) = notification.committed_chain() {
ctx.events.send(ExExEvent::FinishedHeight(committed_chain.tip().num_hash()))?;
}
Expand Down
4 changes: 2 additions & 2 deletions rollup/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use alloy_primitives::{address, Address, U256};
use alloy_sol_types::{sol, SolEventInterface, SolInterface};
use db::Database;
use execution::execute_block;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
use once_cell::sync::Lazy;
use reth_chainspec::{ChainSpec, ChainSpecBuilder};
use reth_execution_types::Chain;
Expand Down Expand Up @@ -54,7 +54,7 @@ impl<Node: FullNodeComponents> Rollup<Node> {

async fn start(mut self) -> eyre::Result<()> {
// Process all new chain state notifications
while let Some(notification) = self.ctx.notifications.next().await {
while let Some(notification) = self.ctx.notifications.try_next().await? {
if let Some(reverted_chain) = notification.reverted_chain() {
self.revert(&reverted_chain)?;
}
Expand Down

0 comments on commit c58e2a7

Please sign in to comment.