Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,14 @@ url = { features = ["serde"], version = "2.5" }
pedantic = { level = "warn", priority = -1 }

cast_possible_truncation = "allow" # Overly many instances especially regarding indices.
collapsible-if = "allow" # Too new to enforce.
from_iter_instead_of_collect = "allow" # at times `FromIter` is much more readable
ignored_unit_patterns = "allow" # Stylistic choice.
large_types_passed_by_value = "allow" # Triggered by BlockHeader being Copy + 334 bytes.
missing_errors_doc = "allow" # TODO: fixup and enable this.
missing_panics_doc = "allow" # TODO: fixup and enable this.
module_name_repetitions = "allow" # Many triggers, and is a stylistic choice.
must_use_candidate = "allow" # This marks many fn's which isn't helpful.
needless_for_each = "allow" # Context dependent if that's useful.
should_panic_without_expect = "allow" # We don't care about the specific panic message.
# End of pedantic lints.
19 changes: 9 additions & 10 deletions bin/faucet/src/faucet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,16 +443,15 @@ impl Faucet {
) -> MintResult<ExecutedTransaction> {
let executor =
TransactionExecutor::new(self.data_store.as_ref(), Some(&self.authenticator));
executor
.execute_transaction(
self.id.account_id,
BlockNumber::GENESIS,
InputNotes::default(),
tx_args,
Arc::new(DefaultSourceManager::default()),
)
.await
.map_err(MintError::Execution)
Box::pin(executor.execute_transaction(
self.id.account_id,
BlockNumber::GENESIS,
InputNotes::default(),
tx_args,
Arc::new(DefaultSourceManager::default()),
))
.await
.map_err(MintError::Execution)
}

async fn submit_transaction(
Expand Down
2 changes: 1 addition & 1 deletion bin/remote-prover/src/commands/update_workers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub enum Action {
Remove,
}

/// Update workers in the proxy performing the specified [Action]
/// Update workers in the proxy performing the specified [`Action`]
#[derive(Debug, Parser, Clone, Serialize, Deserialize)]
pub struct UpdateWorkers {
pub action: Action,
Expand Down
2 changes: 1 addition & 1 deletion bin/remote-prover/src/proxy/health_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tracing::{debug_span, error};

use super::LoadBalancerState;

/// Implement the BackgroundService trait for the LoadBalancer
/// Implement the `BackgroundService` trait for the `LoadBalancer`
///
/// A [BackgroundService] can be run as part of a Pingora application to add supporting logic that
/// exists outside of the request/response lifecycle.
Expand Down
32 changes: 16 additions & 16 deletions bin/remote-prover/src/proxy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,21 +350,21 @@ pub struct LoadBalancer(pub Arc<LoadBalancerState>);
/// Implements load-balancing of incoming requests across a pool of workers.
///
/// At the backend-level, a request lifecycle works as follows:
/// - When a new requests arrives, [LoadBalancer::request_filter()] method is called. In this method
/// we apply IP-based rate-limiting to the request and check if the request queue is full. In this
/// method we also handle the special case update workers request.
/// - Next, the [Self::upstream_peer()] method is called. We use it to figure out which worker will
/// process the request. Inside `upstream_peer()`, we add the request to the queue of requests.
/// Once the request gets to the front of the queue, we forward it to an available worker. This
/// step is also in charge of setting the SNI, timeouts, and enabling HTTP/2. Finally, we
/// establish a connection with the worker.
/// - When a new requests arrives, [`LoadBalancer::request_filter()`] method is called. In this
/// method we apply IP-based rate-limiting to the request and check if the request queue is full.
/// In this method we also handle the special case update workers request.
/// - Next, the [`Self::upstream_peer()`] method is called. We use it to figure out which worker
/// will process the request. Inside `upstream_peer()`, we add the request to the queue of
/// requests. Once the request gets to the front of the queue, we forward it to an available
/// worker. This step is also in charge of setting the SNI, timeouts, and enabling HTTP/2.
/// Finally, we establish a connection with the worker.
/// - Before sending the request to the upstream server and if the connection succeed, the
/// [Self::upstream_request_filter()] method is called. In this method, we ensure that the correct
/// headers are forwarded for gRPC requests.
/// - If the connection fails, the [Self::fail_to_connect()] method is called. In this method, we
/// retry the request [self.max_retries_per_request] times.
/// [`Self::upstream_request_filter()`] method is called. In this method, we ensure that the
/// correct headers are forwarded for gRPC requests.
/// - If the connection fails, the [`Self::fail_to_connect()`] method is called. In this method, we
/// retry the request [`self.max_retries_per_request`] times.
/// - Once the worker processes the request (either successfully or with a failure),
/// [Self::logging()] method is called. In this method, we log the request lifecycle and set the
/// [`Self::logging()`] method is called. In this method, we log the request lifecycle and set the
/// worker as available.
#[async_trait]
impl ProxyHttp for LoadBalancer {
Expand Down Expand Up @@ -446,14 +446,14 @@ impl ProxyHttp for LoadBalancer {
Ok(false)
}

/// Returns [HttpPeer] corresponding to the worker that will handle the current request.
/// Returns [`HttpPeer`] corresponding to the worker that will handle the current request.
///
/// Here we enqueue the request and wait for it to be at the front of the queue and a worker
/// becomes available, then we dequeue the request and process it. We then set the SNI,
/// timeouts, and enable HTTP/2.
///
/// Note that the request will be assigned a worker here, and the worker will be removed from
/// the list of available workers once it reaches the [Self::logging] method.
/// the list of available workers once it reaches the [`Self::logging`] method.
#[tracing::instrument(name = "proxy.upstream_peer", parent = &ctx.parent_span, skip(_session))]
async fn upstream_peer(
&self,
Expand Down Expand Up @@ -512,7 +512,7 @@ impl ProxyHttp for LoadBalancer {
///
/// Here we ensure that the correct headers are forwarded for gRPC requests.
///
/// This method is called right after [Self::upstream_peer()] returns a [HttpPeer] and a
/// This method is called right after [`Self::upstream_peer()`] returns a [`HttpPeer`] and a
/// connection is established with the worker.
#[tracing::instrument(name = "proxy.upstream_request_filter", parent = &_ctx.parent_span, skip(_session))]
async fn upstream_request_filter(
Expand Down
42 changes: 21 additions & 21 deletions crates/block-producer/src/mempool/graph/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,27 @@ mod tests;
///
/// # Node lifecycle
/// ```text
/// │
/// │
/// insert_pending│
/// ┌─────▼─────┐
/// │ pending │────┐
/// └─────┬─────┘ │
/// │ │
/// promote_pending│ │
/// ┌─────▼─────┐ │
/// ┌──────────► in queue │────│
/// │ └─────┬─────┘ │
/// revert_processed│ │ │
/// │ process_root│ │
/// │ ┌─────▼─────┐ │
/// └──────────┼ processed │────│
/// └─────┬─────┘ │
/// │ │
/// │
/// │
/// insert_pending│
/// ┌─────▼─────┐
/// │ pending │────┐
/// └─────┬─────┘ │
/// │ │
/// promote_pending│ │
/// ┌─────▼─────┐ │
/// ┌──────────► in queue │────│
/// │ └─────┬─────┘ │
/// revert_processed│ │ │
/// │ process_root│ │
/// │ ┌─────▼─────┐ │
/// └──────────┼ processed │────│
/// └─────┬─────┘ │
/// │ │
/// prune_processed│ │purge_subgraphs
/// ┌─────▼─────┐ │
/// │ <null> ◄────┘
/// └───────────┘
/// ┌─────▼─────┐ │
/// │ <null> ◄────┘
/// └───────────┘
/// ```
#[derive(Clone, PartialEq, Eq)]
pub struct DependencyGraph<K, V> {
Expand Down Expand Up @@ -405,7 +405,7 @@ impl<K: Ord + Copy + Display + Debug, V: Clone> DependencyGraph<K, V> {
self.vertices.get(key)
}

/// Returns the parents of the node, or [None] if the node does not exist.
/// Returns the parents of the node, or `None` if the node does not exist.
pub fn parents(&self, key: &K) -> Option<&BTreeSet<K>> {
self.parents.get(key)
}
Expand Down
47 changes: 22 additions & 25 deletions crates/ntx-builder/src/transaction.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::collections::BTreeSet;
use std::sync::Arc;

use futures::TryFutureExt;
use miden_node_utils::ErrorReport;
use miden_node_utils::tracing::OpenTelemetrySpanExt;
use miden_objects::account::{Account, AccountId};
Expand Down Expand Up @@ -112,11 +111,11 @@ impl NtxContext {
let data_store =
NtxDataStore::new(account, chain_tip_header, chain_mmr);

self.filter_notes(&data_store, notes)
.and_then(|notes| self.execute(&data_store, notes))
.and_then(|tx| self.prove(tx))
.and_then(|tx| self.submit(tx))
.await
let notes = Box::pin(self.filter_notes(&data_store, notes)).await?;
let executed = Box::pin(self.execute(&data_store, notes)).await?;
let proven = Box::pin(self.prove(executed)).await?;
self.submit(proven).await?;
Ok(())
}
.in_current_span(),
)
Expand Down Expand Up @@ -149,15 +148,14 @@ impl NtxContext {
TransactionExecutor::new(data_store, None);
let checker = NoteConsumptionChecker::new(&executor);

let notes = match checker
.check_notes_consumability(
data_store.account.id(),
data_store.reference_header.block_num(),
notes.clone(),
TransactionArgs::default(),
Arc::new(DefaultSourceManager::default()),
)
.await
let notes = match Box::pin(checker.check_notes_consumability(
data_store.account.id(),
data_store.reference_header.block_num(),
notes.clone(),
TransactionArgs::default(),
Arc::new(DefaultSourceManager::default()),
))
.await
{
Ok(NoteAccountExecution::Success) => notes,
Ok(NoteAccountExecution::Failure { successful_notes, error, .. }) => {
Expand Down Expand Up @@ -194,16 +192,15 @@ impl NtxContext {
let executor: TransactionExecutor<'_, '_, _, UnreachableAuth> =
TransactionExecutor::new(data_store, None);

executor
.execute_transaction(
data_store.account.id(),
data_store.reference_header.block_num(),
notes,
TransactionArgs::default(),
Arc::new(DefaultSourceManager::default()),
)
.await
.map_err(NtxError::Execution)
Box::pin(executor.execute_transaction(
data_store.account.id(),
data_store.reference_header.block_num(),
notes,
TransactionArgs::default(),
Arc::new(DefaultSourceManager::default()),
))
.await
.map_err(NtxError::Execution)
}

/// Delegates the transaction proof to the remote prover if configured, otherwise performs the
Expand Down
2 changes: 1 addition & 1 deletion crates/remote-prover-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ tonic-web = { optional = true, version = "0.13" }
workspace = true

[dependencies]
async-trait = { version = "0.1" }
async-trait = { workspace = true }
miden-objects = { optional = true, workspace = true }
miden-tx = { optional = true, workspace = true }
prost = { default-features = false, features = ["derive"], version = "0.13" }
Expand Down
8 changes: 4 additions & 4 deletions crates/store/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ impl Db {
.await
}

/// Search for a [BlockHeader] from the database by its `block_num`.
/// Search for a [`BlockHeader`] from the database by its `block_num`.
///
/// When `block_number` is [None], the latest block header is returned.
#[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
Expand Down Expand Up @@ -371,7 +371,7 @@ impl Db {
.await
}

/// Loads all the Note's matching a certain NoteId from the database.
/// Loads all the Note's matching a certain `NoteId` from the database.
#[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
pub async fn select_notes_by_id(&self, note_ids: Vec<NoteId>) -> Result<Vec<NoteRecord>> {
self.transact("note by id", move |conn| {
Expand All @@ -392,7 +392,7 @@ impl Db {
.await
}

/// Loads all note IDs matching a certain NoteId from the database.
/// Loads all note IDs matching a certain `NoteId` from the database.
#[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)]
pub async fn select_note_ids(&self, note_ids: Vec<NoteId>) -> Result<HashSet<NoteId>> {
self.select_notes_by_id(note_ids)
Expand All @@ -403,7 +403,7 @@ impl Db {
/// Inserts the data of a new block into the DB.
///
/// `allow_acquire` and `acquire_done` are used to synchronize writes to the DB with writes to
/// the in-memory trees. Further details available on [super::state::State::apply_block].
/// the in-memory trees. Further details available on [`super::state::State::apply_block`].
// TODO: This span is logged in a root span, we should connect it to the parent one.
#[instrument(target = COMPONENT, skip_all, err)]
pub async fn apply_block(
Expand Down
Loading
Loading