From c0450629a29a481b4eb2eb6a337edc30c36e641e Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Wed, 16 Oct 2024 06:56:43 +0300 Subject: [PATCH 01/14] Replace `tokio::sync::mpsc` with `futures::channel::mpsc` in farmer cache --- crates/subspace-farmer/src/farmer_cache.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/crates/subspace-farmer/src/farmer_cache.rs b/crates/subspace-farmer/src/farmer_cache.rs index 025736858a..99888878f1 100644 --- a/crates/subspace-farmer/src/farmer_cache.rs +++ b/crates/subspace-farmer/src/farmer_cache.rs @@ -15,8 +15,9 @@ use crate::node_client::NodeClient; use crate::utils::run_future_in_dedicated_thread; use async_lock::RwLock as AsyncRwLock; use event_listener_primitives::{Bag, HandlerId}; +use futures::channel::mpsc; use futures::stream::{FuturesOrdered, FuturesUnordered}; -use futures::{select, FutureExt, StreamExt}; +use futures::{select, FutureExt, SinkExt, StreamExt}; use prometheus_client::registry::Registry; use rayon::prelude::*; use std::collections::{HashMap, HashSet}; @@ -33,7 +34,6 @@ use subspace_networking::libp2p::PeerId; use subspace_networking::utils::multihash::ToMultihash; use subspace_networking::{KeyWithDistance, LocalRecordProvider}; use tokio::runtime::Handle; -use tokio::sync::mpsc; use tokio::task::{block_in_place, yield_now}; use tracing::{debug, error, info, info_span, trace, warn, Instrument}; @@ -170,7 +170,7 @@ where .expect("Always set during worker instantiation"); if let Some(WorkerCommand::ReplaceBackingCaches { new_piece_caches }) = - worker_receiver.recv().await + worker_receiver.next().await { self.initialize( &piece_getter, @@ -200,7 +200,7 @@ where loop { select! { - maybe_command = worker_receiver.recv().fuse() => { + maybe_command = worker_receiver.next() => { let Some(command) = maybe_command else { // Nothing else left to do return; @@ -995,7 +995,7 @@ pub struct FarmerCache { plot_caches: Arc, handlers: Arc, // We do not want to increase capacity unnecessarily on clone - worker_sender: Arc>, + worker_sender: mpsc::Sender, metrics: Option>, } @@ -1032,7 +1032,7 @@ where piece_caches: Arc::clone(&caches), plot_caches: Arc::clone(&plot_caches), handlers: Arc::clone(&handlers), - worker_sender: Arc::new(worker_sender), + worker_sender, metrics: metrics.clone(), }; let worker = FarmerCacheWorker { @@ -1098,6 +1098,7 @@ where if let Err(error) = self .worker_sender + .clone() .send(WorkerCommand::ForgetKey { key }) .await { @@ -1236,6 +1237,7 @@ where ) { if let Err(error) = self .worker_sender + .clone() .send(WorkerCommand::ReplaceBackingCaches { new_piece_caches }) .await { From bfe4033c389534765e2cf5c286cfc635f73994a6 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Wed, 16 Oct 2024 07:56:05 +0300 Subject: [PATCH 02/14] Add `FarmerCache::get_pieces()` method that can get many pieces concurrently from different caches --- crates/subspace-farmer/src/farmer_cache.rs | 188 +++++++++++++++++- crates/subspace-farmer/src/lib.rs | 1 + .../src/utils/piece_provider.rs | 6 +- 3 files changed, 192 insertions(+), 3 deletions(-) diff --git a/crates/subspace-farmer/src/farmer_cache.rs b/crates/subspace-farmer/src/farmer_cache.rs index 99888878f1..86409eceeb 100644 --- a/crates/subspace-farmer/src/farmer_cache.rs +++ b/crates/subspace-farmer/src/farmer_cache.rs @@ -17,13 +17,16 @@ use async_lock::RwLock as AsyncRwLock; use event_listener_primitives::{Bag, HandlerId}; use futures::channel::mpsc; use futures::stream::{FuturesOrdered, FuturesUnordered}; -use futures::{select, FutureExt, SinkExt, StreamExt}; +use futures::{select, stream, FutureExt, SinkExt, Stream, StreamExt}; use prometheus_client::registry::Registry; use rayon::prelude::*; +use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; +use std::future::join; use std::hash::Hash; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; +use std::task::Poll; use std::time::Duration; use std::{fmt, mem}; use subspace_core_primitives::pieces::{Piece, PieceIndex}; @@ -1128,6 +1131,189 @@ where None } + /// Get pieces from cache. + /// + /// Number of elements in returned stream is the same as in `piece_indices`. + pub async fn get_pieces<'a, PieceIndices>( + &'a self, + piece_indices: PieceIndices, + ) -> impl Stream)> + Send + Unpin + 'a + where + PieceIndices: IntoIterator + Send + 'a, + { + let mut pieces_to_get_from_plot_cache = Vec::new(); + + let reading_from_piece_cache = { + let caches = self.piece_caches.read().await; + // Pieces to read from piece cache grouped by backend for efficiency reasons + let mut reading_from_piece_cache = HashMap::)>::new(); + + for piece_index in piece_indices { + let key = RecordKey::from(piece_index.to_multihash()); + + let offset = match caches.get_stored_piece(&KeyWithDistance::new_with_record_key( + self.peer_id, + key.clone(), + )) { + Some(offset) => offset, + None => { + pieces_to_get_from_plot_cache.push((piece_index, key)); + continue; + } + }; + + let cache_index = offset.cache_index; + let piece_offset = offset.piece_offset; + + match reading_from_piece_cache.entry(cache_index) { + Entry::Occupied(mut entry) => { + let (_backend, pieces) = entry.get_mut(); + pieces.push((piece_index, piece_offset, key)); + } + Entry::Vacant(entry) => { + let backend = match caches.get_backend(cache_index) { + Some(backend) => backend.clone(), + None => { + pieces_to_get_from_plot_cache.push((piece_index, key)); + continue; + } + }; + entry.insert((backend, vec![(piece_index, piece_offset, key)])); + } + } + } + + reading_from_piece_cache + }; + + let (tx, mut rx) = mpsc::unbounded(); + + let mut fut = Box::pin(async move { + let tx = &tx; + + let mut reading_from_piece_cache = reading_from_piece_cache + .into_iter() + .map(|(cache_index, (backend, pieces))| async move { + // TODO: Read a stream of pieces from each backend rather than read + // individual pieces sequentially, but `PieceCache` API ideally needs to be + // extended first to support this more efficiently, especially over the + // network + for (piece_index, piece_offset, key) in pieces { + let maybe_piece = match backend.read_piece(piece_offset).await { + Ok(Some((_piece_index, piece))) => { + if let Some(metrics) = &self.metrics { + metrics.cache_get_hit.inc(); + } + Some(piece) + } + Ok(None) => { + if let Some(metrics) = &self.metrics { + metrics.cache_get_miss.inc(); + } + None + } + Err(error) => { + error!( + %error, + %cache_index, + %piece_index, + %piece_offset, + ?key, + "Error while reading piece from cache, might be a disk \ + corruption" + ); + + if let Err(error) = self + .worker_sender + .clone() + .send(WorkerCommand::ForgetKey { key }) + .await + { + trace!( + %error, + "Failed to send ForgetKey command to worker" + ); + } + + if let Some(metrics) = &self.metrics { + metrics.cache_get_error.inc(); + } + None + } + }; + + tx.unbounded_send((piece_index, maybe_piece)) + .expect("This future isn't polled after receiver is dropped; qed"); + } + }) + .collect::>(); + // TODO: Can't use this due to https://github.com/rust-lang/rust/issues/64650 + // Simply drain everything + // .for_each(|()| async {}) + + // TODO: Remove once https://github.com/rust-lang/rust/issues/64650 is resolved + let reading_from_piece_cache_fut = async move { + while let Some(()) = reading_from_piece_cache.next().await { + // Simply drain everything + } + }; + + let reading_from_plot_cache_fut = async { + if pieces_to_get_from_plot_cache.is_empty() { + return; + } + + for cache in self.plot_caches.caches.read().await.iter() { + // Iterating over offsets in reverse order to both traverse elements in async code + // and being able to efficiently remove entries without extra allocations + for offset in (0..pieces_to_get_from_plot_cache.len()).rev() { + let (piece_index, key) = &pieces_to_get_from_plot_cache[offset]; + + if let Ok(Some(piece)) = cache.read_piece(key).await { + if let Some(metrics) = &self.metrics { + metrics.cache_get_hit.inc(); + } + tx.unbounded_send((*piece_index, Some(piece))) + .expect("This future isn't polled after receiver is dropped; qed"); + + // Due to iteration in reverse order and swapping using elements at the end, + // this doesn't affect processing of the elements + pieces_to_get_from_plot_cache.swap_remove(offset); + } + } + + if pieces_to_get_from_plot_cache.is_empty() { + return; + } + } + + if let Some(metrics) = &self.metrics { + metrics + .cache_get_miss + .inc_by(pieces_to_get_from_plot_cache.len() as u64); + } + + for (piece_index, _key) in pieces_to_get_from_plot_cache { + tx.unbounded_send((piece_index, None)) + .expect("This future isn't polled after receiver is dropped; qed"); + } + }; + + join!(reading_from_piece_cache_fut, reading_from_plot_cache_fut).await + }); + + // Drive above future and stream back any pieces that were downloaded so far + stream::poll_fn(move |cx| { + let end_result = fut.poll_unpin(cx); + + if let Ok(maybe_result) = rx.try_next() { + return Poll::Ready(maybe_result); + } + + end_result.map(|((), ())| None) + }) + } + /// Returns a filtered list of pieces that were found in farmer cache, order is not guaranteed pub async fn has_pieces(&self, mut piece_indices: Vec) -> Vec { let mut pieces_to_find = HashMap::::from_iter( diff --git a/crates/subspace-farmer/src/lib.rs b/crates/subspace-farmer/src/lib.rs index 0415c4bf3c..54921112ea 100644 --- a/crates/subspace-farmer/src/lib.rs +++ b/crates/subspace-farmer/src/lib.rs @@ -7,6 +7,7 @@ duration_constructors, exact_size_is_empty, fmt_helpers_for_derive, + future_join, hash_extract_if, impl_trait_in_assoc_type, int_roundings, diff --git a/crates/subspace-networking/src/utils/piece_provider.rs b/crates/subspace-networking/src/utils/piece_provider.rs index 5a32620cbd..e7ce87a2c5 100644 --- a/crates/subspace-networking/src/utils/piece_provider.rs +++ b/crates/subspace-networking/src/utils/piece_provider.rs @@ -78,11 +78,13 @@ where } } - /// Get pieces with provided indices from cache + /// Get pieces with provided indices from cache. + /// + /// Number of elements in returned stream is the same as in `piece_indices`. pub async fn get_from_cache<'a, PieceIndices>( &'a self, piece_indices: PieceIndices, - ) -> impl Stream)> + 'a + ) -> impl Stream)> + Unpin + 'a where PieceIndices: IntoIterator + 'a, { From a224edcc6e94ca31543d848ac6a1fd586aab4d56 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Wed, 16 Oct 2024 11:10:07 +0300 Subject: [PATCH 03/14] Implement `PieceGetter::get_pieces()` in `FarmerPieceGetter` --- .../src/farmer_piece_getter.rs | 130 ++++++++++++++++++ .../src/utils/piece_provider.rs | 8 +- 2 files changed, 135 insertions(+), 3 deletions(-) diff --git a/crates/subspace-farmer/src/farmer_piece_getter.rs b/crates/subspace-farmer/src/farmer_piece_getter.rs index dd6f8ad606..14b5fc49ac 100644 --- a/crates/subspace-farmer/src/farmer_piece_getter.rs +++ b/crates/subspace-farmer/src/farmer_piece_getter.rs @@ -8,11 +8,15 @@ use async_trait::async_trait; use backoff::backoff::Backoff; use backoff::future::retry; use backoff::ExponentialBackoff; +use futures::channel::mpsc; +use futures::stream::FuturesUnordered; +use futures::{stream, FutureExt, Stream, StreamExt}; use std::fmt; use std::hash::Hash; use std::num::NonZeroUsize; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::{Arc, Weak}; +use std::task::Poll; use subspace_core_primitives::pieces::{Piece, PieceIndex}; use subspace_farmer_components::PieceGetter; use subspace_networking::utils::multihash::ToMultihash; @@ -272,6 +276,132 @@ where ); Ok(None) } + + async fn get_pieces<'a, PieceIndices>( + &'a self, + piece_indices: PieceIndices, + ) -> anyhow::Result< + Box>)> + Send + Unpin + 'a>, + > + where + PieceIndices: IntoIterator + Send + 'a, + { + let (tx, mut rx) = mpsc::unbounded(); + + let mut fut = Box::pin(async move { + let tx = &tx; + + debug!("Getting pieces from farmer cache"); + let mut pieces_not_found_in_farmer_cache = Vec::new(); + let mut pieces_in_farmer_cache = + self.inner.farmer_cache.get_pieces(piece_indices).await; + + while let Some((piece_index, maybe_piece)) = pieces_in_farmer_cache.next().await { + let Some(piece) = maybe_piece else { + pieces_not_found_in_farmer_cache.push(piece_index); + continue; + }; + tx.unbounded_send((piece_index, Ok(Some(piece)))) + .expect("This future isn't polled after receiver is dropped; qed"); + } + + if pieces_not_found_in_farmer_cache.is_empty() { + return; + } + + debug!( + remaining_piece_count = %pieces_not_found_in_farmer_cache.len(), + "Getting pieces from DSN cache" + ); + let mut pieces_not_found_in_dsn_cache = Vec::new(); + let mut pieces_in_dsn_cache = self + .inner + .piece_provider + .get_from_cache(pieces_not_found_in_farmer_cache) + .await; + + while let Some((piece_index, maybe_piece)) = pieces_in_dsn_cache.next().await { + let Some(piece) = maybe_piece else { + pieces_not_found_in_dsn_cache.push(piece_index); + continue; + }; + // TODO: Would be nice to have concurrency here + self.inner + .farmer_cache + .maybe_store_additional_piece(piece_index, &piece) + .await; + tx.unbounded_send((piece_index, Ok(Some(piece)))) + .expect("This future isn't polled after receiver is dropped; qed"); + } + + if pieces_not_found_in_dsn_cache.is_empty() { + return; + } + + debug!( + remaining_piece_count = %pieces_not_found_in_dsn_cache.len(), + "Getting pieces from node" + ); + let pieces_not_found_on_node = pieces_not_found_in_dsn_cache + .into_iter() + .map(|piece_index| async move { + match self.inner.node_client.piece(piece_index).await { + Ok(Some(piece)) => { + trace!(%piece_index, "Got piece from node successfully"); + self.inner + .farmer_cache + .maybe_store_additional_piece(piece_index, &piece) + .await; + + tx.unbounded_send((piece_index, Ok(Some(piece)))) + .expect("This future isn't polled after receiver is dropped; qed"); + None + } + Ok(None) => Some(piece_index), + Err(error) => { + error!( + %error, + %piece_index, + "Failed to retrieve first segment piece from node" + ); + Some(piece_index) + } + } + }) + .collect::>() + .filter_map(|maybe_piece_index| async move { maybe_piece_index }) + .collect::>() + .await; + + debug!( + remaining_piece_count = %pieces_not_found_on_node.len(), + "Some pieces were not easily reachable" + ); + pieces_not_found_on_node + .into_iter() + .map(|piece_index| async move { + let maybe_piece = self.get_piece_slow_internal(piece_index).await; + + tx.unbounded_send((piece_index, Ok(maybe_piece))) + .expect("This future isn't polled after receiver is dropped; qed"); + }) + .collect::>() + // Simply drain everything + .for_each(|()| async {}) + .await; + }); + + // Drive above future and stream back any pieces that were downloaded so far + Ok(Box::new(stream::poll_fn(move |cx| { + let end_result = fut.poll_unpin(cx); + + if let Ok(maybe_result) = rx.try_next() { + return Poll::Ready(maybe_result); + } + + end_result.map(|()| None) + }))) + } } /// Weak farmer piece getter, can be upgraded to [`FarmerPieceGetter`] diff --git a/crates/subspace-networking/src/utils/piece_provider.rs b/crates/subspace-networking/src/utils/piece_provider.rs index e7ce87a2c5..e0cf6d2166 100644 --- a/crates/subspace-networking/src/utils/piece_provider.rs +++ b/crates/subspace-networking/src/utils/piece_provider.rs @@ -428,7 +428,6 @@ impl RecordStore for DummyRecordStore { struct KademliaWrapper { local_peer_id: PeerId, kademlia: Kademlia, - noop_context: Context<'static>, } impl KademliaWrapper { @@ -436,7 +435,6 @@ impl KademliaWrapper { Self { local_peer_id, kademlia: Kademlia::new(local_peer_id, DummyRecordStore), - noop_context: Context::from_waker(noop_waker_ref()), } } @@ -444,7 +442,11 @@ impl KademliaWrapper { for address in addresses { self.kademlia.add_address(peer_id, address); } - while self.kademlia.poll(&mut self.noop_context).is_ready() { + while self + .kademlia + .poll(&mut Context::from_waker(noop_waker_ref())) + .is_ready() + { // Simply drain useless events generated by above calls } } From 5f711a48cdd45215bfcbf1af8f2a97a4dde5c692 Mon Sep 17 00:00:00 2001 From: teor Date: Wed, 16 Oct 2024 09:10:10 +1000 Subject: [PATCH 04/14] Initial subspace-gateway binary which does nothing --- Cargo.lock | 16 ++++ README.md | 3 +- crates/subspace-gateway/Cargo.toml | 29 +++++++ crates/subspace-gateway/README.md | 74 +++++++++++++++++ crates/subspace-gateway/src/commands.rs | 92 +++++++++++++++++++++ crates/subspace-gateway/src/commands/run.rs | 66 +++++++++++++++ crates/subspace-gateway/src/main.rs | 45 ++++++++++ 7 files changed, 324 insertions(+), 1 deletion(-) create mode 100644 crates/subspace-gateway/Cargo.toml create mode 100644 crates/subspace-gateway/README.md create mode 100644 crates/subspace-gateway/src/commands.rs create mode 100644 crates/subspace-gateway/src/commands/run.rs create mode 100644 crates/subspace-gateway/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index 10818cb057..5303a9e6b2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12574,6 +12574,22 @@ dependencies = [ "winapi", ] +[[package]] +name = "subspace-gateway" +version = "0.1.0" +dependencies = [ + "anyhow", + "clap", + "fdlimit", + "futures", + "mimalloc", + "supports-color", + "thiserror", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "subspace-kzg" version = "0.1.0" diff --git a/README.md b/README.md index 0a21038b52..fe7fed60c3 100644 --- a/README.md +++ b/README.md @@ -15,11 +15,12 @@ The structure of this repository is the following: - `crates` contains Subspace-specific Rust crates used to build node and farmer, most are following Substrate naming conventions - `subspace-node` is an implementation of the node for Subspace protocol - `subspace-farmer` is a CLI farmer app + - `subspace-gateway` is a Subspace Distributed Storage Network gateway - `domains` contains client and runtime code for decoupled execution and domains - `shared` contains low-level primitives used by the node, farmer, and other applications ## How to run -Please refer to [farming.md](/docs/farming.md) on how to run farmer. +Please refer to [farming.md](/docs/farming.md) for how to run the farmer. If you are looking to farm offline, or build from source for development purposes please refer to [development.md](/docs/development.md). diff --git a/crates/subspace-gateway/Cargo.toml b/crates/subspace-gateway/Cargo.toml new file mode 100644 index 0000000000..739eca1990 --- /dev/null +++ b/crates/subspace-gateway/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "subspace-gateway" +version = "0.1.0" +authors = ["Teor "] +description = "A Subspace Network data gateway." +edition = "2021" +license = "MIT OR Apache-2.0" +homepage = "https://subspace.network" +repository = "https://github.com/autonomys/subspace" +include = [ + "/src", + "/Cargo.toml", + "/README.md" +] + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] + +[dependencies] +anyhow = "1.0.89" +clap = { version = "4.5.18", features = ["derive"] } +fdlimit = "0.3.0" +futures = "0.3.30" +mimalloc = "0.1.43" +supports-color = "3.0.1" +thiserror = "1.0.64" +tokio = { version = "1.40.0", features = ["macros"] } +tracing = "0.1.40" +tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } diff --git a/crates/subspace-gateway/README.md b/crates/subspace-gateway/README.md new file mode 100644 index 0000000000..4f6869d7a0 --- /dev/null +++ b/crates/subspace-gateway/README.md @@ -0,0 +1,74 @@ +# Subspace Gateway + +Data Gateway implementation for Subspace Network Blockchain using [Substrate](https://docs.substrate.io/) framework. + +## Getting Started + +Follow these steps to get started with the Subspace Gateway :hammer_and_wrench: + +## Running + +It is recommended to follow general farming instructions that explain how to run both farmer and node together. + +## Build from source + +A Rust toolchain is required to compile this repository, but there are some extra dependencies for the gateway. + +`protoc` is required for `libp2p`. + +### Ubuntu + +LLVM/Clang and `make` are necessary: +```bash +sudo apt-get install llvm clang cmake make protobuf-compiler +``` + +### macOS + +1. Install via Homebrew: + +```bash +brew install llvm cmake make protobuf +``` + +2. Add `llvm` to your `~/.zshrc` or `~/.bashrc`: + +```bash +export PATH="/opt/homebrew/opt/llvm/bin:$PATH" +``` + +3. Activate the changes: + +```bash +source ~/.zshrc +``` + +4. Verify that `llvm` is installed: + +```bash +llvm-config --version +``` + +### Build + +Then build the gateway using Cargo: +``` +cargo build --profile production --bin subspace-gateway +target/production/subspace-gateway --version +``` + +#### Start the gateway + +Start a gateway connected to a single node development chain: +```bash +target/production/subspace-gateway run \ + --dev +``` + +### Embedded Docs + +Once the project has been built, the following command can be used to explore all parameters and subcommands: + +```bash +target/production/subspace-gateway --help +``` diff --git a/crates/subspace-gateway/src/commands.rs b/crates/subspace-gateway/src/commands.rs new file mode 100644 index 0000000000..cbdf1190c6 --- /dev/null +++ b/crates/subspace-gateway/src/commands.rs @@ -0,0 +1,92 @@ +//! Gateway subcommands. + +pub(crate) mod run; + +use crate::commands::run::RunOptions; +use clap::Parser; +use tokio::signal; +use tracing::level_filters::LevelFilter; +use tracing::{debug, warn}; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; +use tracing_subscriber::{fmt, EnvFilter, Layer}; + +/// Commands for working with a gateway. +#[derive(Debug, Parser)] +#[clap(about, version)] +pub enum Command { + /// Run data gateway + Run(RunOptions), + // TODO: subcommand to run various benchmarks +} + +pub(crate) fn init_logger() { + // TODO: Workaround for https://github.com/tokio-rs/tracing/issues/2214, also on + // Windows terminal doesn't support the same colors as bash does + let enable_color = if cfg!(windows) { + false + } else { + supports_color::on(supports_color::Stream::Stderr).is_some() + }; + tracing_subscriber::registry() + .with( + fmt::layer().with_ansi(enable_color).with_filter( + EnvFilter::builder() + .with_default_directive(LevelFilter::INFO.into()) + .from_env_lossy(), + ), + ) + .init(); +} + +pub(crate) fn raise_fd_limit() { + match fdlimit::raise_fd_limit() { + Ok(fdlimit::Outcome::LimitRaised { from, to }) => { + debug!( + "Increased file descriptor limit from previous (most likely soft) limit {} to \ + new (most likely hard) limit {}", + from, to + ); + } + Ok(fdlimit::Outcome::Unsupported) => { + // Unsupported platform (a platform other than Linux or macOS) + } + Err(error) => { + warn!( + "Failed to increase file descriptor limit for the process due to an error: {}.", + error + ); + } + } +} + +#[cfg(unix)] +pub(crate) async fn shutdown_signal() { + use futures::FutureExt; + use std::pin::pin; + + futures::future::select( + pin!(signal::unix::signal(signal::unix::SignalKind::interrupt()) + .expect("Setting signal handlers must never fail") + .recv() + .map(|_| { + tracing::info!("Received SIGINT, shutting down gateway..."); + }),), + pin!(signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("Setting signal handlers must never fail") + .recv() + .map(|_| { + tracing::info!("Received SIGTERM, shutting down gateway..."); + }),), + ) + .await; +} + +#[cfg(not(unix))] +pub(crate) async fn shutdown_signal() { + signal::ctrl_c() + .await + .expect("Setting signal handlers must never fail"); + + tracing::info!("Received Ctrl+C, shutting down gateway..."); +} diff --git a/crates/subspace-gateway/src/commands/run.rs b/crates/subspace-gateway/src/commands/run.rs new file mode 100644 index 0000000000..1733699a1e --- /dev/null +++ b/crates/subspace-gateway/src/commands/run.rs @@ -0,0 +1,66 @@ +//! Gateway run command. +//! This is the primary command for the gateway. + +use crate::commands::shutdown_signal; +use clap::Parser; +use futures::{select, FutureExt}; +use std::pin::pin; +use std::{env, future}; +use tracing::info; + +/// Options for running a node +#[derive(Debug, Parser)] +pub struct RunOptions { + #[clap(flatten)] + gateway: GatewayOptions, +} + +/// Options for running a gateway +#[derive(Debug, Parser)] +pub(super) struct GatewayOptions { + /// Enable development mode. + #[arg(long)] + dev: bool, +} + +/// Default run command for gateway +#[expect(clippy::redundant_locals, reason = "code is incomplete")] +pub async fn run(run_options: RunOptions) -> anyhow::Result<()> { + let signal = shutdown_signal(); + + let RunOptions { + gateway: GatewayOptions { dev: _ }, + } = run_options; + + info!("Subspace Gateway"); + info!("✌️ version {}", env!("CARGO_PKG_VERSION")); + info!("❤️ by {}", env!("CARGO_PKG_AUTHORS")); + + let dsn_fut = future::pending::<()>(); + let rpc_fut = future::pending::<()>(); + + // This defines order in which things are dropped + let dsn_fut = dsn_fut; + let rpc_fut = rpc_fut; + + let dsn_fut = pin!(dsn_fut); + let rpc_fut = pin!(rpc_fut); + + select! { + // Signal future + () = signal.fuse() => {}, + + // Networking future + () = dsn_fut.fuse() => { + info!("DSN network runner exited."); + }, + + // RPC service future + () = rpc_fut.fuse() => { + info!("RPC server exited."); + }, + + } + + anyhow::Ok(()) +} diff --git a/crates/subspace-gateway/src/main.rs b/crates/subspace-gateway/src/main.rs new file mode 100644 index 0000000000..0e30450697 --- /dev/null +++ b/crates/subspace-gateway/src/main.rs @@ -0,0 +1,45 @@ +//! Subspace gateway implementation. + +// TODO: Remove +#![allow( + clippy::needless_return, + reason = "https://github.com/rust-lang/rust-clippy/issues/13458" +)] + +mod commands; + +use crate::commands::{init_logger, raise_fd_limit, Command}; +use clap::Parser; + +#[global_allocator] +static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; + +/// Subspace gateway error. +#[derive(thiserror::Error, Debug)] +pub enum Error { + /// Other kind of error. + #[error("Other: {0}")] + Other(String), +} + +impl From for Error { + #[inline] + fn from(s: String) -> Self { + Self::Other(s) + } +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + init_logger(); + raise_fd_limit(); + + let command = Command::parse(); + + match command { + Command::Run(run_options) => { + commands::run::run(run_options).await?; + } + } + Ok(()) +} From ba7edfc8ba60003ac9b8a8defda9e282bb8f7d6c Mon Sep 17 00:00:00 2001 From: teor Date: Thu, 17 Oct 2024 11:08:57 +1000 Subject: [PATCH 05/14] Simplify CLI doc phrasing --- .../bin/subspace-farmer/commands/benchmark.rs | 12 +-- .../bin/subspace-farmer/commands/cluster.rs | 2 +- .../commands/cluster/controller.rs | 2 +- .../commands/cluster/farmer.rs | 20 ++--- .../commands/cluster/plotter.rs | 58 +++++++------- .../src/bin/subspace-farmer/commands/farm.rs | 76 +++++++++---------- .../commands/shared/network.rs | 16 ++-- .../subspace-networking/examples/benchmark.rs | 4 +- .../examples/random-walker.rs | 4 +- .../src/bin/subspace-bootstrap-node/main.rs | 17 +++-- crates/subspace-networking/src/node.rs | 2 +- .../src/commands/run/consensus.rs | 41 +++++----- crates/subspace-service/src/dsn.rs | 8 +- 13 files changed, 131 insertions(+), 131 deletions(-) diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/benchmark.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/benchmark.rs index d68143975f..0c534f8033 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/benchmark.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/benchmark.rs @@ -34,9 +34,9 @@ pub(crate) struct AuditOptions { #[arg(long)] with_single: bool, /// Size of PER FARM thread pool used for farming (mostly for blocking I/O, but also for some - /// compute-intensive operations during proving), defaults to number of logical CPUs - /// available on UMA system and number of logical CPUs in first NUMA node on NUMA system, but - /// not more than 32 threads + /// compute-intensive operations during proving). Defaults to the number of logical CPUs + /// on UMA systems, or the number of logical CPUs in the first NUMA node on NUMA systems, but + /// limited to 32 threads. #[arg(long)] farming_thread_pool_size: Option, /// Disk farm to audit @@ -57,9 +57,9 @@ pub(crate) struct ProveOptions { #[arg(long)] with_single: bool, /// Size of PER FARM thread pool used for farming (mostly for blocking I/O, but also for some - /// compute-intensive operations during proving), defaults to number of logical CPUs - /// available on UMA system and number of logical CPUs in first NUMA node on NUMA system, but - /// not more than 32 threads + /// compute-intensive operations during proving). Defaults to the number of logical CPUs + /// on UMA systems, or the number of logical CPUs in the first NUMA node on NUMA systems, but + /// limited to 32 threads. #[arg(long)] farming_thread_pool_size: Option, /// Disk farm to prove diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster.rs index a713149993..ba042d06f5 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster.rs @@ -52,7 +52,7 @@ struct SharedArgs { /// which can be done by starting NATS server with config file containing `max_payload = 2MB`. #[arg(long = "nats-server", required = true)] nats_servers: Vec, - /// Defines endpoints for the prometheus metrics server. It doesn't start without at least + /// Endpoints for the prometheus metrics server. It doesn't start without at least /// one specified endpoint. Format: 127.0.0.1:8080 #[arg(long, aliases = ["metrics-endpoint", "metrics-endpoints"])] prometheus_listen_on: Vec, diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs index 9457134395..71fda172ea 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs @@ -45,7 +45,7 @@ const GET_PIECE_MAX_INTERVAL: Duration = Duration::from_secs(40); pub(super) struct ControllerArgs { /// Piece getter concurrency. /// - /// Increase will result in higher memory usage. + /// Increasing this value will cause higher memory usage. #[arg(long, default_value = "128")] piece_getter_concurrency: NonZeroUsize, /// Base path where to store P2P network identity diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/farmer.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/farmer.rs index 9ca65ddbee..81f37d5fb3 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/farmer.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/farmer.rs @@ -46,13 +46,13 @@ pub(super) const FARMER_IDENTIFICATION_BROADCAST_INTERVAL: Duration = Duration:: /// Arguments for farmer #[derive(Debug, Parser)] pub(super) struct FarmerArgs { - /// One or more farm located at specified path, each with its own allocated space. + /// One or more farms located at specified paths, each with its own allocated space. /// /// In case of multiple disks, it is recommended to specify them individually rather than using /// RAID 0, that way farmer will be able to better take advantage of concurrency of individual /// drives. /// - /// Format for each farm is coma-separated list of strings like this: + /// The format for each farm is a coma-separated list of strings like this: /// /// path=/path/to/directory,size=5T /// @@ -65,33 +65,33 @@ pub(super) struct FarmerArgs { /// Address for farming rewards #[arg(long, value_parser = parse_ss58_reward_address)] reward_address: PublicKey, - /// Run temporary farmer with specified farm size in human-readable format (e.g. 10GB, 2TiB) or + /// Run a temporary farmer with a farm size in human-readable format (e.g. 10GB, 2TiB) or /// just bytes (e.g. 4096), this will create a temporary directory that will be deleted at the /// end of the process. #[arg(long, conflicts_with = "disk_farms")] tmp: Option, - /// Maximum number of pieces in sector (can override protocol value to something lower). + /// Maximum number of pieces in a sector (can override protocol value to something lower). /// /// This will make plotting of individual sectors faster, decrease load on CPU proving, but also /// proportionally increase amount of disk reads during audits since every sector needs to be /// audited and there will be more of them. /// - /// This is primarily for development and not recommended to use by regular users. + /// This is primarily for development and not recommended for regular users. #[arg(long)] max_pieces_in_sector: Option, /// Do not print info about configured farms on startup #[arg(long)] no_info: bool, - /// Defines max number sectors farmer will encode concurrently, defaults to 50. Might be limited + /// The maximum number sectors a farmer will encode concurrently, defaults to 50. Might be limited /// by plotting capacity available in the cluster. /// - /// Increase will result in higher memory usage. + /// Increasing this value will cause higher memory usage. #[arg(long, default_value = "50")] sector_encoding_concurrency: NonZeroUsize, /// Size of PER FARM thread pool used for farming (mostly for blocking I/O, but also for some - /// compute-intensive operations during proving), defaults to number of logical CPUs - /// available on UMA system and number of logical CPUs in first NUMA node on NUMA system, but - /// not more than 32 threads + /// compute-intensive operations during proving). Defaults to the number of logical CPUs + /// on UMA systems, or the number of logical CPUs in the first NUMA node on NUMA systems, but + /// limited to 32 threads. #[arg(long)] farming_thread_pool_size: Option, /// Disable farm locking, for example if file system doesn't support it diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/plotter.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/plotter.rs index bb2e9a89c6..f10b5269dc 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/plotter.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/plotter.rs @@ -35,14 +35,14 @@ const PLOTTING_RETRY_INTERVAL: Duration = Duration::from_secs(5); #[derive(Debug, Parser)] struct CpuPlottingOptions { - /// Defines how many sectors farmer will download concurrently, allows to limit memory usage of - /// the plotting process, defaults to `--cpu-sector-encoding-concurrency` + 1 to download future + /// How many sectors the farmer will download concurrently. Limits the memory usage of + /// the plotting process. Defaults to `--cpu-sector-encoding-concurrency` + 1 to download future /// sector ahead of time. /// - /// Increase will result in higher memory usage. + /// Increasing this value will cause higher memory usage. #[arg(long)] cpu_sector_downloading_concurrency: Option, - /// Defines how many sectors farmer will encode concurrently, defaults to 1 on UMA system and + /// How many sectors the farmer will encode concurrently. Defaults to 1 on UMA system and the /// number of NUMA nodes on NUMA system or L3 cache groups on large CPUs. It is further /// restricted by /// `--cpu-sector-downloading-concurrency` and setting this option higher than @@ -50,27 +50,27 @@ struct CpuPlottingOptions { /// /// CPU plotting is disabled by default if GPU plotting is detected. /// - /// Increase will result in higher memory usage, setting to 0 will disable CPU plotting. + /// Increasing this value will cause higher memory usage. Set to 0 to disable CPU plotting. #[arg(long)] cpu_sector_encoding_concurrency: Option, - /// Defines how many records farmer will encode in a single sector concurrently, defaults to one + /// How many records the farmer will encode in a single sector concurrently. Defaults to one /// record per 2 cores, but not more than 8 in total. Higher concurrency means higher memory - /// usage and typically more efficient CPU utilization. + /// usage, and typically more efficient CPU utilization. #[arg(long)] cpu_record_encoding_concurrency: Option, - /// Size of one thread pool used for plotting, defaults to number of logical CPUs available + /// Size of one thread pool used for plotting. Defaults to number of logical CPUs available /// on UMA system and number of logical CPUs available in NUMA node on NUMA system or L3 cache /// groups on large CPUs. /// - /// Number of thread pools is defined by `--cpu-sector-encoding-concurrency` option, different - /// thread pools might have different number of threads if NUMA nodes do not have the same size. + /// The number of thread pools is defined by `--cpu-sector-encoding-concurrency` option. Different + /// thread pools might have different numbers of threads if NUMA nodes do not have the same size. /// /// Threads will be pinned to corresponding CPU cores at creation. #[arg(long)] cpu_plotting_thread_pool_size: Option, - /// Specify exact CPU cores to be used for plotting bypassing any custom logic farmer might use - /// otherwise. It replaces both `--cpu-sector-encoding-concurrency` and - /// `--cpu-plotting-thread-pool-size` options if specified. + /// Set the exact CPU cores to be used for plotting, bypassing any custom logic in the farmer. + /// Replaces both `--cpu-sector-encoding-concurrency` and + /// `--cpu-plotting-thread-pool-size` options. /// /// Cores are coma-separated, with whitespace separating different thread pools/encoding /// instances. For example "0,1 2,3" will result in two sectors being encoded at the same time, @@ -87,17 +87,17 @@ struct CpuPlottingOptions { #[cfg(feature = "cuda")] #[derive(Debug, Parser)] struct CudaPlottingOptions { - /// Defines how many sectors farmer will download concurrently during plotting with CUDA GPU, - /// allows to limit memory usage of the plotting process, defaults to number of CUDA GPUs found - /// + 1 to download future sector ahead of time. + /// How many sectors farmer will download concurrently during plotting with CUDA GPUs. + /// Limits memory usage of the plotting process. Defaults to the number of CUDA GPUs + 1, + /// to download future sectors ahead of time. /// - /// Increase will result in higher memory usage. + /// Increasing this value will cause higher memory usage. #[arg(long)] cuda_sector_downloading_concurrency: Option, - /// Specify exact GPUs to be used for plotting instead of using all GPUs (default behavior). + /// Set the exact GPUs to be used for plotting, instead of using all GPUs (default behavior). /// - /// GPUs are coma-separated: `--cuda-gpus 0,1,3`. Empty string can be specified to disable CUDA - /// GPU usage. + /// GPUs are coma-separated: `--cuda-gpus 0,1,3`. Use an empty string to disable CUDA + /// GPUs. #[arg(long)] cuda_gpus: Option, } @@ -105,17 +105,17 @@ struct CudaPlottingOptions { #[cfg(feature = "rocm")] #[derive(Debug, Parser)] struct RocmPlottingOptions { - /// Defines how many sectors farmer will download concurrently during plotting with ROCm GPU, - /// allows to limit memory usage of the plotting process, defaults to number of ROCm GPUs found - /// + 1 to download future sector ahead of time. + /// How many sectors the farmer will download concurrently during plotting with ROCm GPUs. + /// Limits memory usage of the plotting process. Defaults to number of ROCm GPUs + 1, + /// to download future sectors ahead of time. /// - /// Increase will result in higher memory usage. + /// Increasing this value will cause higher memory usage. #[arg(long)] rocm_sector_downloading_concurrency: Option, - /// Specify exact GPUs to be used for plotting instead of using all GPUs (default behavior). + /// Set the exact GPUs to be used for plotting, instead of using all GPUs (default behavior). /// - /// GPUs are coma-separated: `--rocm-gpus 0,1,3`. Empty string can be specified to disable ROCm - /// GPU usage. + /// GPUs are coma-separated: `--rocm-gpus 0,1,3`. Use an empty string to disable ROCm + /// GPUs. #[arg(long)] rocm_gpus: Option, } @@ -125,8 +125,8 @@ struct RocmPlottingOptions { pub(super) struct PlotterArgs { /// Piece getter concurrency. /// - /// Increase can result in NATS communication issues if too many messages arrive via NATS, but - /// are not processed quickly enough for some reason. + /// Increasing this value can cause NATS communication issues if too many messages arrive via NATS, but + /// are not processed quickly enough. #[arg(long, default_value = "32")] piece_getter_concurrency: NonZeroUsize, /// Plotting options only used by CPU plotter diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index 844fbeb518..d4c6047292 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -73,14 +73,14 @@ type CacheIndex = u8; #[derive(Debug, Parser)] struct CpuPlottingOptions { - /// Defines how many sectors farmer will download concurrently, allows to limit memory usage of - /// the plotting process, defaults to `--cpu-sector-encoding-concurrency` + 1 to download future + /// How many sectors a farmer will download concurrently. Limits memory usage of + /// the plotting process. Defaults to `--cpu-sector-encoding-concurrency` + 1 to download future /// sector ahead of time. /// - /// Increase will result in higher memory usage. + /// Increasing this value will cause higher memory usage. #[arg(long)] cpu_sector_downloading_concurrency: Option, - /// Defines how many sectors farmer will encode concurrently, defaults to 1 on UMA system and + /// How many sectors a farmer will encode concurrently. Defaults to 1 on UMA system and /// number of NUMA nodes on NUMA system or L3 cache groups on large CPUs. It is further /// restricted by /// `--cpu-sector-downloading-concurrency` and setting this option higher than @@ -88,15 +88,15 @@ struct CpuPlottingOptions { /// /// CPU plotting is disabled by default if GPU plotting is detected. /// - /// Increase will result in higher memory usage, setting to 0 will disable CPU plotting. + /// Increasing this value will cause higher memory usage. Set to 0 to disable CPU plotting. #[arg(long)] cpu_sector_encoding_concurrency: Option, - /// Defines how many records farmer will encode in a single sector concurrently, defaults to one + /// How many records a farmer will encode in a single sector concurrently. Defaults to one /// record per 2 cores, but not more than 8 in total. Higher concurrency means higher memory /// usage and typically more efficient CPU utilization. #[arg(long)] cpu_record_encoding_concurrency: Option, - /// Size of one thread pool used for plotting, defaults to number of logical CPUs available + /// Size of one thread pool used for plotting. Defaults to the number of logical CPUs available /// on UMA system and number of logical CPUs available in NUMA node on NUMA system or L3 cache /// groups on large CPUs. /// @@ -106,9 +106,9 @@ struct CpuPlottingOptions { /// Threads will be pinned to corresponding CPU cores at creation. #[arg(long)] cpu_plotting_thread_pool_size: Option, - /// Specify exact CPU cores to be used for plotting bypassing any custom logic farmer might use - /// otherwise. It replaces both `--cpu-sector-encoding-concurrency` and - /// `--cpu-plotting-thread-pool-size` options if specified. Requires `--cpu-replotting-cores` to + /// Set the exact CPU cores to be used for plotting bypassing any custom farmer logic. + /// Replaces both `--cpu-sector-encoding-concurrency` and + /// `--cpu-plotting-thread-pool-size` options. Requires `--cpu-replotting-cores` to /// be specified with the same number of CPU cores groups (or not specified at all, in which /// case it'll use the same thread pool as plotting). /// @@ -118,9 +118,9 @@ struct CpuPlottingOptions { #[arg(long, conflicts_with_all = & ["cpu_sector_encoding_concurrency", "cpu_plotting_thread_pool_size"])] cpu_plotting_cores: Option, /// Size of one thread pool used for replotting, typically smaller pool than for plotting - /// to not affect farming as much, defaults to half of the number of logical CPUs available on - /// UMA system and number of logical CPUs available in NUMA node on NUMA system or L3 cache - /// groups on large CPUs. + /// to not affect farming as much. Defaults to half the number of logical CPUs on UMA systems, + /// half the number of logical CPUs in the local NUMA node on NUMA systems, or half the L3 + /// cache group on large CPUs. /// /// Number of thread pools is defined by `--cpu-sector-encoding-concurrency` option, different /// thread pools might have different number of threads if NUMA nodes do not have the same size. @@ -128,8 +128,8 @@ struct CpuPlottingOptions { /// Threads will be pinned to corresponding CPU cores at creation. #[arg(long)] cpu_replotting_thread_pool_size: Option, - /// Specify exact CPU cores to be used for replotting bypassing any custom logic farmer might - /// use otherwise. It replaces `--cpu-replotting-thread-pool-size` options if specified. + /// Set the exact CPU cores to be used for replotting, bypassing any custom farmer logic. + /// Replaces `--cpu-replotting-thread-pool-size` option if specified. /// Requires `--cpu-plotting-cores` to be specified with the same number of CPU cores groups. /// /// Cores are coma-separated, with whitespace separating different thread pools/encoding @@ -147,17 +147,17 @@ struct CpuPlottingOptions { #[cfg(feature = "cuda")] #[derive(Debug, Parser)] struct CudaPlottingOptions { - /// Defines how many sectors farmer will download concurrently during plotting with CUDA GPU, - /// allows to limit memory usage of the plotting process, defaults to number of CUDA GPUs found + /// How many sectors a farmer will download concurrently during plotting with a CUDA GPU. + /// Limits memory usage of the plotting process. Defaults to the number of CUDA GPUs found /// + 1 to download future sector ahead of time. /// - /// Increase will result in higher memory usage. + /// Increasing this value will cause higher memory usage. #[arg(long)] cuda_sector_downloading_concurrency: Option, - /// Specify exact GPUs to be used for plotting instead of using all GPUs (default behavior). + /// Set the exact GPUs to be used for plotting instead of using all GPUs (default behavior). /// - /// GPUs are coma-separated: `--cuda-gpus 0,1,3`. Empty string can be specified to disable CUDA - /// GPU usage. + /// GPUs are coma-separated: `--cuda-gpus 0,1,3`. Use an empty string to disable CUDA + /// GPUs. #[arg(long)] cuda_gpus: Option, } @@ -165,17 +165,17 @@ struct CudaPlottingOptions { #[cfg(feature = "rocm")] #[derive(Debug, Parser)] struct RocmPlottingOptions { - /// Defines how many sectors farmer will download concurrently during plotting with ROCm GPU, - /// allows to limit memory usage of the plotting process, defaults to number of ROCm GPUs found + /// How many sectors a farmer will download concurrently during plotting with a ROCm GPU. + /// Limits memory usage of the plotting process. Defaults to the number of ROCm GPUs found /// + 1 to download future sector ahead of time. /// - /// Increase will result in higher memory usage. + /// Increasing this value will cause higher memory usage. #[arg(long)] rocm_sector_downloading_concurrency: Option, - /// Specify exact GPUs to be used for plotting instead of using all GPUs (default behavior). + /// Set the exact GPUs to be used for plotting instead of using all GPUs (default behavior). /// - /// GPUs are coma-separated: `--rocm-gpus 0,1,3`. Empty string can be specified to disable ROCm - /// GPU usage. + /// GPUs are coma-separated: `--rocm-gpus 0,1,3`. Use an empty string to disable ROCm + /// GPUs. #[arg(long)] rocm_gpus: Option, } @@ -183,13 +183,13 @@ struct RocmPlottingOptions { /// Arguments for farmer #[derive(Debug, Parser)] pub(crate) struct FarmingArgs { - /// One or more farm located at specified path, each with its own allocated space. + /// One or more farms located at specified paths, each with its own allocated space. /// /// In case of multiple disks, it is recommended to specify them individually rather than using /// RAID 0, that way farmer will be able to better take advantage of concurrency of individual /// drives. /// - /// Format for each farm is coma-separated list of strings like this: + /// The format for each farm is coma-separated list of strings like this: /// /// path=/path/to/directory,size=5T /// @@ -211,18 +211,18 @@ pub(crate) struct FarmingArgs { /// Sets some flags that are convenient during development, currently `--allow-private-ips` #[arg(long)] dev: bool, - /// Run temporary farmer with specified plot size in human-readable format (e.g. 10GB, 2TiB) or - /// just bytes (e.g. 4096), this will create a temporary directory that will be deleted at the + /// Run a temporary farmer with a plot size in human-readable format (e.g. 10GB, 2TiB) or + /// just bytes (e.g. 4096). This will create a temporary directory that will be deleted at the /// end of the process. #[arg(long, conflicts_with = "disk_farms")] tmp: Option, - /// Maximum number of pieces in sector (can override protocol value to something lower). + /// Maximum number of pieces in a sector (can override protocol value to something lower). /// /// This will make plotting of individual sectors faster, decrease load on CPU proving, but also /// proportionally increase amount of disk reads during audits since every sector needs to be /// audited and there will be more of them. /// - /// This is primarily for development and not recommended to use by regular users. + /// This is primarily for development and not recommended for regular users. #[arg(long)] max_pieces_in_sector: Option, /// Network parameters @@ -231,19 +231,19 @@ pub(crate) struct FarmingArgs { /// Do not print info about configured farms on startup #[arg(long)] no_info: bool, - /// Defines endpoints for the prometheus metrics server. It doesn't start without at least + /// Endpoints for the prometheus metrics server. It doesn't start without at least /// one specified endpoint. Format: 127.0.0.1:8080 #[arg(long, aliases = ["metrics-endpoint", "metrics-endpoints"])] prometheus_listen_on: Vec, /// Piece getter concurrency. /// - /// Increase will result in higher memory usage. + /// Increasing this value will cause higher memory usage. #[arg(long, default_value = "128")] piece_getter_concurrency: NonZeroUsize, /// Size of PER FARM thread pool used for farming (mostly for blocking I/O, but also for some - /// compute-intensive operations during proving), defaults to number of logical CPUs - /// available on UMA system and number of logical CPUs in first NUMA node on NUMA system, but - /// not more than 32 threads + /// compute-intensive operations during proving). Defaults to the number of logical CPUs + /// on UMA systems, or the number of logical CPUs in first NUMA node on NUMA systems, but + /// limited to 32 threads. #[arg(long)] farming_thread_pool_size: Option, /// Plotting options only used by CPU plotter diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared/network.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared/network.rs index 69fdab4d41..6a7595376e 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared/network.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared/network.rs @@ -45,7 +45,7 @@ pub(in super::super) struct NetworkArgs { /// Multiaddrs of bootstrap nodes to connect to on startup, multiple are supported #[arg(long)] pub(in super::super) bootstrap_nodes: Vec, - /// Multiaddr to listen on for subspace networking, for instance `/ip4/0.0.0.0/tcp/0`, + /// Multiaddrs to listen on for subspace networking, for instance `/ip4/0.0.0.0/tcp/0`, /// multiple are supported. #[arg(long, default_values_t = [ Multiaddr::from(IpAddr::V4(Ipv4Addr::UNSPECIFIED)) @@ -54,26 +54,26 @@ pub(in super::super) struct NetworkArgs { .with(Protocol::Tcp(30533)) ])] pub(in super::super) listen_on: Vec, - /// Determines whether we allow keeping non-global (private, shared, loopback..) addresses in - /// Kademlia DHT. + /// Enable non-global (private, shared, loopback..) addresses in Kademlia DHT. + /// By default, these addresses are excluded from the DHT. #[arg(long, default_value_t = false)] pub(in super::super) allow_private_ips: bool, /// Multiaddrs of reserved nodes to maintain a connection to, multiple are supported #[arg(long)] pub(in super::super) reserved_peers: Vec, - /// Defines max established incoming connection limit. + /// Maximum established incoming connection limit. #[arg(long, default_value_t = 300)] pub(in super::super) in_connections: u32, - /// Defines max established outgoing swarm connection limit. + /// Maximum established outgoing swarm connection limit. #[arg(long, default_value_t = 100)] pub(in super::super) out_connections: u32, - /// Defines max pending incoming connection limit. + /// Maximum pending incoming connection limit. #[arg(long, default_value_t = 100)] pub(in super::super) pending_in_connections: u32, - /// Defines max pending outgoing swarm connection limit. + /// Maximum pending outgoing swarm connection limit. #[arg(long, default_value_t = 100)] pub(in super::super) pending_out_connections: u32, - /// Known external addresses + /// Known external addresses. #[arg(long = "external-address")] pub(in super::super) external_addresses: Vec, } diff --git a/crates/subspace-networking/examples/benchmark.rs b/crates/subspace-networking/examples/benchmark.rs index 135b8545b2..e02212cf63 100644 --- a/crates/subspace-networking/examples/benchmark.rs +++ b/crates/subspace-networking/examples/benchmark.rs @@ -98,10 +98,10 @@ struct Args { /// production use. #[arg(long, required = true)] protocol_version: String, - /// Defines max established outgoing connections limit for the peer. + /// Maximum established outgoing connections limit for the peer. #[arg(long, default_value_t = 100)] out_peers: u32, - /// Defines max pending outgoing connections limit for the peer. + /// Maximum pending outgoing connections limit for the peer. #[arg(long, default_value_t = 100)] pending_out_peers: u32, #[clap(subcommand)] diff --git a/crates/subspace-networking/examples/random-walker.rs b/crates/subspace-networking/examples/random-walker.rs index 7d6a3def0b..f56ff14dc3 100644 --- a/crates/subspace-networking/examples/random-walker.rs +++ b/crates/subspace-networking/examples/random-walker.rs @@ -37,10 +37,10 @@ struct Args { /// production use. #[arg(long, required = true)] protocol_version: String, - /// Defines max established outgoing connections limit for the peer. + /// Maximum established outgoing connections limit for the peer. #[arg(long, default_value_t = 100)] out_peers: u32, - /// Defines max pending outgoing connections limit for the peer. + /// Maximum pending outgoing connections limit for the peer. #[arg(long, default_value_t = 100)] pending_out_peers: u32, /// Enable piece retrieval retries on unsuccessful requests. diff --git a/crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs b/crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs index 997e144e95..ae40681de0 100644 --- a/crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs +++ b/crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs @@ -52,19 +52,20 @@ enum Command { /// Multiaddresses of reserved peers to maintain connections to, multiple are supported #[arg(long = "reserved-peer")] reserved_peers: Vec, - /// Defines max established incoming connections limit for the peer. + /// Maximum established incoming connections limit for the peer. #[arg(long, default_value_t = 300)] in_peers: u32, - /// Defines max established outgoing connections limit for the peer. + /// Maximum established outgoing connections limit for the peer. #[arg(long, default_value_t = 300)] out_peers: u32, - /// Defines max pending incoming connections limit for the peer. + /// Maximum pending incoming connections limit for the peer. #[arg(long, default_value_t = 300)] pending_in_peers: u32, - /// Defines max pending outgoing connections limit for the peer. + /// Maximum pending outgoing connections limit for the peer. #[arg(long, default_value_t = 300)] pending_out_peers: u32, - /// Determines whether we allow keeping non-global (private, shared, loopback..) addresses in Kademlia DHT. + /// Enable non-global (private, shared, loopback..) addresses in the Kademlia DHT. + /// By default these addresses are excluded from the DHT. #[arg(long, default_value_t = false)] allow_private_ips: bool, /// Protocol version for libp2p stack, should be set as genesis hash of the blockchain for @@ -74,14 +75,14 @@ enum Command { /// Known external addresses #[arg(long = "external-address")] external_addresses: Vec, - /// Defines endpoints for the prometheus metrics server. It doesn't start without at least - /// one specified endpoint. Format: 127.0.0.1:8080 + /// Endpoints for the prometheus metrics server. It doesn't start without at least one + /// specified endpoint. Format: 127.0.0.1:8080 #[arg(long, aliases = ["metrics-endpoint", "metrics-endpoints"])] prometheus_listen_on: Vec, }, /// Generate a new keypair GenerateKeypair { - /// Produce an output in JSON format when enabled. + /// Produce output in JSON format. #[arg(long, default_value_t = false)] json: bool, }, diff --git a/crates/subspace-networking/src/node.rs b/crates/subspace-networking/src/node.rs index dbd9cb7832..be429d5a73 100644 --- a/crates/subspace-networking/src/node.rs +++ b/crates/subspace-networking/src/node.rs @@ -431,7 +431,7 @@ impl Node { /// Get closest peers by multihash key using Kademlia DHT's local view without any network /// requests. /// - /// Optional `source` is peer for which results will be sent as a response, defaults to local + /// Optional `source` is peer for which results will be sent as a response. Defaults to local /// peer ID. pub async fn get_closest_local_peers( &self, diff --git a/crates/subspace-node/src/commands/run/consensus.rs b/crates/subspace-node/src/commands/run/consensus.rs index f589d5a6f4..1d7f724d94 100644 --- a/crates/subspace-node/src/commands/run/consensus.rs +++ b/crates/subspace-node/src/commands/run/consensus.rs @@ -60,15 +60,15 @@ fn parse_timekeeper_cpu_cores( /// Options for Substrate networking #[derive(Debug, Parser)] struct SubstrateNetworkOptions { - /// Specify a list of bootstrap nodes for Substrate networking stack. + /// A list of bootstrap nodes for the Substrate networking stack. #[arg(long)] bootstrap_nodes: Vec, - /// Specify a list of reserved node addresses. + /// A list of reserved node addresses, which are prioritised for connections. #[arg(long)] reserved_nodes: Vec, - /// Whether to only synchronize the chain with reserved nodes. + /// Only synchronize the chain with reserved nodes. /// /// TCP connections might still be established with non-reserved nodes. /// In particular, if you are a farmer your node might still connect to other farmer nodes @@ -76,14 +76,14 @@ struct SubstrateNetworkOptions { #[arg(long)] reserved_only: bool, - /// The public address that other nodes will use to connect to it. + /// The public address that other nodes will use to connect to this node. /// /// This can be used if there's a proxy in front of this node or if address is known beforehand /// and less reliable auto-discovery can be avoided. #[arg(long)] public_addr: Vec, - /// Listen on this multiaddress + /// Listen for incoming Substrate connections on these multiaddresses. #[arg(long, default_values_t = [ sc_network::Multiaddr::from(sc_network::multiaddr::Protocol::Ip4(Ipv4Addr::UNSPECIFIED)) .with(sc_network::multiaddr::Protocol::Tcp(30333)), @@ -92,12 +92,12 @@ struct SubstrateNetworkOptions { ])] listen_on: Vec, - /// Determines whether we allow keeping non-global (private, shared, loopback..) addresses - /// in Kademlia DHT. + /// Enable non-global (private, shared, loopback..) addresses in the Kademlia DHT. + /// By default these addresses are excluded from the DHT. #[arg(long, default_value_t = false)] allow_private_ips: bool, - /// Specify the number of outgoing connections we're trying to maintain. + /// The number of outgoing connections we will try to maintain. #[arg(long, default_value_t = 8)] out_peers: u32, @@ -119,7 +119,7 @@ struct SubstrateNetworkOptions { /// Options for DSN #[derive(Debug, Parser)] struct DsnOptions { - /// Where local DSN node will listen for incoming connections. + /// Listen for incoming DSN connections on these multiaddresses. #[arg(long, default_values_t = [ Multiaddr::from(IpAddr::V4(Ipv4Addr::UNSPECIFIED)) .with(Protocol::Tcp(30433)), @@ -136,23 +136,23 @@ struct DsnOptions { #[arg(long)] dsn_reserved_peers: Vec, - /// Defines max established incoming connection limit for DSN. + /// Maximum established incoming connection limit for DSN. #[arg(long, default_value_t = 50)] dsn_in_connections: u32, - /// Defines max established outgoing swarm connection limit for DSN. + /// Maximum established outgoing swarm connection limit for DSN. #[arg(long, default_value_t = 150)] dsn_out_connections: u32, - /// Defines max pending incoming connection limit for DSN. + /// Maximum pending incoming connection limit for DSN. #[arg(long, default_value_t = 100)] dsn_pending_in_connections: u32, - /// Defines max pending outgoing swarm connection limit for DSN. + /// Maximum pending outgoing swarm connection limit for DSN. #[arg(long, default_value_t = 150)] dsn_pending_out_connections: u32, - /// Known external addresses + /// Known external addresses. #[arg(long = "dsn-external-address")] dsn_external_addresses: Vec, } @@ -234,7 +234,7 @@ impl fmt::Display for BlocksPruningMode { /// Parameters to define the pruning mode #[derive(Debug, Clone, Parser)] struct PruningOptions { - /// Specify the state pruning mode. + /// The state pruning mode. /// /// This mode specifies when the block's state (ie, storage) should be pruned (ie, removed) /// from the database. @@ -247,7 +247,7 @@ struct PruningOptions { #[arg(long, default_value_t = StatePruningMode::Number(MIN_STATE_PRUNING))] state_pruning: StatePruningMode, - /// Specify the blocks pruning mode. + /// The blocks pruning mode. /// /// This mode specifies when the block's body (including justifications) /// should be pruned (ie, removed) from the database. @@ -305,13 +305,13 @@ struct TimekeeperOptions { /// Options for running a node #[derive(Debug, Parser)] pub(super) struct ConsensusChainOptions { - /// Base path where to store node files. + /// Base path to store node files. /// /// Required unless --dev mode is used. #[arg(long)] base_path: Option, - /// Specify the chain specification. + /// The chain specification. /// /// It can be one of the predefined ones (dev) or it can be a path to a file with the chainspec /// (such as one exported by the `build-spec` subcommand). @@ -378,9 +378,8 @@ pub(super) struct ConsensusChainOptions { #[clap(flatten)] pool_config: TransactionPoolParams, - /// Parameter that allows node to forcefully assume it is synced, needed for network - /// bootstrapping only, as long as two synced nodes remain on the network at any time, this - /// doesn't need to be used. + /// Make the node forcefully assume it is synced, needed for network bootstrapping only. As + /// long as two synced nodes remain on the network at any time, this doesn't need to be used. /// /// --dev mode enables this option automatically. #[clap(long)] diff --git a/crates/subspace-service/src/dsn.rs b/crates/subspace-service/src/dsn.rs index 14e9a9c67f..3260c2008e 100644 --- a/crates/subspace-service/src/dsn.rs +++ b/crates/subspace-service/src/dsn.rs @@ -50,16 +50,16 @@ pub struct DsnConfig { /// System base path. pub network_path: PathBuf, - /// Defines max established incoming swarm connection limit. + /// Maximum established incoming swarm connection limit. pub max_in_connections: u32, - /// Defines max established outgoing swarm connection limit. + /// Maximum established outgoing swarm connection limit. pub max_out_connections: u32, - /// Defines max pending incoming swarm connection limit. + /// Maximum pending incoming swarm connection limit. pub max_pending_in_connections: u32, - /// Defines max pending outgoing swarm connection limit. + /// Maximum pending outgoing swarm connection limit. pub max_pending_out_connections: u32, /// Known external addresses From f7c24dab08c105215e333bdc0ce2d9f6e84c85ec Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Thu, 17 Oct 2024 10:13:46 +0300 Subject: [PATCH 06/14] Make sure sector metadata is stored in memory in the correct order during initial plotting --- .../src/single_disk_farm/plotting.rs | 41 +++++++++++-------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/crates/subspace-farmer/src/single_disk_farm/plotting.rs b/crates/subspace-farmer/src/single_disk_farm/plotting.rs index 56d3778520..82aef75203 100644 --- a/crates/subspace-farmer/src/single_disk_farm/plotting.rs +++ b/crates/subspace-farmer/src/single_disk_farm/plotting.rs @@ -165,6 +165,8 @@ where maybe_sector_plotting_result = maybe_wait_futures_ordered(&mut sectors_being_plotted).fuse() => { process_plotting_result( maybe_sector_plotting_result?, + sectors_metadata, + sectors_being_modified, &mut metadata_header, Arc::clone(§or_plotting_options.metadata_file) ).await?; @@ -175,6 +177,8 @@ where maybe_sector_plotting_result = maybe_wait_futures_ordered(&mut sectors_being_plotted).fuse() => { process_plotting_result( maybe_sector_plotting_result?, + sectors_metadata, + sectors_being_modified, &mut metadata_header, Arc::clone(§or_plotting_options.metadata_file) ).await?; @@ -187,15 +191,32 @@ where async fn process_plotting_result( sector_plotting_result: SectorPlottingResult, + sectors_metadata: &AsyncRwLock>, + sectors_being_modified: &AsyncRwLock>, metadata_header: &mut PlotMetadataHeader, metadata_file: Arc, ) -> Result<(), PlottingError> { let SectorPlottingResult { - sector_index, + sector_metadata, replotting, last_queued, } = sector_plotting_result; + let sector_index = sector_metadata.sector_index; + + { + let mut sectors_metadata = sectors_metadata.write().await; + // If exists then we're replotting, otherwise we create sector for the first time + if let Some(existing_sector_metadata) = sectors_metadata.get_mut(sector_index as usize) { + *existing_sector_metadata = sector_metadata; + } else { + sectors_metadata.push(sector_metadata); + } + } + + // Inform others that this sector is no longer being modified + sectors_being_modified.write().await.remove(§or_index); + if sector_index + 1 > metadata_header.plotted_sector_count { metadata_header.plotted_sector_count = sector_index + 1; @@ -238,7 +259,7 @@ enum PlotSingleSectorResult { } struct SectorPlottingResult { - sector_index: SectorIndex, + sector_metadata: SectorMetadataChecksummed, replotting: bool, last_queued: bool, } @@ -454,22 +475,8 @@ where .sector_update .call_simple(&(sector_index, sector_state)); - { - let mut sectors_metadata = sectors_metadata.write().await; - // If exists then we're replotting, otherwise we create sector for the first time - if let Some(existing_sector_metadata) = sectors_metadata.get_mut(sector_index as usize) - { - *existing_sector_metadata = sector_metadata; - } else { - sectors_metadata.push(sector_metadata); - } - } - - // Inform others that this sector is no longer being modified - sectors_being_modified.write().await.remove(§or_index); - Ok(SectorPlottingResult { - sector_index, + sector_metadata, replotting, last_queued, }) From 73895eb7f6eb0ca7cb482975977d9b6a58c6f514 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Thu, 17 Oct 2024 14:45:33 +0300 Subject: [PATCH 07/14] Minor simplification of `FuturesUnordered` usage --- .../commands/cluster/controller/farms.rs | 10 ++++----- .../src/cluster/nats_client.rs | 21 ++++++++----------- 2 files changed, 13 insertions(+), 18 deletions(-) diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs index 94c496a533..8f211e5046 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs @@ -14,7 +14,7 @@ use futures::{select, FutureExt, StreamExt}; use parking_lot::Mutex; use std::collections::hash_map::Entry; use std::collections::{HashMap, VecDeque}; -use std::future::{pending, ready, Future}; +use std::future::{ready, Future}; use std::mem; use std::pin::{pin, Pin}; use std::sync::Arc; @@ -139,9 +139,7 @@ pub(super) async fn maintain_farms( // Farm that is being added/removed right now (if any) let mut farm_add_remove_in_progress = (Box::pin(ready(None)) as AddRemoveFuture).fuse(); // Initialize with pending future so it never ends - let mut farms = FuturesUnordered::from_iter([ - Box::pin(pending()) as Pin)>>> - ]); + let mut farms = FuturesUnordered::new(); let farmer_identify_subscription = pin!(nats_client .subscribe_to_broadcasts::(None, None) @@ -253,7 +251,7 @@ pub(super) async fn maintain_farms( } result = farm_add_remove_in_progress => { if let Some((farm_index, expired_receiver, farm)) = result { - farms.push(Box::pin(async move { + farms.push(async move { select! { result = farm.run().fuse() => { (farm_index, result) @@ -263,7 +261,7 @@ pub(super) async fn maintain_farms( (farm_index, Ok(())) } } - })); + }); } } } diff --git a/crates/subspace-farmer/src/cluster/nats_client.rs b/crates/subspace-farmer/src/cluster/nats_client.rs index 7619894f4e..00af3caa8b 100644 --- a/crates/subspace-farmer/src/cluster/nats_client.rs +++ b/crates/subspace-farmer/src/cluster/nats_client.rs @@ -28,7 +28,7 @@ use futures::{select, FutureExt, Stream, StreamExt}; use parity_scale_codec::{Decode, Encode}; use std::any::type_name; use std::collections::VecDeque; -use std::future::{pending, Future}; +use std::future::Future; use std::marker::PhantomData; use std::pin::Pin; use std::sync::Arc; @@ -550,9 +550,7 @@ impl NatsClient { OP: Fn(Request) -> F + Send + Sync, { // Initialize with pending future so it never ends - let mut processing = FuturesUnordered::from_iter([ - Box::pin(pending()) as Pin + Send>> - ]); + let mut processing = FuturesUnordered::new(); let subject = subject_with_instance(Request::SUBJECT, instance); let subscription = if let Some(queue_group) = queue_group { @@ -579,19 +577,18 @@ impl NatsClient { loop { select! { - maybe_message = subscription.next() => { - let Some(message) = maybe_message else { - break; - }; - + message = subscription.select_next_some() => { // Create background task for concurrent processing - processing.push(Box::pin(self.process_request( + processing.push(self.process_request( message, &process, - ))); - } + )); + }, _ = processing.next() => { // Nothing to do here + }, + complete => { + break; } } } From 9f960f2358b1d3e8bbde76e08b9fa9744c59069b Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Thu, 17 Oct 2024 15:19:20 +0300 Subject: [PATCH 08/14] Unify subscriptions a bit in `NatsClient` --- .../src/cluster/nats_client.rs | 59 +++++++++++-------- 1 file changed, 34 insertions(+), 25 deletions(-) diff --git a/crates/subspace-farmer/src/cluster/nats_client.rs b/crates/subspace-farmer/src/cluster/nats_client.rs index 00af3caa8b..e169808a71 100644 --- a/crates/subspace-farmer/src/cluster/nats_client.rs +++ b/crates/subspace-farmer/src/cluster/nats_client.rs @@ -552,21 +552,15 @@ impl NatsClient { // Initialize with pending future so it never ends let mut processing = FuturesUnordered::new(); - let subject = subject_with_instance(Request::SUBJECT, instance); - let subscription = if let Some(queue_group) = queue_group { - self.inner - .client - .queue_subscribe(subject, queue_group) - .await - } else { - self.inner.client.subscribe(subject).await - } - .map_err(|error| { - anyhow!( - "Failed to subscribe to {} requests for {instance:?}: {error}", - type_name::(), - ) - })?; + let subscription = self + .common_subscribe(Request::SUBJECT, instance, queue_group) + .await + .map_err(|error| { + anyhow!( + "Failed to subscribe to {} requests for {instance:?}: {error}", + type_name::(), + ) + })?; debug!( request_type = %type_name::(), @@ -1025,6 +1019,30 @@ impl NatsClient { where Message: Decode, { + let subscriber = self + .common_subscribe(subject, instance, queue_group) + .await?; + debug!( + %subject, + message_type = %type_name::(), + ?subscriber, + "Simple subscription" + ); + + Ok(SubscriberWrapper { + subscriber, + _phantom: PhantomData, + }) + } + + /// Simple subscription that will produce decoded messages, while skipping messages that fail to + /// decode + async fn common_subscribe( + &self, + subject: &'static str, + instance: Option<&str>, + queue_group: Option, + ) -> Result { let subscriber = if let Some(queue_group) = queue_group { self.inner .client @@ -1036,17 +1054,8 @@ impl NatsClient { .subscribe(subject_with_instance(subject, instance)) .await? }; - debug!( - %subject, - message_type = %type_name::(), - ?subscriber, - "Simple subscription" - ); - Ok(SubscriberWrapper { - subscriber, - _phantom: PhantomData, - }) + Ok(subscriber) } } From 64490b02eda82415b7a70ce202f0fc9f5c84c4f1 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Thu, 17 Oct 2024 16:55:37 +0300 Subject: [PATCH 09/14] Introduce `NatsClient::stream_request_responder()` and hide `NatsClient::stream_response()` --- crates/subspace-farmer/src/cluster/cache.rs | 108 +++----------- crates/subspace-farmer/src/cluster/farmer.rs | 103 ++++--------- .../src/cluster/nats_client.rs | 140 +++++++++++++++++- crates/subspace-farmer/src/cluster/plotter.rs | 111 ++++++-------- 4 files changed, 229 insertions(+), 233 deletions(-) diff --git a/crates/subspace-farmer/src/cluster/cache.rs b/crates/subspace-farmer/src/cluster/cache.rs index a5bb7574e6..e8239c174e 100644 --- a/crates/subspace-farmer/src/cluster/cache.rs +++ b/crates/subspace-farmer/src/cluster/cache.rs @@ -9,7 +9,7 @@ use crate::cluster::controller::ClusterControllerCacheIdentifyBroadcast; use crate::cluster::nats_client::{ - GenericBroadcast, GenericRequest, GenericStreamRequest, NatsClient, StreamRequest, + GenericBroadcast, GenericRequest, GenericStreamRequest, NatsClient, }; use crate::farm::{FarmError, PieceCache, PieceCacheId, PieceCacheOffset}; use anyhow::anyhow; @@ -17,8 +17,7 @@ use async_trait::async_trait; use futures::stream::FuturesUnordered; use futures::{select, stream, FutureExt, Stream, StreamExt}; use parity_scale_codec::{Decode, Encode}; -use std::future::{pending, Future}; -use std::pin::{pin, Pin}; +use std::pin::Pin; use std::time::{Duration, Instant}; use subspace_core_primitives::pieces::{Piece, PieceIndex}; use tokio::time::MissedTickBehavior; @@ -377,6 +376,7 @@ where ) }, ) + .instrument(info_span!("", cache_id = %cache_details.cache_id)) .await }) .collect::>() @@ -409,6 +409,7 @@ where ) }, ) + .instrument(info_span!("", cache_id = %cache_details.cache_id)) .await }) .collect::>() @@ -441,6 +442,7 @@ where ) }, ) + .instrument(info_span!("", cache_id = %cache_details.cache_id)) .await }) .collect::>() @@ -458,93 +460,31 @@ where { caches_details .iter() - .enumerate() - .map(|(cache_index, cache_details)| async move { - // Initialize with pending future so it never ends - let mut processing = FuturesUnordered::from_iter([ - Box::pin(pending()) as Pin + Send>> - ]); - let mut subscription = nats_client - .subscribe_to_stream_requests( - Some(&cache_details.cache_id_string), + .map(|cache_details| async move { + nats_client + .stream_request_responder::<_, _, Pin + Send>>, _>( + Some(cache_details.cache_id_string.as_str()), Some(cache_details.cache_id_string.clone()), + |_request: ClusterCacheContentsRequest| async move { + Some(match cache_details.cache.contents().await { + Ok(contents) => Box::pin(contents.map(|maybe_cache_element| { + maybe_cache_element.map_err(|error| error.to_string()) + })) as _, + Err(error) => { + error!(%error, "Failed to get contents"); + + Box::pin(stream::once(async move { + Err(format!("Failed to get contents: {error}")) + })) as _ + } + }) + }, ) + .instrument(info_span!("", cache_id = %cache_details.cache_id)) .await - .map_err(|error| { - anyhow!( - "Failed to subscribe to contents requests for cache {}: {}", - cache_details.cache_id, - error - ) - })? - .fuse(); - - loop { - select! { - maybe_message = subscription.next() => { - let Some(message) = maybe_message else { - break; - }; - - // Create background task for concurrent processing - processing.push(Box::pin( - process_contents_request( - nats_client, - cache_details, - message, - ) - .instrument(info_span!("", %cache_index)) - )); - } - _ = processing.next() => { - // Nothing to do here - } - } - } - - Ok(()) }) .collect::>() .next() .await .ok_or_else(|| anyhow!("No caches"))? } - -async fn process_contents_request( - nats_client: &NatsClient, - cache_details: &CacheDetails<'_, C>, - request: StreamRequest, -) where - C: PieceCache, -{ - trace!(?request, "Contents request"); - - match cache_details.cache.contents().await { - Ok(contents) => { - nats_client - .stream_response::( - request.response_subject, - contents.map(|maybe_cache_element| { - maybe_cache_element.map_err(|error| error.to_string()) - }), - ) - .await; - } - Err(error) => { - error!( - %error, - cache_id = %cache_details.cache_id, - "Failed to get contents" - ); - - nats_client - .stream_response::( - request.response_subject, - pin!(stream::once(async move { - Err(format!("Failed to get contents: {error}")) - })), - ) - .await; - } - } -} diff --git a/crates/subspace-farmer/src/cluster/farmer.rs b/crates/subspace-farmer/src/cluster/farmer.rs index dcc81ba237..4c184c1b2f 100644 --- a/crates/subspace-farmer/src/cluster/farmer.rs +++ b/crates/subspace-farmer/src/cluster/farmer.rs @@ -9,7 +9,7 @@ use crate::cluster::controller::ClusterControllerFarmerIdentifyBroadcast; use crate::cluster::nats_client::{ - GenericBroadcast, GenericRequest, GenericStreamRequest, NatsClient, StreamRequest, + GenericBroadcast, GenericRequest, GenericStreamRequest, NatsClient, }; use crate::farm::{ Farm, FarmError, FarmId, FarmingNotification, HandlerFn, HandlerId, PieceReader, @@ -23,7 +23,7 @@ use futures::channel::mpsc; use futures::stream::FuturesUnordered; use futures::{select, stream, FutureExt, Stream, StreamExt}; use parity_scale_codec::{Decode, Encode}; -use std::future::{pending, Future}; +use std::future::Future; use std::pin::{pin, Pin}; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -33,7 +33,7 @@ use subspace_core_primitives::sectors::SectorIndex; use subspace_farmer_components::plotting::PlottedSector; use subspace_rpc_primitives::SolutionResponse; use tokio::time::MissedTickBehavior; -use tracing::{debug, error, trace, warn}; +use tracing::{debug, error, info_span, trace, warn, Instrument}; const BROADCAST_NOTIFICATIONS_BUFFER: usize = 1000; const MIN_FARMER_IDENTIFICATION_INTERVAL: Duration = Duration::from_secs(1); @@ -591,46 +591,33 @@ async fn plotted_sectors_responder( farms_details .iter() .map(|farm_details| async move { - // Initialize with pending future so it never ends - let mut processing = FuturesUnordered::from_iter([ - Box::pin(pending()) as Pin + Send>> - ]); - let mut subscription = nats_client - .subscribe_to_stream_requests( + nats_client + .stream_request_responder::<_, _, Pin + Send>>, _>( Some(&farm_details.farm_id_string), Some(farm_details.farm_id_string.clone()), + |_request: ClusterFarmerPlottedSectorsRequest| async move { + Some(match farm_details.plotted_sectors.get().await { + Ok(plotted_sectors) => { + Box::pin(plotted_sectors.map(|maybe_plotted_sector| { + maybe_plotted_sector.map_err(|error| error.to_string()) + })) as _ + } + Err(error) => { + error!( + %error, + farm_id = %farm_details.farm_id, + "Failed to get plotted sectors" + ); + + Box::pin(stream::once(async move { + Err(format!("Failed to get plotted sectors: {error}")) + })) as _ + } + }) + }, ) + .instrument(info_span!("", cache_id = %farm_details.farm_id)) .await - .map_err(|error| { - anyhow!( - "Failed to subscribe to plotted sectors requests for farm {}: {}", - farm_details.farm_id, - error - ) - })? - .fuse(); - - loop { - select! { - maybe_message = subscription.next() => { - let Some(message) = maybe_message else { - break; - }; - - // Create background task for concurrent processing - processing.push(Box::pin(process_plotted_sectors_request( - nats_client, - farm_details, - message, - ))); - } - _ = processing.next() => { - // Nothing to do here - } - } - } - - Ok(()) }) .collect::>() .next() @@ -638,43 +625,6 @@ async fn plotted_sectors_responder( .ok_or_else(|| anyhow!("No farms"))? } -async fn process_plotted_sectors_request( - nats_client: &NatsClient, - farm_details: &FarmDetails, - request: StreamRequest, -) { - trace!(?request, "Plotted sectors request"); - - match farm_details.plotted_sectors.get().await { - Ok(plotted_sectors) => { - nats_client - .stream_response::( - request.response_subject, - plotted_sectors.map(|maybe_plotted_sector| { - maybe_plotted_sector.map_err(|error| error.to_string()) - }), - ) - .await; - } - Err(error) => { - error!( - %error, - farm_id = %farm_details.farm_id, - "Failed to get plotted sectors" - ); - - nats_client - .stream_response::( - request.response_subject, - pin!(stream::once(async move { - Err(format!("Failed to get plotted sectors: {error}")) - })), - ) - .await; - } - } -} - async fn read_piece_responder( nats_client: &NatsClient, farms_details: &[FarmDetails], @@ -696,6 +646,7 @@ async fn read_piece_responder( ) }, ) + .instrument(info_span!("", cache_id = %farm_details.farm_id)) .await }) .collect::>() diff --git a/crates/subspace-farmer/src/cluster/nats_client.rs b/crates/subspace-farmer/src/cluster/nats_client.rs index e169808a71..a80f326d6f 100644 --- a/crates/subspace-farmer/src/cluster/nats_client.rs +++ b/crates/subspace-farmer/src/cluster/nats_client.rs @@ -573,10 +573,14 @@ impl NatsClient { select! { message = subscription.select_next_some() => { // Create background task for concurrent processing - processing.push(self.process_request( - message, - &process, - )); + processing.push( + self + .process_request( + message, + &process, + ) + .in_current_span(), + ); }, _ = processing.next() => { // Nothing to do here @@ -682,8 +686,134 @@ impl NatsClient { )) } + /// Responds to stream requests from the given subject using the provided processing function. + /// + /// This will create a subscription on the subject for the given instance (if provided) and + /// queue group. Incoming messages will be deserialized as the request type `Request` and passed + /// to the `process` function to produce a stream response of type `Request::Response`. The + /// stream response will then be sent back on the reply subject from the original request. + /// + /// Each request is processed in a newly created async tokio task. + /// + /// # Arguments + /// + /// * `instance` - Optional instance name to use in place of the `*` in the subject + /// * `group` - The queue group name for the subscription + /// * `process` - The function to call with the decoded request to produce a response + pub async fn stream_request_responder( + &self, + instance: Option<&str>, + queue_group: Option, + process: OP, + ) -> anyhow::Result<()> + where + Request: GenericStreamRequest, + F: Future> + Send, + S: Stream + Unpin, + OP: Fn(Request) -> F + Send + Sync, + { + // Initialize with pending future so it never ends + let mut processing = FuturesUnordered::new(); + + let subscription = self + .common_subscribe(Request::SUBJECT, instance, queue_group) + .await + .map_err(|error| { + anyhow!( + "Failed to subscribe to {} stream requests for {instance:?}: {error}", + type_name::(), + ) + })?; + + debug!( + request_type = %type_name::(), + ?subscription, + "Stream requests subscription" + ); + let mut subscription = subscription.fuse(); + + loop { + select! { + message = subscription.select_next_some() => { + // Create background task for concurrent processing + processing.push( + self + .process_stream_request( + message, + &process, + ) + .in_current_span(), + ); + }, + _ = processing.next() => { + // Nothing to do here + }, + complete => { + break; + } + } + } + + Ok(()) + } + + async fn process_stream_request(&self, message: Message, process: OP) + where + Request: GenericStreamRequest, + F: Future> + Send, + S: Stream + Unpin, + OP: Fn(Request) -> F + Send + Sync, + { + // TODO: Un-comment once there is no `StreamRequest` + // let Some(reply_subject) = message.reply else { + // return; + // }; + + let message_payload_size = message.payload.len(); + let request = match StreamRequest::::decode(&mut message.payload.as_ref()) { + Ok(request) => { + // Free allocation early + drop(message.payload); + request + } + Err(error) => { + warn!( + request_type = %type_name::(), + %error, + message = %hex::encode(message.payload), + "Failed to decode request" + ); + return; + } + }; + // TODO: Remove two lines once there is no `StreamRequest` + let reply_subject = request.response_subject; + let request = request.request; + + // Avoid printing large messages in logs + if message_payload_size > 1024 { + trace!( + request_type = %type_name::(), + %reply_subject, + "Processing request" + ); + } else { + trace!( + request_type = %type_name::(), + ?request, + %reply_subject, + "Processing request" + ); + } + + if let Some(stream) = process(request).await { + self.stream_response::(reply_subject, stream) + .await; + } + } + /// Helper method to send responses to requests initiated with [`Self::stream_request`] - pub async fn stream_response(&self, response_subject: String, response_stream: S) + async fn stream_response(&self, response_subject: String, response_stream: S) where Request: GenericStreamRequest, S: Stream + Unpin, diff --git a/crates/subspace-farmer/src/cluster/plotter.rs b/crates/subspace-farmer/src/cluster/plotter.rs index 1b112e4033..230bb91853 100644 --- a/crates/subspace-farmer/src/cluster/plotter.rs +++ b/crates/subspace-farmer/src/cluster/plotter.rs @@ -6,9 +6,7 @@ //! implementation designed to work with cluster plotter and a service function to drive the backend //! part of the plotter. -use crate::cluster::nats_client::{ - GenericRequest, GenericStreamRequest, NatsClient, StreamRequest, -}; +use crate::cluster::nats_client::{GenericRequest, GenericStreamRequest, NatsClient}; use crate::plotter::{Plotter, SectorPlottingProgress}; use crate::utils::AsyncJoinOnDrop; use anyhow::anyhow; @@ -23,10 +21,11 @@ use futures::stream::FuturesUnordered; use futures::{select, stream, FutureExt, Sink, SinkExt, StreamExt}; use parity_scale_codec::{Decode, Encode}; use std::error::Error; -use std::future::{pending, Future}; +use std::future::pending; use std::num::NonZeroUsize; -use std::pin::{pin, Pin}; +use std::pin::pin; use std::sync::Arc; +use std::task::Poll; use std::time::{Duration, Instant}; use subspace_core_primitives::sectors::SectorIndex; use subspace_core_primitives::PublicKey; @@ -754,56 +753,50 @@ where { let plotter_id_string = plotter_id.to_string(); - // Initialize with pending future so it never ends - let mut processing = FuturesUnordered::from_iter([ - Box::pin(pending()) as Pin + Send>> - ]); - let subscription = nats_client - .subscribe_to_stream_requests(Some(&plotter_id_string), Some(plotter_id_string.clone())) - .await - .map_err(|error| anyhow!("Failed to subscribe to plot sector requests: {}", error))?; - debug!(?subscription, "Plot sector subscription"); - let mut subscription = subscription.fuse(); - - loop { - select! { - maybe_message = subscription.next() => { - let Some(message) = maybe_message else { - break; - }; + nats_client + .stream_request_responder( + Some(&plotter_id_string), + Some(plotter_id_string.clone()), + |request| async move { + let (progress_sender, mut progress_receiver) = mpsc::channel(10); - // Create background task for concurrent processing - processing.push(Box::pin(process_plot_sector_request( + let mut fut = Box::pin(process_plot_sector_request( nats_client, plotter, - message, - ))); - } - _ = processing.next() => { - // Nothing to do here - } - } - } + request, + progress_sender, + )); + + Some( + // Drive above future and stream back any pieces that were downloaded so far + stream::poll_fn(move |cx| { + let end_result = fut.poll_unpin(cx); + + if let Ok(maybe_result) = progress_receiver.try_next() { + return Poll::Ready(maybe_result); + } - Ok(()) + end_result.map(|()| None) + }), + ) + }, + ) + .await } async fn process_plot_sector_request

( nats_client: &NatsClient, plotter: &P, - request: StreamRequest, + request: ClusterPlotterPlotSectorRequest, + mut response_proxy_sender: mpsc::Sender, ) where P: Plotter, { - let StreamRequest { - request: - ClusterPlotterPlotSectorRequest { - public_key, - sector_index, - farmer_protocol_info, - pieces_in_sector, - }, - response_subject, + let ClusterPlotterPlotSectorRequest { + public_key, + sector_index, + farmer_protocol_info, + pieces_in_sector, } = request; // Wrapper future just for instrumentation below @@ -825,26 +818,16 @@ async fn process_plot_sector_request

( { debug!("Plotter is currently occupied and can't plot more sectors"); - nats_client - .stream_response::( - response_subject, - pin!(stream::once(async move { - ClusterSectorPlottingProgress::Occupied - })), - ) - .await; + if let Err(error) = response_proxy_sender + .send(ClusterSectorPlottingProgress::Occupied) + .await + { + warn!(%error, "Failed to send plotting progress"); + return; + } return; } - let (mut response_proxy_sender, response_proxy_receiver) = mpsc::channel(10); - - let response_streaming_fut = nats_client - .stream_response::( - response_subject, - response_proxy_receiver, - ) - .fuse(); - let mut response_streaming_fut = pin!(response_streaming_fut); let progress_proxy_fut = { let mut response_proxy_sender = response_proxy_sender.clone(); let approximate_max_message_size = nats_client.approximate_max_message_size(); @@ -877,11 +860,6 @@ async fn process_plot_sector_request

( }; select! { - _ = response_streaming_fut => { - warn!("Response sending ended early"); - - return; - } _ = progress_proxy_fut.fuse() => { // Done } @@ -890,9 +868,6 @@ async fn process_plot_sector_request

( } } - // Drain remaining progress messages - response_streaming_fut.await; - info!("Finished plotting sector successfully"); }; From 48618d11c7f34f320016fec6c3dcfae3e96a0543 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Thu, 17 Oct 2024 16:59:15 +0300 Subject: [PATCH 10/14] Remove `StreamRequest` and hide stream request/response internals from external users --- .../src/cluster/nats_client.rs | 76 +++++-------------- 1 file changed, 17 insertions(+), 59 deletions(-) diff --git a/crates/subspace-farmer/src/cluster/nats_client.rs b/crates/subspace-farmer/src/cluster/nats_client.rs index a80f326d6f..5683529d32 100644 --- a/crates/subspace-farmer/src/cluster/nats_client.rs +++ b/crates/subspace-farmer/src/cluster/nats_client.rs @@ -70,11 +70,11 @@ pub trait GenericStreamRequest: Encode + Decode + fmt::Debug + Send + Sync + 'st type Response: Encode + Decode + fmt::Debug + Send + Sync + 'static; } -/// Messages sent in response to [`StreamRequest`]. +/// Messages sent in response to [`GenericStreamRequest`]. /// /// Empty list of responses means the end of the stream. #[derive(Debug, Encode, Decode)] -pub enum GenericStreamResponses { +enum GenericStreamResponses { /// Some responses, but the stream didn't end yet Continue { /// Monotonically increasing index of responses in a stream @@ -132,35 +132,6 @@ impl GenericStreamResponses { } } -/// Generic stream request that expects a stream of responses. -/// -/// Internally it is expected that [`GenericStreamResponses`] messages will be -/// sent to auto-generated subject specified in `response_subject` field. -#[derive(Debug, Encode, Decode)] -#[non_exhaustive] -pub struct StreamRequest -where - Request: GenericStreamRequest, -{ - /// Request - pub request: Request, - /// Topic to send a stream of [`GenericStreamResponses`]s to - pub response_subject: String, -} - -impl StreamRequest -where - Request: GenericStreamRequest, -{ - /// Create new stream request - pub fn new(request: Request) -> Self { - Self { - request, - response_subject: format!("stream-response.{}", Ulid::new()), - } - } -} - /// Stream request error #[derive(Debug, Error)] pub enum StreamRequestError { @@ -658,30 +629,35 @@ impl NatsClient { where Request: GenericStreamRequest, { - let stream_request = StreamRequest::new(request); + let stream_request_subject = subject_with_instance(Request::SUBJECT, instance); + let stream_response_subject = format!("stream-response.{}", Ulid::new()); let subscriber = self .inner .client - .subscribe(stream_request.response_subject.clone()) + .subscribe(stream_response_subject.clone()) .await?; - let stream_request_subject = subject_with_instance(Request::SUBJECT, instance); debug!( request_type = %type_name::(), %stream_request_subject, + %stream_response_subject, ?subscriber, "Stream request subscription" ); self.inner .client - .publish(stream_request_subject, stream_request.encode().into()) + .publish_with_reply( + stream_request_subject, + stream_response_subject.clone(), + request.encode().into(), + ) .await?; Ok(StreamResponseSubscriber::new( subscriber, - stream_request.response_subject, + stream_response_subject, self.clone(), )) } @@ -764,13 +740,12 @@ impl NatsClient { S: Stream + Unpin, OP: Fn(Request) -> F + Send + Sync, { - // TODO: Un-comment once there is no `StreamRequest` - // let Some(reply_subject) = message.reply else { - // return; - // }; + let Some(reply_subject) = message.reply else { + return; + }; let message_payload_size = message.payload.len(); - let request = match StreamRequest::::decode(&mut message.payload.as_ref()) { + let request = match Request::decode(&mut message.payload.as_ref()) { Ok(request) => { // Free allocation early drop(message.payload); @@ -786,9 +761,6 @@ impl NatsClient { return; } }; - // TODO: Remove two lines once there is no `StreamRequest` - let reply_subject = request.response_subject; - let request = request.request; // Avoid printing large messages in logs if message_payload_size > 1024 { @@ -813,7 +785,7 @@ impl NatsClient { } /// Helper method to send responses to requests initiated with [`Self::stream_request`] - async fn stream_response(&self, response_subject: String, response_stream: S) + async fn stream_response(&self, response_subject: Subject, response_stream: S) where Request: GenericStreamRequest, S: Stream + Unpin, @@ -1096,20 +1068,6 @@ impl NatsClient { .await } - /// Simple subscription that will produce decoded stream requests, while skipping messages that - /// fail to decode - pub async fn subscribe_to_stream_requests( - &self, - instance: Option<&str>, - queue_group: Option, - ) -> Result>, SubscribeError> - where - Request: GenericStreamRequest, - { - self.simple_subscribe(Request::SUBJECT, instance, queue_group) - .await - } - /// Simple subscription that will produce decoded notifications, while skipping messages that /// fail to decode pub async fn subscribe_to_notifications( From 20cf307cb866199a7d1f33cc9f1af663f2eb94f7 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Thu, 17 Oct 2024 17:29:44 +0300 Subject: [PATCH 11/14] Fix resuming future after completion --- crates/subspace-farmer/src/farmer_cache.rs | 6 ++---- crates/subspace-farmer/src/farmer_piece_getter.rs | 6 ++---- crates/subspace-networking/src/utils/piece_provider.rs | 6 ++---- 3 files changed, 6 insertions(+), 12 deletions(-) diff --git a/crates/subspace-farmer/src/farmer_cache.rs b/crates/subspace-farmer/src/farmer_cache.rs index 86409eceeb..d909f1eb41 100644 --- a/crates/subspace-farmer/src/farmer_cache.rs +++ b/crates/subspace-farmer/src/farmer_cache.rs @@ -1304,13 +1304,11 @@ where // Drive above future and stream back any pieces that were downloaded so far stream::poll_fn(move |cx| { - let end_result = fut.poll_unpin(cx); - - if let Ok(maybe_result) = rx.try_next() { + if let Poll::Ready(maybe_result) = rx.poll_next_unpin(cx) { return Poll::Ready(maybe_result); } - end_result.map(|((), ())| None) + fut.poll_unpin(cx).map(|((), ())| None) }) } diff --git a/crates/subspace-farmer/src/farmer_piece_getter.rs b/crates/subspace-farmer/src/farmer_piece_getter.rs index 14b5fc49ac..1c36064fb9 100644 --- a/crates/subspace-farmer/src/farmer_piece_getter.rs +++ b/crates/subspace-farmer/src/farmer_piece_getter.rs @@ -393,13 +393,11 @@ where // Drive above future and stream back any pieces that were downloaded so far Ok(Box::new(stream::poll_fn(move |cx| { - let end_result = fut.poll_unpin(cx); - - if let Ok(maybe_result) = rx.try_next() { + if let Poll::Ready(maybe_result) = rx.poll_next_unpin(cx) { return Poll::Ready(maybe_result); } - end_result.map(|()| None) + fut.poll_unpin(cx).map(|()| None) }))) } } diff --git a/crates/subspace-networking/src/utils/piece_provider.rs b/crates/subspace-networking/src/utils/piece_provider.rs index e0cf6d2166..2d48621967 100644 --- a/crates/subspace-networking/src/utils/piece_provider.rs +++ b/crates/subspace-networking/src/utils/piece_provider.rs @@ -98,13 +98,11 @@ where // Drive above future and stream back any pieces that were downloaded so far stream::poll_fn(move |cx| { - let end_result = fut.poll_unpin(cx); - - if let Ok(maybe_result) = rx.try_next() { + if let Poll::Ready(maybe_result) = rx.poll_next_unpin(cx) { return Poll::Ready(maybe_result); } - end_result.map(|()| None) + fut.poll_unpin(cx).map(|()| None) }) } From 08b4e10d37ffbbc17f218eb72119eca32d18a64a Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Thu, 17 Oct 2024 21:54:58 +0300 Subject: [PATCH 12/14] Fix docs links to private items --- crates/subspace-farmer/src/cluster/nats_client.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/crates/subspace-farmer/src/cluster/nats_client.rs b/crates/subspace-farmer/src/cluster/nats_client.rs index 5683529d32..be2d53b508 100644 --- a/crates/subspace-farmer/src/cluster/nats_client.rs +++ b/crates/subspace-farmer/src/cluster/nats_client.rs @@ -57,7 +57,8 @@ pub trait GenericRequest: Encode + Decode + fmt::Debug + Send + Sync + 'static { type Response: Encode + Decode + fmt::Debug + Send + Sync + 'static; } -/// Generic stream request where response is streamed using [`NatsClient::stream_response`]. +/// Generic stream request where response is streamed using +/// [`NatsClient::stream_request_responder`]. /// /// Used for cases where a large payload that doesn't fit into NATS message needs to be sent or /// there is a very large number of messages to send. For simple request/response patten @@ -66,7 +67,7 @@ pub trait GenericStreamRequest: Encode + Decode + fmt::Debug + Send + Sync + 'st /// Request subject with optional `*` in place of application instance to receive the request const SUBJECT: &'static str; /// Response type that corresponds to this stream request. These responses are send as a stream - /// of [`GenericStreamResponses`] messages. + /// of messages. type Response: Encode + Decode + fmt::Debug + Send + Sync + 'static; } @@ -143,8 +144,8 @@ pub enum StreamRequestError { Publish(#[from] PublishError), } -/// Wrapper around subscription that transforms [`GenericStreamResponses`] messages into a -/// normal `Response` stream. +/// Wrapper around subscription that transforms stream of wrapped response messages into a normal +/// `Response` stream. #[derive(Debug, Deref, DerefMut)] #[pin_project::pin_project] pub struct StreamResponseSubscriber { From d3a8ef08513862f35b29dbc58ccd0456c822ad52 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Thu, 17 Oct 2024 21:55:57 +0300 Subject: [PATCH 13/14] Avoid division by zero by skipping empty list of peers pieces --- crates/subspace-networking/src/utils/piece_provider.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/subspace-networking/src/utils/piece_provider.rs b/crates/subspace-networking/src/utils/piece_provider.rs index 2d48621967..68e64e2561 100644 --- a/crates/subspace-networking/src/utils/piece_provider.rs +++ b/crates/subspace-networking/src/utils/piece_provider.rs @@ -527,6 +527,10 @@ where break; }; + if connected_peers.is_empty() || pieces_to_download.is_empty() { + break; + } + let num_pieces = pieces_to_download.len(); let step = num_pieces / connected_peers.len().min(num_pieces); From ca3288cb2b923cc9243c0a48bd791e4312fd2b3b Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Thu, 17 Oct 2024 21:56:34 +0300 Subject: [PATCH 14/14] Update `futures` crate --- Cargo.lock | 36 +++++++++---------- crates/pallet-subspace/Cargo.toml | 2 +- crates/sc-consensus-subspace-rpc/Cargo.toml | 2 +- crates/sc-consensus-subspace/Cargo.toml | 2 +- crates/sc-proof-of-time/Cargo.toml | 2 +- crates/sc-subspace-block-relay/Cargo.toml | 2 +- crates/sp-domains-fraud-proof/Cargo.toml | 2 +- crates/subspace-farmer-components/Cargo.toml | 4 +-- crates/subspace-farmer/Cargo.toml | 2 +- crates/subspace-gateway/Cargo.toml | 2 +- crates/subspace-malicious-operator/Cargo.toml | 2 +- crates/subspace-networking/Cargo.toml | 2 +- crates/subspace-node/Cargo.toml | 2 +- crates/subspace-service/Cargo.toml | 2 +- .../cross-domain-message-gossip/Cargo.toml | 2 +- domains/client/domain-operator/Cargo.toml | 2 +- domains/client/eth-service/Cargo.toml | 2 +- domains/client/relayer/Cargo.toml | 2 +- domains/service/Cargo.toml | 2 +- shared/subspace-data-retrieval/Cargo.toml | 2 +- test/subspace-test-client/Cargo.toml | 2 +- test/subspace-test-service/Cargo.toml | 2 +- 22 files changed, 40 insertions(+), 40 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 209d4d895a..7f05f68bcf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4072,9 +4072,9 @@ checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" [[package]] name = "futures" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" dependencies = [ "futures-channel", "futures-core", @@ -4107,9 +4107,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" dependencies = [ "futures-core", "futures-sink", @@ -4117,15 +4117,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" [[package]] name = "futures-executor" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" dependencies = [ "futures-core", "futures-task", @@ -4135,9 +4135,9 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" [[package]] name = "futures-lite" @@ -4169,9 +4169,9 @@ dependencies = [ [[package]] name = "futures-macro" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", @@ -4210,15 +4210,15 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" [[package]] name = "futures-task" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" [[package]] name = "futures-ticker" @@ -4239,9 +4239,9 @@ checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" [[package]] name = "futures-util" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ "futures-channel", "futures-core", diff --git a/crates/pallet-subspace/Cargo.toml b/crates/pallet-subspace/Cargo.toml index 8b4bec2524..828ada518f 100644 --- a/crates/pallet-subspace/Cargo.toml +++ b/crates/pallet-subspace/Cargo.toml @@ -31,7 +31,7 @@ subspace-verification = { version = "0.1.0", path = "../subspace-verification", [dev-dependencies] env_logger = "0.11.5" -futures = "0.3.30" +futures = "0.3.31" pallet-balances = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } rand = { version = "0.8.5", features = ["min_const_gen"] } sp-io = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } diff --git a/crates/sc-consensus-subspace-rpc/Cargo.toml b/crates/sc-consensus-subspace-rpc/Cargo.toml index 17e245872a..701917d88a 100644 --- a/crates/sc-consensus-subspace-rpc/Cargo.toml +++ b/crates/sc-consensus-subspace-rpc/Cargo.toml @@ -14,7 +14,7 @@ targets = ["x86_64-unknown-linux-gnu"] [dependencies] async-oneshot = "0.5.9" -futures = "0.3.30" +futures = "0.3.31" futures-timer = "3.0.3" jsonrpsee = { version = "0.24.5", features = ["server", "macros"] } parking_lot = "0.12.2" diff --git a/crates/sc-consensus-subspace/Cargo.toml b/crates/sc-consensus-subspace/Cargo.toml index 4d6d650e60..6bb4fa38f6 100644 --- a/crates/sc-consensus-subspace/Cargo.toml +++ b/crates/sc-consensus-subspace/Cargo.toml @@ -15,7 +15,7 @@ targets = ["x86_64-unknown-linux-gnu"] [dependencies] async-trait = "0.1.83" codec = { package = "parity-scale-codec", version = "3.6.12", features = ["derive"] } -futures = "0.3.30" +futures = "0.3.31" parking_lot = "0.12.2" rand = "0.8.5" rand_chacha = "0.3.1" diff --git a/crates/sc-proof-of-time/Cargo.toml b/crates/sc-proof-of-time/Cargo.toml index 21459d71f7..a350872dd0 100644 --- a/crates/sc-proof-of-time/Cargo.toml +++ b/crates/sc-proof-of-time/Cargo.toml @@ -13,7 +13,7 @@ include = [ [dependencies] core_affinity = "0.8.1" derive_more = { version = "1.0.0", features = ["full"] } -futures = "0.3.30" +futures = "0.3.31" parity-scale-codec = { version = "3.6.12", features = ["derive"] } parking_lot = "0.12.2" rayon = "1.10.0" diff --git a/crates/sc-subspace-block-relay/Cargo.toml b/crates/sc-subspace-block-relay/Cargo.toml index df46c9629d..326ad1c4a9 100644 --- a/crates/sc-subspace-block-relay/Cargo.toml +++ b/crates/sc-subspace-block-relay/Cargo.toml @@ -15,7 +15,7 @@ async-channel = "1.9.0" async-trait = "0.1.83" codec = { package = "parity-scale-codec", version = "3.6.12", default-features = false, features = ["derive"] } derive_more = { version = "1.0.0", features = ["full"] } -futures = "0.3.30" +futures = "0.3.31" parking_lot = "0.12.2" sc-client-api = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } sc-network = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } diff --git a/crates/sp-domains-fraud-proof/Cargo.toml b/crates/sp-domains-fraud-proof/Cargo.toml index f2c456e12f..3a742235d1 100644 --- a/crates/sp-domains-fraud-proof/Cargo.toml +++ b/crates/sp-domains-fraud-proof/Cargo.toml @@ -49,7 +49,7 @@ fp-rpc = { version = "3.0.0-dev", git = "https://github.com/autonomys/frontier", fp-self-contained = { version = "1.0.0-dev", git = "https://github.com/autonomys/frontier", rev = "f80f9e2bad338f3bf3854b256b3c4edea23e5968", features = ['default'] } frame-support = { default-features = false, git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } frame-system = { default-features = false, git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } -futures = "0.3.30" +futures = "0.3.31" libsecp256k1 = { version = "0.7.1", features = ["static-context", "hmac"] } pallet-balances = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } pallet-ethereum = { git = "https://github.com/autonomys/frontier", rev = "f80f9e2bad338f3bf3854b256b3c4edea23e5968", features = ['default'] } diff --git a/crates/subspace-farmer-components/Cargo.toml b/crates/subspace-farmer-components/Cargo.toml index 8edc69aa4e..5ca9c4ae55 100644 --- a/crates/subspace-farmer-components/Cargo.toml +++ b/crates/subspace-farmer-components/Cargo.toml @@ -23,7 +23,7 @@ backoff = { version = "0.4.0", features = ["futures", "tokio"] } bitvec = "1.0.1" # TODO: Switch to fs4 once https://github.com/al8n/fs4-rs/issues/15 is resolved fs2 = "0.4.3" -futures = "0.3.30" +futures = "0.3.31" hex = "0.4.3" libc = "0.2.159" parity-scale-codec = "3.6.12" @@ -48,7 +48,7 @@ winapi = "0.3.9" [dev-dependencies] criterion = "0.5.1" -futures = "0.3.30" +futures = "0.3.31" subspace-archiving = { version = "0.1.0", path = "../subspace-archiving" } subspace-proof-of-space = { version = "0.1.0", path = "../subspace-proof-of-space" } diff --git a/crates/subspace-farmer/Cargo.toml b/crates/subspace-farmer/Cargo.toml index 9b5008d6ac..cfe35a8588 100644 --- a/crates/subspace-farmer/Cargo.toml +++ b/crates/subspace-farmer/Cargo.toml @@ -33,7 +33,7 @@ event-listener = "5.3.1" event-listener-primitives = "2.0.1" fdlimit = { version = "0.3.0", optional = true } fs4 = "0.9.1" -futures = "0.3.30" +futures = "0.3.31" hex = { version = "0.4.3", features = ["serde"] } hwlocality = { version = "1.0.0-alpha.6", features = ["vendored"], optional = true } jsonrpsee = { version = "0.24.5", features = ["ws-client"] } diff --git a/crates/subspace-gateway/Cargo.toml b/crates/subspace-gateway/Cargo.toml index 739eca1990..2ee2195d1b 100644 --- a/crates/subspace-gateway/Cargo.toml +++ b/crates/subspace-gateway/Cargo.toml @@ -20,7 +20,7 @@ targets = ["x86_64-unknown-linux-gnu"] anyhow = "1.0.89" clap = { version = "4.5.18", features = ["derive"] } fdlimit = "0.3.0" -futures = "0.3.30" +futures = "0.3.31" mimalloc = "0.1.43" supports-color = "3.0.1" thiserror = "1.0.64" diff --git a/crates/subspace-malicious-operator/Cargo.toml b/crates/subspace-malicious-operator/Cargo.toml index 98c2f479fc..d696749f63 100644 --- a/crates/subspace-malicious-operator/Cargo.toml +++ b/crates/subspace-malicious-operator/Cargo.toml @@ -31,7 +31,7 @@ evm-domain-runtime = { version = "0.1.0", path = "../../domains/runtime/evm" } fp-evm = { version = "3.0.0-dev", git = "https://github.com/autonomys/frontier", rev = "f80f9e2bad338f3bf3854b256b3c4edea23e5968" } frame-system = { default-features = false, git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } frame-system-rpc-runtime-api = { default-features = false, git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } -futures = "0.3.30" +futures = "0.3.31" hex-literal = "0.4.1" log = "0.4.22" mimalloc = "0.1.43" diff --git a/crates/subspace-networking/Cargo.toml b/crates/subspace-networking/Cargo.toml index d329bec891..7626019bc9 100644 --- a/crates/subspace-networking/Cargo.toml +++ b/crates/subspace-networking/Cargo.toml @@ -26,7 +26,7 @@ either = "1.13.0" event-listener-primitives = "2.0.1" # TODO: Switch to fs4 once https://github.com/al8n/fs4-rs/issues/15 is resolved fs2 = "0.4.3" -futures = "0.3.30" +futures = "0.3.31" futures-timer = "3.0.3" hex = "0.4.3" memmap2 = "0.9.5" diff --git a/crates/subspace-node/Cargo.toml b/crates/subspace-node/Cargo.toml index 9d38936659..8b197a10ab 100644 --- a/crates/subspace-node/Cargo.toml +++ b/crates/subspace-node/Cargo.toml @@ -35,7 +35,7 @@ fp-evm = { version = "3.0.0-dev", git = "https://github.com/autonomys/frontier", frame-benchmarking = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42", default-features = false } frame-benchmarking-cli = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42", default-features = false } frame-support = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } -futures = "0.3.30" +futures = "0.3.31" hex = "0.4.3" hex-literal = "0.4.1" mimalloc = "0.1.43" diff --git a/crates/subspace-service/Cargo.toml b/crates/subspace-service/Cargo.toml index 30479a35c1..ec9668805e 100644 --- a/crates/subspace-service/Cargo.toml +++ b/crates/subspace-service/Cargo.toml @@ -22,7 +22,7 @@ async-trait = "0.1.83" cross-domain-message-gossip = { version = "0.1.0", path = "../../domains/client/cross-domain-message-gossip" } domain-runtime-primitives = { version = "0.1.0", path = "../../domains/primitives/runtime" } frame-benchmarking = { default-features = false, git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42", optional = true } -futures = "0.3.30" +futures = "0.3.31" hex = "0.4.3" jsonrpsee = { version = "0.24.5", features = ["server"] } mmr-gadget = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } diff --git a/domains/client/cross-domain-message-gossip/Cargo.toml b/domains/client/cross-domain-message-gossip/Cargo.toml index 59a73f8148..59b7582874 100644 --- a/domains/client/cross-domain-message-gossip/Cargo.toml +++ b/domains/client/cross-domain-message-gossip/Cargo.toml @@ -14,7 +14,7 @@ include = [ [dependencies] domain-block-preprocessor = { version = "0.1.0", path = "../../client/block-preprocessor" } fp-account = { version = "1.0.0-dev", git = "https://github.com/autonomys/frontier", rev = "f80f9e2bad338f3bf3854b256b3c4edea23e5968" } -futures = "0.3.30" +futures = "0.3.31" parity-scale-codec = { version = "3.6.12", features = ["derive"] } parking_lot = "0.12.2" sc-client-api = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } diff --git a/domains/client/domain-operator/Cargo.toml b/domains/client/domain-operator/Cargo.toml index ac9544d3b1..6ccd3742f6 100644 --- a/domains/client/domain-operator/Cargo.toml +++ b/domains/client/domain-operator/Cargo.toml @@ -9,7 +9,7 @@ codec = { package = "parity-scale-codec", version = "3.6.12", features = ["deriv domain-block-builder = { version = "0.1.0", path = "../block-builder" } domain-block-preprocessor = { version = "0.1.0", path = "../block-preprocessor" } domain-runtime-primitives = { version = "0.1.0", path = "../../primitives/runtime" } -futures = "0.3.30" +futures = "0.3.31" futures-timer = "3.0.3" parking_lot = "0.12.2" sc-client-api = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } diff --git a/domains/client/eth-service/Cargo.toml b/domains/client/eth-service/Cargo.toml index 046fae7ac3..263548ba41 100644 --- a/domains/client/eth-service/Cargo.toml +++ b/domains/client/eth-service/Cargo.toml @@ -22,7 +22,7 @@ fc-rpc = { version = "2.0.0-dev", git = "https://github.com/autonomys/frontier", fc-rpc-core = { version = "1.1.0-dev", git = "https://github.com/autonomys/frontier", rev = "f80f9e2bad338f3bf3854b256b3c4edea23e5968" } fc-storage = { version = "1.0.0-dev", git = "https://github.com/autonomys/frontier", rev = "f80f9e2bad338f3bf3854b256b3c4edea23e5968" } fp-rpc = { version = "3.0.0-dev", git = "https://github.com/autonomys/frontier", rev = "f80f9e2bad338f3bf3854b256b3c4edea23e5968", features = ['default'] } -futures = "0.3.30" +futures = "0.3.31" jsonrpsee = { version = "0.24.5", features = ["server"] } pallet-transaction-payment-rpc = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } parity-scale-codec = "3.6.12" diff --git a/domains/client/relayer/Cargo.toml b/domains/client/relayer/Cargo.toml index 4214a845fc..4b4bcb7106 100644 --- a/domains/client/relayer/Cargo.toml +++ b/domains/client/relayer/Cargo.toml @@ -14,7 +14,7 @@ include = [ [dependencies] async-channel = "1.9.0" cross-domain-message-gossip = { path = "../../client/cross-domain-message-gossip" } -futures = "0.3.30" +futures = "0.3.31" parity-scale-codec = { version = "3.6.12", features = ["derive"] } sc-client-api = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } sc-state-db = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } diff --git a/domains/service/Cargo.toml b/domains/service/Cargo.toml index 536a1e6e9f..8196fa3e31 100644 --- a/domains/service/Cargo.toml +++ b/domains/service/Cargo.toml @@ -21,7 +21,7 @@ domain-client-message-relayer = { version = "0.1.0", path = "../client/relayer" domain-client-operator = { version = "0.1.0", path = "../client/domain-operator" } domain-runtime-primitives = { version = "0.1.0", path = "../primitives/runtime" } frame-benchmarking = { default-features = false, git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42", optional = true } -futures = "0.3.30" +futures = "0.3.31" jsonrpsee = { version = "0.24.5", features = ["server"] } log = "0.4.22" pallet-transaction-payment-rpc = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } diff --git a/shared/subspace-data-retrieval/Cargo.toml b/shared/subspace-data-retrieval/Cargo.toml index 6fa44cb679..6c5ec9e6f9 100644 --- a/shared/subspace-data-retrieval/Cargo.toml +++ b/shared/subspace-data-retrieval/Cargo.toml @@ -14,7 +14,7 @@ include = [ [dependencies] async-lock = "3.4.0" async-trait = "0.1.83" -futures = "0.3.30" +futures = "0.3.31" parity-scale-codec = { version = "3.6.12", features = ["derive"] } subspace-archiving = { version = "0.1.0", path = "../../crates/subspace-archiving" } subspace-core-primitives = { version = "0.1.0", path = "../../crates/subspace-core-primitives" } diff --git a/test/subspace-test-client/Cargo.toml b/test/subspace-test-client/Cargo.toml index aea3ba2d82..901fe1beeb 100644 --- a/test/subspace-test-client/Cargo.toml +++ b/test/subspace-test-client/Cargo.toml @@ -20,7 +20,7 @@ codec = { package = "parity-scale-codec", version = "3.6.12", features = ["deriv domain-runtime-primitives = { version = "0.1.0", path = "../../domains/primitives/runtime" } evm-domain-test-runtime = { version = "0.1.0", path = "../../domains/test/runtime/evm" } fp-evm = { version = "3.0.0-dev", git = "https://github.com/autonomys/frontier", rev = "f80f9e2bad338f3bf3854b256b3c4edea23e5968" } -futures = "0.3.30" +futures = "0.3.31" schnorrkel = "0.11.4" sc-chain-spec = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } sc-client-api = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } diff --git a/test/subspace-test-service/Cargo.toml b/test/subspace-test-service/Cargo.toml index 2fffe71ca5..77fd9ca44d 100644 --- a/test/subspace-test-service/Cargo.toml +++ b/test/subspace-test-service/Cargo.toml @@ -21,7 +21,7 @@ codec = { package = "parity-scale-codec", version = "3.6.12", features = ["deriv domain-client-message-relayer = { version = "0.1.0", path = "../../domains/client/relayer" } domain-runtime-primitives = { version = "0.1.0", path = "../../domains/primitives/runtime" } frame-system = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } -futures = "0.3.30" +futures = "0.3.31" jsonrpsee = { version = "0.24.5", features = ["server"] } pallet-transaction-payment = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } mmr-gadget = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" }