diff --git a/prover/crates/bin/witness_generator/src/leaf_aggregation.rs b/prover/crates/bin/witness_generator/src/leaf_aggregation.rs index d8cad84e777..2cfae160028 100644 --- a/prover/crates/bin/witness_generator/src/leaf_aggregation.rs +++ b/prover/crates/bin/witness_generator/src/leaf_aggregation.rs @@ -3,7 +3,7 @@ use std::{sync::Arc, time::Instant}; use anyhow::Context as _; use async_trait::async_trait; use circuit_definitions::circuit_definitions::recursion_layer::base_circuit_type_into_recursive_leaf_circuit_type; -use tokio::sync::Semaphore; +use tokio::{runtime::Handle, sync::Semaphore}; use zkevm_test_harness::{ witness::recursive_aggregation::{ compute_leaf_params, create_leaf_witness, split_recursion_queue, @@ -298,44 +298,48 @@ pub async fn process_leaf_aggregation_job( let base_vk = job.base_vk.clone(); let leaf_params = (circuit_id, job.leaf_params.clone()); - let handle = tokio::task::spawn(async move { - let _permit = semaphore - .acquire() + let handle = tokio::task::spawn_blocking(move || { + let async_task = async { + let _permit = semaphore + .acquire() + .await + .expect("failed to get permit to process queues chunk"); + + let proofs = load_proofs_for_job_ids(&proofs_ids_for_queue, &*object_store).await; + let base_proofs = proofs + .into_iter() + .map(|wrapper| match wrapper { + FriProofWrapper::Base(base_proof) => base_proof, + FriProofWrapper::Recursive(_) => { + panic!( + "Expected only base proofs for leaf agg {} {}", + job.circuit_id, job.block_number + ); + } + }) + .collect(); + + let (_, circuit) = create_leaf_witness( + circuit_id.into(), + queue, + base_proofs, + &base_vk, + &leaf_params, + ); + + save_recursive_layer_prover_input_artifacts( + job.block_number, + circuit_idx, + vec![circuit], + AggregationRound::LeafAggregation, + 0, + &*object_store, + None, + ) .await - .expect("failed to get permit to process queues chunk"); - - let proofs = load_proofs_for_job_ids(&proofs_ids_for_queue, &*object_store).await; - let base_proofs = proofs - .into_iter() - .map(|wrapper| match wrapper { - FriProofWrapper::Base(base_proof) => base_proof, - FriProofWrapper::Recursive(_) => { - panic!( - "Expected only base proofs for leaf agg {} {}", - job.circuit_id, job.block_number - ); - } - }) - .collect(); - - let (_, circuit) = create_leaf_witness( - circuit_id.into(), - queue, - base_proofs, - &base_vk, - &leaf_params, - ); - - save_recursive_layer_prover_input_artifacts( - job.block_number, - circuit_idx, - vec![circuit], - AggregationRound::LeafAggregation, - 0, - &*object_store, - None, - ) - .await + }; + + Handle::current().block_on(async_task) }); handles.push(handle); diff --git a/prover/crates/bin/witness_generator/src/node_aggregation.rs b/prover/crates/bin/witness_generator/src/node_aggregation.rs index c9d5ab32bc5..2836d463cd4 100644 --- a/prover/crates/bin/witness_generator/src/node_aggregation.rs +++ b/prover/crates/bin/witness_generator/src/node_aggregation.rs @@ -3,7 +3,7 @@ use std::{sync::Arc, time::Instant}; use anyhow::Context as _; use async_trait::async_trait; use circuit_definitions::circuit_definitions::recursion_layer::RECURSION_ARITY; -use tokio::sync::Semaphore; +use tokio::{runtime::Handle, sync::Semaphore}; use zkevm_test_harness::witness::recursive_aggregation::{ compute_node_vk_commitment, create_node_witness, }; @@ -138,51 +138,56 @@ impl NodeAggregationWitnessGenerator { let vk = vk.clone(); let all_leafs_layer_params = job.all_leafs_layer_params.clone(); - let handle = tokio::task::spawn(async move { - let _permit = semaphore - .acquire() - .await - .expect("failed to get permit to process queues chunk"); - - let proofs = load_proofs_for_job_ids(&proofs_ids_for_chunk, &*object_store).await; - let mut recursive_proofs = vec![]; - for wrapper in proofs { - match wrapper { - FriProofWrapper::Base(_) => { - panic!( - "Expected only recursive proofs for node agg {} {}", - job.circuit_id, job.block_number - ); - } - FriProofWrapper::Recursive(recursive_proof) => { - recursive_proofs.push(recursive_proof) + let handle = tokio::task::spawn_blocking(move || { + let async_task = async { + let _permit = semaphore + .acquire() + .await + .expect("failed to get permit to process queues chunk"); + + let proofs = + load_proofs_for_job_ids(&proofs_ids_for_chunk, &*object_store).await; + let mut recursive_proofs = vec![]; + for wrapper in proofs { + match wrapper { + FriProofWrapper::Base(_) => { + panic!( + "Expected only recursive proofs for node agg {} {}", + job.circuit_id, job.block_number + ); + } + FriProofWrapper::Recursive(recursive_proof) => { + recursive_proofs.push(recursive_proof) + } } } - } - - let (result_circuit_id, recursive_circuit, input_queue) = create_node_witness( - &chunk, - recursive_proofs, - &vk, - node_vk_commitment, - &all_leafs_layer_params, - ); - - let recursive_circuit_id_and_url = save_recursive_layer_prover_input_artifacts( - job.block_number, - circuit_idx, - vec![recursive_circuit], - AggregationRound::NodeAggregation, - job.depth + 1, - &*object_store, - Some(job.circuit_id), - ) - .await; - ( - (result_circuit_id, input_queue), - recursive_circuit_id_and_url, - ) + let (result_circuit_id, recursive_circuit, input_queue) = create_node_witness( + &chunk, + recursive_proofs, + &vk, + node_vk_commitment, + &all_leafs_layer_params, + ); + + let recursive_circuit_id_and_url = save_recursive_layer_prover_input_artifacts( + job.block_number, + circuit_idx, + vec![recursive_circuit], + AggregationRound::NodeAggregation, + job.depth + 1, + &*object_store, + Some(job.circuit_id), + ) + .await; + + ( + (result_circuit_id, input_queue), + recursive_circuit_id_and_url, + ) + }; + + Handle::current().block_on(async_task) }); handles.push(handle); diff --git a/prover/crates/bin/witness_generator/src/utils.rs b/prover/crates/bin/witness_generator/src/utils.rs index a21aabc5d6d..624d8ec1b40 100644 --- a/prover/crates/bin/witness_generator/src/utils.rs +++ b/prover/crates/bin/witness_generator/src/utils.rs @@ -227,11 +227,15 @@ pub async fn load_proofs_for_job_ids( job_ids: &[u32], object_store: &dyn ObjectStore, ) -> Vec { - let mut proofs = Vec::with_capacity(job_ids.len()); + let mut handles = Vec::with_capacity(job_ids.len()); for job_id in job_ids { - proofs.push(object_store.get(*job_id).await.unwrap()); + handles.push(object_store.get(*job_id)); } - proofs + futures::future::join_all(handles) + .await + .into_iter() + .map(|x| x.unwrap()) + .collect() } /// Loads all proofs for a given recursion tip's job ids.