Skip to content

Commit

Permalink
Merge pull request #3149 from autonomys/batches-in-cluster-piece-getter
Browse files Browse the repository at this point in the history
Batches in `ClusterPieceGetter`
  • Loading branch information
nazar-pc authored Oct 18, 2024
2 parents ebd4a0a + d9d2cc0 commit 64df00e
Show file tree
Hide file tree
Showing 12 changed files with 452 additions and 46 deletions.
51 changes: 51 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 1 addition & 13 deletions crates/subspace-farmer-components/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,7 @@ pub trait PieceGetter {
Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
>
where
PieceIndices: IntoIterator<Item = PieceIndex, IntoIter: Send> + Send + 'a,
{
// TODO: Remove default impl here
Ok(Box::new(
piece_indices
.into_iter()
.map(|piece_index| async move {
let result = self.get_piece(piece_index).await;
(piece_index, result)
})
.collect::<FuturesUnordered<_>>(),
) as Box<_>)
}
PieceIndices: IntoIterator<Item = PieceIndex, IntoIter: Send> + Send + 'a;
}

#[async_trait]
Expand Down
1 change: 1 addition & 0 deletions crates/subspace-farmer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ hwlocality = { version = "1.0.0-alpha.6", features = ["vendored"], optional = tr
jsonrpsee = { version = "0.24.5", features = ["ws-client"] }
mimalloc = { version = "0.1.43", optional = true }
num_cpus = "1.16.0"
ouroboros = "0.18.4"
parity-scale-codec = "3.6.12"
parking_lot = "0.12.2"
pin-project = "1.1.5"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ struct RocmPlottingOptions {
pub(super) struct PlotterArgs {
/// Piece getter concurrency.
///
/// Increasing this value can cause NATS communication issues if too many messages arrive via NATS, but
/// are not processed quickly enough.
/// Increasing this value can cause NATS communication issues if too many messages arrive via
/// NATS, but are not processed quickly enough.
#[arg(long, default_value = "32")]
piece_getter_concurrency: NonZeroUsize,
/// Plotting options only used by CPU plotter
Expand Down
6 changes: 3 additions & 3 deletions crates/subspace-farmer/src/cluster/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl GenericStreamRequest for ClusterCacheReadPiecesRequest {
type Response = Result<(PieceCacheOffset, Option<(PieceIndex, Piece)>), String>;
}

/// Request plotted from farmer, request
/// Collect plotted pieces from farmer
#[derive(Debug, Clone, Encode, Decode)]
struct ClusterCacheContentsRequest;

Expand Down Expand Up @@ -132,7 +132,7 @@ impl PieceCache for ClusterPieceCache {
> {
Ok(Box::new(
self.nats_client
.stream_request(ClusterCacheContentsRequest, Some(&self.cache_id_string))
.stream_request(&ClusterCacheContentsRequest, Some(&self.cache_id_string))
.await?
.map(|response| response.map_err(FarmError::from)),
))
Expand Down Expand Up @@ -200,7 +200,7 @@ impl PieceCache for ClusterPieceCache {
let mut stream = self
.nats_client
.stream_request(
ClusterCacheReadPiecesRequest { offsets },
&ClusterCacheReadPiecesRequest { offsets },
Some(&self.cache_id_string),
)
.await?
Expand Down
Loading

0 comments on commit 64df00e

Please sign in to comment.