Skip to content

Commit

Permalink
feat: Added a JSON RPC to simulating L1 for consensus attestation (#2480
Browse files Browse the repository at this point in the history
)

It just checks what is the last L1 batch certificate stored locally and
returns its number (actually, the next number to avoid problems with 0).
ENs will query the main node for this information to determine what is
the number of the next batch that they should vote for.
  • Loading branch information
pompon0 authored Jul 26, 2024
1 parent fb55d1c commit c6b3adf
Show file tree
Hide file tree
Showing 13 changed files with 209 additions and 62 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

74 changes: 52 additions & 22 deletions core/lib/dal/src/consensus_dal.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::Context as _;
use bigdecimal::Zero;
use bigdecimal::Zero as _;
use zksync_consensus_roles::{attester, validator};
use zksync_consensus_storage::{BlockStoreState, ReplicaState};
use zksync_db_connection::{
Expand Down Expand Up @@ -378,9 +378,7 @@ impl ConsensusDal<'_, '_> {
) -> Result<(), InsertCertificateError> {
use InsertCertificateError as E;
let header = &cert.message.proposal;
let mut txn = self.storage.start_transaction().await?;
let want_payload = txn
.consensus_dal()
let want_payload = self
.block_payload(cert.message.proposal.number)
.await?
.ok_or(E::MissingPayload)?;
Expand All @@ -394,28 +392,24 @@ impl ConsensusDal<'_, '_> {
VALUES
($1, $2)
"#,
header.number.0 as i64,
i64::try_from(header.number.0).context("overflow")?,
zksync_protobuf::serde::serialize(cert, serde_json::value::Serializer).unwrap(),
)
.instrument("insert_block_certificate")
.report_latency()
.execute(&mut txn)
.execute(self.storage)
.await?;
txn.commit().await.context("commit")?;
Ok(())
}

/// Inserts a certificate for the L1 batch.
///
/// Insertion is allowed even if it creates gaps in the L1 batch history.
///
/// This method assumes that all payload validation has been carried out by the caller.
/// Noop if a certificate for the same L1 batch is already present.
/// No verification is performed - it cannot be performed due to circular dependency on
/// `zksync_l1_contract_interface`.
pub async fn insert_batch_certificate(
&mut self,
cert: &attester::BatchQC,
) -> Result<(), InsertCertificateError> {
let l1_batch_number = cert.message.number.0 as i64;

) -> anyhow::Result<()> {
let res = sqlx::query!(
r#"
INSERT INTO
Expand All @@ -424,26 +418,25 @@ impl ConsensusDal<'_, '_> {
($1, $2, NOW(), NOW())
ON CONFLICT (l1_batch_number) DO NOTHING
"#,
l1_batch_number,
i64::try_from(cert.message.number.0).context("overflow")?,
// Unwrap is ok, because serialization should always succeed.
zksync_protobuf::serde::serialize(cert, serde_json::value::Serializer).unwrap(),
)
.instrument("insert_batch_certificate")
.report_latency()
.execute(self.storage)
.await?;

if res.rows_affected().is_zero() {
tracing::debug!(l1_batch_number, "duplicate batch certificate");
tracing::debug!(l1_batch_number = ?cert.message.number, "duplicate batch certificate");
}

Ok(())
}

/// Gets a number of the last L1 batch that was inserted. It might have gaps before it,
/// depending on the order in which votes have been collected over gossip by consensus.
pub async fn get_last_batch_certificate_number(
&mut self,
) -> DalResult<Option<attester::BatchNumber>> {
) -> anyhow::Result<Option<attester::BatchNumber>> {
let row = sqlx::query!(
r#"
SELECT
Expand All @@ -457,9 +450,46 @@ impl ConsensusDal<'_, '_> {
.fetch_one(self.storage)
.await?;

Ok(row
.number
.map(|number| attester::BatchNumber(number as u64)))
let Some(n) = row.number else {
return Ok(None);
};
Ok(Some(attester::BatchNumber(
n.try_into().context("overflow")?,
)))
}

/// Next batch that the attesters should vote for.
/// This is a main node only query.
/// ENs should call the attestation_status RPC of the main node.
pub async fn next_batch_to_attest(&mut self) -> anyhow::Result<attester::BatchNumber> {
// First batch that we don't have a certificate for.
if let Some(last) = self
.get_last_batch_certificate_number()
.await
.context("get_last_batch_certificate_number()")?
{
return Ok(last + 1);
}
// Otherwise start with the last sealed L1 batch.
// We don't want to backfill certificates for old batches.
// Note that there is a race condition in case the next
// batch is sealed before the certificate for the current
// last sealed batch is stored. This is only relevant
// for the first certificate though and anyway this is
// a test setup, so we are OK with that race condition.
if let Some(sealed) = self
.storage
.blocks_dal()
.get_sealed_l1_batch_number()
.await
.context("get_sealed_l1_batch_number()")?
{
return Ok(attester::BatchNumber(sealed.0.into()));
}
// Otherwise start with 0.
// Note that main node doesn't start from snapshot
// and doesn't have prunning enabled.
Ok(attester::BatchNumber(0))
}
}

Expand Down
7 changes: 7 additions & 0 deletions core/lib/types/src/api/en.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,10 @@ pub struct SyncBlock {

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConsensusGenesis(pub serde_json::Value);

/// AttestationStatus maintained by the main node.
/// Used for testing L1 batch signing by consensus attesters.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AttestationStatus {
pub next_batch_to_attest: L1BatchNumber,
}
7 changes: 7 additions & 0 deletions core/lib/web3_decl/src/namespaces/en.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ pub trait EnNamespace {
#[method(name = "genesisConfig")]
async fn genesis_config(&self) -> RpcResult<GenesisConfig>;

/// MAIN NODE ONLY:
/// Gets the AttestationStatus of L1 batches.
/// This is a temporary RPC used for testing L1 batch signing
/// by consensus attesters.
#[method(name = "attestationStatus")]
async fn attestation_status(&self) -> RpcResult<en::AttestationStatus>;

/// Get tokens that are white-listed and it can be used by paymasters.
#[method(name = "whitelistedTokensForAA")]
async fn whitelisted_tokens_for_aa(&self) -> RpcResult<Vec<Address>>;
Expand Down
1 change: 1 addition & 0 deletions core/node/api_server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ categories.workspace = true

[dependencies]
zksync_config.workspace = true
zksync_consensus_roles.workspace = true
zksync_contracts.workspace = true
zksync_types.workspace = true
zksync_dal.workspace = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ impl EnNamespaceServer for EnNamespace {
.map_err(|err| self.current_method().map_err(err))
}

async fn attestation_status(&self) -> RpcResult<en::AttestationStatus> {
self.attestation_status_impl()
.await
.map_err(|err| self.current_method().map_err(err))
}

async fn sync_tokens(&self, block_number: Option<L2BlockNumber>) -> RpcResult<Vec<TokenInfo>> {
self.sync_tokens_impl(block_number)
.await
Expand Down
26 changes: 24 additions & 2 deletions core/node/api_server/src/web3/namespaces/en.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use anyhow::Context as _;
use zksync_config::{configs::EcosystemContracts, GenesisConfig};
use zksync_consensus_roles::attester;
use zksync_dal::{CoreDal, DalError};
use zksync_types::{
api::en, protocol_version::ProtocolSemanticVersion, tokens::TokenInfo, Address, L1BatchNumber,
Expand All @@ -16,14 +17,20 @@ pub(crate) struct EnNamespace {
state: RpcState,
}

fn to_l1_batch_number(n: attester::BatchNumber) -> anyhow::Result<L1BatchNumber> {
Ok(L1BatchNumber(
n.0.try_into().context("L1BatchNumber overflow")?,
))
}

impl EnNamespace {
pub fn new(state: RpcState) -> Self {
Self { state }
}

pub async fn consensus_genesis_impl(&self) -> Result<Option<en::ConsensusGenesis>, Web3Error> {
let mut storage = self.state.acquire_connection().await?;
let Some(genesis) = storage
let mut conn = self.state.acquire_connection().await?;
let Some(genesis) = conn
.consensus_dal()
.genesis()
.await
Expand All @@ -36,6 +43,21 @@ impl EnNamespace {
)))
}

#[tracing::instrument(skip(self))]
pub async fn attestation_status_impl(&self) -> Result<en::AttestationStatus, Web3Error> {
Ok(en::AttestationStatus {
next_batch_to_attest: to_l1_batch_number(
self.state
.acquire_connection()
.await?
.consensus_dal()
.next_batch_to_attest()
.await
.context("next_batch_to_attest()")?,
)?,
})
}

pub(crate) fn current_method(&self) -> &MethodTracer {
&self.state.current_method
}
Expand Down
51 changes: 31 additions & 20 deletions core/node/consensus/src/storage/connection.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use anyhow::Context as _;
use zksync_concurrency::{ctx, error::Wrap as _, time};
use zksync_consensus_crypto::keccak256::Keccak256;
use zksync_consensus_roles::{attester, validator};
use zksync_consensus_storage::{self as storage, BatchStoreState};
use zksync_dal::{consensus_dal::Payload, Core, CoreDal, DalError};
use zksync_dal::{consensus_dal, consensus_dal::Payload, Core, CoreDal, DalError};
use zksync_l1_contract_interface::i_executor::structures::StoredBatchInfo;
use zksync_node_sync::{fetcher::IoCursorExt as _, ActionQueueSender, SyncState};
use zksync_state_keeper::io::common::IoCursor;
Expand Down Expand Up @@ -115,35 +116,26 @@ impl<'a> Connection<'a> {
.await??)
}

/// Wrapper for `consensus_dal().insert_batch_certificate()`.
/// Wrapper for `consensus_dal().insert_batch_certificate()`,
/// which additionally verifies that the batch hash matches the stored batch.
pub async fn insert_batch_certificate(
&mut self,
ctx: &ctx::Ctx,
cert: &attester::BatchQC,
) -> Result<(), InsertCertificateError> {
use crate::storage::consensus_dal::InsertCertificateError as E;

let l1_batch_number = L1BatchNumber(cert.message.number.0 as u32);

let Some(l1_batch) = self
.0
.blocks_dal()
.get_l1_batch_metadata(l1_batch_number)
use consensus_dal::InsertCertificateError as E;
let want_hash = self
.batch_hash(ctx, cert.message.number)
.await
.map_err(E::Dal)?
else {
return Err(E::MissingPayload.into());
};

let l1_batch_info = StoredBatchInfo::from(&l1_batch);

if l1_batch_info.hash().0 != *cert.message.hash.0.as_bytes() {
.wrap("batch_hash()")?
.ok_or(E::MissingPayload)?;
if want_hash != cert.message.hash {
return Err(E::PayloadMismatch.into());
}

Ok(ctx
.wait(self.0.consensus_dal().insert_batch_certificate(cert))
.await??)
.await?
.map_err(E::Other)?)
}

/// Wrapper for `consensus_dal().replica_state()`.
Expand All @@ -166,6 +158,25 @@ impl<'a> Connection<'a> {
.context("sqlx")?)
}

/// Wrapper for `consensus_dal().batch_hash()`.
pub async fn batch_hash(
&mut self,
ctx: &ctx::Ctx,
number: attester::BatchNumber,
) -> ctx::Result<Option<attester::BatchHash>> {
let n = L1BatchNumber(number.0.try_into().context("overflow")?);
let Some(meta) = ctx
.wait(self.0.blocks_dal().get_l1_batch_metadata(n))
.await?
.context("get_l1_batch_metadata()")?
else {
return Ok(None);
};
Ok(Some(attester::BatchHash(Keccak256::from_bytes(
StoredBatchInfo::from(&meta).hash().0,
))))
}

/// Wrapper for `blocks_dal().get_l1_batch_metadata()`.
pub async fn batch(
&mut self,
Expand Down
9 changes: 9 additions & 0 deletions core/node/consensus/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@ pub enum InsertCertificateError {
Inner(#[from] consensus_dal::InsertCertificateError),
}

impl From<ctx::Error> for InsertCertificateError {
fn from(err: ctx::Error) -> Self {
match err {
ctx::Error::Canceled(err) => Self::Canceled(err),
ctx::Error::Internal(err) => Self::Inner(err.into()),
}
}
}

#[derive(Debug)]
pub(crate) struct PayloadQueue {
inner: IoCursor,
Expand Down
22 changes: 5 additions & 17 deletions core/node/consensus/src/storage/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@ use std::sync::Arc;
use anyhow::Context as _;
use zksync_concurrency::{ctx, error::Wrap as _, scope, sync, time};
use zksync_consensus_bft::PayloadManager;
use zksync_consensus_crypto::keccak256::Keccak256;
use zksync_consensus_roles::{attester, validator};
use zksync_consensus_storage::{self as storage, BatchStoreState};
use zksync_dal::consensus_dal::{self, Payload};
use zksync_l1_contract_interface::i_executor::structures::StoredBatchInfo;
use zksync_node_sync::fetcher::{FetchedBlock, FetchedTransaction};
use zksync_types::{L1BatchNumber, L2BlockNumber};
use zksync_types::L2BlockNumber;

use super::{Connection, PayloadQueue};
use crate::storage::{ConnectionPool, InsertCertificateError};
Expand Down Expand Up @@ -526,26 +524,16 @@ impl storage::PersistentBatchStore for Store {
ctx: &ctx::Ctx,
number: attester::BatchNumber,
) -> ctx::Result<Option<attester::Batch>> {
let Some(batch) = self
let Some(hash) = self
.conn(ctx)
.await?
.batch(
ctx,
L1BatchNumber(u32::try_from(number.0).context("number")?),
)
.batch_hash(ctx, number)
.await
.wrap("batch")?
.wrap("batch_hash()")?
else {
return Ok(None);
};

let info = StoredBatchInfo::from(&batch);
let hash = Keccak256::from_bytes(info.hash().0);

Ok(Some(attester::Batch {
number,
hash: attester::BatchHash(hash),
}))
Ok(Some(attester::Batch { number, hash }))
}

/// Returns the QC of the batch with the given number.
Expand Down
Loading

0 comments on commit c6b3adf

Please sign in to comment.