Skip to content

Commit

Permalink
fix(rpc): Check for panics in the transaction queue (#4046)
Browse files Browse the repository at this point in the history
* Check for panics in the RPC transaction queue

* Add missing pin! and abort in the start task

* Check for transaction queue panics in tests
  • Loading branch information
teor2345 authored Apr 6, 2022
1 parent 4e3229b commit ecfb7ed
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 41 deletions.
25 changes: 9 additions & 16 deletions zebra-rpc/src/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ use hex::{FromHex, ToHex};
use indexmap::IndexMap;
use jsonrpc_core::{self, BoxFuture, Error, ErrorCode, Result};
use jsonrpc_derive::rpc;
use tokio::sync::broadcast::Sender;
use tokio::{sync::broadcast::Sender, task::JoinHandle};
use tower::{buffer::Buffer, Service, ServiceExt};
use tracing::Instrument;

use zebra_chain::{
block::{self, Height, SerializedBlock},
Expand Down Expand Up @@ -188,7 +189,7 @@ where
state: State,
latest_chain_tip: Tip,
network: Network,
) -> Self
) -> (Self, JoinHandle<()>)
where
Version: ToString,
<Mempool as Service<mempool::Request>>::Future: Send,
Expand All @@ -206,21 +207,13 @@ where
};

// run the process queue
let mut queue_task_handler = tokio::spawn(async move {
runner.run(mempool, state, latest_chain_tip, network).await;
});

// queue panic checker
tokio::spawn(async move {
loop {
let queue_task_handler = &mut queue_task_handler;
if queue_task_handler.await.is_err() {
panic!("Unexpected panic in the RPC queue");
}
}
});
let rpc_tx_queue_task_handle = tokio::spawn(
runner
.run(mempool, state, latest_chain_tip, network)
.in_current_span(),
);

rpc_impl
(rpc_impl, rpc_tx_queue_task_handle)
}
}

Expand Down
77 changes: 65 additions & 12 deletions zebra-rpc/src/methods/tests/prop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use std::collections::HashSet;

use futures::FutureExt;
use hex::ToHex;
use jsonrpc_core::{Error, ErrorCode};
use proptest::prelude::*;
Expand Down Expand Up @@ -34,7 +35,7 @@ proptest! {
runtime.block_on(async move {
let mut mempool = MockService::build().for_prop_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests();
let rpc = RpcImpl::new(
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
Expand Down Expand Up @@ -67,6 +68,10 @@ proptest! {

prop_assert_eq!(result, Ok(hash));

// The queue task should continue without errors or panics
let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never();
prop_assert!(matches!(rpc_tx_queue_task_result, None));

Ok::<_, TestCaseError>(())
})?;
}
Expand All @@ -82,7 +87,7 @@ proptest! {
let mut mempool = MockService::build().for_prop_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests();

let rpc = RpcImpl::new(
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
Expand Down Expand Up @@ -122,6 +127,10 @@ proptest! {
"Result is not a server error: {result:?}"
);

// The queue task should continue without errors or panics
let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never();
prop_assert!(matches!(rpc_tx_queue_task_result, None));

Ok::<_, TestCaseError>(())
})?;
}
Expand All @@ -135,7 +144,7 @@ proptest! {
let mut mempool = MockService::build().for_prop_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests();

let rpc = RpcImpl::new(
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
Expand Down Expand Up @@ -176,6 +185,10 @@ proptest! {
"Result is not a server error: {result:?}"
);

// The queue task should continue without errors or panics
let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never();
prop_assert!(matches!(rpc_tx_queue_task_result, None));

Ok::<_, TestCaseError>(())
})?;
}
Expand All @@ -196,7 +209,7 @@ proptest! {
let mut mempool = MockService::build().for_prop_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests();

let rpc = RpcImpl::new(
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
Expand Down Expand Up @@ -224,6 +237,10 @@ proptest! {
"Result is not an invalid parameters error: {result:?}"
);

// The queue task should continue without errors or panics
let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never();
prop_assert!(matches!(rpc_tx_queue_task_result, None));

Ok::<_, TestCaseError>(())
})?;
}
Expand All @@ -246,7 +263,7 @@ proptest! {
let mut mempool = MockService::build().for_prop_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests();

let rpc = RpcImpl::new(
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
Expand Down Expand Up @@ -274,6 +291,10 @@ proptest! {
"Result is not an invalid parameters error: {result:?}"
);

// The queue task should continue without errors or panics
let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never();
prop_assert!(matches!(rpc_tx_queue_task_result, None));

Ok::<_, TestCaseError>(())
})?;
}
Expand All @@ -295,7 +316,7 @@ proptest! {
let mut mempool = MockService::build().for_prop_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests();

let rpc = RpcImpl::new(
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
Expand Down Expand Up @@ -323,6 +344,10 @@ proptest! {

prop_assert_eq!(result, Ok(expected_response));

// The queue task should continue without errors or panics
let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never();
prop_assert!(matches!(rpc_tx_queue_task_result, None));

Ok::<_, TestCaseError>(())
})?;
}
Expand All @@ -343,7 +368,7 @@ proptest! {
let mut mempool = MockService::build().for_prop_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests();

let rpc = RpcImpl::new(
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
Expand Down Expand Up @@ -371,6 +396,10 @@ proptest! {
"Result is not an invalid parameters error: {result:?}"
);

// The queue task should continue without errors or panics
let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never();
prop_assert!(matches!(rpc_tx_queue_task_result, None));

Ok::<_, TestCaseError>(())
})?;
}
Expand All @@ -393,7 +422,7 @@ proptest! {
let mut mempool = MockService::build().for_prop_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests();

let rpc = RpcImpl::new(
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
Expand All @@ -420,6 +449,11 @@ proptest! {
),
"Result is not an invalid parameters error: {result:?}"
);

// The queue task should continue without errors or panics
let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never();
prop_assert!(matches!(rpc_tx_queue_task_result, None));

Ok::<_, TestCaseError>(())
})?;
}
Expand All @@ -431,16 +465,23 @@ proptest! {
let _guard = runtime.enter();
let mut mempool = MockService::build().for_prop_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests();

// look for an error with a `NoChainTip`
let rpc = RpcImpl::new(
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
NoChainTip,
network,
);

let response = rpc.get_blockchain_info();
prop_assert_eq!(&response.err().unwrap().message, "No Chain tip available yet");

// The queue task should continue without errors or panics
let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never();
prop_assert!(matches!(rpc_tx_queue_task_result, None));

runtime.block_on(async move {
mempool.expect_no_requests().await?;
state.expect_no_requests().await?;
Expand Down Expand Up @@ -471,7 +512,7 @@ proptest! {
mock_chain_tip_sender.send_best_tip_block_time(block_time);

// Start RPC with the mocked `ChainTip`
let rpc = RpcImpl::new(
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
Expand Down Expand Up @@ -504,6 +545,10 @@ proptest! {
},
};

// The queue task should continue without errors or panics
let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never();
prop_assert!(matches!(rpc_tx_queue_task_result, None));

// check no requests were made during this test
runtime.block_on(async move {
mempool.expect_no_requests().await?;
Expand All @@ -528,7 +573,7 @@ proptest! {
let mut mempool = MockService::build().for_prop_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests();

let rpc = RpcImpl::new(
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
Expand Down Expand Up @@ -594,6 +639,10 @@ proptest! {
mempool.expect_no_requests().await?;
state.expect_no_requests().await?;

// The queue task should continue without errors or panics
let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never();
prop_assert!(matches!(rpc_tx_queue_task_result, None));

Ok::<_, TestCaseError>(())
})?;
}
Expand All @@ -611,7 +660,7 @@ proptest! {
let mut mempool = MockService::build().for_prop_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests();

let rpc = RpcImpl::new(
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
Expand Down Expand Up @@ -686,6 +735,10 @@ proptest! {
mempool.expect_no_requests().await?;
state.expect_no_requests().await?;

// The queue task should continue without errors or panics
let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never();
prop_assert!(matches!(rpc_tx_queue_task_result, None));

Ok::<_, TestCaseError>(())
})?;
}
Expand Down
30 changes: 25 additions & 5 deletions zebra-rpc/src/methods/tests/vectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async fn rpc_getinfo() {
let mut mempool: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests();

let rpc = RpcImpl::new(
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
Expand All @@ -45,6 +45,10 @@ async fn rpc_getinfo() {

mempool.expect_no_requests().await;
state.expect_no_requests().await;

// The queue task should continue without errors or panics
let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never();
assert!(matches!(rpc_tx_queue_task_result, None));
}

#[tokio::test]
Expand All @@ -63,7 +67,7 @@ async fn rpc_getblock() {
zebra_state::populated_state(blocks.clone(), Mainnet).await;

// Init RPC
let rpc = RpcImpl::new(
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
read_state,
Expand All @@ -82,6 +86,10 @@ async fn rpc_getblock() {
}

mempool.expect_no_requests().await;

// The queue task should continue without errors or panics
let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never();
assert!(matches!(rpc_tx_queue_task_result, None));
}

#[tokio::test]
Expand All @@ -92,7 +100,7 @@ async fn rpc_getblock_error() {
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests();

// Init RPC
let rpc = RpcImpl::new(
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
Expand All @@ -108,6 +116,10 @@ async fn rpc_getblock_error() {

mempool.expect_no_requests().await;
state.expect_no_requests().await;

// The queue task should continue without errors or panics
let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never();
assert!(matches!(rpc_tx_queue_task_result, None));
}

#[tokio::test]
Expand All @@ -132,7 +144,7 @@ async fn rpc_getbestblockhash() {
zebra_state::populated_state(blocks.clone(), Mainnet).await;

// Init RPC
let rpc = RpcImpl::new(
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
read_state,
Expand All @@ -150,6 +162,10 @@ async fn rpc_getbestblockhash() {
assert_eq!(response_hash, tip_block_hash);

mempool.expect_no_requests().await;

// The queue task should continue without errors or panics
let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never();
assert!(matches!(rpc_tx_queue_task_result, None));
}

#[tokio::test]
Expand All @@ -168,7 +184,7 @@ async fn rpc_getrawtransaction() {
zebra_state::populated_state(blocks.clone(), Mainnet).await;

// Init RPC
let rpc = RpcImpl::new(
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
read_state,
Expand Down Expand Up @@ -231,4 +247,8 @@ async fn rpc_getrawtransaction() {
}
}
}

// The queue task should continue without errors or panics
let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never();
assert!(matches!(rpc_tx_queue_task_result, None));
}
Loading

0 comments on commit ecfb7ed

Please sign in to comment.