Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
- Pin tool versions in CI ([#1523](https://github.com/0xMiden/miden-node/pull/1523)).
- Add `GetVaultAssetWitnesses` and `GetStorageMapWitness` RPC endpoints to store ([#1529](https://github.com/0xMiden/miden-node/pull/1529)).
- Add check to ensure tree store state is in sync with database storage ([#1532](https://github.com/0xMiden/miden-node/issues/1534)).
- Ensure store terminates on nullifier tree or account tree root vs header mismatch (#[#1569](https://github.com/0xMiden/miden-node/pull/1569)).

### Changes

Expand Down
3 changes: 2 additions & 1 deletion bin/stress-test/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,8 @@ async fn sync_transactions_paginated(

pub async fn load_state(data_directory: &Path) {
let start = Instant::now();
let _state = State::load(data_directory).await.unwrap();
let (termination_ask, _) = tokio::sync::mpsc::channel(1);
let _state = State::load(data_directory, termination_ask).await.unwrap();
let elapsed = start.elapsed();

// Get database path and run SQL commands to count records
Expand Down
18 changes: 15 additions & 3 deletions crates/store/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use tracing::{info, instrument};

use crate::blocks::BlockStore;
use crate::db::Db;
use crate::errors::ApplyBlockError;
use crate::state::State;
use crate::{COMPONENT, GenesisState};

Expand Down Expand Up @@ -91,8 +92,13 @@ impl Store {
block_producer_endpoint=?block_producer_address, ?self.data_directory, ?self.grpc_timeout,
"Loading database");

let state =
Arc::new(State::load(&self.data_directory).await.context("failed to load state")?);
let (termination_ask, mut termination_signal) =
tokio::sync::mpsc::channel::<ApplyBlockError>(1);
let state = Arc::new(
State::load(&self.data_directory, termination_ask)
.await
.context("failed to load state")?,
);

let rpc_service =
store::rpc_server::RpcServer::new(api::StoreApi { state: Arc::clone(&state) });
Expand Down Expand Up @@ -173,7 +179,13 @@ impl Store {
);

// SAFETY: The joinset is definitely not empty.
join_set.join_next().await.unwrap()?.map_err(Into::into)
let service = async move { join_set.join_next().await.unwrap()?.map_err(Into::into) };
tokio::select! {
result = service => result,
Some(err) = termination_signal.recv() => {
Err(anyhow::anyhow!("received termination signal").context(err))
}
}
}
}

Expand Down
25 changes: 23 additions & 2 deletions crates/store/src/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ pub struct State {
/// To allow readers to access the tree data while an update in being performed, and prevent
/// TOCTOU issues, there must be no concurrent writers. This locks to serialize the writers.
writer: Mutex<()>,

/// Request termination of the process due to a fatal internal state error.
termination_ask: tokio::sync::mpsc::Sender<ApplyBlockError>,
}

impl State {
Expand All @@ -138,7 +141,10 @@ impl State {

/// Loads the state from the data directory.
#[instrument(target = COMPONENT, skip_all)]
pub async fn load(data_path: &Path) -> Result<Self, StateInitializationError> {
pub async fn load(
data_path: &Path,
termination_ask: tokio::sync::mpsc::Sender<ApplyBlockError>,
) -> Result<Self, StateInitializationError> {
let data_directory = DataDirectory::load(data_path.to_path_buf())
.map_err(StateInitializationError::DataDirectoryLoadError)?;

Expand Down Expand Up @@ -178,7 +184,14 @@ impl State {
let writer = Mutex::new(());
let db = Arc::new(db);

Ok(Self { db, block_store, inner, forest, writer })
Ok(Self {
db,
block_store,
inner,
forest,
writer,
termination_ask,
})
}

// STATE MUTATOR
Expand Down Expand Up @@ -304,6 +317,11 @@ impl State {
.map_err(InvalidBlockError::NewBlockNullifierAlreadySpent)?;

if nullifier_tree_update.as_mutation_set().root() != header.nullifier_root() {
// We do our best here to notify the serve routine, if it doesn't care (dropped the
// receiver) we can't do much.
let _ = self.termination_ask.try_send(ApplyBlockError::InvalidBlockError(
InvalidBlockError::NewBlockInvalidNullifierRoot,
));
return Err(InvalidBlockError::NewBlockInvalidNullifierRoot.into());
}

Expand All @@ -327,6 +345,9 @@ impl State {
})?;

if account_tree_update.as_mutation_set().root() != header.account_root() {
let _ = self.termination_ask.try_send(ApplyBlockError::InvalidBlockError(
InvalidBlockError::NewBlockInvalidAccountRoot,
));
return Err(InvalidBlockError::NewBlockInvalidAccountRoot.into());
}

Expand Down