From 3414e2abea16ff79a1150aa432c6563612735d79 Mon Sep 17 00:00:00 2001 From: Cameron Garnham Date: Mon, 25 Mar 2024 12:13:09 +0800 Subject: [PATCH] dev: torrent repository tests --- Cargo.lock | 419 ++++++++++++++- packages/torrent-repository/Cargo.toml | 1 + .../torrent-repository/tests/common/mod.rs | 3 + .../torrent-repository/tests/common/repo.rs | 147 +++++ .../tests/common/torrent.rs | 89 ++++ .../tests/common/torrent_peer_builder.rs | 88 +++ .../torrent-repository/tests/entry/mod.rs | 433 +++++++++++++++ .../torrent-repository/tests/integration.rs | 22 + .../tests/repository/mod.rs | 504 ++++++++++++++++++ 9 files changed, 1700 insertions(+), 6 deletions(-) create mode 100644 packages/torrent-repository/tests/common/mod.rs create mode 100644 packages/torrent-repository/tests/common/repo.rs create mode 100644 packages/torrent-repository/tests/common/torrent.rs create mode 100644 packages/torrent-repository/tests/common/torrent_peer_builder.rs create mode 100644 packages/torrent-repository/tests/entry/mod.rs create mode 100644 packages/torrent-repository/tests/integration.rs create mode 100644 packages/torrent-repository/tests/repository/mod.rs diff --git a/Cargo.lock b/Cargo.lock index e28278ab..0bdd83b9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -167,6 +167,40 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" +[[package]] +name = "async-attributes" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3203e79f4dd9bdda415ed03cf14dae5a2bf775c683a00f94e9cd1faf0f596e5" +dependencies = [ + "quote", + "syn 1.0.109", +] + +[[package]] +name = "async-channel" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" +dependencies = [ + "concurrent-queue", + "event-listener 2.5.3", + "futures-core", +] + +[[package]] +name = "async-channel" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f28243a43d821d11341ab73c80bed182dc015c514b951616cf79bd4af39af0c3" +dependencies = [ + "concurrent-queue", + "event-listener 5.2.0", + "event-listener-strategy 0.5.0", + "futures-core", + "pin-project-lite", +] + [[package]] name = "async-compression" version = "0.4.6" @@ -183,6 +217,128 @@ dependencies = [ "zstd-safe", ] +[[package]] +name = "async-executor" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17ae5ebefcc48e7452b4987947920dac9450be1110cadf34d1b8c116bdbaf97c" +dependencies = [ + "async-lock 3.3.0", + "async-task", + "concurrent-queue", + "fastrand 2.0.1", + "futures-lite 2.3.0", + "slab", +] + +[[package]] +name = "async-global-executor" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05b1b633a2115cd122d73b955eadd9916c18c8f510ec9cd1686404c60ad1c29c" +dependencies = [ + "async-channel 2.2.0", + "async-executor", + "async-io 2.3.2", + "async-lock 3.3.0", + "blocking", + "futures-lite 2.3.0", + "once_cell", + "tokio", +] + +[[package]] +name = "async-io" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fc5b45d93ef0529756f812ca52e44c221b35341892d3dcc34132ac02f3dd2af" +dependencies = [ + "async-lock 2.8.0", + "autocfg", + "cfg-if", + "concurrent-queue", + "futures-lite 1.13.0", + "log", + "parking", + "polling 2.8.0", + "rustix 0.37.27", + "slab", + "socket2 0.4.10", + "waker-fn", +] + +[[package]] +name = "async-io" +version = "2.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dcccb0f599cfa2f8ace422d3555572f47424da5648a4382a9dd0310ff8210884" +dependencies = [ + "async-lock 3.3.0", + "cfg-if", + "concurrent-queue", + "futures-io", + "futures-lite 2.3.0", + "parking", + "polling 3.6.0", + "rustix 0.38.32", + "slab", + "tracing", + "windows-sys 0.52.0", +] + +[[package]] +name = "async-lock" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "287272293e9d8c41773cec55e365490fe034813a2f172f502d6ddcf75b2f582b" +dependencies = [ + "event-listener 2.5.3", +] + +[[package]] +name = "async-lock" +version = "3.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d034b430882f8381900d3fe6f0aaa3ad94f2cb4ac519b429692a1bc2dda4ae7b" +dependencies = [ + "event-listener 4.0.3", + "event-listener-strategy 0.4.0", + "pin-project-lite", +] + +[[package]] +name = "async-std" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62565bb4402e926b29953c785397c6dc0391b7b446e45008b0049eb43cec6f5d" +dependencies = [ + "async-attributes", + "async-channel 1.9.0", + "async-global-executor", + "async-io 1.13.0", + "async-lock 2.8.0", + "crossbeam-utils", + "futures-channel", + "futures-core", + "futures-io", + "futures-lite 1.13.0", + "gloo-timers", + "kv-log-macro", + "log", + "memchr", + "once_cell", + "pin-project-lite", + "pin-utils", + "slab", + "wasm-bindgen-futures", +] + +[[package]] +name = "async-task" +version = "4.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbb36e985947064623dbd357f727af08ffd077f93d696782f3c56365fa2e2799" + [[package]] name = "async-trait" version = "0.1.78" @@ -194,6 +350,12 @@ dependencies = [ "syn 2.0.53", ] +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "autocfg" version = "1.1.0" @@ -418,6 +580,22 @@ dependencies = [ "generic-array", ] +[[package]] +name = "blocking" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a37913e8dc4ddcc604f0c6d3bf2887c995153af3611de9e23c352b44c1b9118" +dependencies = [ + "async-channel 2.2.0", + "async-lock 3.3.0", + "async-task", + "fastrand 2.0.1", + "futures-io", + "futures-lite 2.3.0", + "piper", + "tracing", +] + [[package]] name = "borsh" version = "1.3.1" @@ -662,6 +840,15 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "concurrent-queue" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d16048cd947b08fa32c24458a22f5dc5e835264f689f4f5653210c69fd107363" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "config" version = "0.14.0" @@ -996,6 +1183,54 @@ dependencies = [ "version_check", ] +[[package]] +name = "event-listener" +version = "2.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" + +[[package]] +name = "event-listener" +version = "4.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67b215c49b2b248c855fb73579eb1f4f26c38ffdc12973e20e07b91d78d5646e" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener" +version = "5.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b5fb89194fa3cad959b833185b3063ba881dbfc7030680b314250779fb4cc91" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "958e4d70b6d5e81971bebec42271ec641e7ff4e170a6fa605f2b8a8b65cb97d3" +dependencies = [ + "event-listener 4.0.3", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "feedafcaa9b749175d5ac357452a9d41ea2911da598fde46ce1fe02c37751291" +dependencies = [ + "event-listener 5.2.0", + "pin-project-lite", +] + [[package]] name = "fallible-iterator" version = "0.3.0" @@ -1008,6 +1243,15 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" +[[package]] +name = "fastrand" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be" +dependencies = [ + "instant", +] + [[package]] name = "fastrand" version = "2.0.1" @@ -1186,6 +1430,34 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" +[[package]] +name = "futures-lite" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" +dependencies = [ + "fastrand 1.9.0", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", +] + +[[package]] +name = "futures-lite" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52527eb5074e35e9339c6b4e8d12600c7128b68fb25dcb9fa9dec18f7c25f3a5" +dependencies = [ + "fastrand 2.0.1", + "futures-core", + "futures-io", + "parking", + "pin-project-lite", +] + [[package]] name = "futures-macro" version = "0.3.30" @@ -1266,6 +1538,18 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" +[[package]] +name = "gloo-timers" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b995a66bb87bebce9a0f4a95aed01daca4872c050bfcb21653361c03bc35e5c" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "h2" version = "0.4.3" @@ -1458,7 +1742,7 @@ dependencies = [ "http-body", "hyper", "pin-project-lite", - "socket2", + "socket2 0.5.6", "tokio", "tower", "tower-service", @@ -1526,6 +1810,15 @@ dependencies = [ "serde", ] +[[package]] +name = "instant" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" +dependencies = [ + "cfg-if", +] + [[package]] name = "io-enum" version = "1.1.3" @@ -1535,6 +1828,17 @@ dependencies = [ "derive_utils", ] +[[package]] +name = "io-lifetimes" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2" +dependencies = [ + "hermit-abi", + "libc", + "windows-sys 0.48.0", +] + [[package]] name = "ipnet" version = "2.9.0" @@ -1605,6 +1909,15 @@ dependencies = [ "serde", ] +[[package]] +name = "kv-log-macro" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" +dependencies = [ + "log", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -1734,6 +2047,12 @@ version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" +[[package]] +name = "linux-raw-sys" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" + [[package]] name = "linux-raw-sys" version = "0.4.13" @@ -1767,6 +2086,9 @@ name = "log" version = "0.4.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" +dependencies = [ + "value-bag", +] [[package]] name = "lru" @@ -1878,7 +2200,7 @@ dependencies = [ "percent-encoding", "serde", "serde_json", - "socket2", + "socket2 0.5.6", "twox-hash", "url", ] @@ -2127,6 +2449,12 @@ dependencies = [ "hashbrown 0.13.2", ] +[[package]] +name = "parking" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" + [[package]] name = "parking_lot" version = "0.12.1" @@ -2287,6 +2615,17 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "piper" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "668d31b1c4eba19242f2088b2bf3316b82ca31082a8335764db4e083db7485d4" +dependencies = [ + "atomic-waker", + "fastrand 2.0.1", + "futures-io", +] + [[package]] name = "pkg-config" version = "0.3.30" @@ -2321,6 +2660,37 @@ dependencies = [ "plotters-backend", ] +[[package]] +name = "polling" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce" +dependencies = [ + "autocfg", + "bitflags 1.3.2", + "cfg-if", + "concurrent-queue", + "libc", + "log", + "pin-project-lite", + "windows-sys 0.48.0", +] + +[[package]] +name = "polling" +version = "3.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0c976a60b2d7e99d6f229e414670a9b85d13ac305cc6d1e9c134de58c5aaaf6" +dependencies = [ + "cfg-if", + "concurrent-queue", + "hermit-abi", + "pin-project-lite", + "rustix 0.38.32", + "tracing", + "windows-sys 0.52.0", +] + [[package]] name = "powerfmt" version = "0.2.0" @@ -2778,6 +3148,20 @@ dependencies = [ "semver", ] +[[package]] +name = "rustix" +version = "0.37.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fea8ca367a3a01fe35e6943c400addf443c0f57670e6ec51196f71a4b8762dd2" +dependencies = [ + "bitflags 1.3.2", + "errno", + "io-lifetimes", + "libc", + "linux-raw-sys 0.3.8", + "windows-sys 0.48.0", +] + [[package]] name = "rustix" version = "0.38.32" @@ -2787,7 +3171,7 @@ dependencies = [ "bitflags 2.5.0", "errno", "libc", - "linux-raw-sys", + "linux-raw-sys 0.4.13", "windows-sys 0.52.0", ] @@ -3133,6 +3517,16 @@ version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" +[[package]] +name = "socket2" +version = "0.4.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "socket2" version = "0.5.6" @@ -3268,8 +3662,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "85b77fafb263dd9d05cbeac119526425676db3784113aa9295c88498cbf8bff1" dependencies = [ "cfg-if", - "fastrand", - "rustix", + "fastrand 2.0.1", + "rustix 0.38.32", "windows-sys 0.52.0", ] @@ -3386,7 +3780,7 @@ dependencies = [ "num_cpus", "pin-project-lite", "signal-hook-registry", - "socket2", + "socket2 0.5.6", "tokio-macros", "windows-sys 0.48.0", ] @@ -3610,6 +4004,7 @@ dependencies = [ name = "torrust-tracker-torrent-repository" version = "3.0.0-alpha.12-develop" dependencies = [ + "async-std", "criterion", "futures", "rstest", @@ -3801,6 +4196,12 @@ dependencies = [ "rand", ] +[[package]] +name = "value-bag" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74797339c3b98616c009c7c3eb53a0ce41e85c8ec66bd3db96ed132d20cfdee8" + [[package]] name = "vcpkg" version = "0.2.15" @@ -3813,6 +4214,12 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "waker-fn" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3c4517f54858c779bbcbf228f4fca63d121bf85fbecb2dc578cdf4a39395690" + [[package]] name = "walkdir" version = "2.5.0" diff --git a/packages/torrent-repository/Cargo.toml b/packages/torrent-repository/Cargo.toml index c36ae144..4cea8767 100644 --- a/packages/torrent-repository/Cargo.toml +++ b/packages/torrent-repository/Cargo.toml @@ -25,6 +25,7 @@ torrust-tracker-clock = { version = "3.0.0-alpha.12-develop", path = "../clock" [dev-dependencies] criterion = { version = "0", features = ["async_tokio"] } rstest = "0" +async-std = {version = "1", features = ["attributes", "tokio1"] } [[bench]] harness = false diff --git a/packages/torrent-repository/tests/common/mod.rs b/packages/torrent-repository/tests/common/mod.rs new file mode 100644 index 00000000..efdf7f74 --- /dev/null +++ b/packages/torrent-repository/tests/common/mod.rs @@ -0,0 +1,3 @@ +pub mod repo; +pub mod torrent; +pub mod torrent_peer_builder; diff --git a/packages/torrent-repository/tests/common/repo.rs b/packages/torrent-repository/tests/common/repo.rs new file mode 100644 index 00000000..3a4b53d2 --- /dev/null +++ b/packages/torrent-repository/tests/common/repo.rs @@ -0,0 +1,147 @@ +use torrust_tracker_configuration::TrackerPolicy; +use torrust_tracker_primitives::info_hash::InfoHash; +use torrust_tracker_primitives::pagination::Pagination; +use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; +use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics; +use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrents}; +use torrust_tracker_torrent_repository::repository::{Repository as _, RepositoryAsync as _}; +use torrust_tracker_torrent_repository::{ + EntrySingle, TorrentsRwLockStd, TorrentsRwLockStdMutexStd, TorrentsRwLockStdMutexTokio, TorrentsRwLockTokio, + TorrentsRwLockTokioMutexStd, TorrentsRwLockTokioMutexTokio, +}; + +#[derive(Debug)] +pub(crate) enum Repo { + Std(TorrentsRwLockStd), + StdMutexStd(TorrentsRwLockStdMutexStd), + StdMutexTokio(TorrentsRwLockStdMutexTokio), + Tokio(TorrentsRwLockTokio), + TokioMutexStd(TorrentsRwLockTokioMutexStd), + TokioMutexTokio(TorrentsRwLockTokioMutexTokio), +} + +impl Repo { + pub(crate) async fn get(&self, key: &InfoHash) -> Option { + match self { + Repo::Std(repo) => repo.get(key), + Repo::StdMutexStd(repo) => Some(repo.get(key)?.lock().unwrap().clone()), + Repo::StdMutexTokio(repo) => Some(repo.get(key).await?.lock().await.clone()), + Repo::Tokio(repo) => repo.get(key).await, + Repo::TokioMutexStd(repo) => Some(repo.get(key).await?.lock().unwrap().clone()), + Repo::TokioMutexTokio(repo) => Some(repo.get(key).await?.lock().await.clone()), + } + } + pub(crate) async fn get_metrics(&self) -> TorrentsMetrics { + match self { + Repo::Std(repo) => repo.get_metrics(), + Repo::StdMutexStd(repo) => repo.get_metrics(), + Repo::StdMutexTokio(repo) => repo.get_metrics().await, + Repo::Tokio(repo) => repo.get_metrics().await, + Repo::TokioMutexStd(repo) => repo.get_metrics().await, + Repo::TokioMutexTokio(repo) => repo.get_metrics().await, + } + } + pub(crate) async fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntrySingle)> { + match self { + Repo::Std(repo) => repo.get_paginated(pagination), + Repo::StdMutexStd(repo) => repo + .get_paginated(pagination) + .iter() + .map(|(i, t)| (*i, t.lock().expect("it should get a lock").clone())) + .collect(), + Repo::StdMutexTokio(repo) => { + let mut v: Vec<(InfoHash, EntrySingle)> = vec![]; + + for (i, t) in repo.get_paginated(pagination).await { + v.push((i, t.lock().await.clone())); + } + v + } + Repo::Tokio(repo) => repo.get_paginated(pagination).await, + Repo::TokioMutexStd(repo) => repo + .get_paginated(pagination) + .await + .iter() + .map(|(i, t)| (*i, t.lock().expect("it should get a lock").clone())) + .collect(), + Repo::TokioMutexTokio(repo) => { + let mut v: Vec<(InfoHash, EntrySingle)> = vec![]; + + for (i, t) in repo.get_paginated(pagination).await { + v.push((i, t.lock().await.clone())); + } + v + } + } + } + pub(crate) async fn import_persistent(&self, persistent_torrents: &PersistentTorrents) { + match self { + Repo::Std(repo) => repo.import_persistent(persistent_torrents), + Repo::StdMutexStd(repo) => repo.import_persistent(persistent_torrents), + Repo::StdMutexTokio(repo) => repo.import_persistent(persistent_torrents).await, + Repo::Tokio(repo) => repo.import_persistent(persistent_torrents).await, + Repo::TokioMutexStd(repo) => repo.import_persistent(persistent_torrents).await, + Repo::TokioMutexTokio(repo) => repo.import_persistent(persistent_torrents).await, + } + } + pub(crate) async fn remove(&self, key: &InfoHash) -> Option { + match self { + Repo::Std(repo) => repo.remove(key), + Repo::StdMutexStd(repo) => Some(repo.remove(key)?.lock().unwrap().clone()), + Repo::StdMutexTokio(repo) => Some(repo.remove(key).await?.lock().await.clone()), + Repo::Tokio(repo) => repo.remove(key).await, + Repo::TokioMutexStd(repo) => Some(repo.remove(key).await?.lock().unwrap().clone()), + Repo::TokioMutexTokio(repo) => Some(repo.remove(key).await?.lock().await.clone()), + } + } + pub(crate) async fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) { + match self { + Repo::Std(repo) => repo.remove_inactive_peers(current_cutoff), + Repo::StdMutexStd(repo) => repo.remove_inactive_peers(current_cutoff), + Repo::StdMutexTokio(repo) => repo.remove_inactive_peers(current_cutoff).await, + Repo::Tokio(repo) => repo.remove_inactive_peers(current_cutoff).await, + Repo::TokioMutexStd(repo) => repo.remove_inactive_peers(current_cutoff).await, + Repo::TokioMutexTokio(repo) => repo.remove_inactive_peers(current_cutoff).await, + } + } + pub(crate) async fn remove_peerless_torrents(&self, policy: &TrackerPolicy) { + match self { + Repo::Std(repo) => repo.remove_peerless_torrents(policy), + Repo::StdMutexStd(repo) => repo.remove_peerless_torrents(policy), + Repo::StdMutexTokio(repo) => repo.remove_peerless_torrents(policy).await, + Repo::Tokio(repo) => repo.remove_peerless_torrents(policy).await, + Repo::TokioMutexStd(repo) => repo.remove_peerless_torrents(policy).await, + Repo::TokioMutexTokio(repo) => repo.remove_peerless_torrents(policy).await, + } + } + pub(crate) async fn update_torrent_with_peer_and_get_stats( + &self, + info_hash: &InfoHash, + peer: &peer::Peer, + ) -> (bool, SwarmMetadata) { + match self { + Repo::Std(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer), + Repo::StdMutexStd(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer), + Repo::StdMutexTokio(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await, + Repo::Tokio(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await, + Repo::TokioMutexStd(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await, + Repo::TokioMutexTokio(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await, + } + } + pub(crate) async fn insert(&self, info_hash: &InfoHash, torrent: EntrySingle) -> Option { + match self { + Repo::Std(repo) => repo.write().insert(*info_hash, torrent), + Repo::StdMutexStd(repo) => Some(repo.write().insert(*info_hash, torrent.into())?.lock().unwrap().clone()), + Repo::StdMutexTokio(repo) => { + let r = repo.write().insert(*info_hash, torrent.into()); + match r { + Some(t) => Some(t.lock().await.clone()), + None => None, + } + } + Repo::Tokio(repo) => repo.write().await.insert(*info_hash, torrent), + Repo::TokioMutexStd(repo) => Some(repo.write().await.insert(*info_hash, torrent.into())?.lock().unwrap().clone()), + Repo::TokioMutexTokio(repo) => Some(repo.write().await.insert(*info_hash, torrent.into())?.lock().await.clone()), + } + } +} diff --git a/packages/torrent-repository/tests/common/torrent.rs b/packages/torrent-repository/tests/common/torrent.rs new file mode 100644 index 00000000..33264c44 --- /dev/null +++ b/packages/torrent-repository/tests/common/torrent.rs @@ -0,0 +1,89 @@ +use std::net::SocketAddr; +use std::sync::Arc; + +use torrust_tracker_configuration::TrackerPolicy; +use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; +use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch}; +use torrust_tracker_torrent_repository::entry::{Entry as _, EntryAsync as _, EntrySync as _}; +use torrust_tracker_torrent_repository::{EntryMutexStd, EntryMutexTokio, EntrySingle}; + +#[derive(Debug, Clone)] +pub(crate) enum Torrent { + Single(EntrySingle), + MutexStd(EntryMutexStd), + MutexTokio(EntryMutexTokio), +} + +impl Torrent { + pub(crate) async fn get_stats(&self) -> SwarmMetadata { + match self { + Torrent::Single(entry) => entry.get_stats(), + Torrent::MutexStd(entry) => entry.get_stats(), + Torrent::MutexTokio(entry) => entry.clone().get_stats().await, + } + } + + pub(crate) async fn is_good(&self, policy: &TrackerPolicy) -> bool { + match self { + Torrent::Single(entry) => entry.is_good(policy), + Torrent::MutexStd(entry) => entry.is_good(policy), + Torrent::MutexTokio(entry) => entry.clone().check_good(policy).await, + } + } + + pub(crate) async fn peers_is_empty(&self) -> bool { + match self { + Torrent::Single(entry) => entry.peers_is_empty(), + Torrent::MutexStd(entry) => entry.peers_is_empty(), + Torrent::MutexTokio(entry) => entry.clone().peers_is_empty().await, + } + } + + pub(crate) async fn get_peers_len(&self) -> usize { + match self { + Torrent::Single(entry) => entry.get_peers_len(), + Torrent::MutexStd(entry) => entry.get_peers_len(), + Torrent::MutexTokio(entry) => entry.clone().get_peers_len().await, + } + } + + pub(crate) async fn get_peers(&self, limit: Option) -> Vec> { + match self { + Torrent::Single(entry) => entry.get_peers(limit), + Torrent::MutexStd(entry) => entry.get_peers(limit), + Torrent::MutexTokio(entry) => entry.clone().get_peers(limit).await, + } + } + + pub(crate) async fn get_peers_for_client(&self, client: &SocketAddr, limit: Option) -> Vec> { + match self { + Torrent::Single(entry) => entry.get_peers_for_client(client, limit), + Torrent::MutexStd(entry) => entry.get_peers_for_client(client, limit), + Torrent::MutexTokio(entry) => entry.clone().get_peers_for_client(client, limit).await, + } + } + + pub(crate) async fn insert_or_update_peer(&mut self, peer: &peer::Peer) -> bool { + match self { + Torrent::Single(entry) => entry.insert_or_update_peer(peer), + Torrent::MutexStd(entry) => entry.insert_or_update_peer(peer), + Torrent::MutexTokio(entry) => entry.clone().insert_or_update_peer(peer).await, + } + } + + pub(crate) async fn insert_or_update_peer_and_get_stats(&mut self, peer: &peer::Peer) -> (bool, SwarmMetadata) { + match self { + Torrent::Single(entry) => entry.insert_or_update_peer_and_get_stats(peer), + Torrent::MutexStd(entry) => entry.insert_or_update_peer_and_get_stats(peer), + Torrent::MutexTokio(entry) => entry.clone().insert_or_update_peer_and_get_stats(peer).await, + } + } + + pub(crate) async fn remove_inactive_peers(&mut self, current_cutoff: DurationSinceUnixEpoch) { + match self { + Torrent::Single(entry) => entry.remove_inactive_peers(current_cutoff), + Torrent::MutexStd(entry) => entry.remove_inactive_peers(current_cutoff), + Torrent::MutexTokio(entry) => entry.clone().remove_inactive_peers(current_cutoff).await, + } + } +} diff --git a/packages/torrent-repository/tests/common/torrent_peer_builder.rs b/packages/torrent-repository/tests/common/torrent_peer_builder.rs new file mode 100644 index 00000000..3a4e61ed --- /dev/null +++ b/packages/torrent-repository/tests/common/torrent_peer_builder.rs @@ -0,0 +1,88 @@ +use std::net::SocketAddr; + +use torrust_tracker_clock::clock::Time; +use torrust_tracker_primitives::announce_event::AnnounceEvent; +use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, NumberOfBytes}; + +use crate::CurrentClock; + +#[derive(Debug, Default)] +struct TorrentPeerBuilder { + peer: peer::Peer, +} + +#[allow(dead_code)] +impl TorrentPeerBuilder { + #[must_use] + fn new() -> Self { + Self { + peer: peer::Peer { + updated: CurrentClock::now(), + ..Default::default() + }, + } + } + + #[must_use] + fn with_event_completed(mut self) -> Self { + self.peer.event = AnnounceEvent::Completed; + self + } + + #[must_use] + fn with_event_started(mut self) -> Self { + self.peer.event = AnnounceEvent::Started; + self + } + + #[must_use] + fn with_peer_address(mut self, peer_addr: SocketAddr) -> Self { + self.peer.peer_addr = peer_addr; + self + } + + #[must_use] + fn with_peer_id(mut self, peer_id: peer::Id) -> Self { + self.peer.peer_id = peer_id; + self + } + + #[must_use] + fn with_number_of_bytes_left(mut self, left: i64) -> Self { + self.peer.left = NumberOfBytes(left); + self + } + + #[must_use] + fn updated_at(mut self, updated: DurationSinceUnixEpoch) -> Self { + self.peer.updated = updated; + self + } + + #[must_use] + fn into(self) -> peer::Peer { + self.peer + } +} + +/// A torrent seeder is a peer with 0 bytes left to download which +/// has not announced it has stopped +#[must_use] +pub fn a_completed_peer(id: i32) -> peer::Peer { + TorrentPeerBuilder::new() + .with_number_of_bytes_left(0) + .with_event_completed() + .with_peer_id(id.into()) + .into() +} + +/// A torrent leecher is a peer that is not a seeder. +/// Leecher: left > 0 OR event = Stopped +#[must_use] +pub fn a_started_peer(id: i32) -> peer::Peer { + TorrentPeerBuilder::new() + .with_number_of_bytes_left(1) + .with_event_started() + .with_peer_id(id.into()) + .into() +} diff --git a/packages/torrent-repository/tests/entry/mod.rs b/packages/torrent-repository/tests/entry/mod.rs new file mode 100644 index 00000000..c39bef63 --- /dev/null +++ b/packages/torrent-repository/tests/entry/mod.rs @@ -0,0 +1,433 @@ +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::ops::Sub; +use std::time::Duration; + +use rstest::{fixture, rstest}; +use torrust_tracker_clock::clock::stopped::Stopped as _; +use torrust_tracker_clock::clock::{self, Time as _}; +use torrust_tracker_configuration::{TrackerPolicy, TORRENT_PEERS_LIMIT}; +use torrust_tracker_primitives::announce_event::AnnounceEvent; +use torrust_tracker_primitives::peer::Peer; +use torrust_tracker_primitives::{peer, NumberOfBytes}; +use torrust_tracker_torrent_repository::{EntryMutexStd, EntryMutexTokio, EntrySingle}; + +use crate::common::torrent::Torrent; +use crate::common::torrent_peer_builder::{a_completed_peer, a_started_peer}; +use crate::CurrentClock; + +#[fixture] +fn single() -> Torrent { + Torrent::Single(EntrySingle::default()) +} +#[fixture] +fn standard_mutex() -> Torrent { + Torrent::MutexStd(EntryMutexStd::default()) +} + +#[fixture] +fn mutex_tokio() -> Torrent { + Torrent::MutexTokio(EntryMutexTokio::default()) +} + +#[fixture] +fn policy_none() -> TrackerPolicy { + TrackerPolicy::new(false, 0, false) +} + +#[fixture] +fn policy_persist() -> TrackerPolicy { + TrackerPolicy::new(false, 0, true) +} + +#[fixture] +fn policy_remove() -> TrackerPolicy { + TrackerPolicy::new(true, 0, false) +} + +#[fixture] +fn policy_remove_persist() -> TrackerPolicy { + TrackerPolicy::new(true, 0, true) +} + +pub enum Makes { + Empty, + Started, + Completed, + Downloaded, + Three, +} + +async fn make(torrent: &mut Torrent, makes: &Makes) -> Vec { + match makes { + Makes::Empty => vec![], + Makes::Started => { + let peer = a_started_peer(1); + torrent.insert_or_update_peer(&peer).await; + vec![peer] + } + Makes::Completed => { + let peer = a_completed_peer(2); + torrent.insert_or_update_peer(&peer).await; + vec![peer] + } + Makes::Downloaded => { + let mut peer = a_started_peer(3); + torrent.insert_or_update_peer(&peer).await; + peer.event = AnnounceEvent::Completed; + peer.left = NumberOfBytes(0); + torrent.insert_or_update_peer(&peer).await; + vec![peer] + } + Makes::Three => { + let peer_1 = a_started_peer(1); + torrent.insert_or_update_peer(&peer_1).await; + + let peer_2 = a_completed_peer(2); + torrent.insert_or_update_peer(&peer_2).await; + + let mut peer_3 = a_started_peer(3); + torrent.insert_or_update_peer(&peer_3).await; + peer_3.event = AnnounceEvent::Completed; + peer_3.left = NumberOfBytes(0); + torrent.insert_or_update_peer(&peer_3).await; + vec![peer_1, peer_2, peer_3] + } + } +} + +#[rstest] +#[case::empty(&Makes::Empty)] +#[tokio::test] +async fn it_should_be_empty_by_default( + #[values(single(), standard_mutex(), mutex_tokio())] mut torrent: Torrent, + #[case] makes: &Makes, +) { + make(&mut torrent, makes).await; + + assert_eq!(torrent.get_peers_len().await, 0); +} + +#[rstest] +#[case::empty(&Makes::Empty)] +#[case::started(&Makes::Started)] +#[case::completed(&Makes::Completed)] +#[case::downloaded(&Makes::Downloaded)] +#[case::three(&Makes::Three)] +#[tokio::test] +async fn it_should_check_if_entry_is_good( + #[values(single(), standard_mutex(), mutex_tokio())] mut torrent: Torrent, + #[case] makes: &Makes, + #[values(policy_none(), policy_persist(), policy_remove(), policy_remove_persist())] policy: TrackerPolicy, +) { + make(&mut torrent, makes).await; + + let has_peers = !torrent.peers_is_empty().await; + let has_downloads = torrent.get_stats().await.downloaded != 0; + + match (policy.remove_peerless_torrents, policy.persistent_torrent_completed_stat) { + // remove torrents without peers, and keep completed download stats + (true, true) => match (has_peers, has_downloads) { + // no peers, but has downloads + // peers, with or without downloads + (false, true) | (true, true | false) => assert!(torrent.is_good(&policy).await), + // no peers and no downloads + (false, false) => assert!(!torrent.is_good(&policy).await), + }, + // remove torrents without peers and drop completed download stats + (true, false) => match (has_peers, has_downloads) { + // peers, with or without downloads + (true, true | false) => assert!(torrent.is_good(&policy).await), + // no peers and with or without downloads + (false, true | false) => assert!(!torrent.is_good(&policy).await), + }, + // keep torrents without peers, but keep or drop completed download stats + (false, true | false) => assert!(torrent.is_good(&policy).await), + } +} + +#[rstest] +#[case::empty(&Makes::Empty)] +#[case::started(&Makes::Started)] +#[case::completed(&Makes::Completed)] +#[case::downloaded(&Makes::Downloaded)] +#[case::three(&Makes::Three)] +#[tokio::test] +async fn it_should_get_peers_for_torrent_entry( + #[values(single(), standard_mutex(), mutex_tokio())] mut torrent: Torrent, + #[case] makes: &Makes, +) { + let peers = make(&mut torrent, makes).await; + + let torrent_peers = torrent.get_peers(None).await; + + assert_eq!(torrent_peers.len(), peers.len()); + + for peer in torrent_peers { + assert!(peers.contains(&peer)); + } +} + +#[rstest] +#[case::empty(&Makes::Empty)] +#[case::started(&Makes::Started)] +#[case::completed(&Makes::Completed)] +#[case::downloaded(&Makes::Downloaded)] +#[case::three(&Makes::Three)] +#[tokio::test] +async fn it_should_update_a_peer( + #[values(single(), standard_mutex(), mutex_tokio())] mut torrent: Torrent, + #[case] makes: &Makes, +) { + make(&mut torrent, makes).await; + + // Make and insert a new peer. + let mut peer = a_started_peer(-1); + torrent.insert_or_update_peer(&peer).await; + + // Get the Inserted Peer by Id. + let peers = torrent.get_peers(None).await; + let original = peers + .iter() + .find(|p| peer::ReadInfo::get_id(*p) == peer::ReadInfo::get_id(&peer)) + .expect("it should find peer by id"); + + assert_eq!(original.event, AnnounceEvent::Started, "it should be as created"); + + // Announce "Completed" torrent download event. + peer.event = AnnounceEvent::Completed; + torrent.insert_or_update_peer(&peer).await; + + // Get the Updated Peer by Id. + let peers = torrent.get_peers(None).await; + let updated = peers + .iter() + .find(|p| peer::ReadInfo::get_id(*p) == peer::ReadInfo::get_id(&peer)) + .expect("it should find peer by id"); + + assert_eq!(updated.event, AnnounceEvent::Completed, "it should be updated"); +} + +#[rstest] +#[case::empty(&Makes::Empty)] +#[case::started(&Makes::Started)] +#[case::completed(&Makes::Completed)] +#[case::downloaded(&Makes::Downloaded)] +#[case::three(&Makes::Three)] +#[tokio::test] +async fn it_should_remove_a_peer_upon_stopped_announcement( + #[values(single(), standard_mutex(), mutex_tokio())] mut torrent: Torrent, + #[case] makes: &Makes, +) { + use torrust_tracker_primitives::peer::ReadInfo as _; + + make(&mut torrent, makes).await; + + let mut peer = a_started_peer(-1); + + torrent.insert_or_update_peer(&peer).await; + + // The started peer should be inserted. + let peers = torrent.get_peers(None).await; + let original = peers + .iter() + .find(|p| p.get_id() == peer.get_id()) + .expect("it should find peer by id"); + + assert_eq!(original.event, AnnounceEvent::Started); + + // Change peer to "Stopped" and insert. + peer.event = AnnounceEvent::Stopped; + torrent.insert_or_update_peer(&peer).await; + + // It should be removed now. + let peers = torrent.get_peers(None).await; + + assert_eq!( + peers.iter().find(|p| p.get_id() == peer.get_id()), + None, + "it should be removed" + ); +} + +#[rstest] +#[case::started(&Makes::Started)] +#[case::completed(&Makes::Completed)] +#[case::downloaded(&Makes::Downloaded)] +#[case::three(&Makes::Three)] +#[tokio::test] +async fn it_should_handle_a_peer_completed_announcement_and_update_the_downloaded_statistic( + #[values(single(), standard_mutex(), mutex_tokio())] mut torrent: Torrent, + #[case] makes: &Makes, +) { + make(&mut torrent, makes).await; + let downloaded = torrent.get_stats().await.downloaded; + + let peers = torrent.get_peers(None).await; + let mut peer = **peers.first().expect("there should be a peer"); + + let is_already_completed = peer.event == AnnounceEvent::Completed; + + // Announce "Completed" torrent download event. + peer.event = AnnounceEvent::Completed; + + let (updated, stats) = torrent.insert_or_update_peer_and_get_stats(&peer).await; + + if is_already_completed { + assert!(!updated); + assert_eq!(stats.downloaded, downloaded); + } else { + assert!(updated); + assert_eq!(stats.downloaded, downloaded + 1); + } +} + +#[rstest] +#[case::started(&Makes::Started)] +#[case::completed(&Makes::Completed)] +#[case::downloaded(&Makes::Downloaded)] +#[case::three(&Makes::Three)] +#[tokio::test] +async fn it_should_update_a_peer_as_a_seeder( + #[values(single(), standard_mutex(), mutex_tokio())] mut torrent: Torrent, + #[case] makes: &Makes, +) { + let peers = make(&mut torrent, makes).await; + let completed = u32::try_from(peers.iter().filter(|p| p.is_seeder()).count()).expect("it_should_not_be_so_many"); + + let peers = torrent.get_peers(None).await; + let mut peer = **peers.first().expect("there should be a peer"); + + let is_already_non_left = peer.left == NumberOfBytes(0); + + // Set Bytes Left to Zero + peer.left = NumberOfBytes(0); + let (_, stats) = torrent.insert_or_update_peer_and_get_stats(&peer).await; // Add the peer + + if is_already_non_left { + // it was already complete + assert_eq!(stats.complete, completed); + } else { + // now it is complete + assert_eq!(stats.complete, completed + 1); + } +} + +#[rstest] +#[case::started(&Makes::Started)] +#[case::completed(&Makes::Completed)] +#[case::downloaded(&Makes::Downloaded)] +#[case::three(&Makes::Three)] +#[tokio::test] +async fn it_should_update_a_peer_as_incomplete( + #[values(single(), standard_mutex(), mutex_tokio())] mut torrent: Torrent, + #[case] makes: &Makes, +) { + let peers = make(&mut torrent, makes).await; + let incomplete = u32::try_from(peers.iter().filter(|p| !p.is_seeder()).count()).expect("it should not be so many"); + + let peers = torrent.get_peers(None).await; + let mut peer = **peers.first().expect("there should be a peer"); + + let completed_already = peer.left == NumberOfBytes(0); + + // Set Bytes Left to no Zero + peer.left = NumberOfBytes(1); + let (_, stats) = torrent.insert_or_update_peer_and_get_stats(&peer).await; // Add the peer + + if completed_already { + // now it is incomplete + assert_eq!(stats.incomplete, incomplete + 1); + } else { + // was already incomplete + assert_eq!(stats.incomplete, incomplete); + } +} + +#[rstest] +#[case::started(&Makes::Started)] +#[case::completed(&Makes::Completed)] +#[case::downloaded(&Makes::Downloaded)] +#[case::three(&Makes::Three)] +#[tokio::test] +async fn it_should_get_peers_excluding_the_client_socket( + #[values(single(), standard_mutex(), mutex_tokio())] mut torrent: Torrent, + #[case] makes: &Makes, +) { + make(&mut torrent, makes).await; + + let peers = torrent.get_peers(None).await; + let mut peer = **peers.first().expect("there should be a peer"); + + let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8081); + + // for this test, we should not already use this socket. + assert_ne!(peer.peer_addr, socket); + + // it should get the peer as it dose not share the socket. + assert!(torrent.get_peers_for_client(&socket, None).await.contains(&peer.into())); + + // set the address to the socket. + peer.peer_addr = socket; + torrent.insert_or_update_peer(&peer).await; // Add peer + + // It should not include the peer that has the same socket. + assert!(!torrent.get_peers_for_client(&socket, None).await.contains(&peer.into())); +} + +#[rstest] +#[case::empty(&Makes::Empty)] +#[case::started(&Makes::Started)] +#[case::completed(&Makes::Completed)] +#[case::downloaded(&Makes::Downloaded)] +#[case::three(&Makes::Three)] +#[tokio::test] +async fn it_should_limit_the_number_of_peers_returned( + #[values(single(), standard_mutex(), mutex_tokio())] mut torrent: Torrent, + #[case] makes: &Makes, +) { + make(&mut torrent, makes).await; + + // We add one more peer than the scrape limit + for peer_number in 1..=74 + 1 { + let mut peer = a_started_peer(1); + peer.peer_id = peer::Id::from(peer_number); + torrent.insert_or_update_peer(&peer).await; + } + + let peers = torrent.get_peers(Some(TORRENT_PEERS_LIMIT)).await; + + assert_eq!(peers.len(), 74); +} + +#[rstest] +#[case::empty(&Makes::Empty)] +#[case::started(&Makes::Started)] +#[case::completed(&Makes::Completed)] +#[case::downloaded(&Makes::Downloaded)] +#[case::three(&Makes::Three)] +#[tokio::test] +async fn it_should_remove_inactive_peers_beyond_cutoff( + #[values(single(), standard_mutex(), mutex_tokio())] mut torrent: Torrent, + #[case] makes: &Makes, +) { + const TIMEOUT: Duration = Duration::from_secs(120); + const EXPIRE: Duration = Duration::from_secs(121); + + let peers = make(&mut torrent, makes).await; + + let mut peer = a_completed_peer(-1); + + let now = clock::Working::now(); + clock::Stopped::local_set(&now); + + peer.updated = now.sub(EXPIRE); + + torrent.insert_or_update_peer(&peer).await; + + assert_eq!(torrent.get_peers_len().await, peers.len() + 1); + + let current_cutoff = CurrentClock::now_sub(&TIMEOUT).unwrap_or_default(); + torrent.remove_inactive_peers(current_cutoff).await; + + assert_eq!(torrent.get_peers_len().await, peers.len()); +} diff --git a/packages/torrent-repository/tests/integration.rs b/packages/torrent-repository/tests/integration.rs new file mode 100644 index 00000000..5aab67b0 --- /dev/null +++ b/packages/torrent-repository/tests/integration.rs @@ -0,0 +1,22 @@ +//! Integration tests. +//! +//! ```text +//! cargo test --test integration +//! ``` + +use torrust_tracker_clock::clock; + +pub mod common; +mod entry; +mod repository; + +/// This code needs to be copied into each crate. +/// Working version, for production. +#[cfg(not(test))] +#[allow(dead_code)] +pub(crate) type CurrentClock = clock::Working; + +/// Stopped version, for testing. +#[cfg(test)] +#[allow(dead_code)] +pub(crate) type CurrentClock = clock::Stopped; diff --git a/packages/torrent-repository/tests/repository/mod.rs b/packages/torrent-repository/tests/repository/mod.rs new file mode 100644 index 00000000..7ffe17dd --- /dev/null +++ b/packages/torrent-repository/tests/repository/mod.rs @@ -0,0 +1,504 @@ +use std::collections::{BTreeMap, HashSet}; +use std::hash::{DefaultHasher, Hash, Hasher}; + +use rstest::{fixture, rstest}; +use torrust_tracker_configuration::TrackerPolicy; +use torrust_tracker_primitives::announce_event::AnnounceEvent; +use torrust_tracker_primitives::info_hash::InfoHash; +use torrust_tracker_primitives::pagination::Pagination; +use torrust_tracker_primitives::{NumberOfBytes, PersistentTorrents}; +use torrust_tracker_torrent_repository::entry::Entry as _; +use torrust_tracker_torrent_repository::repository::{RwLockStd, RwLockTokio}; +use torrust_tracker_torrent_repository::EntrySingle; + +use crate::common::repo::Repo; +use crate::common::torrent_peer_builder::{a_completed_peer, a_started_peer}; + +#[fixture] +fn standard() -> Repo { + Repo::Std(RwLockStd::default()) +} +#[fixture] +fn standard_mutex() -> Repo { + Repo::StdMutexStd(RwLockStd::default()) +} + +#[fixture] +fn standard_tokio() -> Repo { + Repo::StdMutexTokio(RwLockStd::default()) +} + +#[fixture] +fn tokio_std() -> Repo { + Repo::Tokio(RwLockTokio::default()) +} +#[fixture] +fn tokio_mutex() -> Repo { + Repo::TokioMutexStd(RwLockTokio::default()) +} + +#[fixture] +fn tokio_tokio() -> Repo { + Repo::TokioMutexTokio(RwLockTokio::default()) +} + +type Entries = Vec<(InfoHash, EntrySingle)>; + +#[fixture] +fn empty() -> Entries { + vec![] +} + +#[fixture] +fn default() -> Entries { + vec![(InfoHash::default(), EntrySingle::default())] +} + +#[fixture] +fn started() -> Entries { + let mut torrent = EntrySingle::default(); + torrent.insert_or_update_peer(&a_started_peer(1)); + vec![(InfoHash::default(), torrent)] +} + +#[fixture] +fn completed() -> Entries { + let mut torrent = EntrySingle::default(); + torrent.insert_or_update_peer(&a_completed_peer(2)); + vec![(InfoHash::default(), torrent)] +} + +#[fixture] +fn downloaded() -> Entries { + let mut torrent = EntrySingle::default(); + let mut peer = a_started_peer(3); + torrent.insert_or_update_peer(&peer); + peer.event = AnnounceEvent::Completed; + peer.left = NumberOfBytes(0); + torrent.insert_or_update_peer(&peer); + vec![(InfoHash::default(), torrent)] +} + +#[fixture] +fn three() -> Entries { + let mut started = EntrySingle::default(); + let started_h = &mut DefaultHasher::default(); + started.insert_or_update_peer(&a_started_peer(1)); + started.hash(started_h); + + let mut completed = EntrySingle::default(); + let completed_h = &mut DefaultHasher::default(); + completed.insert_or_update_peer(&a_completed_peer(2)); + completed.hash(completed_h); + + let mut downloaded = EntrySingle::default(); + let downloaded_h = &mut DefaultHasher::default(); + let mut downloaded_peer = a_started_peer(3); + downloaded.insert_or_update_peer(&downloaded_peer); + downloaded_peer.event = AnnounceEvent::Completed; + downloaded_peer.left = NumberOfBytes(0); + downloaded.insert_or_update_peer(&downloaded_peer); + downloaded.hash(downloaded_h); + + vec![ + (InfoHash::from(&started_h.clone()), started), + (InfoHash::from(&completed_h.clone()), completed), + (InfoHash::from(&downloaded_h.clone()), downloaded), + ] +} + +#[fixture] +fn many_out_of_order() -> Entries { + let mut entries: HashSet<(InfoHash, EntrySingle)> = HashSet::default(); + + for i in 0..408 { + let mut entry = EntrySingle::default(); + entry.insert_or_update_peer(&a_started_peer(i)); + + entries.insert((InfoHash::from(&i), entry)); + } + + // we keep the random order from the hashed set for the vector. + entries.iter().map(|(i, e)| (*i, e.clone())).collect() +} + +#[fixture] +fn many_hashed_in_order() -> Entries { + let mut entries: BTreeMap = BTreeMap::default(); + + for i in 0..408 { + let mut entry = EntrySingle::default(); + entry.insert_or_update_peer(&a_started_peer(i)); + + let hash: &mut DefaultHasher = &mut DefaultHasher::default(); + hash.write_i32(i); + + entries.insert(InfoHash::from(&hash.clone()), entry); + } + + // We return the entries in-order from from the b-tree map. + entries.iter().map(|(i, e)| (*i, e.clone())).collect() +} + +#[fixture] +fn persistent_empty() -> PersistentTorrents { + PersistentTorrents::default() +} + +#[fixture] +fn persistent_single() -> PersistentTorrents { + let hash = &mut DefaultHasher::default(); + + hash.write_u8(1); + let t = [(InfoHash::from(&hash.clone()), 0_u32)]; + + t.iter().copied().collect() +} + +#[fixture] +fn persistent_three() -> PersistentTorrents { + let hash = &mut DefaultHasher::default(); + + hash.write_u8(1); + let info_1 = InfoHash::from(&hash.clone()); + hash.write_u8(2); + let info_2 = InfoHash::from(&hash.clone()); + hash.write_u8(3); + let info_3 = InfoHash::from(&hash.clone()); + + let t = [(info_1, 1_u32), (info_2, 2_u32), (info_3, 3_u32)]; + + t.iter().copied().collect() +} + +async fn make(repo: &Repo, entries: &Entries) { + for (info_hash, entry) in entries { + repo.insert(info_hash, entry.clone()).await; + } +} + +#[fixture] +fn paginated_limit_zero() -> Pagination { + Pagination::new(0, 0) +} + +#[fixture] +fn paginated_limit_one() -> Pagination { + Pagination::new(0, 1) +} + +#[fixture] +fn paginated_limit_one_offset_one() -> Pagination { + Pagination::new(1, 1) +} + +#[fixture] +fn policy_none() -> TrackerPolicy { + TrackerPolicy::new(false, 0, false) +} + +#[fixture] +fn policy_persist() -> TrackerPolicy { + TrackerPolicy::new(false, 0, true) +} + +#[fixture] +fn policy_remove() -> TrackerPolicy { + TrackerPolicy::new(true, 0, false) +} + +#[fixture] +fn policy_remove_persist() -> TrackerPolicy { + TrackerPolicy::new(true, 0, true) +} + +#[rstest] +#[case::empty(empty())] +#[case::default(default())] +#[case::started(started())] +#[case::completed(completed())] +#[case::downloaded(downloaded())] +#[case::three(three())] +#[case::out_of_order(many_out_of_order())] +#[case::in_order(many_hashed_in_order())] +#[tokio::test] +async fn it_should_get_a_torrent_entry( + #[values(standard(), standard_mutex(), standard_tokio(), tokio_std(), tokio_mutex(), tokio_tokio())] repo: Repo, + #[case] entries: Entries, +) { + make(&repo, &entries).await; + + if let Some((info_hash, torrent)) = entries.first() { + assert_eq!(repo.get(info_hash).await, Some(torrent.clone())); + } else { + assert_eq!(repo.get(&InfoHash::default()).await, None); + } +} + +#[rstest] +#[case::empty(empty())] +#[case::default(default())] +#[case::started(started())] +#[case::completed(completed())] +#[case::downloaded(downloaded())] +#[case::three(three())] +#[case::out_of_order(many_out_of_order())] +#[case::in_order(many_hashed_in_order())] +#[tokio::test] +async fn it_should_get_paginated_entries_in_a_stable_or_sorted_order( + #[values(standard(), standard_mutex(), standard_tokio(), tokio_std(), tokio_mutex(), tokio_tokio())] repo: Repo, + #[case] entries: Entries, + many_out_of_order: Entries, +) { + make(&repo, &entries).await; + + let entries_a = repo.get_paginated(None).await.iter().map(|(i, _)| *i).collect::>(); + + make(&repo, &many_out_of_order).await; + + let entries_b = repo.get_paginated(None).await.iter().map(|(i, _)| *i).collect::>(); + + let is_equal = entries_b.iter().take(entries_a.len()).copied().collect::>() == entries_a; + + let is_sorted = entries_b.windows(2).all(|w| w[0] <= w[1]); + + assert!( + is_equal || is_sorted, + "The order is unstable: {is_equal}, or is sorted {is_sorted}." + ); +} + +#[rstest] +#[case::empty(empty())] +#[case::default(default())] +#[case::started(started())] +#[case::completed(completed())] +#[case::downloaded(downloaded())] +#[case::three(three())] +#[case::out_of_order(many_out_of_order())] +#[case::in_order(many_hashed_in_order())] +#[tokio::test] +async fn it_should_get_paginated( + #[values(standard(), standard_mutex(), standard_tokio(), tokio_std(), tokio_mutex(), tokio_tokio())] repo: Repo, + #[case] entries: Entries, + #[values(paginated_limit_zero(), paginated_limit_one(), paginated_limit_one_offset_one())] paginated: Pagination, +) { + make(&repo, &entries).await; + + let mut info_hashes = repo.get_paginated(None).await.iter().map(|(i, _)| *i).collect::>(); + info_hashes.sort(); + + match paginated { + // it should return empty if limit is zero. + Pagination { limit: 0, .. } => assert_eq!(repo.get_paginated(Some(&paginated)).await, vec![]), + + // it should return a single entry if the limit is one. + Pagination { limit: 1, offset: 0 } => { + if info_hashes.is_empty() { + assert_eq!(repo.get_paginated(Some(&paginated)).await.len(), 0); + } else { + let page = repo.get_paginated(Some(&paginated)).await; + assert_eq!(page.len(), 1); + assert_eq!(page.first().map(|(i, _)| i), info_hashes.first()); + } + } + + // it should return the only the second entry if both the limit and the offset are one. + Pagination { limit: 1, offset: 1 } => { + if info_hashes.len() > 1 { + let page = repo.get_paginated(Some(&paginated)).await; + assert_eq!(page.len(), 1); + assert_eq!(page[0].0, info_hashes[1]); + } + } + // the other cases are not yet tested. + _ => {} + } +} + +#[rstest] +#[case::empty(empty())] +#[case::default(default())] +#[case::started(started())] +#[case::completed(completed())] +#[case::downloaded(downloaded())] +#[case::three(three())] +#[case::out_of_order(many_out_of_order())] +#[case::in_order(many_hashed_in_order())] +#[tokio::test] +async fn it_should_get_metrics( + #[values(standard(), standard_mutex(), standard_tokio(), tokio_std(), tokio_mutex(), tokio_tokio())] repo: Repo, + #[case] entries: Entries, +) { + use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics; + + make(&repo, &entries).await; + + let mut metrics = TorrentsMetrics::default(); + + for (_, torrent) in entries { + let stats = torrent.get_stats(); + + metrics.torrents += 1; + metrics.incomplete += u64::from(stats.incomplete); + metrics.complete += u64::from(stats.complete); + metrics.downloaded += u64::from(stats.downloaded); + } + + assert_eq!(repo.get_metrics().await, metrics); +} + +#[rstest] +#[case::empty(empty())] +#[case::default(default())] +#[case::started(started())] +#[case::completed(completed())] +#[case::downloaded(downloaded())] +#[case::three(three())] +#[case::out_of_order(many_out_of_order())] +#[case::in_order(many_hashed_in_order())] +#[tokio::test] +async fn it_should_import_persistent_torrents( + #[values(standard(), standard_mutex(), standard_tokio(), tokio_std(), tokio_mutex(), tokio_tokio())] repo: Repo, + #[case] entries: Entries, + #[values(persistent_empty(), persistent_single(), persistent_three())] persistent_torrents: PersistentTorrents, +) { + make(&repo, &entries).await; + + let mut downloaded = repo.get_metrics().await.downloaded; + persistent_torrents.iter().for_each(|(_, d)| downloaded += u64::from(*d)); + + repo.import_persistent(&persistent_torrents).await; + + assert_eq!(repo.get_metrics().await.downloaded, downloaded); + + for (entry, _) in persistent_torrents { + assert!(repo.get(&entry).await.is_some()); + } +} + +#[rstest] +#[case::empty(empty())] +#[case::default(default())] +#[case::started(started())] +#[case::completed(completed())] +#[case::downloaded(downloaded())] +#[case::three(three())] +#[case::out_of_order(many_out_of_order())] +#[case::in_order(many_hashed_in_order())] +#[tokio::test] +async fn it_should_remove_an_entry( + #[values(standard(), standard_mutex(), standard_tokio(), tokio_std(), tokio_mutex(), tokio_tokio())] repo: Repo, + #[case] entries: Entries, +) { + make(&repo, &entries).await; + + for (info_hash, torrent) in entries { + assert_eq!(repo.get(&info_hash).await, Some(torrent.clone())); + assert_eq!(repo.remove(&info_hash).await, Some(torrent)); + + assert_eq!(repo.get(&info_hash).await, None); + assert_eq!(repo.remove(&info_hash).await, None); + } + + assert_eq!(repo.get_metrics().await.torrents, 0); +} + +#[rstest] +#[case::empty(empty())] +#[case::default(default())] +#[case::started(started())] +#[case::completed(completed())] +#[case::downloaded(downloaded())] +#[case::three(three())] +#[case::out_of_order(many_out_of_order())] +#[case::in_order(many_hashed_in_order())] +#[tokio::test] +async fn it_should_remove_inactive_peers( + #[values(standard(), standard_mutex(), standard_tokio(), tokio_std(), tokio_mutex(), tokio_tokio())] repo: Repo, + #[case] entries: Entries, +) { + use std::ops::Sub as _; + use std::time::Duration; + + use torrust_tracker_clock::clock::stopped::Stopped as _; + use torrust_tracker_clock::clock::{self, Time as _}; + use torrust_tracker_primitives::peer; + + use crate::CurrentClock; + + const TIMEOUT: Duration = Duration::from_secs(120); + const EXPIRE: Duration = Duration::from_secs(121); + + make(&repo, &entries).await; + + let info_hash: InfoHash; + let mut peer: peer::Peer; + + // Generate a new infohash and peer. + { + let hash = &mut DefaultHasher::default(); + hash.write_u8(255); + info_hash = InfoHash::from(&hash.clone()); + peer = a_completed_peer(-1); + } + + // Set the last updated time of the peer to be 121 seconds ago. + { + let now = clock::Working::now(); + clock::Stopped::local_set(&now); + + peer.updated = now.sub(EXPIRE); + } + + // Insert the infohash and peer into the repository + // and verify there is an extra torrent entry. + { + repo.update_torrent_with_peer_and_get_stats(&info_hash, &peer).await; + assert_eq!(repo.get_metrics().await.torrents, entries.len() as u64 + 1); + } + + // Verify that this new peer was inserted into the repository. + { + let entry = repo.get(&info_hash).await.expect("it_should_get_some"); + assert!(entry.get_peers(None).contains(&peer.into())); + } + + // Remove peers that have not been updated since the timeout (120 seconds ago). + { + repo.remove_inactive_peers(CurrentClock::now_sub(&TIMEOUT).expect("it should get a time passed")) + .await; + } + + // Verify that the this peer was removed from the repository. + { + let entry = repo.get(&info_hash).await.expect("it_should_get_some"); + assert!(!entry.get_peers(None).contains(&peer.into())); + } +} + +#[rstest] +#[case::empty(empty())] +#[case::default(default())] +#[case::started(started())] +#[case::completed(completed())] +#[case::downloaded(downloaded())] +#[case::three(three())] +#[case::out_of_order(many_out_of_order())] +#[case::in_order(many_hashed_in_order())] +#[tokio::test] +async fn it_should_remove_peerless_torrents( + #[values(standard(), standard_mutex(), standard_tokio(), tokio_std(), tokio_mutex(), tokio_tokio())] repo: Repo, + #[case] entries: Entries, + #[values(policy_none(), policy_persist(), policy_remove(), policy_remove_persist())] policy: TrackerPolicy, +) { + make(&repo, &entries).await; + + repo.remove_peerless_torrents(&policy).await; + + let torrents = repo.get_paginated(None).await; + + for (_, entry) in torrents { + assert!(entry.is_good(&policy)); + } +}