Skip to content

Commit

Permalink
explorer-api: handle SIGTERM (#1482)
Browse files Browse the repository at this point in the history
* explorer-api: handle SIGTERM

* Update CHANGELOG
  • Loading branch information
neacsu authored Aug 1, 2022
1 parent 207c6cf commit e631219
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 41 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ Post 1.0.0 release, the changelog format is based on [Keep a Changelog](https://
- native & socks5 clients: rerun init will now reuse previous gateway configuration instead of failing ([#1353])
- native & socks5 clients: deduplicate big chunks of init logic
- validator: fixed local docker-compose setup to work on Apple M1 ([#1329])
- explorer-api: listen out for SIGTERM and SIGQUIT too, making it play nicely as a system service ([#1482]).

### Changed

Expand Down Expand Up @@ -73,6 +74,7 @@ Post 1.0.0 release, the changelog format is based on [Keep a Changelog](https://
[#1457]: https://github.com/nymtech/nym/pull/1457
[#1463]: https://github.com/nymtech/nym/pull/1463
[#1478]: https://github.com/nymtech/nym/pull/1478
[#1482]: https://github.com/nymtech/nym/pull/1482

## [nym-connect-v1.0.1](https://github.com/nymtech/nym/tree/nym-connect-v1.0.1) (2022-07-22)

Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions explorer-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@ tokio = {version = "1.19.1", features = ["full"] }

mixnet-contract-common = { path = "../common/cosmwasm-smart-contracts/mixnet-contract" }
network-defaults = { path = "../common/network-defaults" }
task = { path = "../common/task" }
validator-client = { path = "../common/client-libs/validator-client", features=["nymd-client"] }
19 changes: 13 additions & 6 deletions explorer-api/src/country_statistics/distribution.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use log::info;
use task::ShutdownListener;

use crate::country_statistics::country_nodes_distribution::CountryNodesDistribution;
use crate::COUNTRY_DATA_REFRESH_INTERVAL;
Expand All @@ -7,11 +8,12 @@ use crate::state::ExplorerApiStateContext;

pub(crate) struct CountryStatisticsDistributionTask {
state: ExplorerApiStateContext,
shutdown: ShutdownListener,
}

impl CountryStatisticsDistributionTask {
pub(crate) fn new(state: ExplorerApiStateContext) -> Self {
CountryStatisticsDistributionTask { state }
pub(crate) fn new(state: ExplorerApiStateContext, shutdown: ShutdownListener) -> Self {
CountryStatisticsDistributionTask { state, shutdown }
}

pub(crate) fn start(mut self) {
Expand All @@ -20,10 +22,15 @@ impl CountryStatisticsDistributionTask {
let mut interval_timer = tokio::time::interval(std::time::Duration::from_secs(
COUNTRY_DATA_REFRESH_INTERVAL,
));
loop {
// wait for the next interval tick
interval_timer.tick().await;
self.calculate_nodes_per_country().await;
while !self.shutdown.is_shutdown() {
tokio::select! {
_ = interval_timer.tick() => {
self.calculate_nodes_per_country().await;
}
_ = self.shutdown.recv() => {
trace!("Listener: Received shutdown");
}
}
}
});
}
Expand Down
19 changes: 13 additions & 6 deletions explorer-api/src/country_statistics/geolocate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@ use crate::mix_nodes::location::{GeoLocation, Location};
use crate::state::ExplorerApiStateContext;
use log::{info, warn};
use reqwest::Error as ReqwestError;
use task::ShutdownListener;
use thiserror::Error;

pub(crate) struct GeoLocateTask {
state: ExplorerApiStateContext,
shutdown: ShutdownListener,
}

impl GeoLocateTask {
pub(crate) fn new(state: ExplorerApiStateContext) -> Self {
GeoLocateTask { state }
pub(crate) fn new(state: ExplorerApiStateContext, shutdown: ShutdownListener) -> Self {
GeoLocateTask { state, shutdown }
}

pub(crate) fn start(mut self) {
Expand All @@ -27,10 +29,15 @@ impl GeoLocateTask {
info!("Spawning mix node locator task runner...");
tokio::spawn(async move {
let mut interval_timer = tokio::time::interval(std::time::Duration::from_millis(50));
loop {
// wait for the next interval tick
interval_timer.tick().await;
self.locate_mix_nodes().await;
while !self.shutdown.is_shutdown() {
tokio::select! {
_ = interval_timer.tick() => {
self.locate_mix_nodes().await;
}
_ = self.shutdown.recv() => {
trace!("Listener: Received shutdown");
}
}
}
});
}
Expand Down
59 changes: 47 additions & 12 deletions explorer-api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ extern crate rocket_okapi;
use clap::Parser;
use log::info;
use network_defaults::setup_env;
use task::ShutdownNotifier;

pub(crate) mod cache;
mod client;
Expand Down Expand Up @@ -50,29 +51,63 @@ impl ExplorerApi {
let validator_api_url = self.state.inner.validator_client.api_endpoint();
info!("Using validator API - {}", validator_api_url);

let shutdown = ShutdownNotifier::default();

// spawn concurrent tasks
crate::tasks::ExplorerApiTasks::new(self.state.clone()).start();
crate::tasks::ExplorerApiTasks::new(self.state.clone(), shutdown.subscribe()).start();
country_statistics::distribution::CountryStatisticsDistributionTask::new(
self.state.clone(),
shutdown.subscribe(),
)
.start();
country_statistics::geolocate::GeoLocateTask::new(self.state.clone()).start();
country_statistics::geolocate::GeoLocateTask::new(self.state.clone(), shutdown.subscribe())
.start();

// Rocket handles shutdown on it's own, but its shutdown handling should be incorporated
// with that of the rest of the tasks.
// Currently it's runtime is forcefully terminated once the explorer-api exits.
http::start(self.state.clone());

// wait for user to press ctrl+C
self.wait_for_interrupt().await
self.wait_for_interrupt(shutdown).await
}

async fn wait_for_interrupt(&self, mut shutdown: ShutdownNotifier) {
wait_for_signal().await;

log::info!("Sending shutdown");
shutdown.signal_shutdown().ok();
log::info!("Waiting for tasks to finish... (Press ctrl-c to force)");
shutdown.wait_for_shutdown().await;
log::info!("Stopping explorer API");
}
}

async fn wait_for_interrupt(&self) {
if let Err(e) = tokio::signal::ctrl_c().await {
error!(
"There was an error while capturing SIGINT - {:?}. We will terminate regardless",
e
);
#[cfg(unix)]
async fn wait_for_signal() {
use tokio::signal::unix::{signal, SignalKind};
let mut sigterm = signal(SignalKind::terminate()).expect("Failed to setup SIGTERM channel");
let mut sigquit = signal(SignalKind::quit()).expect("Failed to setup SIGQUIT channel");

tokio::select! {
_ = tokio::signal::ctrl_c() => {
log::info!("Received SIGINT");
},
_ = sigterm.recv() => {
log::info!("Received SIGTERM");
}
_ = sigquit.recv() => {
log::info!("Received SIGQUIT");
}
info!(
"Received SIGINT - the mixnode will terminate now (threads are not yet nicely stopped, if you see stack traces that's alright)."
);
}
}

#[cfg(not(unix))]
async fn wait_for_signal() {
tokio::select! {
_ = tokio::signal::ctrl_c() => {
log::info!("Received SIGINT");
},
}
}

Expand Down
40 changes: 23 additions & 17 deletions explorer-api/src/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::future::Future;

use mixnet_contract_common::GatewayBond;
use task::ShutdownListener;
use validator_client::models::MixNodeBondAnnotated;
use validator_client::nymd::error::NymdError;
use validator_client::nymd::{Paging, QueryNymdClient, ValidatorResponse};
Expand All @@ -14,11 +15,12 @@ use crate::state::ExplorerApiStateContext;

pub(crate) struct ExplorerApiTasks {
state: ExplorerApiStateContext,
shutdown: ShutdownListener,
}

impl ExplorerApiTasks {
pub(crate) fn new(state: ExplorerApiStateContext) -> Self {
ExplorerApiTasks { state }
pub(crate) fn new(state: ExplorerApiStateContext, shutdown: ShutdownListener) -> Self {
ExplorerApiTasks { state, shutdown }
}

// a helper to remove duplicate code when grabbing active/rewarded/all mixnodes
Expand Down Expand Up @@ -128,24 +130,28 @@ impl ExplorerApiTasks {
}
}

pub(crate) fn start(self) {
pub(crate) fn start(mut self) {
info!("Spawning mix nodes task runner...");
tokio::spawn(async move {
let mut interval_timer = tokio::time::interval(CACHE_REFRESH_RATE);
loop {
// wait for the next interval tick
interval_timer.tick().await;

info!("Updating validator cache...");
self.update_validators_cache().await;
info!("Done");

info!("Updating gateway cache...");
self.update_gateways_cache().await;
info!("Done");

info!("Updating mix node cache...");
self.update_mixnode_cache().await;
while !self.shutdown.is_shutdown() {
tokio::select! {
_ = interval_timer.tick() => {
info!("Updating validator cache...");
self.update_validators_cache().await;
info!("Done");

info!("Updating gateway cache...");
self.update_gateways_cache().await;
info!("Done");

info!("Updating mix node cache...");
self.update_mixnode_cache().await;
}
_ = self.shutdown.recv() => {
trace!("Listener: Received shutdown");
}
}
}
});
}
Expand Down

0 comments on commit e631219

Please sign in to comment.