Skip to content

Commit

Permalink
Remove first_sector_index and sector_offset from implementation a…
Browse files Browse the repository at this point in the history
…s unnecessary, simplifying logic
  • Loading branch information
nazar-pc committed Jul 6, 2023
1 parent e25420a commit 5de2ca1
Show file tree
Hide file tree
Showing 14 changed files with 118 additions and 203 deletions.
3 changes: 1 addition & 2 deletions crates/pallet-subspace/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,13 +410,12 @@ pub fn create_signed_vote(
let pieces_in_sector = farmer_protocol_info.max_pieces_in_sector;
let sector_size = sector_size(pieces_in_sector);

for (sector_offset, sector_index) in iter::from_fn(|| Some(rand::random())).enumerate() {
for sector_index in iter::from_fn(|| Some(rand::random())) {
let mut plotted_sector_bytes = vec![0; sector_size];
let mut plotted_sector_metadata_bytes = vec![0; SectorMetadata::encoded_size()];

let plotted_sector = block_on(plot_sector::<_, PosTable>(
&public_key,
sector_offset,
sector_index,
archived_history_segment,
PieceGetterRetryPolicy::default(),
Expand Down
3 changes: 1 addition & 2 deletions crates/sp-lightclient/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,12 @@ fn valid_header(
let pieces_in_sector = farmer_parameters.farmer_protocol_info.max_pieces_in_sector;
let sector_size = sector_size(pieces_in_sector);

for (sector_offset, sector_index) in iter::from_fn(|| Some(rand::random())).enumerate() {
for sector_index in iter::from_fn(|| Some(rand::random())) {
let mut plotted_sector_bytes = vec![0; sector_size];
let mut plotted_sector_metadata_bytes = vec![0; SectorMetadata::encoded_size()];

let plotted_sector = block_on(plot_sector::<_, PosTable>(
&public_key,
sector_offset,
sector_index,
&farmer_parameters.archived_segment.pieces,
PieceGetterRetryPolicy::default(),
Expand Down
2 changes: 0 additions & 2 deletions crates/subspace-farmer-components/benches/auditing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ pub fn criterion_benchmark(c: &mut Criterion) {
.unwrap_or(10);

let public_key = PublicKey::default();
let sector_offset = 0;
let sector_index = 0;
let mut input = RecordedHistorySegment::new_boxed();
StdRng::seed_from_u64(42).fill(AsMut::<[u8]>::as_mut(input.as_mut()));
Expand Down Expand Up @@ -116,7 +115,6 @@ pub fn criterion_benchmark(c: &mut Criterion) {

let plotted_sector = block_on(plot_sector::<_, PosTable>(
&public_key,
sector_offset,
sector_index,
&archived_history_segment,
PieceGetterRetryPolicy::default(),
Expand Down
2 changes: 0 additions & 2 deletions crates/subspace-farmer-components/benches/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ fn criterion_benchmark(c: &mut Criterion) {
.unwrap_or_else(|_error| MAX_PIECES_IN_SECTOR);

let public_key = PublicKey::default();
let sector_offset = 0;
let sector_index = 0;
let mut input = RecordedHistorySegment::new_boxed();
StdRng::seed_from_u64(42).fill(AsMut::<[u8]>::as_mut(input.as_mut()));
Expand Down Expand Up @@ -67,7 +66,6 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
block_on(plot_sector::<_, PosTable>(
black_box(&public_key),
black_box(sector_offset),
black_box(sector_index),
black_box(&archived_history_segment),
black_box(PieceGetterRetryPolicy::default()),
Expand Down
2 changes: 0 additions & 2 deletions crates/subspace-farmer-components/benches/proving.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ pub fn criterion_benchmark(c: &mut Criterion) {

let keypair = Keypair::from_bytes(&[0; 96]).unwrap();
let public_key = PublicKey::from(keypair.public.to_bytes());
let sector_offset = 0;
let sector_index = 0;
let mut input = RecordedHistorySegment::new_boxed();
let mut rng = StdRng::seed_from_u64(42);
Expand Down Expand Up @@ -119,7 +118,6 @@ pub fn criterion_benchmark(c: &mut Criterion) {

let plotted_sector = block_on(plot_sector::<_, PosTable>(
&public_key,
sector_offset,
sector_index,
&archived_history_segment,
PieceGetterRetryPolicy::default(),
Expand Down
2 changes: 0 additions & 2 deletions crates/subspace-farmer-components/benches/reading.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ pub fn criterion_benchmark(c: &mut Criterion) {
.unwrap_or(10);

let public_key = PublicKey::default();
let sector_offset = 0;
let sector_index = 0;
let mut input = RecordedHistorySegment::new_boxed();
StdRng::seed_from_u64(42).fill(AsMut::<[u8]>::as_mut(input.as_mut()));
Expand Down Expand Up @@ -113,7 +112,6 @@ pub fn criterion_benchmark(c: &mut Criterion) {

let plotted_sector = block_on(plot_sector::<_, PosTable>(
&public_key,
sector_offset,
sector_index,
&archived_history_segment,
PieceGetterRetryPolicy::default(),
Expand Down
4 changes: 1 addition & 3 deletions crates/subspace-farmer-components/src/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ pub enum PlottingError {
#[allow(clippy::too_many_arguments)]
pub async fn plot_sector<PG, PosTable>(
public_key: &PublicKey,
sector_offset: usize,
sector_index: u64,
piece_getter: &PG,
piece_getter_retry_policy: PieceGetterRetryPolicy,
Expand Down Expand Up @@ -230,7 +229,6 @@ where
.await
{
warn!(
%sector_offset,
%sector_index,
%error,
"Sector plotting attempt failed, will retry later"
Expand All @@ -239,7 +237,7 @@ where
return Err(BackoffError::transient(error));
}

debug!(%sector_offset, %sector_index, "Sector downloaded successfully");
debug!(%sector_index, "Sector downloaded successfully");

Ok(())
})
Expand Down
174 changes: 83 additions & 91 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ use std::sync::Arc;
use std::time::Duration;
use subspace_core_primitives::crypto::kzg::{embedded_kzg_settings, Kzg};
use subspace_core_primitives::{
ArchivedHistorySegment, Piece, PieceIndex, PieceIndexHash, PieceOffset, Record, SegmentIndex,
ArchivedHistorySegment, Piece, PieceIndex, PieceIndexHash, PieceOffset, Record, SectorIndex,
SegmentIndex,
};
use subspace_erasure_coding::ErasureCoding;
use subspace_farmer::single_disk_plot::{
Expand Down Expand Up @@ -253,23 +254,22 @@ where
.iter()
.enumerate()
.flat_map(|(disk_farm_index, single_disk_plot)| {
single_disk_plot
.plotted_sectors()
.enumerate()
.filter_map(move |(sector_offset, plotted_sector_result)| {
match plotted_sector_result {
(0 as SectorIndex..)
.zip(single_disk_plot.plotted_sectors())
.filter_map(
move |(sector_index, plotted_sector_result)| match plotted_sector_result {
Ok(plotted_sector) => Some(plotted_sector),
Err(error) => {
error!(
%error,
%disk_farm_index,
%sector_offset,
%sector_index,
"Failed reading plotted sector on startup, skipping"
);
None
}
}
})
},
)
.flat_map(move |plotted_sector| {
(PieceOffset::ZERO..).zip(plotted_sector.piece_indexes).map(
move |(piece_offset, piece_index)| {
Expand Down Expand Up @@ -310,95 +310,87 @@ where
// Collect newly plotted pieces
// TODO: Once we have replotting, this will have to be updated
single_disk_plot
.on_sector_plotted(Arc::new(
move |(sector_offset, plotted_sector, plotting_permit)| {
let _span_guard = span.enter();
let plotting_permit = Arc::clone(plotting_permit);
let node = node.clone();
let sector_offset = *sector_offset;
let sector_index = plotted_sector.sector_index;

let mut dropped_receiver = dropped_sender.subscribe();

let new_pieces = {
let mut readers_and_pieces = readers_and_pieces.lock();
let readers_and_pieces = readers_and_pieces
.as_mut()
.expect("Initial value was populated above; qed");

let new_pieces = plotted_sector
.piece_indexes
.iter()
.filter(|&&piece_index| {
// Skip pieces that are already plotted and thus were announced
// before
!readers_and_pieces.contains_piece(&piece_index.hash())
})
.copied()
.collect::<Vec<_>>();

readers_and_pieces.add_pieces(
(PieceOffset::ZERO..)
.zip(plotted_sector.piece_indexes.iter().copied())
.map(|(piece_offset, piece_index)| {
(
piece_index.hash(),
PieceDetails {
disk_farm_index,
sector_index,
piece_offset,
},
)
}),
);

new_pieces
};
.on_sector_plotted(Arc::new(move |(plotted_sector, plotting_permit)| {
let _span_guard = span.enter();
let plotting_permit = Arc::clone(plotting_permit);
let node = node.clone();
let sector_index = plotted_sector.sector_index;

let mut dropped_receiver = dropped_sender.subscribe();

let new_pieces = {
let mut readers_and_pieces = readers_and_pieces.lock();
let readers_and_pieces = readers_and_pieces
.as_mut()
.expect("Initial value was populated above; qed");

let new_pieces = plotted_sector
.piece_indexes
.iter()
.filter(|&&piece_index| {
// Skip pieces that are already plotted and thus were announced
// before
!readers_and_pieces.contains_piece(&piece_index.hash())
})
.copied()
.collect::<Vec<_>>();

readers_and_pieces.add_pieces(
(PieceOffset::ZERO..)
.zip(plotted_sector.piece_indexes.iter().copied())
.map(|(piece_offset, piece_index)| {
(
piece_index.hash(),
PieceDetails {
disk_farm_index,
sector_index,
piece_offset,
},
)
}),
);

if new_pieces.is_empty() {
// None of the pieces are new, nothing left to do here
return;
}
new_pieces
};

archival_storage_pieces.add_pieces(&new_pieces);
if new_pieces.is_empty() {
// None of the pieces are new, nothing left to do here
return;
}

// TODO: Skip those that were already announced (because they cached)
let publish_fut = async move {
let mut pieces_publishing_futures = new_pieces
.into_iter()
.map(|piece_index| {
announce_single_piece_index_hash_with_backoff(
piece_index.hash(),
&node,
)
})
.collect::<FuturesUnordered<_>>();
archival_storage_pieces.add_pieces(&new_pieces);

// TODO: Skip those that were already announced (because they cached)
let publish_fut = async move {
let mut pieces_publishing_futures = new_pieces
.into_iter()
.map(|piece_index| {
announce_single_piece_index_hash_with_backoff(
piece_index.hash(),
&node,
)
})
.collect::<FuturesUnordered<_>>();

while pieces_publishing_futures.next().await.is_some() {
// Nothing is needed here, just driving all futures to completion
}

while pieces_publishing_futures.next().await.is_some() {
// Nothing is needed here, just driving all futures to completion
}
info!(?sector_index, "Sector publishing was successful.");

info!(
%sector_offset,
?sector_index,
"Sector publishing was successful."
);
// Release only after publishing is finished
drop(plotting_permit);
}
.in_current_span();

// Release only after publishing is finished
drop(plotting_permit);
tokio::spawn(async move {
let result =
select(Box::pin(publish_fut), Box::pin(dropped_receiver.recv())).await;
if matches!(result, Either::Right(_)) {
debug!("Piece publishing was cancelled due to shutdown.");
}
.in_current_span();

tokio::spawn(async move {
let result =
select(Box::pin(publish_fut), Box::pin(dropped_receiver.recv()))
.await;
if matches!(result, Either::Right(_)) {
debug!("Piece publishing was cancelled due to shutdown.");
}
});
},
))
});
}))
.detach();

single_disk_plot.run()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ pub(crate) fn print_disk_farm_info(directory: PathBuf, disk_farm_index: usize) {
println!(" ID: {}", info.id());
println!(" Genesis hash: 0x{}", hex::encode(info.genesis_hash()));
println!(" Public key: 0x{}", hex::encode(info.public_key()));
println!(" First sector index: {}", info.first_sector_index());
println!(
" Allocated space: {} ({})",
bytesize::to_string(info.allocated_space(), true),
Expand Down
Loading

0 comments on commit 5de2ca1

Please sign in to comment.