From ea34e52733fd0ac139f0fec0ca1b4505be0532ba Mon Sep 17 00:00:00 2001 From: teor Date: Wed, 6 Jul 2022 06:34:42 +1000 Subject: [PATCH] Run all verifier cryptography on a blocking thread Also use a new verifier channel for each batch. --- zebra-chain/src/primitives/redpallas/batch.rs | 7 +- zebra-consensus/Cargo.toml | 2 +- zebra-consensus/src/primitives.rs | 7 - zebra-consensus/src/primitives/ed25519.rs | 28 ++- .../src/primitives/ed25519/tests.rs | 4 +- zebra-consensus/src/primitives/groth16.rs | 28 ++- .../src/primitives/groth16/tests.rs | 10 +- zebra-consensus/src/primitives/halo2.rs | 17 +- zebra-consensus/src/primitives/redjubjub.rs | 28 ++- .../src/primitives/redjubjub/tests.rs | 4 +- zebra-consensus/src/primitives/redpallas.rs | 11 +- zebra-consensus/src/transaction/tests.rs | 173 +++++++++--------- 12 files changed, 191 insertions(+), 128 deletions(-) diff --git a/zebra-chain/src/primitives/redpallas/batch.rs b/zebra-chain/src/primitives/redpallas/batch.rs index df0fd98039b..a8bb9175989 100644 --- a/zebra-chain/src/primitives/redpallas/batch.rs +++ b/zebra-chain/src/primitives/redpallas/batch.rs @@ -137,10 +137,7 @@ impl Item { /// [`VerificationKey::verify`], which requires borrowing the message data, /// the `Item` type is unlinked from the lifetime of the message. pub fn verify_single(self) -> Result<(), Error> { - // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. - // - // TODO: use spawn_blocking to avoid blocking code running concurrently in this task - tokio::task::block_in_place(|| match self.inner { + match self.inner { Inner::Binding { vk_bytes, sig, c } => VerificationKey::::try_from(vk_bytes) .and_then(|vk| vk.verify_prehashed(&sig, c)), Inner::SpendAuth { vk_bytes, sig, c } => { @@ -155,7 +152,7 @@ impl Item { VerificationKey::::try_from(vk_bytes) .and_then(|vk| vk.verify_prehashed(&sig, c)) } - }) + } } } diff --git a/zebra-consensus/Cargo.toml b/zebra-consensus/Cargo.toml index 99ab5d3e467..abda323c059 100644 --- a/zebra-consensus/Cargo.toml +++ b/zebra-consensus/Cargo.toml @@ -29,7 +29,7 @@ futures = "0.3.21" futures-util = "0.3.21" metrics = "0.18.1" thiserror = "1.0.31" -tokio = { version = "1.19.2", features = ["time", "sync", "tracing"] } +tokio = { version = "1.19.2", features = ["time", "sync", "tracing", "rt-multi-thread"] } tower = { version = "0.4.13", features = ["timeout", "util", "buffer"] } tracing = "0.1.31" tracing-futures = "0.2.5" diff --git a/zebra-consensus/src/primitives.rs b/zebra-consensus/src/primitives.rs index ee25e718029..333ff1156f9 100644 --- a/zebra-consensus/src/primitives.rs +++ b/zebra-consensus/src/primitives.rs @@ -11,10 +11,3 @@ const MAX_BATCH_SIZE: usize = 64; /// The maximum latency bound for any of the batch verifiers. const MAX_BATCH_LATENCY: std::time::Duration = std::time::Duration::from_millis(100); - -/// The size of the buffer in the broadcast channels used by batch verifiers. -/// -/// This bound limits the number of concurrent batches for each verifier. -/// If tasks delay checking for verifier results, and the bound is too small, -/// new batches will be rejected with `RecvError`s. -const BROADCAST_BUFFER_SIZE: usize = 512; diff --git a/zebra-consensus/src/primitives/ed25519.rs b/zebra-consensus/src/primitives/ed25519.rs index 82f7b988630..50af27d10e5 100644 --- a/zebra-consensus/src/primitives/ed25519.rs +++ b/zebra-consensus/src/primitives/ed25519.rs @@ -45,7 +45,16 @@ pub static VERIFIER: Lazy< // blocks have eldritch types whose names cannot be written. So instead, // we use a Ready to avoid an async block and cast the closure to a // function (which is possible because it doesn't capture any state). - tower::service_fn((|item: Item| ready(item.verify_single())) as fn(_) -> _), + tower::service_fn( + (|item: Item| { + ready( + // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. + // + // TODO: use spawn_blocking to avoid blocking code running concurrently in this task + tokio::task::block_in_place(|| item.verify_single()), + ) + }) as fn(_) -> _, + ), ) }); @@ -62,7 +71,7 @@ pub struct Verifier { impl Default for Verifier { fn default() -> Self { let batch = batch::Verifier::default(); - let (tx, _) = channel(super::BROADCAST_BUFFER_SIZE); + let (tx, _) = channel(1); Self { batch, tx } } } @@ -113,7 +122,20 @@ impl Service> for Verifier { BatchControl::Flush => { tracing::trace!("got ed25519 flush command"); let batch = mem::take(&mut self.batch); - let _ = self.tx.send(batch.verify(thread_rng())); + + // # Correctness + // + // Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. + // + // TODO: use spawn_blocking to avoid blocking code running concurrently in this task + let result = tokio::task::block_in_place(|| batch.verify(thread_rng())); + let _ = self.tx.send(result); + + // Use a new channel for each batch. + // TODO: replace with a watch channel (#4729) + let (tx, _) = channel(1); + let _ = mem::replace(&mut self.tx, tx); + Box::pin(async { Ok(()) }) } } diff --git a/zebra-consensus/src/primitives/ed25519/tests.rs b/zebra-consensus/src/primitives/ed25519/tests.rs index a95fd3bcaf0..ff68311a50e 100644 --- a/zebra-consensus/src/primitives/ed25519/tests.rs +++ b/zebra-consensus/src/primitives/ed25519/tests.rs @@ -33,7 +33,7 @@ where Ok(()) } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn batch_flushes_on_max_items_test() -> Result<()> { batch_flushes_on_max_items().await } @@ -52,7 +52,7 @@ async fn batch_flushes_on_max_items() -> Result<()> { Ok(()) } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn batch_flushes_on_max_latency_test() -> Result<()> { batch_flushes_on_max_latency().await } diff --git a/zebra-consensus/src/primitives/groth16.rs b/zebra-consensus/src/primitives/groth16.rs index 6a5d294bc3c..d53a81de313 100644 --- a/zebra-consensus/src/primitives/groth16.rs +++ b/zebra-consensus/src/primitives/groth16.rs @@ -129,9 +129,16 @@ pub static JOINSPLIT_VERIFIER: Lazy Ready _, ) @@ -345,7 +352,7 @@ pub struct Verifier { impl Verifier { fn new(vk: &'static VerifyingKey) -> Self { let batch = batch::Verifier::default(); - let (tx, _) = channel(super::BROADCAST_BUFFER_SIZE); + let (tx, _) = channel(1); Self { batch, vk, tx } } } @@ -403,7 +410,20 @@ impl Service> for Verifier { BatchControl::Flush => { tracing::trace!("got flush command"); let batch = mem::take(&mut self.batch); - let _ = self.tx.send(batch.verify(thread_rng(), self.vk)); + + // # Correctness + // + // Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. + // + // TODO: use spawn_blocking to avoid blocking code running concurrently in this task + let result = tokio::task::block_in_place(|| batch.verify(thread_rng(), self.vk)); + let _ = self.tx.send(result); + + // Use a new channel for each batch. + // TODO: replace with a watch channel (#4729) + let (tx, _) = channel(1); + let _ = mem::replace(&mut self.tx, tx); + Box::pin(async { Ok(()) }) } } diff --git a/zebra-consensus/src/primitives/groth16/tests.rs b/zebra-consensus/src/primitives/groth16/tests.rs index 665df39ec5e..16f84fed1c2 100644 --- a/zebra-consensus/src/primitives/groth16/tests.rs +++ b/zebra-consensus/src/primitives/groth16/tests.rs @@ -67,7 +67,7 @@ where Ok(()) } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn verify_sapling_groth16() { // Use separate verifiers so shared batch tasks aren't killed when the test ends (#2390) let mut spend_verifier = Fallback::new( @@ -170,7 +170,7 @@ where Ok(()) } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn correctly_err_on_invalid_output_proof() { // Use separate verifiers so shared batch tasks aren't killed when the test ends (#2390). // Also, since we expect these to fail, we don't want to slow down the communal verifiers. @@ -246,7 +246,7 @@ where Ok(()) } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn verify_sprout_groth16() { let mut verifier = tower::service_fn( (|item: Item| { @@ -309,7 +309,7 @@ where Ok(()) } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn verify_sprout_groth16_vector() { let mut verifier = tower::service_fn( (|item: Item| { @@ -431,7 +431,7 @@ where Ok(()) } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn correctly_err_on_invalid_joinsplit_proof() { // Use separate verifiers so shared batch tasks aren't killed when the test ends (#2390). // Also, since we expect these to fail, we don't want to slow down the communal verifiers. diff --git a/zebra-consensus/src/primitives/halo2.rs b/zebra-consensus/src/primitives/halo2.rs index ec02c7686b3..df578cfff83 100644 --- a/zebra-consensus/src/primitives/halo2.rs +++ b/zebra-consensus/src/primitives/halo2.rs @@ -46,10 +46,7 @@ impl Item { /// This is useful (in combination with `Item::clone`) for implementing /// fallback logic when batch verification fails. pub fn verify_single(&self, vk: &VerifyingKey) -> Result<(), halo2::plonk::Error> { - // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. - // - // TODO: use spawn_blocking to avoid blocking code running concurrently in this task - tokio::task::block_in_place(|| self.proof.verify(vk, &self.instances[..])) + self.proof.verify(vk, &self.instances[..]) } } @@ -169,8 +166,16 @@ pub static VERIFIER: Lazy< // we use a Ready to avoid an async block and cast the closure to a // function (which is possible because it doesn't capture any state). tower::service_fn( - (|item: Item| ready(item.verify_single(&VERIFYING_KEY).map_err(Halo2Error::from))) - as fn(_) -> _, + (|item: Item| { + ready( + // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. + // + // TODO: use spawn_blocking to avoid blocking code running concurrently in this task + tokio::task::block_in_place(|| { + item.verify_single(&VERIFYING_KEY).map_err(Halo2Error::from) + }), + ) + }) as fn(_) -> _, ), ) }); diff --git a/zebra-consensus/src/primitives/redjubjub.rs b/zebra-consensus/src/primitives/redjubjub.rs index d965ecb8887..1fef0185f9f 100644 --- a/zebra-consensus/src/primitives/redjubjub.rs +++ b/zebra-consensus/src/primitives/redjubjub.rs @@ -45,7 +45,16 @@ pub static VERIFIER: Lazy< // blocks have eldritch types whose names cannot be written. So instead, // we use a Ready to avoid an async block and cast the closure to a // function (which is possible because it doesn't capture any state). - tower::service_fn((|item: Item| ready(item.verify_single())) as fn(_) -> _), + tower::service_fn( + (|item: Item| { + ready( + // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. + // + // TODO: use spawn_blocking to avoid blocking code running concurrently in this task + tokio::task::block_in_place(|| item.verify_single()), + ) + }) as fn(_) -> _, + ), ) }); @@ -62,7 +71,7 @@ pub struct Verifier { impl Default for Verifier { fn default() -> Self { let batch = batch::Verifier::default(); - let (tx, _) = channel(super::BROADCAST_BUFFER_SIZE); + let (tx, _) = channel(1); Self { batch, tx } } } @@ -112,7 +121,20 @@ impl Service> for Verifier { BatchControl::Flush => { tracing::trace!("got flush command"); let batch = mem::take(&mut self.batch); - let _ = self.tx.send(batch.verify(thread_rng())); + + // # Correctness + // + // Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. + // + // TODO: use spawn_blocking to avoid blocking code running concurrently in this task + let result = tokio::task::block_in_place(|| batch.verify(thread_rng())); + let _ = self.tx.send(result); + + // Use a new channel for each batch. + // TODO: replace with a watch channel (#4729) + let (tx, _) = channel(1); + let _ = mem::replace(&mut self.tx, tx); + Box::pin(async { Ok(()) }) } } diff --git a/zebra-consensus/src/primitives/redjubjub/tests.rs b/zebra-consensus/src/primitives/redjubjub/tests.rs index 0f0700094cf..0314c458ef1 100644 --- a/zebra-consensus/src/primitives/redjubjub/tests.rs +++ b/zebra-consensus/src/primitives/redjubjub/tests.rs @@ -45,7 +45,7 @@ where Ok(()) } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn batch_flushes_on_max_items_test() -> Result<()> { batch_flushes_on_max_items().await } @@ -64,7 +64,7 @@ async fn batch_flushes_on_max_items() -> Result<()> { Ok(()) } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn batch_flushes_on_max_latency_test() -> Result<()> { batch_flushes_on_max_latency().await } diff --git a/zebra-consensus/src/primitives/redpallas.rs b/zebra-consensus/src/primitives/redpallas.rs index 7fdb0d6719c..7acdbb82721 100644 --- a/zebra-consensus/src/primitives/redpallas.rs +++ b/zebra-consensus/src/primitives/redpallas.rs @@ -46,7 +46,16 @@ pub static VERIFIER: Lazy< // blocks have eldritch types whose names cannot be written. So instead, // we use a Ready to avoid an async block and cast the closure to a // function (which is possible because it doesn't capture any state). - tower::service_fn((|item: Item| ready(item.verify_single())) as fn(_) -> _), + tower::service_fn( + (|item: Item| { + ready( + // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. + // + // TODO: use spawn_blocking to avoid blocking code running concurrently in this task + tokio::task::block_in_place(|| item.verify_single()), + ) + }) as fn(_) -> _, + ), ) }); diff --git a/zebra-consensus/src/transaction/tests.rs b/zebra-consensus/src/transaction/tests.rs index db7c3eb8c35..92d67ea7a25 100644 --- a/zebra-consensus/src/transaction/tests.rs +++ b/zebra-consensus/src/transaction/tests.rs @@ -1360,74 +1360,72 @@ async fn v5_transaction_with_conflicting_transparent_spend_is_rejected() { /// - Test if an unsigned V4 transaction with a dummy [`sprout::JoinSplit`] is rejected. /// /// This test verifies if the transaction verifier correctly accepts a signed transaction. -#[test] -fn v4_with_signed_sprout_transfer_is_accepted() { +#[tokio::test(flavor = "multi_thread")] +async fn v4_with_signed_sprout_transfer_is_accepted() { zebra_test::init(); - zebra_test::RUNTIME.block_on(async { - let network = Network::Mainnet; - let (height, transaction) = test_transactions(network) - .rev() - .filter(|(_, transaction)| { - !transaction.is_coinbase() && transaction.inputs().is_empty() - }) - .find(|(_, transaction)| transaction.sprout_groth16_joinsplits().next().is_some()) - .expect("No transaction found with Groth16 JoinSplits"); + let network = Network::Mainnet; - let expected_hash = transaction.unmined_id(); + let (height, transaction) = test_transactions(network) + .rev() + .filter(|(_, transaction)| !transaction.is_coinbase() && transaction.inputs().is_empty()) + .find(|(_, transaction)| transaction.sprout_groth16_joinsplits().next().is_some()) + .expect("No transaction found with Groth16 JoinSplits"); - // Initialize the verifier - let state_service = - service_fn(|_| async { unreachable!("State service should not be called") }); - let verifier = Verifier::new(network, state_service); + let expected_hash = transaction.unmined_id(); - // Test the transaction verifier - let result = verifier - .clone() - .oneshot(Request::Block { - transaction, - known_utxos: Arc::new(HashMap::new()), - height, - time: chrono::MAX_DATETIME, - }) - .await; + // Initialize the verifier + let state_service = + service_fn(|_| async { unreachable!("State service should not be called") }); + let verifier = Verifier::new(network, state_service); - assert_eq!( - result.expect("unexpected error response").tx_id(), - expected_hash - ); - }); + // Test the transaction verifier + let result = verifier + .clone() + .oneshot(Request::Block { + transaction, + known_utxos: Arc::new(HashMap::new()), + height, + time: chrono::MAX_DATETIME, + }) + .await; + + assert_eq!( + result.expect("unexpected error response").tx_id(), + expected_hash + ); } /// Test if an V4 transaction with a modified [`sprout::JoinSplit`] is rejected. /// /// This test verifies if the transaction verifier correctly rejects the transaction because of the /// invalid JoinSplit. -#[test] -fn v4_with_modified_joinsplit_is_rejected() { +#[tokio::test(flavor = "multi_thread")] +async fn v4_with_modified_joinsplit_is_rejected() { zebra_test::init(); - zebra_test::RUNTIME.block_on(async { - v4_with_joinsplit_is_rejected_for_modification( - JoinSplitModification::CorruptSignature, - // TODO: Fix error downcast - // Err(TransactionError::Ed25519(ed25519::Error::InvalidSignature)) - TransactionError::InternalDowncastError( - "downcast to known transaction error type failed, original error: InvalidSignature" - .to_string(), - ), - ) - .await; - v4_with_joinsplit_is_rejected_for_modification( - JoinSplitModification::CorruptProof, - TransactionError::Groth16("proof verification failed".to_string()), - ) - .await; - v4_with_joinsplit_is_rejected_for_modification( - JoinSplitModification::ZeroProof, - TransactionError::MalformedGroth16("invalid G1".to_string()), - ) - .await; - }); + + v4_with_joinsplit_is_rejected_for_modification( + JoinSplitModification::CorruptSignature, + // TODO: Fix error downcast + // Err(TransactionError::Ed25519(ed25519::Error::InvalidSignature)) + TransactionError::InternalDowncastError( + "downcast to known transaction error type failed, original error: InvalidSignature" + .to_string(), + ), + ) + .await; + + v4_with_joinsplit_is_rejected_for_modification( + JoinSplitModification::CorruptProof, + TransactionError::Groth16("proof verification failed".to_string()), + ) + .await; + + v4_with_joinsplit_is_rejected_for_modification( + JoinSplitModification::ZeroProof, + TransactionError::MalformedGroth16("invalid G1".to_string()), + ) + .await; } async fn v4_with_joinsplit_is_rejected_for_modification( @@ -1552,46 +1550,43 @@ fn v4_with_duplicate_sapling_spends() { } /// Test if a V4 transaction with Sapling outputs but no spends is accepted by the verifier. -#[test] -fn v4_with_sapling_outputs_and_no_spends() { +#[tokio::test(flavor = "multi_thread")] +async fn v4_with_sapling_outputs_and_no_spends() { zebra_test::init(); - zebra_test::RUNTIME.block_on(async { - let network = Network::Mainnet; - let (height, transaction) = test_transactions(network) - .rev() - .filter(|(_, transaction)| { - !transaction.is_coinbase() && transaction.inputs().is_empty() - }) - .find(|(_, transaction)| { - transaction.sapling_spends_per_anchor().next().is_none() - && transaction.sapling_outputs().next().is_some() - }) - .expect("No transaction found with Sapling outputs and no Sapling spends"); + let network = Network::Mainnet; - let expected_hash = transaction.unmined_id(); + let (height, transaction) = test_transactions(network) + .rev() + .filter(|(_, transaction)| !transaction.is_coinbase() && transaction.inputs().is_empty()) + .find(|(_, transaction)| { + transaction.sapling_spends_per_anchor().next().is_none() + && transaction.sapling_outputs().next().is_some() + }) + .expect("No transaction found with Sapling outputs and no Sapling spends"); - // Initialize the verifier - let state_service = - service_fn(|_| async { unreachable!("State service should not be called") }); - let verifier = Verifier::new(network, state_service); + let expected_hash = transaction.unmined_id(); - // Test the transaction verifier - let result = verifier - .clone() - .oneshot(Request::Block { - transaction, - known_utxos: Arc::new(HashMap::new()), - height, - time: chrono::MAX_DATETIME, - }) - .await; + // Initialize the verifier + let state_service = + service_fn(|_| async { unreachable!("State service should not be called") }); + let verifier = Verifier::new(network, state_service); - assert_eq!( - result.expect("unexpected error response").tx_id(), - expected_hash - ); - }); + // Test the transaction verifier + let result = verifier + .clone() + .oneshot(Request::Block { + transaction, + known_utxos: Arc::new(HashMap::new()), + height, + time: chrono::MAX_DATETIME, + }) + .await; + + assert_eq!( + result.expect("unexpected error response").tx_id(), + expected_hash + ); } /// Test if a V5 transaction with Sapling spends is accepted by the verifier.