Skip to content

Commit

Permalink
fix(external-node): make fetcher rely on unsealed batches (#3088)
Browse files Browse the repository at this point in the history
## What ❔

Originally though I should just make ENs ignore `OpenBatch` command when
the batch is already present (see
19537f9),
but while writing a test I have realized that under normal execution EN
cannot receive two `OpenBatch` commands for the same batch (even with
re-execution/restart).

Looking further I think fetcher logic is the actual culprit - the bug
only reproduces when an unsealed batch is inserted but no associated L2
blocks have made it into the DB yet.

## Why ❔

Existing logic causes EN to panic sometimes

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [ ] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk_supervisor fmt` and `zk_supervisor
lint`.

---------

Co-authored-by: Danil <deniallugo@gmail.com>
  • Loading branch information
itegulov and Deniallugo authored Oct 14, 2024
1 parent 114834f commit bb5d147
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 22 deletions.
14 changes: 0 additions & 14 deletions core/lib/dal/src/blocks_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2184,20 +2184,6 @@ impl BlocksDal<'_, '_> {
Ok(Some((L2BlockNumber(min as u32), L2BlockNumber(max as u32))))
}

/// Returns `true` if there exists a non-sealed batch (i.e. there is one+ stored L2 block that isn't assigned
/// to any batch yet).
pub async fn pending_batch_exists(&mut self) -> DalResult<bool> {
let count = sqlx::query_scalar!(
"SELECT COUNT(miniblocks.number) FROM miniblocks WHERE l1_batch_number IS NULL"
)
.instrument("pending_batch_exists")
.fetch_one(self.storage)
.await?
.unwrap_or(0);

Ok(count != 0)
}

// methods used for measuring Eth tx stage transition latencies
// and emitting metrics base on these measured data
pub async fn oldest_uncommitted_batch_timestamp(&mut self) -> DalResult<Option<u64>> {
Expand Down
9 changes: 5 additions & 4 deletions core/node/consensus/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,10 +219,11 @@ impl StateKeeper {
.wait(IoCursor::for_fetcher(&mut conn.0))
.await?
.context("IoCursor::new()")?;
let pending_batch = ctx
.wait(conn.0.blocks_dal().pending_batch_exists())
let batch_sealed = ctx
.wait(conn.0.blocks_dal().get_unsealed_l1_batch())
.await?
.context("pending_batch_exists()")?;
.context("get_unsealed_l1_batch()")?
.is_none();
let (actions_sender, actions_queue) = ActionQueue::new();
let addr = sync::watch::channel(None).0;
let sync_state = SyncState::default();
Expand Down Expand Up @@ -258,7 +259,7 @@ impl StateKeeper {
last_batch: cursor.l1_batch,
last_block: cursor.next_l2_block - 1,
last_timestamp: cursor.prev_l2_block_timestamp,
batch_sealed: !pending_batch,
batch_sealed,
next_priority_op: PriorityOpId(1),
actions_sender,
sync_state: sync_state.clone(),
Expand Down
2 changes: 1 addition & 1 deletion core/node/node_sync/src/external_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ impl StateKeeperIO for ExternalIO {
);

self.pool
.connection()
.connection_tagged("sync_layer")
.await?
.blocks_dal()
.insert_l1_batch(UnsealedL1BatchHeader {
Expand Down
36 changes: 34 additions & 2 deletions core/node/node_sync/src/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ impl IoCursorExt for IoCursor {
let mut this = Self::new(storage).await?;
// It's important to know whether we have opened a new batch already or just sealed the previous one.
// Depending on it, we must either insert `OpenBatch` item into the queue, or not.
let was_new_batch_open = storage.blocks_dal().pending_batch_exists().await?;
if !was_new_batch_open {
let unsealed_batch = storage.blocks_dal().get_unsealed_l1_batch().await?;
if unsealed_batch.is_none() {
this.l1_batch -= 1; // Should continue from the last L1 batch present in the storage
}
Ok(this)
Expand Down Expand Up @@ -201,3 +201,35 @@ impl IoCursorExt for IoCursor {
new_actions
}
}

#[cfg(test)]
mod tests {
use zksync_dal::{ConnectionPool, Core, CoreDal};
use zksync_node_genesis::{insert_genesis_batch, GenesisParams};
use zksync_state_keeper::io::IoCursor;
use zksync_types::{block::UnsealedL1BatchHeader, L1BatchNumber};

use crate::fetcher::IoCursorExt;

#[tokio::test]
async fn io_cursor_recognizes_empty_unsealed_batch() -> anyhow::Result<()> {
let pool = ConnectionPool::<Core>::test_pool().await;
let mut conn = pool.connection().await.unwrap();
insert_genesis_batch(&mut conn, &GenesisParams::mock())
.await
.unwrap();
conn.blocks_dal()
.insert_l1_batch(UnsealedL1BatchHeader {
number: L1BatchNumber(1),
timestamp: 1,
protocol_version: None,
fee_address: Default::default(),
fee_input: Default::default(),
})
.await?;

let io_cursor = IoCursor::for_fetcher(&mut conn).await?;
assert_eq!(io_cursor.l1_batch, L1BatchNumber(1));
Ok(())
}
}
2 changes: 1 addition & 1 deletion core/node/state_keeper/src/io/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ impl StateKeeperIO for MempoolIO {
}

self.pool
.connection()
.connection_tagged("state_keeper")
.await?
.blocks_dal()
.insert_l1_batch(UnsealedL1BatchHeader {
Expand Down

0 comments on commit bb5d147

Please sign in to comment.