Skip to content

Commit

Permalink
fix(prover): Revert use of spawn_blocking in LWG/NWG (#2682)
Browse files Browse the repository at this point in the history
## What ❔

<!-- What are the changes this PR brings about? -->
<!-- Example: This PR adds a PR template to the repo. -->
<!-- (For bigger PRs adding more context is appreciated) -->

## Why ❔

<!-- Why are these changes done? What goal do they contribute to? What
are the principles behind them? -->
<!-- Example: PR templates ensure PR reviewers, observers, and future
iterators are in context about the evolution of repos. -->

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [ ] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [ ] Tests for the changes have been added / updated.
- [ ] Documentation comments have been added / updated.
- [ ] Code has been formatted via `zk fmt` and `zk lint`.
  • Loading branch information
0xVolosnikov authored Aug 20, 2024
1 parent f84aaaf commit edfcc7d
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 90 deletions.
80 changes: 38 additions & 42 deletions prover/crates/bin/witness_generator/src/leaf_aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{sync::Arc, time::Instant};
use anyhow::Context as _;
use async_trait::async_trait;
use circuit_definitions::circuit_definitions::recursion_layer::base_circuit_type_into_recursive_leaf_circuit_type;
use tokio::{runtime::Handle, sync::Semaphore};
use tokio::sync::Semaphore;
use zkevm_test_harness::{
witness::recursive_aggregation::{
compute_leaf_params, create_leaf_witness, split_recursion_queue,
Expand Down Expand Up @@ -298,48 +298,44 @@ pub async fn process_leaf_aggregation_job(
let base_vk = job.base_vk.clone();
let leaf_params = (circuit_id, job.leaf_params.clone());

let handle = tokio::task::spawn_blocking(move || {
let async_task = async {
let _permit = semaphore
.acquire()
.await
.expect("failed to get permit to process queues chunk");

let proofs = load_proofs_for_job_ids(&proofs_ids_for_queue, &*object_store).await;
let base_proofs = proofs
.into_iter()
.map(|wrapper| match wrapper {
FriProofWrapper::Base(base_proof) => base_proof,
FriProofWrapper::Recursive(_) => {
panic!(
"Expected only base proofs for leaf agg {} {}",
job.circuit_id, job.block_number
);
}
})
.collect();

let (_, circuit) = create_leaf_witness(
circuit_id.into(),
queue,
base_proofs,
&base_vk,
&leaf_params,
);

save_recursive_layer_prover_input_artifacts(
job.block_number,
circuit_idx,
vec![circuit],
AggregationRound::LeafAggregation,
0,
&*object_store,
None,
)
let handle = tokio::task::spawn(async move {
let _permit = semaphore
.acquire()
.await
};

Handle::current().block_on(async_task)
.expect("failed to get permit to process queues chunk");

let proofs = load_proofs_for_job_ids(&proofs_ids_for_queue, &*object_store).await;
let base_proofs = proofs
.into_iter()
.map(|wrapper| match wrapper {
FriProofWrapper::Base(base_proof) => base_proof,
FriProofWrapper::Recursive(_) => {
panic!(
"Expected only base proofs for leaf agg {} {}",
job.circuit_id, job.block_number
);
}
})
.collect();

let (_, circuit) = create_leaf_witness(
circuit_id.into(),
queue,
base_proofs,
&base_vk,
&leaf_params,
);

save_recursive_layer_prover_input_artifacts(
job.block_number,
circuit_idx,
vec![circuit],
AggregationRound::LeafAggregation,
0,
&*object_store,
None,
)
.await
});

handles.push(handle);
Expand Down
91 changes: 43 additions & 48 deletions prover/crates/bin/witness_generator/src/node_aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{sync::Arc, time::Instant};
use anyhow::Context as _;
use async_trait::async_trait;
use circuit_definitions::circuit_definitions::recursion_layer::RECURSION_ARITY;
use tokio::{runtime::Handle, sync::Semaphore};
use tokio::sync::Semaphore;
use zkevm_test_harness::witness::recursive_aggregation::{
compute_node_vk_commitment, create_node_witness,
};
Expand Down Expand Up @@ -138,56 +138,51 @@ impl NodeAggregationWitnessGenerator {
let vk = vk.clone();
let all_leafs_layer_params = job.all_leafs_layer_params.clone();

let handle = tokio::task::spawn_blocking(move || {
let async_task = async {
let _permit = semaphore
.acquire()
.await
.expect("failed to get permit to process queues chunk");

let proofs =
load_proofs_for_job_ids(&proofs_ids_for_chunk, &*object_store).await;
let mut recursive_proofs = vec![];
for wrapper in proofs {
match wrapper {
FriProofWrapper::Base(_) => {
panic!(
"Expected only recursive proofs for node agg {} {}",
job.circuit_id, job.block_number
);
}
FriProofWrapper::Recursive(recursive_proof) => {
recursive_proofs.push(recursive_proof)
}
let handle = tokio::task::spawn(async move {
let _permit = semaphore
.acquire()
.await
.expect("failed to get permit to process queues chunk");

let proofs = load_proofs_for_job_ids(&proofs_ids_for_chunk, &*object_store).await;
let mut recursive_proofs = vec![];
for wrapper in proofs {
match wrapper {
FriProofWrapper::Base(_) => {
panic!(
"Expected only recursive proofs for node agg {} {}",
job.circuit_id, job.block_number
);
}
FriProofWrapper::Recursive(recursive_proof) => {
recursive_proofs.push(recursive_proof)
}
}
}

let (result_circuit_id, recursive_circuit, input_queue) = create_node_witness(
&chunk,
recursive_proofs,
&vk,
node_vk_commitment,
&all_leafs_layer_params,
);

let recursive_circuit_id_and_url = save_recursive_layer_prover_input_artifacts(
job.block_number,
circuit_idx,
vec![recursive_circuit],
AggregationRound::NodeAggregation,
job.depth + 1,
&*object_store,
Some(job.circuit_id),
)
.await;

let (result_circuit_id, recursive_circuit, input_queue) = create_node_witness(
&chunk,
recursive_proofs,
&vk,
node_vk_commitment,
&all_leafs_layer_params,
);

let recursive_circuit_id_and_url = save_recursive_layer_prover_input_artifacts(
job.block_number,
circuit_idx,
vec![recursive_circuit],
AggregationRound::NodeAggregation,
job.depth + 1,
&*object_store,
Some(job.circuit_id),
)
.await;

(
(result_circuit_id, input_queue),
recursive_circuit_id_and_url,
)
};

Handle::current().block_on(async_task)
(
(result_circuit_id, input_queue),
recursive_circuit_id_and_url,
)
});

handles.push(handle);
Expand Down

0 comments on commit edfcc7d

Please sign in to comment.