Skip to content

Commit

Permalink
fix(prover): speed up LWG and NWG (#2661)
Browse files Browse the repository at this point in the history
## What ❔

- Loading of proofs for each recursive circuit is now done
asynchronously
- Each recursive circuit is now processed in a blocking thread, which
speeds up serialization and other CPU-sensitive processing.

Locally, with additional artificial network delays (500 ms), a x5 speed
up is observed

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [ ] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [ ] Tests for the changes have been added / updated.
- [ ] Documentation comments have been added / updated.
- [ ] Code has been formatted via `zk fmt` and `zk lint`.
  • Loading branch information
0xVolosnikov authored Aug 16, 2024
1 parent a87358a commit 6243399
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 84 deletions.
80 changes: 42 additions & 38 deletions prover/crates/bin/witness_generator/src/leaf_aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{sync::Arc, time::Instant};
use anyhow::Context as _;
use async_trait::async_trait;
use circuit_definitions::circuit_definitions::recursion_layer::base_circuit_type_into_recursive_leaf_circuit_type;
use tokio::sync::Semaphore;
use tokio::{runtime::Handle, sync::Semaphore};
use zkevm_test_harness::{
witness::recursive_aggregation::{
compute_leaf_params, create_leaf_witness, split_recursion_queue,
Expand Down Expand Up @@ -298,44 +298,48 @@ pub async fn process_leaf_aggregation_job(
let base_vk = job.base_vk.clone();
let leaf_params = (circuit_id, job.leaf_params.clone());

let handle = tokio::task::spawn(async move {
let _permit = semaphore
.acquire()
let handle = tokio::task::spawn_blocking(move || {
let async_task = async {
let _permit = semaphore
.acquire()
.await
.expect("failed to get permit to process queues chunk");

let proofs = load_proofs_for_job_ids(&proofs_ids_for_queue, &*object_store).await;
let base_proofs = proofs
.into_iter()
.map(|wrapper| match wrapper {
FriProofWrapper::Base(base_proof) => base_proof,
FriProofWrapper::Recursive(_) => {
panic!(
"Expected only base proofs for leaf agg {} {}",
job.circuit_id, job.block_number
);
}
})
.collect();

let (_, circuit) = create_leaf_witness(
circuit_id.into(),
queue,
base_proofs,
&base_vk,
&leaf_params,
);

save_recursive_layer_prover_input_artifacts(
job.block_number,
circuit_idx,
vec![circuit],
AggregationRound::LeafAggregation,
0,
&*object_store,
None,
)
.await
.expect("failed to get permit to process queues chunk");

let proofs = load_proofs_for_job_ids(&proofs_ids_for_queue, &*object_store).await;
let base_proofs = proofs
.into_iter()
.map(|wrapper| match wrapper {
FriProofWrapper::Base(base_proof) => base_proof,
FriProofWrapper::Recursive(_) => {
panic!(
"Expected only base proofs for leaf agg {} {}",
job.circuit_id, job.block_number
);
}
})
.collect();

let (_, circuit) = create_leaf_witness(
circuit_id.into(),
queue,
base_proofs,
&base_vk,
&leaf_params,
);

save_recursive_layer_prover_input_artifacts(
job.block_number,
circuit_idx,
vec![circuit],
AggregationRound::LeafAggregation,
0,
&*object_store,
None,
)
.await
};

Handle::current().block_on(async_task)
});

handles.push(handle);
Expand Down
91 changes: 48 additions & 43 deletions prover/crates/bin/witness_generator/src/node_aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{sync::Arc, time::Instant};
use anyhow::Context as _;
use async_trait::async_trait;
use circuit_definitions::circuit_definitions::recursion_layer::RECURSION_ARITY;
use tokio::sync::Semaphore;
use tokio::{runtime::Handle, sync::Semaphore};
use zkevm_test_harness::witness::recursive_aggregation::{
compute_node_vk_commitment, create_node_witness,
};
Expand Down Expand Up @@ -138,51 +138,56 @@ impl NodeAggregationWitnessGenerator {
let vk = vk.clone();
let all_leafs_layer_params = job.all_leafs_layer_params.clone();

let handle = tokio::task::spawn(async move {
let _permit = semaphore
.acquire()
.await
.expect("failed to get permit to process queues chunk");

let proofs = load_proofs_for_job_ids(&proofs_ids_for_chunk, &*object_store).await;
let mut recursive_proofs = vec![];
for wrapper in proofs {
match wrapper {
FriProofWrapper::Base(_) => {
panic!(
"Expected only recursive proofs for node agg {} {}",
job.circuit_id, job.block_number
);
}
FriProofWrapper::Recursive(recursive_proof) => {
recursive_proofs.push(recursive_proof)
let handle = tokio::task::spawn_blocking(move || {
let async_task = async {
let _permit = semaphore
.acquire()
.await
.expect("failed to get permit to process queues chunk");

let proofs =
load_proofs_for_job_ids(&proofs_ids_for_chunk, &*object_store).await;
let mut recursive_proofs = vec![];
for wrapper in proofs {
match wrapper {
FriProofWrapper::Base(_) => {
panic!(
"Expected only recursive proofs for node agg {} {}",
job.circuit_id, job.block_number
);
}
FriProofWrapper::Recursive(recursive_proof) => {
recursive_proofs.push(recursive_proof)
}
}
}
}

let (result_circuit_id, recursive_circuit, input_queue) = create_node_witness(
&chunk,
recursive_proofs,
&vk,
node_vk_commitment,
&all_leafs_layer_params,
);

let recursive_circuit_id_and_url = save_recursive_layer_prover_input_artifacts(
job.block_number,
circuit_idx,
vec![recursive_circuit],
AggregationRound::NodeAggregation,
job.depth + 1,
&*object_store,
Some(job.circuit_id),
)
.await;

(
(result_circuit_id, input_queue),
recursive_circuit_id_and_url,
)
let (result_circuit_id, recursive_circuit, input_queue) = create_node_witness(
&chunk,
recursive_proofs,
&vk,
node_vk_commitment,
&all_leafs_layer_params,
);

let recursive_circuit_id_and_url = save_recursive_layer_prover_input_artifacts(
job.block_number,
circuit_idx,
vec![recursive_circuit],
AggregationRound::NodeAggregation,
job.depth + 1,
&*object_store,
Some(job.circuit_id),
)
.await;

(
(result_circuit_id, input_queue),
recursive_circuit_id_and_url,
)
};

Handle::current().block_on(async_task)
});

handles.push(handle);
Expand Down
10 changes: 7 additions & 3 deletions prover/crates/bin/witness_generator/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,11 +227,15 @@ pub async fn load_proofs_for_job_ids(
job_ids: &[u32],
object_store: &dyn ObjectStore,
) -> Vec<FriProofWrapper> {
let mut proofs = Vec::with_capacity(job_ids.len());
let mut handles = Vec::with_capacity(job_ids.len());
for job_id in job_ids {
proofs.push(object_store.get(*job_id).await.unwrap());
handles.push(object_store.get(*job_id));
}
proofs
futures::future::join_all(handles)
.await
.into_iter()
.map(|x| x.unwrap())
.collect()
}

/// Loads all proofs for a given recursion tip's job ids.
Expand Down

0 comments on commit 6243399

Please sign in to comment.