Skip to content

Commit

Permalink
Run all verifier cryptography on a blocking thread
Browse files Browse the repository at this point in the history
Also use a new verifier channel for each batch.
  • Loading branch information
teor2345 committed Jul 5, 2022
1 parent 16c8202 commit ea34e52
Show file tree
Hide file tree
Showing 12 changed files with 191 additions and 128 deletions.
7 changes: 2 additions & 5 deletions zebra-chain/src/primitives/redpallas/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,7 @@ impl Item {
/// [`VerificationKey::verify`], which requires borrowing the message data,
/// the `Item` type is unlinked from the lifetime of the message.
pub fn verify_single(self) -> Result<(), Error> {
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// TODO: use spawn_blocking to avoid blocking code running concurrently in this task
tokio::task::block_in_place(|| match self.inner {
match self.inner {
Inner::Binding { vk_bytes, sig, c } => VerificationKey::<Binding>::try_from(vk_bytes)
.and_then(|vk| vk.verify_prehashed(&sig, c)),
Inner::SpendAuth { vk_bytes, sig, c } => {
Expand All @@ -155,7 +152,7 @@ impl Item {
VerificationKey::<SpendAuth>::try_from(vk_bytes)
.and_then(|vk| vk.verify_prehashed(&sig, c))
}
})
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion zebra-consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ futures = "0.3.21"
futures-util = "0.3.21"
metrics = "0.18.1"
thiserror = "1.0.31"
tokio = { version = "1.19.2", features = ["time", "sync", "tracing"] }
tokio = { version = "1.19.2", features = ["time", "sync", "tracing", "rt-multi-thread"] }
tower = { version = "0.4.13", features = ["timeout", "util", "buffer"] }
tracing = "0.1.31"
tracing-futures = "0.2.5"
Expand Down
7 changes: 0 additions & 7 deletions zebra-consensus/src/primitives.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,3 @@ const MAX_BATCH_SIZE: usize = 64;

/// The maximum latency bound for any of the batch verifiers.
const MAX_BATCH_LATENCY: std::time::Duration = std::time::Duration::from_millis(100);

/// The size of the buffer in the broadcast channels used by batch verifiers.
///
/// This bound limits the number of concurrent batches for each verifier.
/// If tasks delay checking for verifier results, and the bound is too small,
/// new batches will be rejected with `RecvError`s.
const BROADCAST_BUFFER_SIZE: usize = 512;
28 changes: 25 additions & 3 deletions zebra-consensus/src/primitives/ed25519.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,16 @@ pub static VERIFIER: Lazy<
// blocks have eldritch types whose names cannot be written. So instead,
// we use a Ready to avoid an async block and cast the closure to a
// function (which is possible because it doesn't capture any state).
tower::service_fn((|item: Item| ready(item.verify_single())) as fn(_) -> _),
tower::service_fn(
(|item: Item| {
ready(
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// TODO: use spawn_blocking to avoid blocking code running concurrently in this task
tokio::task::block_in_place(|| item.verify_single()),
)
}) as fn(_) -> _,
),
)
});

Expand All @@ -62,7 +71,7 @@ pub struct Verifier {
impl Default for Verifier {
fn default() -> Self {
let batch = batch::Verifier::default();
let (tx, _) = channel(super::BROADCAST_BUFFER_SIZE);
let (tx, _) = channel(1);
Self { batch, tx }
}
}
Expand Down Expand Up @@ -113,7 +122,20 @@ impl Service<BatchControl<Item>> for Verifier {
BatchControl::Flush => {
tracing::trace!("got ed25519 flush command");
let batch = mem::take(&mut self.batch);
let _ = self.tx.send(batch.verify(thread_rng()));

// # Correctness
//
// Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// TODO: use spawn_blocking to avoid blocking code running concurrently in this task
let result = tokio::task::block_in_place(|| batch.verify(thread_rng()));
let _ = self.tx.send(result);

// Use a new channel for each batch.
// TODO: replace with a watch channel (#4729)
let (tx, _) = channel(1);
let _ = mem::replace(&mut self.tx, tx);

Box::pin(async { Ok(()) })
}
}
Expand Down
4 changes: 2 additions & 2 deletions zebra-consensus/src/primitives/ed25519/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ where
Ok(())
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn batch_flushes_on_max_items_test() -> Result<()> {
batch_flushes_on_max_items().await
}
Expand All @@ -52,7 +52,7 @@ async fn batch_flushes_on_max_items() -> Result<()> {
Ok(())
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn batch_flushes_on_max_latency_test() -> Result<()> {
batch_flushes_on_max_latency().await
}
Expand Down
28 changes: 24 additions & 4 deletions zebra-consensus/src/primitives/groth16.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,16 @@ pub static JOINSPLIT_VERIFIER: Lazy<ServiceFn<fn(Item) -> Ready<Result<(), Boxed
tower::service_fn(
(|item: Item| {
ready(
item.verify_single(&GROTH16_PARAMETERS.sprout.joinsplit_prepared_verifying_key)
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// TODO: use spawn_blocking to avoid blocking code running concurrently in this task
tokio::task::block_in_place(|| {
item.verify_single(
&GROTH16_PARAMETERS.sprout.joinsplit_prepared_verifying_key,
)
.map_err(|e| TransactionError::Groth16(e.to_string()))
.map_err(tower_fallback::BoxedError::from),
.map_err(tower_fallback::BoxedError::from)
}),
)
}) as fn(_) -> _,
)
Expand Down Expand Up @@ -345,7 +352,7 @@ pub struct Verifier {
impl Verifier {
fn new(vk: &'static VerifyingKey<Bls12>) -> Self {
let batch = batch::Verifier::default();
let (tx, _) = channel(super::BROADCAST_BUFFER_SIZE);
let (tx, _) = channel(1);
Self { batch, vk, tx }
}
}
Expand Down Expand Up @@ -403,7 +410,20 @@ impl Service<BatchControl<Item>> for Verifier {
BatchControl::Flush => {
tracing::trace!("got flush command");
let batch = mem::take(&mut self.batch);
let _ = self.tx.send(batch.verify(thread_rng(), self.vk));

// # Correctness
//
// Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// TODO: use spawn_blocking to avoid blocking code running concurrently in this task
let result = tokio::task::block_in_place(|| batch.verify(thread_rng(), self.vk));
let _ = self.tx.send(result);

// Use a new channel for each batch.
// TODO: replace with a watch channel (#4729)
let (tx, _) = channel(1);
let _ = mem::replace(&mut self.tx, tx);

Box::pin(async { Ok(()) })
}
}
Expand Down
10 changes: 5 additions & 5 deletions zebra-consensus/src/primitives/groth16/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ where
Ok(())
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn verify_sapling_groth16() {
// Use separate verifiers so shared batch tasks aren't killed when the test ends (#2390)
let mut spend_verifier = Fallback::new(
Expand Down Expand Up @@ -170,7 +170,7 @@ where
Ok(())
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn correctly_err_on_invalid_output_proof() {
// Use separate verifiers so shared batch tasks aren't killed when the test ends (#2390).
// Also, since we expect these to fail, we don't want to slow down the communal verifiers.
Expand Down Expand Up @@ -246,7 +246,7 @@ where
Ok(())
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn verify_sprout_groth16() {
let mut verifier = tower::service_fn(
(|item: Item| {
Expand Down Expand Up @@ -309,7 +309,7 @@ where
Ok(())
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn verify_sprout_groth16_vector() {
let mut verifier = tower::service_fn(
(|item: Item| {
Expand Down Expand Up @@ -431,7 +431,7 @@ where
Ok(())
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn correctly_err_on_invalid_joinsplit_proof() {
// Use separate verifiers so shared batch tasks aren't killed when the test ends (#2390).
// Also, since we expect these to fail, we don't want to slow down the communal verifiers.
Expand Down
17 changes: 11 additions & 6 deletions zebra-consensus/src/primitives/halo2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,7 @@ impl Item {
/// This is useful (in combination with `Item::clone`) for implementing
/// fallback logic when batch verification fails.
pub fn verify_single(&self, vk: &VerifyingKey) -> Result<(), halo2::plonk::Error> {
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// TODO: use spawn_blocking to avoid blocking code running concurrently in this task
tokio::task::block_in_place(|| self.proof.verify(vk, &self.instances[..]))
self.proof.verify(vk, &self.instances[..])
}
}

Expand Down Expand Up @@ -169,8 +166,16 @@ pub static VERIFIER: Lazy<
// we use a Ready to avoid an async block and cast the closure to a
// function (which is possible because it doesn't capture any state).
tower::service_fn(
(|item: Item| ready(item.verify_single(&VERIFYING_KEY).map_err(Halo2Error::from)))
as fn(_) -> _,
(|item: Item| {
ready(
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// TODO: use spawn_blocking to avoid blocking code running concurrently in this task
tokio::task::block_in_place(|| {
item.verify_single(&VERIFYING_KEY).map_err(Halo2Error::from)
}),
)
}) as fn(_) -> _,
),
)
});
Expand Down
28 changes: 25 additions & 3 deletions zebra-consensus/src/primitives/redjubjub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,16 @@ pub static VERIFIER: Lazy<
// blocks have eldritch types whose names cannot be written. So instead,
// we use a Ready to avoid an async block and cast the closure to a
// function (which is possible because it doesn't capture any state).
tower::service_fn((|item: Item| ready(item.verify_single())) as fn(_) -> _),
tower::service_fn(
(|item: Item| {
ready(
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// TODO: use spawn_blocking to avoid blocking code running concurrently in this task
tokio::task::block_in_place(|| item.verify_single()),
)
}) as fn(_) -> _,
),
)
});

Expand All @@ -62,7 +71,7 @@ pub struct Verifier {
impl Default for Verifier {
fn default() -> Self {
let batch = batch::Verifier::default();
let (tx, _) = channel(super::BROADCAST_BUFFER_SIZE);
let (tx, _) = channel(1);
Self { batch, tx }
}
}
Expand Down Expand Up @@ -112,7 +121,20 @@ impl Service<BatchControl<Item>> for Verifier {
BatchControl::Flush => {
tracing::trace!("got flush command");
let batch = mem::take(&mut self.batch);
let _ = self.tx.send(batch.verify(thread_rng()));

// # Correctness
//
// Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// TODO: use spawn_blocking to avoid blocking code running concurrently in this task
let result = tokio::task::block_in_place(|| batch.verify(thread_rng()));
let _ = self.tx.send(result);

// Use a new channel for each batch.
// TODO: replace with a watch channel (#4729)
let (tx, _) = channel(1);
let _ = mem::replace(&mut self.tx, tx);

Box::pin(async { Ok(()) })
}
}
Expand Down
4 changes: 2 additions & 2 deletions zebra-consensus/src/primitives/redjubjub/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ where
Ok(())
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn batch_flushes_on_max_items_test() -> Result<()> {
batch_flushes_on_max_items().await
}
Expand All @@ -64,7 +64,7 @@ async fn batch_flushes_on_max_items() -> Result<()> {
Ok(())
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn batch_flushes_on_max_latency_test() -> Result<()> {
batch_flushes_on_max_latency().await
}
Expand Down
11 changes: 10 additions & 1 deletion zebra-consensus/src/primitives/redpallas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,16 @@ pub static VERIFIER: Lazy<
// blocks have eldritch types whose names cannot be written. So instead,
// we use a Ready to avoid an async block and cast the closure to a
// function (which is possible because it doesn't capture any state).
tower::service_fn((|item: Item| ready(item.verify_single())) as fn(_) -> _),
tower::service_fn(
(|item: Item| {
ready(
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// TODO: use spawn_blocking to avoid blocking code running concurrently in this task
tokio::task::block_in_place(|| item.verify_single()),
)
}) as fn(_) -> _,
),
)
});

Expand Down
Loading

0 comments on commit ea34e52

Please sign in to comment.