Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions node/rest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ pub struct Rest<N: Network, C: ConsensusStorage<N>, R: Routing<N>> {
num_verifying_deploys: Arc<AtomicUsize>,
/// The number of ongoing execute transaction verifications via REST.
num_verifying_executions: Arc<AtomicUsize>,
/// The number of ongoing solution verifications via REST.
num_verifying_solutions: Arc<AtomicUsize>,
}

impl<N: Network, C: 'static + ConsensusStorage<N>, R: Routing<N>> Rest<N, C, R> {
Expand All @@ -113,6 +115,7 @@ impl<N: Network, C: 'static + ConsensusStorage<N>, R: Routing<N>> Rest<N, C, R>
handles: Default::default(),
num_verifying_deploys: Default::default(),
num_verifying_executions: Default::default(),
num_verifying_solutions: Default::default(),
};
// Spawn the server.
server.spawn_server(rest_ip, rest_rps).await?;
Expand Down
125 changes: 88 additions & 37 deletions node/rest/src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ pub(crate) struct CheckTransaction {
check_transaction: Option<bool>,
}

/// The query object for `solution_broadcast`.
#[derive(Copy, Clone, Deserialize, Serialize)]
pub(crate) struct CheckSolution {
check_solution: Option<bool>,
}

/// The query object for `get_state_paths_for_commitments`.
#[derive(Clone, Deserialize, Serialize)]
pub(crate) struct Commitments {
Expand Down Expand Up @@ -737,50 +743,80 @@ impl<N: Network, C: ConsensusStorage<N>, R: Routing<N>> Rest<N, C, R> {
}

// POST /<network>/solution/broadcast
// POST /<network>/solution/broadcast?check_solution={true}
//
// Solution Broadcast Flow
//
// /solution/broadcast
// |
// +----+---------------------------+
// | |
// v v
// Without Query Params With Query Param
// check_solution=true
// | |
// +---------+ +---------+
// | | | |
// v v v v
// Synced Not Synced Synced Not Synced
// | | | |
// v v v v
// 200 200 check_solution check_solution
// +---------+ +---------+
// | | | |
// v v v v
// 200 422 203 503
pub(crate) async fn solution_broadcast(
State(rest): State<Self>,
check_solution: Query<CheckSolution>,
Json(solution): Json<Solution<N>>,
) -> Result<ErasedJson, RestError> {
) -> Result<impl axum::response::IntoResponse, RestError> {
// Check if the node is within sync leniency.
let is_within_sync_leniency = rest.routing.is_within_sync_leniency();
// Determine if we need to check the solution.
let check_solution = check_solution.check_solution.unwrap_or(false);

// If the consensus module is enabled, add the unconfirmed solution to the memory pool.
// Otherwise, verify it prior to broadcasting.
match rest.consensus {
// Add the unconfirmed solution to the memory pool.
Some(consensus) => {
// Do not process the solution if the node is too far behind.
if !is_within_sync_leniency {
return Err(RestError::service_unavailable(anyhow!(
"Unable to broadcast solution '{}' (node is syncing)",
fmt_id(solution.id())
)));
}
consensus.add_unconfirmed_solution(solution).await?
if check_solution {
// Select counter and limit.
let (counter, limit, err_msg) =
(&rest.num_verifying_solutions, N::MAX_SOLUTIONS, "Too many solution verifications in progress");

// Try to acquire a slot.
if counter
.fetch_update(
Ordering::Relaxed,
Ordering::Relaxed,
|val| {
if val < limit { Some(val + 1) } else { None }
},
)
.is_err()
{
return Err(RestError::too_many_requests(anyhow!("{}", err_msg)));
}
// Verify the solution.
None => {
// Compute the current epoch hash.
let epoch_hash = rest.ledger.latest_epoch_hash()?;
// Retrieve the current proof target.
let proof_target = rest.ledger.latest_proof_target();
// Ensure that the solution is valid for the given epoch.
let puzzle = rest.ledger.puzzle().clone();
// Check if the prover has reached their solution limit.
// While snarkVM will ultimately abort any excess solutions for safety, performing this check
// here prevents the to-be aborted solutions from propagating through the network.
let prover_address = solution.address();
if rest.ledger.is_solution_limit_reached(&prover_address, 0) {
return Err(RestError::unprocessable_entity(anyhow!(
"Invalid solution '{}' - Prover '{prover_address}' has reached their solution limit for the current epoch",
fmt_id(solution.id())
)));
}
// Verify the solution in a blocking task.

// Compute the current epoch hash.
let epoch_hash = rest.ledger.latest_epoch_hash()?;
// Retrieve the current proof target.
let proof_target = rest.ledger.latest_proof_target();
// Ensure that the solution is valid for the given epoch.
let puzzle = rest.ledger.puzzle().clone();
// Check if the prover has reached their solution limit.
// While snarkVM will ultimately abort any excess solutions for safety, performing this check
// here prevents the to-be aborted solutions from propagating through the network.
let prover_address = solution.address();
if rest.ledger.is_solution_limit_reached(&prover_address, 0) {
return Err(RestError::unprocessable_entity(anyhow!(
"Invalid solution '{}' - Prover '{prover_address}' has reached their solution limit for the current epoch",
fmt_id(solution.id())
)));
}
// Verify the solution in a blocking task.
let res: Result<(), anyhow::Error> =
match tokio::task::spawn_blocking(move || puzzle.check_solution(&solution, epoch_hash, proof_target))
.await
{
Ok(Ok(())) => {}
Ok(Ok(())) => Ok(()),
Ok(Err(err)) => {
return match is_within_sync_leniency {
// The solution failed to verify.
Expand All @@ -797,8 +833,17 @@ impl<N: Network, C: ConsensusStorage<N>, R: Routing<N>> Rest<N, C, R> {
Err(err) => {
return Err(RestError::internal_server_error(anyhow!("Tokio error: {err}")));
}
}
}
};
// Release the slot.
counter.fetch_sub(1, Ordering::Relaxed);
// Propagate error if any.
res?;
}

// If the consensus module is enabled, add the unconfirmed solution to the memory pool.
if let Some(consensus) = rest.consensus {
// Add the unconfirmed solution to the memory pool.
let _ = consensus.add_unconfirmed_solution(solution).await;
}

let solution_id = solution.id();
Expand All @@ -809,7 +854,13 @@ impl<N: Network, C: ConsensusStorage<N>, R: Routing<N>> Rest<N, C, R> {
// Broadcast the unconfirmed solution message.
rest.routing.propagate(message, &[]);

Ok(ErasedJson::pretty(solution_id))
// Determine if the node is synced and if the solution was checked.
match !is_within_sync_leniency && check_solution {
// If the node is not synced and we validated the solution, return a 203.
true => Ok((StatusCode::NON_AUTHORITATIVE_INFORMATION, ErasedJson::pretty(solution_id))),
// Otherwise, return a 200.
false => Ok((StatusCode::OK, ErasedJson::pretty(solution_id))),
}
}

// POST /{network}/db_backup?path=new_fs_path
Expand Down