Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 7 additions & 52 deletions bin/stress-test/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,9 +439,7 @@ pub async fn bench_sync_chain_mmr(

let request = |_| {
let mut client = store_client.clone();
tokio::spawn(async move {
sync_chain_mmr_paginated(&mut client, chain_tip, block_range_size).await
})
tokio::spawn(async move { sync_chain_mmr(&mut client, chain_tip, block_range_size).await })
};

let results = stream::iter(0..iterations)
Expand All @@ -456,77 +454,34 @@ pub async fn bench_sync_chain_mmr(
print_summary(&timers_accumulator);

let total_runs = results.len();
let paginated_runs = results.iter().filter(|r| r.pages > 1).count();
#[expect(clippy::cast_precision_loss)]
let pagination_rate = if total_runs > 0 {
(paginated_runs as f64 / total_runs as f64) * 100.0
} else {
0.0
};
#[expect(clippy::cast_precision_loss)]
let avg_pages = if total_runs > 0 {
results.iter().map(|r| r.pages as f64).sum::<f64>() / total_runs as f64
} else {
0.0
};

println!("Pagination statistics:");
println!(" Total runs: {total_runs}");
println!(" Runs triggering pagination: {paginated_runs}");
println!(" Pagination rate: {pagination_rate:.2}%");
println!(" Average pages per run: {avg_pages:.2}");
}

/// Sends a single `sync_chain_mmr` request to the store and returns a tuple with:
/// - the elapsed time.
/// - the response.
pub async fn sync_chain_mmr(
async fn sync_chain_mmr(
api_client: &mut RpcClient<InterceptedService<Channel, OtelInterceptor>>,
block_from: u32,
block_to: u32,
) -> (Duration, proto::rpc::SyncChainMmrResponse) {
) -> SyncChainMmrRun {
let sync_request = proto::rpc::SyncChainMmrRequest {
block_range: Some(proto::rpc::BlockRange { block_from, block_to: Some(block_to) }),
};

let start = Instant::now();
let response = api_client.sync_chain_mmr(sync_request).await.unwrap();
(start.elapsed(), response.into_inner())
let elapsed = start.elapsed();
let response = response.into_inner();
let _mmr_delta = response.mmr_delta.expect("mmr_delta should exist");
SyncChainMmrRun { duration: elapsed }
}

#[derive(Clone)]
struct SyncChainMmrRun {
duration: Duration,
pages: usize,
}

async fn sync_chain_mmr_paginated(
api_client: &mut RpcClient<InterceptedService<Channel, OtelInterceptor>>,
chain_tip: u32,
block_range_size: u32,
) -> SyncChainMmrRun {
let mut total_duration = Duration::default();
let mut pages = 0usize;
let mut next_block_from = 0u32;

loop {
let target_block_to = next_block_from.saturating_add(block_range_size).min(chain_tip);
let (elapsed, response) =
sync_chain_mmr(api_client, next_block_from, target_block_to).await;
total_duration += elapsed;
pages += 1;

let pagination_info = response.pagination_info.expect("pagination_info should exist");
let _mmr_delta = response.mmr_delta.expect("mmr_delta should exist");

if pagination_info.block_num >= pagination_info.chain_tip {
break;
}

next_block_from = pagination_info.block_num;
}

SyncChainMmrRun { duration: total_duration, pages }
}

// LOAD STATE
Expand Down
8 changes: 5 additions & 3 deletions crates/proto/src/generated/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,11 +442,11 @@ pub struct SyncChainMmrRequest {
/// Represents the result of syncing chain MMR.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SyncChainMmrResponse {
/// Pagination information.
/// For which block range the MMR delta is returned.
#[prost(message, optional, tag = "1")]
pub pagination_info: ::core::option::Option<PaginationInfo>,
pub block_range: ::core::option::Option<BlockRange>,
/// Data needed to update the partial MMR from `request.block_range.block_from + 1` to
/// `pagination_info.block_num`.
/// `response.block_range.block_to` or the chain tip.
#[prost(message, optional, tag = "2")]
pub mmr_delta: ::core::option::Option<super::primitives::MmrDelta>,
}
Expand Down Expand Up @@ -1052,6 +1052,7 @@ pub mod api_client {
.insert(GrpcMethod::new("rpc.Api", "SyncAccountStorageMaps"));
self.inner.unary(req, path, codec).await
}
/// Returns MMR delta needed to synchronize the chain MMR within the requested block range.
pub async fn sync_chain_mmr(
&mut self,
request: impl tonic::IntoRequest<super::SyncChainMmrRequest>,
Expand Down Expand Up @@ -1236,6 +1237,7 @@ pub mod api_server {
tonic::Response<super::SyncAccountStorageMapsResponse>,
tonic::Status,
>;
/// Returns MMR delta needed to synchronize the chain MMR within the requested block range.
async fn sync_chain_mmr(
&self,
request: tonic::Request<super::SyncChainMmrRequest>,
Expand Down
9 changes: 9 additions & 0 deletions crates/rpc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ The full gRPC method definitions can be found in the [proto](../proto/README.md)
- [SyncAccountVault](#SyncAccountVault)
- [SyncNotes](#syncnotes)
- [SyncAccountStorageMaps](#syncaccountstoragemaps)
- [SyncChainMmr](#syncchainmmr)
- [SyncTransactions](#synctransactions)

<!--toc:end-->
Expand Down Expand Up @@ -236,6 +237,14 @@ When storage map synchronization fails, detailed error information is provided t

---

### SyncChainMmr

Returns MMR delta information needed to synchronize the chain MMR within a block range.

Caller specifies the `block_range`, starting from the last block already represented in its local MMR. The response contains the MMR delta for the requested range along with pagination info so the caller can continue syncing until the chain tip.

---

### SyncTransactions

Returns transaction records for specific accounts within a block range.
Expand Down
4 changes: 0 additions & 4 deletions crates/rpc/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -559,10 +559,6 @@ async fn sync_chain_mmr_returns_delta() {
let response = rpc_client.sync_chain_mmr(request).await.expect("sync_chain_mmr should succeed");
let response = response.into_inner();

let pagination_info = response.pagination_info.expect("pagination_info should exist");
assert_eq!(pagination_info.chain_tip, 0);
assert_eq!(pagination_info.block_num, 0);

let mmr_delta = response.mmr_delta.expect("mmr_delta should exist");
assert_eq!(mmr_delta.forest, 0);
assert!(mmr_delta.data.is_empty());
Expand Down
9 changes: 9 additions & 0 deletions crates/store/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ The full gRPC API can be found [here](../../proto/proto/store.proto).
- [SyncAccountVault](#syncaccountvault)
- [SyncNotes](#syncnotes)
- [SyncAccountStorageMaps](#syncaccountstoragemaps)
- [SyncChainMmr](#syncchainmmr)
- [SyncTransactions](#synctransactions)
<!--toc:end-->

Expand Down Expand Up @@ -249,6 +250,14 @@ When storage map synchronization fails, detailed error information is provided t

---

### SyncChainMmr

Returns MMR delta information needed to synchronize the chain MMR within a block range.

Caller specifies the `block_range`, starting from the last block already represented in its local MMR. The response contains the MMR delta for the requested range and the returned `block_range` reflects the last block included, which may be the chain tip.

---

### SyncTransactions

Returns transaction records for specific accounts within a block range.
Expand Down
23 changes: 5 additions & 18 deletions crates/store/src/server/rpc_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,6 @@ impl rpc_server::Rpc for StoreApi {
&self,
request: Request<proto::rpc::SyncChainMmrRequest>,
) -> Result<Response<proto::rpc::SyncChainMmrResponse>, Status> {
// TODO find a reasonable upper boundary
const MAX_BLOCKS: u32 = 1 << 20;

let request = request.into_inner();
let chain_tip = self.state.latest_block_num().await;

Expand All @@ -183,23 +180,13 @@ impl rpc_server::Rpc for StoreApi {
}))?;
}
let block_range = block_from..=block_to;
let len = 1 + block_range.end().as_u32() - block_range.start().as_u32();
let trimmed_block_range = if len > MAX_BLOCKS {
block_from..=BlockNumber::from(block_from.as_u32() + MAX_BLOCKS)
} else {
block_range
};

let mmr_delta = self
.state
.sync_chain_mmr(trimmed_block_range.clone())
.await
.map_err(internal_error)?;
let mmr_delta =
self.state.sync_chain_mmr(block_range.clone()).await.map_err(internal_error)?;

Ok(Response::new(proto::rpc::SyncChainMmrResponse {
pagination_info: Some(proto::rpc::PaginationInfo {
chain_tip: chain_tip.as_u32(),
block_num: trimmed_block_range.end().as_u32(),
block_range: Some(proto::rpc::BlockRange {
block_from: block_range.start().as_u32(),
block_to: Some(block_range.end().as_u32()),
}),
mmr_delta: Some(mmr_delta.into()),
}))
Expand Down
10 changes: 0 additions & 10 deletions crates/utils/src/limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,16 +120,6 @@ impl QueryParamLimiter for QueryParamBlockLimit {
const LIMIT: usize = GENERAL_REQUEST_LIMIT;
}

/// Used for the following RPC endpoints:
/// * `sync_chain_mmr`
///
/// Capped at 1000 blocks to keep MMR deltas within the 4 MB payload budget.
pub struct QueryParamBlockRangeLimit;
impl QueryParamLimiter for QueryParamBlockRangeLimit {
const PARAM_NAME: &str = "block_range";
const LIMIT: usize = GENERAL_REQUEST_LIMIT;
}

/// Used for the following RPC endpoints
/// * `get_account`
///
Expand Down
7 changes: 7 additions & 0 deletions docs/external/src/rpc.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ The gRPC service definition can be found in the Miden node's `proto` [directory]
- [SyncAccountVault](#syncaccountvault)
- [SyncNotes](#syncnotes)
- [SyncAccountStorageMaps](#syncaccountstoragemaps)
- [SyncChainMmr](#syncchainmmr)
- [SyncTransactions](#synctransactions)
- [Status](#status)

Expand Down Expand Up @@ -216,6 +217,12 @@ Caller specifies the `account_id` of the public account and the block range (`bl

This endpoint enables clients to maintain an updated view of account storage.

### SyncChainMmr

Returns MMR delta information needed to synchronize the chain MMR within a block range.

Caller specifies the `block_range`, starting from the last block already represented in its local MMR. The response contains the MMR delta for the requested range, but at most to (including) the chain tip.

### SyncTransactions

Returns transaction records for specific accounts within a block range.
Expand Down
8 changes: 4 additions & 4 deletions proto/proto/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ service Api {
// Returns storage map updates for specified account and storage slots within a block range.
rpc SyncAccountStorageMaps(SyncAccountStorageMapsRequest) returns (SyncAccountStorageMapsResponse) {}

// Returns MMR delta needed to synchronize the chain MMR within the requested block range.
rpc SyncChainMmr(SyncChainMmrRequest) returns (SyncChainMmrResponse) {}
}

Expand Down Expand Up @@ -494,11 +495,10 @@ message SyncChainMmrRequest {

// Represents the result of syncing chain MMR.
message SyncChainMmrResponse {
// Pagination information.
PaginationInfo pagination_info = 1;

// For which block range the MMR delta is returned.
BlockRange block_range = 1;
// Data needed to update the partial MMR from `request.block_range.block_from + 1` to
// `pagination_info.block_num`.
// `response.block_range.block_to` or the chain tip.
primitives.MmrDelta mmr_delta = 2;
}

Expand Down
Loading