Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(rpc): Check for panics in the transaction queue #4046

Merged
merged 3 commits into from
Apr 6, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Check for panics in the RPC transaction queue
  • Loading branch information
teor2345 committed Apr 6, 2022
commit 36200c486902fb5da46120e8ff59f372b300e5c6
25 changes: 9 additions & 16 deletions zebra-rpc/src/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ use hex::{FromHex, ToHex};
use indexmap::IndexMap;
use jsonrpc_core::{self, BoxFuture, Error, ErrorCode, Result};
use jsonrpc_derive::rpc;
use tokio::sync::broadcast::Sender;
use tokio::{sync::broadcast::Sender, task::JoinHandle};
use tower::{buffer::Buffer, Service, ServiceExt};
use tracing::Instrument;

use zebra_chain::{
block::{self, Height, SerializedBlock},
Expand Down Expand Up @@ -188,7 +189,7 @@ where
state: State,
latest_chain_tip: Tip,
network: Network,
) -> Self
) -> (Self, JoinHandle<()>)
where
Version: ToString,
<Mempool as Service<mempool::Request>>::Future: Send,
Expand All @@ -206,21 +207,13 @@ where
};

// run the process queue
let mut queue_task_handler = tokio::spawn(async move {
runner.run(mempool, state, latest_chain_tip, network).await;
});

// queue panic checker
tokio::spawn(async move {
loop {
let queue_task_handler = &mut queue_task_handler;
if queue_task_handler.await.is_err() {
panic!("Unexpected panic in the RPC queue");
}
}
});
let rpc_tx_queue_task_handle = tokio::spawn(
runner
.run(mempool, state, latest_chain_tip, network)
.in_current_span(),
);

rpc_impl
(rpc_impl, rpc_tx_queue_task_handle)
}
}

Expand Down
24 changes: 12 additions & 12 deletions zebra-rpc/src/methods/tests/prop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ proptest! {
runtime.block_on(async move {
let mut mempool = MockService::build().for_prop_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests();
let rpc = RpcImpl::new(
let (rpc, _rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
Expand Down Expand Up @@ -82,7 +82,7 @@ proptest! {
let mut mempool = MockService::build().for_prop_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests();

let rpc = RpcImpl::new(
let (rpc, _rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
Expand Down Expand Up @@ -135,7 +135,7 @@ proptest! {
let mut mempool = MockService::build().for_prop_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests();

let rpc = RpcImpl::new(
let (rpc, _rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
Expand Down Expand Up @@ -196,7 +196,7 @@ proptest! {
let mut mempool = MockService::build().for_prop_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests();

let rpc = RpcImpl::new(
let (rpc, _rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
Expand Down Expand Up @@ -246,7 +246,7 @@ proptest! {
let mut mempool = MockService::build().for_prop_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests();

let rpc = RpcImpl::new(
let (rpc, _rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
Expand Down Expand Up @@ -295,7 +295,7 @@ proptest! {
let mut mempool = MockService::build().for_prop_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests();

let rpc = RpcImpl::new(
let (rpc, _rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
Expand Down Expand Up @@ -343,7 +343,7 @@ proptest! {
let mut mempool = MockService::build().for_prop_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests();

let rpc = RpcImpl::new(
let (rpc, _rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
Expand Down Expand Up @@ -393,7 +393,7 @@ proptest! {
let mut mempool = MockService::build().for_prop_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests();

let rpc = RpcImpl::new(
let (rpc, _rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
Expand Down Expand Up @@ -432,7 +432,7 @@ proptest! {
let mut mempool = MockService::build().for_prop_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests();
// look for an error with a `NoChainTip`
let rpc = RpcImpl::new(
let (rpc, _rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
Expand Down Expand Up @@ -471,7 +471,7 @@ proptest! {
mock_chain_tip_sender.send_best_tip_block_time(block_time);

// Start RPC with the mocked `ChainTip`
let rpc = RpcImpl::new(
let (rpc, _rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
Expand Down Expand Up @@ -528,7 +528,7 @@ proptest! {
let mut mempool = MockService::build().for_prop_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests();

let rpc = RpcImpl::new(
let (rpc, _rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
Expand Down Expand Up @@ -611,7 +611,7 @@ proptest! {
let mut mempool = MockService::build().for_prop_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests();

let rpc = RpcImpl::new(
let (rpc, _rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
Expand Down
10 changes: 5 additions & 5 deletions zebra-rpc/src/methods/tests/vectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async fn rpc_getinfo() {
let mut mempool: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests();

let rpc = RpcImpl::new(
let (rpc, _rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
Expand Down Expand Up @@ -63,7 +63,7 @@ async fn rpc_getblock() {
zebra_state::populated_state(blocks.clone(), Mainnet).await;

// Init RPC
let rpc = RpcImpl::new(
let (rpc, _rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
read_state,
Expand Down Expand Up @@ -92,7 +92,7 @@ async fn rpc_getblock_error() {
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests();

// Init RPC
let rpc = RpcImpl::new(
let (rpc, _rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
Expand Down Expand Up @@ -132,7 +132,7 @@ async fn rpc_getbestblockhash() {
zebra_state::populated_state(blocks.clone(), Mainnet).await;

// Init RPC
let rpc = RpcImpl::new(
let (rpc, _rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
read_state,
Expand Down Expand Up @@ -168,7 +168,7 @@ async fn rpc_getrawtransaction() {
zebra_state::populated_state(blocks.clone(), Mainnet).await;

// Init RPC
let rpc = RpcImpl::new(
let (rpc, _rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
read_state,
Expand Down
18 changes: 13 additions & 5 deletions zebra-rpc/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

use jsonrpc_core;
use jsonrpc_http_server::ServerBuilder;
use tokio::task::JoinHandle;
use tower::{buffer::Buffer, Service};
use tracing::*;
use tracing_futures::Instrument;
Expand Down Expand Up @@ -37,7 +38,7 @@ impl RpcServer {
state: State,
latest_chain_tip: Tip,
network: Network,
) -> tokio::task::JoinHandle<()>
) -> (JoinHandle<()>, JoinHandle<()>)
where
Version: ToString,
Mempool: tower::Service<mempool::Request, Response = mempool::Response, Error = BoxError>
Expand All @@ -58,7 +59,8 @@ impl RpcServer {
info!("Trying to open RPC endpoint at {}...", listen_addr,);

// Initialize the rpc methods with the zebra version
let rpc_impl = RpcImpl::new(app_version, mempool, state, latest_chain_tip, network);
let (rpc_impl, rpc_tx_queue_task_handle) =
RpcImpl::new(app_version, mempool, state, latest_chain_tip, network);

// Create handler compatible with V1 and V2 RPC protocols
let mut io =
Expand Down Expand Up @@ -87,10 +89,16 @@ impl RpcServer {
})
};

tokio::task::spawn_blocking(server)
(
tokio::task::spawn_blocking(server),
rpc_tx_queue_task_handle,
)
} else {
// There is no RPC port, so the RPC task does nothing.
tokio::task::spawn(futures::future::pending().in_current_span())
// There is no RPC port, so the RPC tasks do nothing.
(
tokio::task::spawn(futures::future::pending().in_current_span()),
tokio::task::spawn(futures::future::pending().in_current_span()),
)
}
}
}
13 changes: 11 additions & 2 deletions zebrad/src/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ impl StartCmd {
.service(mempool);

// Launch RPC server
let rpc_task_handle = RpcServer::spawn(
let (rpc_task_handle, rpc_tx_queue_task_handle) = RpcServer::spawn(
config.rpc,
app_version(),
mempool.clone(),
Expand Down Expand Up @@ -218,6 +218,7 @@ impl StartCmd {

// ongoing tasks
pin!(rpc_task_handle);
pin!(rpc_tx_queue_task_handle);
pin!(syncer_task_handle);
pin!(mempool_crawler_task_handle);
pin!(mempool_queue_checker_task_handle);
Expand All @@ -240,6 +241,13 @@ impl StartCmd {
Ok(())
}

rpc_tx_queue_result = &mut rpc_tx_queue_task_handle => {
rpc_tx_queue_result
.expect("unexpected panic in the rpc transaction queue task");
info!("rpc transaction queue task exited");
Ok(())
}

sync_result = &mut syncer_task_handle => sync_result
.expect("unexpected panic in the syncer task")
.map(|_| info!("syncer task exited")),
Expand Down Expand Up @@ -298,12 +306,13 @@ impl StartCmd {
info!("exiting Zebra because an ongoing task exited: stopping other tasks");

// ongoing tasks
rpc_task_handle.abort();
rpc_tx_queue_task_handle.abort();
syncer_task_handle.abort();
block_gossip_task_handle.abort();
mempool_crawler_task_handle.abort();
mempool_queue_checker_task_handle.abort();
tx_gossip_task_handle.abort();
rpc_task_handle.abort();

// startup tasks
groth16_download_handle.abort();
Expand Down