Skip to content

Commit d2e75e3

Browse files
committed
refactor: [#1405] gracefull shutdown for listeners
Events listeners listen for the cancelation request instead of directly for the CRTR+c signal. This will allow implementing centralized policies for shutdown and alternative conditions.
1 parent 1ad19e7 commit d2e75e3

File tree

33 files changed

+206
-84
lines changed

33 files changed

+206
-84
lines changed

Cargo.lock

Lines changed: 8 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ serde = { version = "1", features = ["derive"] }
4747
serde_json = { version = "1", features = ["preserve_order"] }
4848
thiserror = "2.0.12"
4949
tokio = { version = "1", features = ["macros", "net", "rt-multi-thread", "signal", "sync"] }
50+
tokio-util = "0.7.15"
5051
torrust-axum-health-check-api-server = { version = "3.0.0-develop", path = "packages/axum-health-check-api-server" }
5152
torrust-axum-http-tracker-server = { version = "3.0.0-develop", path = "packages/axum-http-tracker-server" }
5253
torrust-axum-rest-tracker-api-server = { version = "3.0.0-develop", path = "packages/axum-rest-tracker-api-server" }

packages/axum-http-tracker-server/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ hyper = "1"
2828
reqwest = { version = "0", features = ["json"] }
2929
serde = { version = "1", features = ["derive"] }
3030
tokio = { version = "1", features = ["macros", "net", "rt-multi-thread", "signal", "sync"] }
31+
tokio-util = "0.7.15"
3132
torrust-axum-server = { version = "3.0.0-develop", path = "../axum-server" }
3233
torrust-server-lib = { version = "3.0.0-develop", path = "../server-lib" }
3334
torrust-tracker-clock = { version = "3.0.0-develop", path = "../clock" }

packages/axum-http-tracker-server/src/environment.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use bittorrent_primitives::info_hash::InfoHash;
66
use bittorrent_tracker_core::container::TrackerCoreContainer;
77
use futures::executor::block_on;
88
use tokio::task::JoinHandle;
9+
use tokio_util::sync::CancellationToken;
910
use torrust_axum_server::tsl::make_rust_tls;
1011
use torrust_server_lib::registar::Registar;
1112
use torrust_tracker_configuration::{logging, Configuration};
@@ -21,6 +22,7 @@ pub struct Environment<S> {
2122
pub registar: Registar,
2223
pub server: HttpServer<S>,
2324
pub event_listener_job: Option<JoinHandle<()>>,
25+
pub cancellation_token: CancellationToken,
2426
}
2527

2628
impl<S> Environment<S> {
@@ -59,6 +61,7 @@ impl Environment<Stopped> {
5961
registar: Registar::default(),
6062
server,
6163
event_listener_job: None,
64+
cancellation_token: CancellationToken::new(),
6265
}
6366
}
6467

@@ -72,6 +75,7 @@ impl Environment<Stopped> {
7275
// Start the event listener
7376
let event_listener_job = run_event_listener(
7477
self.container.http_tracker_core_container.event_bus.receiver(),
78+
self.cancellation_token.clone(),
7579
&self.container.http_tracker_core_container.stats_repository,
7680
);
7781

@@ -87,6 +91,7 @@ impl Environment<Stopped> {
8791
registar: self.registar.clone(),
8892
server,
8993
event_listener_job: Some(event_listener_job),
94+
cancellation_token: self.cancellation_token,
9095
}
9196
}
9297
}
@@ -117,6 +122,7 @@ impl Environment<Running> {
117122
registar: Registar::default(),
118123
server,
119124
event_listener_job: None,
125+
cancellation_token: self.cancellation_token,
120126
}
121127
}
122128

packages/axum-http-tracker-server/src/server.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,7 @@ mod tests {
256256
use bittorrent_http_tracker_core::statistics::event::listener::run_event_listener;
257257
use bittorrent_http_tracker_core::statistics::repository::Repository;
258258
use bittorrent_tracker_core::container::TrackerCoreContainer;
259+
use tokio_util::sync::CancellationToken;
259260
use torrust_axum_server::tsl::make_rust_tls;
260261
use torrust_server_lib::registar::Registar;
261262
use torrust_tracker_configuration::{logging, Configuration};
@@ -265,6 +266,8 @@ mod tests {
265266
use crate::server::{HttpServer, Launcher};
266267

267268
pub fn initialize_container(configuration: &Configuration) -> HttpTrackerCoreContainer {
269+
let cancellation_token = CancellationToken::new();
270+
268271
let core_config = Arc::new(configuration.core.clone());
269272

270273
let http_trackers = configuration
@@ -287,7 +290,7 @@ mod tests {
287290
let http_stats_event_sender = http_stats_event_bus.sender();
288291

289292
if configuration.core.tracker_usage_statistics {
290-
let _unused = run_event_listener(http_stats_event_bus.receiver(), &http_stats_repository);
293+
let _unused = run_event_listener(http_stats_event_bus.receiver(), cancellation_token, &http_stats_repository);
291294
}
292295

293296
let swarm_coordination_registry_container = Arc::new(SwarmCoordinationRegistryContainer::initialize(

packages/axum-http-tracker-server/src/v1/handlers/announce.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ mod tests {
123123
use bittorrent_tracker_core::torrent::repository::in_memory::InMemoryTorrentRepository;
124124
use bittorrent_tracker_core::whitelist::authorization::WhitelistAuthorization;
125125
use bittorrent_tracker_core::whitelist::repository::in_memory::InMemoryWhitelist;
126+
use tokio_util::sync::CancellationToken;
126127
use torrust_tracker_configuration::Configuration;
127128
use torrust_tracker_test_helpers::configuration;
128129

@@ -149,6 +150,9 @@ mod tests {
149150
}
150151

151152
fn initialize_core_tracker_services(config: &Configuration) -> CoreHttpTrackerServices {
153+
let cancellation_token = CancellationToken::new();
154+
155+
// Initialize the core tracker services with the provided configuration.
152156
let core_config = Arc::new(config.core.clone());
153157
let database = initialize_database(&config.core);
154158
let in_memory_whitelist = Arc::new(InMemoryWhitelist::default());
@@ -175,7 +179,7 @@ mod tests {
175179
let http_stats_event_sender = http_stats_event_bus.sender();
176180

177181
if config.core.tracker_usage_statistics {
178-
let _unused = run_event_listener(http_stats_event_bus.receiver(), &http_stats_repository);
182+
let _unused = run_event_listener(http_stats_event_bus.receiver(), cancellation_token, &http_stats_repository);
179183
}
180184

181185
let announce_service = Arc::new(AnnounceService::new(

packages/axum-http-tracker-server/src/v1/handlers/scrape.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ mod tests {
9797
use bittorrent_tracker_core::torrent::repository::in_memory::InMemoryTorrentRepository;
9898
use bittorrent_tracker_core::whitelist::authorization::WhitelistAuthorization;
9999
use bittorrent_tracker_core::whitelist::repository::in_memory::InMemoryWhitelist;
100+
use tokio_util::sync::CancellationToken;
100101
use torrust_tracker_configuration::{Configuration, Core};
101102
use torrust_tracker_test_helpers::configuration;
102103

@@ -127,6 +128,8 @@ mod tests {
127128
}
128129

129130
fn initialize_core_tracker_services(config: &Configuration) -> (CoreTrackerServices, CoreHttpTrackerServices) {
131+
let cancellation_token = CancellationToken::new();
132+
130133
let core_config = Arc::new(config.core.clone());
131134
let in_memory_whitelist = Arc::new(InMemoryWhitelist::default());
132135
let whitelist_authorization = Arc::new(WhitelistAuthorization::new(&config.core, &in_memory_whitelist.clone()));
@@ -146,7 +149,7 @@ mod tests {
146149
let http_stats_event_sender = http_stats_event_bus.sender();
147150

148151
if config.core.tracker_usage_statistics {
149-
let _unused = run_event_listener(http_stats_event_bus.receiver(), &http_stats_repository);
152+
let _unused = run_event_listener(http_stats_event_bus.receiver(), cancellation_token, &http_stats_repository);
150153
}
151154

152155
(

packages/events/src/shutdown.rs

Whitespace-only changes.

packages/http-tracker-core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ futures = "0"
2323
serde = "1.0.219"
2424
thiserror = "2"
2525
tokio = { version = "1", features = ["macros", "net", "rt-multi-thread", "signal", "sync"] }
26+
tokio-util = "0.7.15"
2627
torrust-tracker-clock = { version = "3.0.0-develop", path = "../clock" }
2728
torrust-tracker-configuration = { version = "3.0.0-develop", path = "../configuration" }
2829
torrust-tracker-events = { version = "3.0.0-develop", path = "../events" }

packages/http-tracker-core/benches/helpers/util.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use bittorrent_tracker_core::whitelist::authorization::WhitelistAuthorization;
2020
use bittorrent_tracker_core::whitelist::repository::in_memory::InMemoryWhitelist;
2121
use futures::future::BoxFuture;
2222
use mockall::mock;
23+
use tokio_util::sync::CancellationToken;
2324
use torrust_tracker_configuration::{Configuration, Core};
2425
use torrust_tracker_events::sender::SendError;
2526
use torrust_tracker_primitives::peer::Peer;
@@ -42,6 +43,8 @@ pub fn initialize_core_tracker_services() -> (CoreTrackerServices, CoreHttpTrack
4243
}
4344

4445
pub fn initialize_core_tracker_services_with_config(config: &Configuration) -> (CoreTrackerServices, CoreHttpTrackerServices) {
46+
let cancellation_token = CancellationToken::new();
47+
4548
let core_config = Arc::new(config.core.clone());
4649
let database = initialize_database(&config.core);
4750
let in_memory_torrent_repository = Arc::new(InMemoryTorrentRepository::default());
@@ -69,7 +72,7 @@ pub fn initialize_core_tracker_services_with_config(config: &Configuration) -> (
6972
let http_stats_event_sender = http_stats_event_bus.sender();
7073

7174
if config.core.tracker_usage_statistics {
72-
let _unused = run_event_listener(http_stats_event_bus.receiver(), &http_stats_repository);
75+
let _unused = run_event_listener(http_stats_event_bus.receiver(), cancellation_token, &http_stats_repository);
7376
}
7477

7578
(

0 commit comments

Comments
 (0)