Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/block-producer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ miden-node-utils = { features = ["testing"], workspace = true }
miden-protocol = { default-features = true, workspace = true }
miden-remote-prover-client = { features = ["batch-prover", "block-prover"], workspace = true }
miden-standards = { workspace = true }
miden-tx = { default-features = true, workspace = true }
miden-tx-batch-prover = { workspace = true }
rand = { version = "0.9" }
thiserror = { workspace = true }
Expand All @@ -45,6 +44,7 @@ assert_matches = { workspace = true }
miden-node-store = { workspace = true }
miden-node-test-macro = { workspace = true }
miden-node-utils = { features = ["testing"], workspace = true }
miden-node-validator = { workspace = true }
miden-protocol = { default-features = true, features = ["testing"], workspace = true }
miden-standards = { features = ["testing"], workspace = true }
miden-tx = { features = ["testing"], workspace = true }
Expand Down
3 changes: 3 additions & 0 deletions crates/block-producer/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ use crate::store::StoreClient;
use crate::validator::BlockProducerValidatorClient;
use crate::{CACHED_MEMPOOL_STATS_UPDATE_INTERVAL, COMPONENT, SERVER_NUM_BATCH_BUILDERS};

#[cfg(test)]
mod tests;

/// The block producer server.
///
/// Specifies how to connect to the store, batch prover, and block prover components.
Expand Down
215 changes: 95 additions & 120 deletions crates/block-producer/src/server/tests.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,25 @@
use std::num::NonZeroUsize;
use std::time::Duration;

use miden_air::{ExecutionProof, HashFunction};
use miden_node_proto::generated::{
self as proto, block_producer::api_client as block_producer_client,
};
use miden_node_proto::generated::block_producer::api_client as block_producer_client;
use miden_node_store::{GenesisState, Store};
use miden_protocol::{
Digest,
account::{AccountId, AccountIdVersion, AccountStorageMode, AccountType},
transaction::ProvenTransactionBuilder,
};
use miden_tx::utils::Serializable;
use tokio::{net::TcpListener, runtime, task, time::sleep};
use miden_node_utils::fee::test_fee_params;
use miden_node_validator::Validator;
use miden_protocol::crypto::dsa::ecdsa_k256_keccak::SecretKey;
use miden_protocol::testing::random_signer::RandomBlockSigner as _;
use tokio::net::TcpListener;
use tokio::time::sleep;
use tokio::{runtime, task};
use tonic::transport::{Channel, Endpoint};
use winterfell::Proof;
use url::Url;

use crate::{BlockProducer, SERVER_MAX_BATCHES_PER_BLOCK, SERVER_MAX_TXS_PER_BATCH};
use crate::{BlockProducer, DEFAULT_MAX_BATCHES_PER_BLOCK, DEFAULT_MAX_TXS_PER_BATCH};

/// Tests that the block producer starts up correctly even when the store is not initially
/// available. The block producer should retry with exponential backoff until the store becomes
/// available, then start serving requests.
#[tokio::test]
async fn block_producer_startup_is_robust_to_network_failures() {
// This test starts the block producer and tests that it starts serving only after the store
// is started.

// get the addresses for the store and block producer
let store_addr = {
let store_listener =
Expand All @@ -36,113 +34,103 @@ async fn block_producer_startup_is_robust_to_network_failures() {
.expect("Failed to get block-producer address")
};

let ntx_builder_addr = {
let ntx_builder_address = TcpListener::bind("127.0.0.1:0")
.await
.expect("failed to bind the ntx builder address");
ntx_builder_address.local_addr().expect("failed to get ntx builder address")
let validator_addr = {
let validator_listener =
TcpListener::bind("127.0.0.1:0").await.expect("failed to bind validator");
validator_listener.local_addr().expect("failed to get validator address")
};

// start the block producer
let grpc_timeout = Duration::from_secs(30);

// start the validator
task::spawn(async move {
Validator {
address: validator_addr,
grpc_timeout,
signer: SecretKey::random(),
}
.serve()
.await
.unwrap();
});

// start the block producer BEFORE the store is available
// this tests the exponential backoff behavior
let store_url = Url::parse(&format!("http://{store_addr}")).expect("Failed to parse store URL");
let validator_url =
Url::parse(&format!("http://{validator_addr}")).expect("Failed to parse validator URL");
task::spawn(async move {
BlockProducer {
block_producer_address: block_producer_addr,
store_address: store_addr,
ntx_builder_address: Some(ntx_builder_addr),
store_url,
validator_url,
batch_prover_url: None,
block_prover_url: None,
batch_interval: Duration::from_millis(500),
block_interval: Duration::from_millis(500),
max_txs_per_batch: SERVER_MAX_TXS_PER_BATCH,
max_batches_per_block: SERVER_MAX_BATCHES_PER_BLOCK,
max_txs_per_batch: DEFAULT_MAX_TXS_PER_BATCH,
max_batches_per_block: DEFAULT_MAX_BATCHES_PER_BLOCK,
grpc_timeout,
mempool_tx_capacity: NonZeroUsize::new(100).unwrap(),
}
.serve()
.await
.unwrap();
});

// test: connecting to the block producer should fail until the store is started
// test: connecting to the block producer should fail because the store is not yet started
// (and therefore the block producer is not yet listening)
let block_producer_endpoint =
Endpoint::try_from(format!("http://{block_producer_addr}")).expect("valid url");
let block_producer_client =
block_producer_client::ApiClient::connect(block_producer_endpoint.clone()).await;
assert!(block_producer_client.is_err());
assert!(
block_producer_client.is_err(),
"Block producer should not be available before store is started"
);

// start the store
let data_directory = tempfile::tempdir().expect("tempdir should be created");
let store_runtime = {
let genesis_state = GenesisState::new(vec![], 1, 1);
Store::bootstrap(genesis_state.clone(), data_directory.path())
.expect("store should bootstrap");
let dir = data_directory.path().to_path_buf();
let rpc_listener =
TcpListener::bind("127.0.0.1:0").await.expect("store should bind the RPC port");
let ntx_builder_listener = TcpListener::bind("127.0.0.1:0")
.await
.expect("Failed to bind store ntx-builder gRPC endpoint");
let block_producer_listener = TcpListener::bind(store_addr)
.await
.expect("store should bind the block-producer port");
// in order to later kill the store, we need to spawn a new runtime and run the store on
// it. That allows us to kill all the tasks spawned by the store when we
// kill the runtime.
let store_runtime =
runtime::Builder::new_multi_thread().enable_time().enable_io().build().unwrap();
store_runtime.spawn(async move {
Store {
rpc_listener,
ntx_builder_listener,
block_producer_listener,
data_directory: dir,
grpc_timeout: std::time::Duration::from_secs(30),
let store_runtime = start_store(store_addr, data_directory.path()).await;

// wait for the block producer's exponential backoff to connect to the store
// use a retry loop since CI environments may be slower
let block_producer_client = {
let mut attempts = 0;
loop {
attempts += 1;
match block_producer_client::ApiClient::connect(block_producer_endpoint.clone()).await {
Ok(client) => break client,
Err(_) if attempts < 30 => {
sleep(Duration::from_millis(200)).await;
},
Err(e) => panic!(
"block producer client should connect after store is started (after {attempts} attempts): {e}"
),
}
.serve()
.await
.expect("store should start serving");
});
store_runtime
}
};

// we need to wait for the exponential backoff of the block producer to connect to the store
sleep(Duration::from_secs(1)).await;
// test: status request against block-producer should succeed
let response = send_status_request(block_producer_client).await;
assert!(response.is_ok(), "Status request should succeed, got: {:?}", response.err());

let block_producer_client = block_producer_client::ApiClient::connect(block_producer_endpoint)
.await
.expect("block producer client should connect");
// verify the response contains expected data
let status = response.unwrap().into_inner();
assert_eq!(status.status, "connected");

// test: request against block-producer api should succeed
let response = send_request(block_producer_client.clone(), 0).await;
assert!(response.is_ok());

// kill the store
shutdown_store(store_runtime).await;

// test: request against block-producer api should fail immediately
let response = send_request(block_producer_client.clone(), 1).await;
assert!(response.is_err());

// test: restart the store and request should succeed
let store_runtime = restart_store(store_addr, data_directory.path()).await;
let response = send_request(block_producer_client.clone(), 2).await;
assert!(response.is_ok());

// Shutdown the store before data_directory is dropped to allow RocksDB to flush properly
// Shutdown the store before data_directory is dropped to allow the database to flush properly
shutdown_store(store_runtime).await;
}

/// Shuts down the store runtime properly to allow RocksDB to flush before the temp directory is
/// deleted.
async fn shutdown_store(store_runtime: runtime::Runtime) {
task::spawn_blocking(move || store_runtime.shutdown_timeout(Duration::from_millis(500)))
.await
.expect("shutdown should complete");
}

/// Restarts a store using an existing data directory. Returns the runtime handle for shutdown.
async fn restart_store(
/// Starts the store with a fresh genesis state and returns the runtime handle.
async fn start_store(
store_addr: std::net::SocketAddr,
data_directory: &std::path::Path,
) -> runtime::Runtime {
let genesis_state = GenesisState::new(vec![], test_fee_params(), 1, 1, SecretKey::random());
Store::bootstrap(genesis_state.clone(), data_directory).expect("store should bootstrap");

let dir = data_directory.to_path_buf();
let rpc_listener =
TcpListener::bind("127.0.0.1:0").await.expect("store should bind the RPC port");
let ntx_builder_listener = TcpListener::bind("127.0.0.1:0")
Expand All @@ -151,16 +139,18 @@ async fn restart_store(
let block_producer_listener = TcpListener::bind(store_addr)
.await
.expect("store should bind the block-producer port");
let dir = data_directory.to_path_buf();

// Use a separate runtime so we can kill all store tasks later
let store_runtime =
runtime::Builder::new_multi_thread().enable_time().enable_io().build().unwrap();
store_runtime.spawn(async move {
Store {
rpc_listener,
ntx_builder_listener,
block_producer_listener,
block_prover_url: None,
data_directory: dir,
grpc_timeout: std::time::Duration::from_secs(30),
grpc_timeout: Duration::from_secs(30),
}
.serve()
.await
Expand All @@ -169,32 +159,17 @@ async fn restart_store(
store_runtime
}

/// Creates a dummy transaction and submits it to the block producer.
async fn send_request(
/// Shuts down the store runtime properly to allow the database to flush before the temp directory
/// is deleted.
async fn shutdown_store(store_runtime: runtime::Runtime) {
task::spawn_blocking(move || store_runtime.shutdown_timeout(Duration::from_millis(500)))
.await
.expect("shutdown should complete");
}

/// Sends a status request to the block producer to verify connectivity.
async fn send_status_request(
mut client: block_producer_client::ApiClient<Channel>,
i: u8,
) -> Result<tonic::Response<proto::blockchain::BlockNumber>, tonic::Status>
{
let tx = ProvenTransactionBuilder::new(
AccountId::dummy(
[0; 15],
AccountIdVersion::Version0,
AccountType::RegularAccountImmutableCode,
AccountStorageMode::Private,
),
Digest::default(),
[i; 32].try_into().unwrap(),
Digest::default(),
0.into(),
Digest::default(),
u32::MAX.into(),
ExecutionProof::new(Proof::new_dummy(), HashFunction::default()),
)
.build()
.unwrap();
let request = proto::transaction::ProvenTransaction {
transaction: tx.to_bytes(),
transaction_replay: None,
};
client.submit_proven_transaction(request).await
) -> Result<tonic::Response<miden_node_proto::generated::rpc::BlockProducerStatus>, tonic::Status> {
client.status(()).await
}
3 changes: 3 additions & 0 deletions crates/rpc/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,9 @@ async fn rpc_server_rejects_proven_transactions_with_invalid_commitment() {
let (_, rpc_addr, store_addr) = start_rpc().await;
let (store_runtime, _data_directory, genesis) = start_store(store_addr).await;

// Wait for the store to be ready before sending requests.
tokio::time::sleep(Duration::from_millis(100)).await;

// Override the client so that the ACCEPT header is not set.
let mut rpc_client =
miden_node_proto::clients::Builder::new(Url::parse(&format!("http://{rpc_addr}")).unwrap())
Expand Down
Loading