Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 55 additions & 13 deletions crates/ntx-builder/src/actor/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use tracing::{Instrument, instrument};
use crate::COMPONENT;
use crate::actor::account_state::TransactionCandidate;
use crate::block_producer::BlockProducerClient;
use crate::db::Db;
use crate::store::StoreClient;

#[derive(Debug, thiserror::Error)]
Expand All @@ -77,6 +78,12 @@ pub enum NtxError {

type NtxResult<T> = Result<T, NtxError>;

/// The result of a successful transaction execution.
///
/// Contains the transaction ID, any notes that failed during filtering, and note scripts fetched
/// from the remote store that should be persisted to the local DB cache.
pub type NtxExecutionResult = (TransactionId, Vec<FailedNote>, Vec<(Word, NoteScript)>);

// NETWORK TRANSACTION CONTEXT
// ================================================================================================

Expand All @@ -100,6 +107,9 @@ pub struct NtxContext {

/// LRU cache for storing retrieved note scripts to avoid repeated store calls.
script_cache: LruCache<Word, NoteScript>,

/// Local database for persistent note script caching.
db: Db,
}

impl NtxContext {
Expand All @@ -110,13 +120,15 @@ impl NtxContext {
prover: Option<RemoteTransactionProver>,
store: StoreClient,
script_cache: LruCache<Word, NoteScript>,
db: Db,
) -> Self {
Self {
block_producer,
validator,
prover,
store,
script_cache,
db,
}
}

Expand All @@ -132,8 +144,9 @@ impl NtxContext {
///
/// # Returns
///
/// On success, returns the [`TransactionId`] of the executed transaction and a list of
/// [`FailedNote`]s representing notes that were filtered out before execution.
/// On success, returns an [`NtxExecutionResult`] containing the transaction ID, any notes
/// that failed during filtering, and note scripts fetched from the remote store that should
/// be persisted to the local DB cache.
///
/// # Errors
///
Expand All @@ -146,7 +159,7 @@ impl NtxContext {
pub fn execute_transaction(
self,
tx: TransactionCandidate,
) -> impl FutureMaybeSend<NtxResult<(TransactionId, Vec<FailedNote>)>> {
) -> impl FutureMaybeSend<NtxResult<NtxExecutionResult>> {
let TransactionCandidate {
account,
notes,
Expand All @@ -168,6 +181,7 @@ impl NtxContext {
chain_mmr,
self.store.clone(),
self.script_cache.clone(),
self.db.clone(),
);

// Filter notes.
Expand All @@ -178,6 +192,9 @@ impl NtxContext {
// Execute transaction.
let executed_tx = Box::pin(self.execute(&data_store, successful_notes)).await?;

// Collect scripts fetched from the remote store during execution.
let scripts_to_cache = data_store.take_fetched_scripts().await;

// Prove transaction.
let tx_inputs: TransactionInputs = executed_tx.into();
let proven_tx = Box::pin(self.prove(&tx_inputs)).await?;
Expand All @@ -188,7 +205,7 @@ impl NtxContext {
// Submit transaction to block producer.
self.submit(&proven_tx).await?;

Ok((proven_tx.id(), failed_notes))
Ok((proven_tx.id(), failed_notes, scripts_to_cache))
})
.in_current_span()
.await
Expand Down Expand Up @@ -334,6 +351,11 @@ struct NtxDataStore {
store: StoreClient,
/// LRU cache for storing retrieved note scripts to avoid repeated store calls.
script_cache: LruCache<Word, NoteScript>,
/// Local database for persistent note script.
db: Db,
/// Scripts fetched from the remote store during execution, to be persisted by the
/// coordinator.
fetched_scripts: Arc<Mutex<Vec<(Word, NoteScript)>>>,
/// Mapping of storage map roots to storage slot names observed during various calls.
///
/// The registered slot names are subsequently used to retrieve storage map witnesses from the
Expand Down Expand Up @@ -366,6 +388,7 @@ impl NtxDataStore {
chain_mmr: Arc<PartialBlockchain>,
store: StoreClient,
script_cache: LruCache<Word, NoteScript>,
db: Db,
) -> Self {
let mast_store = TransactionMastStore::new();
mast_store.load_account_code(account.code());
Expand All @@ -377,10 +400,17 @@ impl NtxDataStore {
mast_store,
store,
script_cache,
db,
fetched_scripts: Arc::new(Mutex::new(Vec::new())),
storage_slots: Arc::new(Mutex::new(BTreeMap::default())),
}
}

/// Returns the list of note scripts fetched from the remote store during execution.
async fn take_fetched_scripts(&self) -> Vec<(Word, NoteScript)> {
self.fetched_scripts.lock().await.drain(..).collect()
}

/// Registers storage map slot names for the given account ID and storage header.
///
/// These slot names are subsequently used to query for storage map witnesses against the store.
Expand Down Expand Up @@ -507,28 +537,40 @@ impl DataStore for NtxDataStore {

/// Retrieves a note script by its root hash.
///
/// This implementation uses the configured RPC client to call the `GetNoteScriptByRoot`
/// endpoint on the RPC server.
/// Uses a 3-tier lookup strategy:
/// 1. In-memory LRU cache.
/// 2. Local SQLite database.
/// 3. Remote store via gRPC.
fn get_note_script(
&self,
script_root: Word,
) -> impl FutureMaybeSend<Result<Option<NoteScript>, DataStoreError>> {
async move {
// Attempt to retrieve the script from the cache.
// 1. In-memory LRU cache.
if let Some(cached_script) = self.script_cache.get(&script_root).await {
return Ok(Some(cached_script));
}

// Retrieve the script from the store.
// 2. Local DB.
if let Some(script) = self.db.lookup_note_script(script_root).await.map_err(|err| {
DataStoreError::other_with_source("failed to look up note script in local DB", err)
})? {
self.script_cache.put(script_root, script.clone()).await;
return Ok(Some(script));
}

// 3. Remote store.
let maybe_script =
self.store.get_note_script_by_root(script_root).await.map_err(|err| {
DataStoreError::Other {
error_msg: "failed to retrieve note script from store".to_string().into(),
source: Some(err.into()),
}
DataStoreError::other_with_source(
"failed to retrieve note script from store",
err,
)
})?;
// Handle response.

if let Some(script) = maybe_script {
// Collect for later persistence by the coordinator.
self.fetched_scripts.lock().await.push((script_root, script.clone()));
self.script_cache.put(script_root, script.clone()).await;
Ok(Some(script))
} else {
Expand Down
60 changes: 44 additions & 16 deletions crates/ntx-builder/src/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use miden_node_utils::lru_cache::LruCache;
use miden_protocol::Word;
use miden_protocol::account::{Account, AccountDelta};
use miden_protocol::block::BlockNumber;
use miden_protocol::note::{Note, NoteScript};
use miden_protocol::note::{Note, NoteScript, Nullifier};
use miden_protocol::transaction::TransactionId;
use miden_remote_prover_client::RemoteTransactionProver;
use tokio::sync::{AcquireError, RwLock, Semaphore, mpsc};
Expand All @@ -29,6 +29,21 @@ use crate::builder::ChainState;
use crate::db::Db;
use crate::store::StoreClient;

// ACTOR NOTIFICATION
// ================================================================================================

/// A notification sent from an account actor to the coordinator.
pub enum ActorNotification {
/// One or more notes failed during transaction execution and should have their attempt
/// counters incremented.
NotesFailed {
nullifiers: Vec<Nullifier>,
block_num: BlockNumber,
},
/// A note script was fetched from the remote store and should be persisted to the local DB.
CacheNoteScript { script_root: Word, script: NoteScript },
}

// ACTOR SHUTDOWN REASON
// ================================================================================================

Expand Down Expand Up @@ -72,6 +87,8 @@ pub struct AccountActorContext {
pub max_note_attempts: usize,
/// Database for persistent state.
pub db: Db,
/// Channel for sending notifications to the coordinator (via the builder event loop).
pub notification_tx: mpsc::Sender<ActorNotification>,
}

// ACCOUNT ORIGIN
Expand Down Expand Up @@ -173,6 +190,8 @@ pub struct AccountActor {
max_notes_per_tx: NonZeroUsize,
/// Maximum number of note execution attempts before dropping a note.
max_note_attempts: usize,
/// Channel for sending notifications to the coordinator.
notification_tx: mpsc::Sender<ActorNotification>,
}

impl AccountActor {
Expand Down Expand Up @@ -207,6 +226,7 @@ impl AccountActor {
script_cache: actor_context.script_cache.clone(),
max_notes_per_tx: actor_context.max_notes_per_tx,
max_note_attempts: actor_context.max_note_attempts,
notification_tx: actor_context.notification_tx.clone(),
}
}

Expand Down Expand Up @@ -272,11 +292,6 @@ impl AccountActor {
// Read the chain state.
let chain_state = self.chain_state.read().await.clone();

// Drop notes that have failed too many times.
if let Err(err) = self.db.drop_failing_notes(account_id, self.max_note_attempts).await {
tracing::error!(err = %err, "failed to drop failing notes");
}

// Query DB for latest account and available notes.
let tx_candidate = self.select_candidate_from_db(
account_id,
Expand Down Expand Up @@ -348,17 +363,20 @@ impl AccountActor {
self.prover.clone(),
self.store.clone(),
self.script_cache.clone(),
self.db.clone(),
);

let notes = tx_candidate.notes.clone();
let execution_result = context.execute_transaction(tx_candidate).await;
match execution_result {
// Execution completed without failed notes.
Ok((tx_id, failed)) if failed.is_empty() => {
Ok((tx_id, failed, scripts_to_cache)) if failed.is_empty() => {
self.cache_note_scripts(scripts_to_cache).await;
self.mode = ActorMode::TransactionInflight(tx_id);
},
// Execution completed with some failed notes.
Ok((tx_id, failed)) => {
Ok((tx_id, failed, scripts_to_cache)) => {
self.cache_note_scripts(scripts_to_cache).await;
let nullifiers: Vec<_> =
failed.into_iter().map(|note| note.note.nullifier()).collect();
self.mark_notes_failed(&nullifiers, block_num).await;
Expand All @@ -377,16 +395,26 @@ impl AccountActor {
}
}

/// Marks notes as failed in the DB.
async fn mark_notes_failed(
&self,
nullifiers: &[miden_protocol::note::Nullifier],
block_num: BlockNumber,
) {
if let Err(err) = self.db.notes_failed(nullifiers.to_vec(), block_num).await {
tracing::error!(err = %err, "failed to mark notes as failed");
/// Sends notifications to the coordinator to cache note scripts fetched from the remote store.
async fn cache_note_scripts(&self, scripts: Vec<(Word, NoteScript)>) {
for (script_root, script) in scripts {
let _ = self
.notification_tx
.send(ActorNotification::CacheNoteScript { script_root, script })
.await;
}
}

/// Sends a notification to the coordinator to mark notes as failed.
async fn mark_notes_failed(&self, nullifiers: &[Nullifier], block_num: BlockNumber) {
let _ = self
.notification_tx
.send(ActorNotification::NotesFailed {
nullifiers: nullifiers.to_vec(),
block_num,
})
.await;
}
}

// HELPERS
Expand Down
27 changes: 26 additions & 1 deletion crates/ntx-builder/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tokio_stream::StreamExt;
use tonic::Status;

use crate::NtxBuilderConfig;
use crate::actor::{AccountActorContext, AccountOrigin};
use crate::actor::{AccountActorContext, AccountOrigin, ActorNotification};
use crate::coordinator::Coordinator;
use crate::db::Db;
use crate::store::StoreClient;
Expand Down Expand Up @@ -98,9 +98,12 @@ pub struct NetworkTransactionBuilder {
actor_context: AccountActorContext,
/// Stream of mempool events from the block producer.
mempool_events: MempoolEventStream,
/// Receiver for notifications from account actors (e.g., note failures).
notification_rx: mpsc::Receiver<ActorNotification>,
}

impl NetworkTransactionBuilder {
#[expect(clippy::too_many_arguments)]
pub(crate) fn new(
config: NtxBuilderConfig,
coordinator: Coordinator,
Expand All @@ -109,6 +112,7 @@ impl NetworkTransactionBuilder {
chain_state: Arc<RwLock<ChainState>>,
actor_context: AccountActorContext,
mempool_events: MempoolEventStream,
notification_rx: mpsc::Receiver<ActorNotification>,
) -> Self {
Self {
config,
Expand All @@ -118,6 +122,7 @@ impl NetworkTransactionBuilder {
chain_state,
actor_context,
mempool_events,
notification_rx,
}
}

Expand Down Expand Up @@ -167,6 +172,10 @@ impl NetworkTransactionBuilder {
Some(account_id) = account_rx.recv() => {
self.handle_loaded_account(account_id).await?;
},
// Handle actor notifications (DB writes delegated from actors).
Some(notification) = self.notification_rx.recv() => {
self.handle_actor_notification(notification).await;
},
// Handle account loader task completion/failure.
// If the task fails, we abort since the builder would be in a degraded state
// where existing notes against network accounts won't be processed.
Expand Down Expand Up @@ -285,6 +294,22 @@ impl NetworkTransactionBuilder {
}
}

/// Processes a notification from an account actor by performing the corresponding DB write.
async fn handle_actor_notification(&mut self, notification: ActorNotification) {
match notification {
ActorNotification::NotesFailed { nullifiers, block_num } => {
if let Err(err) = self.db.notes_failed(nullifiers, block_num).await {
tracing::error!(err = %err, "failed to mark notes as failed");
}
},
ActorNotification::CacheNoteScript { script_root, script } => {
if let Err(err) = self.db.insert_note_script(script_root, &script).await {
tracing::error!(err = %err, "failed to cache note script");
}
},
}
}

/// Updates the chain tip and prunes old blocks from the MMR.
async fn update_chain_tip(&mut self, tip: BlockHeader) {
let mut chain_state = self.chain_state.write().await;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,12 @@ CREATE TABLE notes (
CREATE INDEX idx_notes_account ON notes(account_id);
CREATE INDEX idx_notes_created_by ON notes(created_by) WHERE created_by IS NOT NULL;
CREATE INDEX idx_notes_consumed_by ON notes(consumed_by) WHERE consumed_by IS NOT NULL;

-- Persistent cache of note scripts, keyed by script root hash.
-- Survives restarts so scripts don't need to be re-fetched from the store.
CREATE TABLE note_scripts (
-- Script root hash (Word serialized to 32 bytes).
script_root BLOB PRIMARY KEY,
-- Serialized NoteScript bytes.
script_data BLOB NOT NULL
) WITHOUT ROWID;
Loading
Loading