From ecfb7edb05922ad2d77a13bc32a3d55f16b7377b Mon Sep 17 00:00:00 2001 From: teor Date: Wed, 6 Apr 2022 23:19:13 +1000 Subject: [PATCH] fix(rpc): Check for panics in the transaction queue (#4046) * Check for panics in the RPC transaction queue * Add missing pin! and abort in the start task * Check for transaction queue panics in tests --- zebra-rpc/src/methods.rs | 25 +++------ zebra-rpc/src/methods/tests/prop.rs | 77 ++++++++++++++++++++++---- zebra-rpc/src/methods/tests/vectors.rs | 30 ++++++++-- zebra-rpc/src/server.rs | 18 ++++-- zebrad/src/commands/start.rs | 17 +++++- 5 files changed, 126 insertions(+), 41 deletions(-) diff --git a/zebra-rpc/src/methods.rs b/zebra-rpc/src/methods.rs index e4477cc04eb..6959fcfc057 100644 --- a/zebra-rpc/src/methods.rs +++ b/zebra-rpc/src/methods.rs @@ -14,8 +14,9 @@ use hex::{FromHex, ToHex}; use indexmap::IndexMap; use jsonrpc_core::{self, BoxFuture, Error, ErrorCode, Result}; use jsonrpc_derive::rpc; -use tokio::sync::broadcast::Sender; +use tokio::{sync::broadcast::Sender, task::JoinHandle}; use tower::{buffer::Buffer, Service, ServiceExt}; +use tracing::Instrument; use zebra_chain::{ block::{self, Height, SerializedBlock}, @@ -188,7 +189,7 @@ where state: State, latest_chain_tip: Tip, network: Network, - ) -> Self + ) -> (Self, JoinHandle<()>) where Version: ToString, >::Future: Send, @@ -206,21 +207,13 @@ where }; // run the process queue - let mut queue_task_handler = tokio::spawn(async move { - runner.run(mempool, state, latest_chain_tip, network).await; - }); - - // queue panic checker - tokio::spawn(async move { - loop { - let queue_task_handler = &mut queue_task_handler; - if queue_task_handler.await.is_err() { - panic!("Unexpected panic in the RPC queue"); - } - } - }); + let rpc_tx_queue_task_handle = tokio::spawn( + runner + .run(mempool, state, latest_chain_tip, network) + .in_current_span(), + ); - rpc_impl + (rpc_impl, rpc_tx_queue_task_handle) } } diff --git a/zebra-rpc/src/methods/tests/prop.rs b/zebra-rpc/src/methods/tests/prop.rs index 8544add0adf..2288ff9abfb 100644 --- a/zebra-rpc/src/methods/tests/prop.rs +++ b/zebra-rpc/src/methods/tests/prop.rs @@ -2,6 +2,7 @@ use std::collections::HashSet; +use futures::FutureExt; use hex::ToHex; use jsonrpc_core::{Error, ErrorCode}; use proptest::prelude::*; @@ -34,7 +35,7 @@ proptest! { runtime.block_on(async move { let mut mempool = MockService::build().for_prop_tests(); let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests(); - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), @@ -67,6 +68,10 @@ proptest! { prop_assert_eq!(result, Ok(hash)); + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + prop_assert!(matches!(rpc_tx_queue_task_result, None)); + Ok::<_, TestCaseError>(()) })?; } @@ -82,7 +87,7 @@ proptest! { let mut mempool = MockService::build().for_prop_tests(); let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests(); - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), @@ -122,6 +127,10 @@ proptest! { "Result is not a server error: {result:?}" ); + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + prop_assert!(matches!(rpc_tx_queue_task_result, None)); + Ok::<_, TestCaseError>(()) })?; } @@ -135,7 +144,7 @@ proptest! { let mut mempool = MockService::build().for_prop_tests(); let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests(); - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), @@ -176,6 +185,10 @@ proptest! { "Result is not a server error: {result:?}" ); + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + prop_assert!(matches!(rpc_tx_queue_task_result, None)); + Ok::<_, TestCaseError>(()) })?; } @@ -196,7 +209,7 @@ proptest! { let mut mempool = MockService::build().for_prop_tests(); let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests(); - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), @@ -224,6 +237,10 @@ proptest! { "Result is not an invalid parameters error: {result:?}" ); + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + prop_assert!(matches!(rpc_tx_queue_task_result, None)); + Ok::<_, TestCaseError>(()) })?; } @@ -246,7 +263,7 @@ proptest! { let mut mempool = MockService::build().for_prop_tests(); let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests(); - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), @@ -274,6 +291,10 @@ proptest! { "Result is not an invalid parameters error: {result:?}" ); + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + prop_assert!(matches!(rpc_tx_queue_task_result, None)); + Ok::<_, TestCaseError>(()) })?; } @@ -295,7 +316,7 @@ proptest! { let mut mempool = MockService::build().for_prop_tests(); let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests(); - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), @@ -323,6 +344,10 @@ proptest! { prop_assert_eq!(result, Ok(expected_response)); + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + prop_assert!(matches!(rpc_tx_queue_task_result, None)); + Ok::<_, TestCaseError>(()) })?; } @@ -343,7 +368,7 @@ proptest! { let mut mempool = MockService::build().for_prop_tests(); let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests(); - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), @@ -371,6 +396,10 @@ proptest! { "Result is not an invalid parameters error: {result:?}" ); + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + prop_assert!(matches!(rpc_tx_queue_task_result, None)); + Ok::<_, TestCaseError>(()) })?; } @@ -393,7 +422,7 @@ proptest! { let mut mempool = MockService::build().for_prop_tests(); let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests(); - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), @@ -420,6 +449,11 @@ proptest! { ), "Result is not an invalid parameters error: {result:?}" ); + + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + prop_assert!(matches!(rpc_tx_queue_task_result, None)); + Ok::<_, TestCaseError>(()) })?; } @@ -431,16 +465,23 @@ proptest! { let _guard = runtime.enter(); let mut mempool = MockService::build().for_prop_tests(); let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests(); + // look for an error with a `NoChainTip` - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), NoChainTip, network, ); + let response = rpc.get_blockchain_info(); prop_assert_eq!(&response.err().unwrap().message, "No Chain tip available yet"); + + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + prop_assert!(matches!(rpc_tx_queue_task_result, None)); + runtime.block_on(async move { mempool.expect_no_requests().await?; state.expect_no_requests().await?; @@ -471,7 +512,7 @@ proptest! { mock_chain_tip_sender.send_best_tip_block_time(block_time); // Start RPC with the mocked `ChainTip` - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), @@ -504,6 +545,10 @@ proptest! { }, }; + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + prop_assert!(matches!(rpc_tx_queue_task_result, None)); + // check no requests were made during this test runtime.block_on(async move { mempool.expect_no_requests().await?; @@ -528,7 +573,7 @@ proptest! { let mut mempool = MockService::build().for_prop_tests(); let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests(); - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), @@ -594,6 +639,10 @@ proptest! { mempool.expect_no_requests().await?; state.expect_no_requests().await?; + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + prop_assert!(matches!(rpc_tx_queue_task_result, None)); + Ok::<_, TestCaseError>(()) })?; } @@ -611,7 +660,7 @@ proptest! { let mut mempool = MockService::build().for_prop_tests(); let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests(); - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), @@ -686,6 +735,10 @@ proptest! { mempool.expect_no_requests().await?; state.expect_no_requests().await?; + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + prop_assert!(matches!(rpc_tx_queue_task_result, None)); + Ok::<_, TestCaseError>(()) })?; } diff --git a/zebra-rpc/src/methods/tests/vectors.rs b/zebra-rpc/src/methods/tests/vectors.rs index 8301fa5fd5f..2e5a49e17e7 100644 --- a/zebra-rpc/src/methods/tests/vectors.rs +++ b/zebra-rpc/src/methods/tests/vectors.rs @@ -25,7 +25,7 @@ async fn rpc_getinfo() { let mut mempool: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); let mut state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), @@ -45,6 +45,10 @@ async fn rpc_getinfo() { mempool.expect_no_requests().await; state.expect_no_requests().await; + + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + assert!(matches!(rpc_tx_queue_task_result, None)); } #[tokio::test] @@ -63,7 +67,7 @@ async fn rpc_getblock() { zebra_state::populated_state(blocks.clone(), Mainnet).await; // Init RPC - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), read_state, @@ -82,6 +86,10 @@ async fn rpc_getblock() { } mempool.expect_no_requests().await; + + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + assert!(matches!(rpc_tx_queue_task_result, None)); } #[tokio::test] @@ -92,7 +100,7 @@ async fn rpc_getblock_error() { let mut state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests(); // Init RPC - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), @@ -108,6 +116,10 @@ async fn rpc_getblock_error() { mempool.expect_no_requests().await; state.expect_no_requests().await; + + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + assert!(matches!(rpc_tx_queue_task_result, None)); } #[tokio::test] @@ -132,7 +144,7 @@ async fn rpc_getbestblockhash() { zebra_state::populated_state(blocks.clone(), Mainnet).await; // Init RPC - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), read_state, @@ -150,6 +162,10 @@ async fn rpc_getbestblockhash() { assert_eq!(response_hash, tip_block_hash); mempool.expect_no_requests().await; + + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + assert!(matches!(rpc_tx_queue_task_result, None)); } #[tokio::test] @@ -168,7 +184,7 @@ async fn rpc_getrawtransaction() { zebra_state::populated_state(blocks.clone(), Mainnet).await; // Init RPC - let rpc = RpcImpl::new( + let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", Buffer::new(mempool.clone(), 1), read_state, @@ -231,4 +247,8 @@ async fn rpc_getrawtransaction() { } } } + + // The queue task should continue without errors or panics + let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); + assert!(matches!(rpc_tx_queue_task_result, None)); } diff --git a/zebra-rpc/src/server.rs b/zebra-rpc/src/server.rs index 75ce5402d48..c759e32cb6d 100644 --- a/zebra-rpc/src/server.rs +++ b/zebra-rpc/src/server.rs @@ -9,6 +9,7 @@ use jsonrpc_core; use jsonrpc_http_server::ServerBuilder; +use tokio::task::JoinHandle; use tower::{buffer::Buffer, Service}; use tracing::*; use tracing_futures::Instrument; @@ -37,7 +38,7 @@ impl RpcServer { state: State, latest_chain_tip: Tip, network: Network, - ) -> tokio::task::JoinHandle<()> + ) -> (JoinHandle<()>, JoinHandle<()>) where Version: ToString, Mempool: tower::Service @@ -58,7 +59,8 @@ impl RpcServer { info!("Trying to open RPC endpoint at {}...", listen_addr,); // Initialize the rpc methods with the zebra version - let rpc_impl = RpcImpl::new(app_version, mempool, state, latest_chain_tip, network); + let (rpc_impl, rpc_tx_queue_task_handle) = + RpcImpl::new(app_version, mempool, state, latest_chain_tip, network); // Create handler compatible with V1 and V2 RPC protocols let mut io = @@ -87,10 +89,16 @@ impl RpcServer { }) }; - tokio::task::spawn_blocking(server) + ( + tokio::task::spawn_blocking(server), + rpc_tx_queue_task_handle, + ) } else { - // There is no RPC port, so the RPC task does nothing. - tokio::task::spawn(futures::future::pending().in_current_span()) + // There is no RPC port, so the RPC tasks do nothing. + ( + tokio::task::spawn(futures::future::pending().in_current_span()), + tokio::task::spawn(futures::future::pending().in_current_span()), + ) } } } diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index 01e3d093775..05b4d726fcb 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -160,7 +160,7 @@ impl StartCmd { .service(mempool); // Launch RPC server - let rpc_task_handle = RpcServer::spawn( + let (rpc_task_handle, rpc_tx_queue_task_handle) = RpcServer::spawn( config.rpc, app_version(), mempool.clone(), @@ -183,7 +183,7 @@ impl StartCmd { let syncer_task_handle = tokio::spawn(syncer.sync().in_current_span()); - let mut block_gossip_task_handle = tokio::spawn( + let block_gossip_task_handle = tokio::spawn( sync::gossip_best_tip_block_hashes( sync_status.clone(), chain_tip_change.clone(), @@ -218,7 +218,9 @@ impl StartCmd { // ongoing tasks pin!(rpc_task_handle); + pin!(rpc_tx_queue_task_handle); pin!(syncer_task_handle); + pin!(block_gossip_task_handle); pin!(mempool_crawler_task_handle); pin!(mempool_queue_checker_task_handle); pin!(tx_gossip_task_handle); @@ -240,6 +242,13 @@ impl StartCmd { Ok(()) } + rpc_tx_queue_result = &mut rpc_tx_queue_task_handle => { + rpc_tx_queue_result + .expect("unexpected panic in the rpc transaction queue task"); + info!("rpc transaction queue task exited"); + Ok(()) + } + sync_result = &mut syncer_task_handle => sync_result .expect("unexpected panic in the syncer task") .map(|_| info!("syncer task exited")), @@ -298,12 +307,14 @@ impl StartCmd { info!("exiting Zebra because an ongoing task exited: stopping other tasks"); // ongoing tasks + rpc_task_handle.abort(); + rpc_tx_queue_task_handle.abort(); syncer_task_handle.abort(); block_gossip_task_handle.abort(); mempool_crawler_task_handle.abort(); mempool_queue_checker_task_handle.abort(); tx_gossip_task_handle.abort(); - rpc_task_handle.abort(); + progress_task_handle.abort(); // startup tasks groth16_download_handle.abort();