Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
removes SHRED_PAYLOAD_SIZE from shred public interface (#24806)
Browse files Browse the repository at this point in the history
  • Loading branch information
behzadnouri authored Apr 28, 2022
1 parent 2e617ba commit 008860b
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 70 deletions.
23 changes: 8 additions & 15 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,7 @@ use {
blockstore_meta::*,
leader_schedule_cache::LeaderScheduleCache,
next_slots_iterator::NextSlotsIterator,
shred::{
self, max_ticks_per_n_shreds, ErasureSetId, Shred, ShredId, ShredType, Shredder,
SHRED_PAYLOAD_SIZE,
},
shred::{self, max_ticks_per_n_shreds, ErasureSetId, Shred, ShredId, ShredType, Shredder},
slot_stats::{ShredSource, SlotsStats},
},
bincode::deserialize,
Expand Down Expand Up @@ -1637,14 +1634,12 @@ impl Blockstore {
}

pub fn get_data_shred(&self, slot: Slot, index: u64) -> Result<Option<Vec<u8>>> {
self.data_shred_cf.get_bytes((slot, index)).map(|data| {
data.map(|mut d| {
// Only data_header.size bytes stored in the blockstore so
// pad the payload out to SHRED_PAYLOAD_SIZE so that the
// erasure recovery works properly.
d.resize(cmp::max(d.len(), SHRED_PAYLOAD_SIZE), 0);
d
})
let shred = self.data_shred_cf.get_bytes((slot, index))?;
let shred = shred.map(Shred::resize_stored_shred).transpose();
shred.map_err(|err| {
let err = format!("Invalid stored shred: {}", err);
let err = Box::new(bincode::ErrorKind::Custom(err));
BlockstoreError::InvalidShredData(err)
})
}

Expand Down Expand Up @@ -3107,15 +3102,13 @@ impl Blockstore {
// Returns the existing shred if `new_shred` is not equal to the existing shred at the
// given slot and index as this implies the leader generated two different shreds with
// the same slot and index
pub fn is_shred_duplicate(&self, shred: ShredId, mut payload: Vec<u8>) -> Option<Vec<u8>> {
pub fn is_shred_duplicate(&self, shred: ShredId, payload: Vec<u8>) -> Option<Vec<u8>> {
let (slot, index, shred_type) = shred.unwrap();
let existing_shred = match shred_type {
ShredType::Data => self.get_data_shred(slot, index as u64),
ShredType::Code => self.get_coding_shred(slot, index as u64),
}
.expect("fetch from DuplicateSlots column family failed")?;
let size = payload.len().max(SHRED_PAYLOAD_SIZE);
payload.resize(size, 0u8);
let new_shred = Shred::new_from_serialized_shred(payload).unwrap();
(existing_shred != *new_shred.payload()).then(|| existing_shred)
}
Expand Down
63 changes: 47 additions & 16 deletions ledger/src/shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ const DATA_SHRED_SIZE_RANGE: RangeInclusive<usize> =
const OFFSET_OF_SHRED_TYPE: usize = SIZE_OF_SIGNATURE;
const OFFSET_OF_SHRED_SLOT: usize = SIZE_OF_SIGNATURE + SIZE_OF_SHRED_TYPE;
const OFFSET_OF_SHRED_INDEX: usize = OFFSET_OF_SHRED_SLOT + SIZE_OF_SHRED_SLOT;
pub const SHRED_PAYLOAD_SIZE: usize = PACKET_DATA_SIZE - SIZE_OF_NONCE;
const SHRED_PAYLOAD_SIZE: usize = PACKET_DATA_SIZE - SIZE_OF_NONCE;
// SIZE_OF_CODING_SHRED_HEADERS bytes at the end of data shreds
// is never used and is not part of erasure coding.
const ENCODED_PAYLOAD_SIZE: usize = SHRED_PAYLOAD_SIZE - SIZE_OF_CODING_SHRED_HEADERS;
Expand Down Expand Up @@ -133,8 +133,8 @@ pub enum Error {
InvalidParentSlot { slot: Slot, parent_slot: Slot },
#[error("Invalid payload size: {size}")]
InvalidPayloadSize { size: usize },
#[error("Invalid shred type: {0:?}")]
InvalidShredType(ShredType),
#[error("Invalid shred type")]
InvalidShredType,
}

#[repr(u8)]
Expand Down Expand Up @@ -432,7 +432,7 @@ impl Shred {
parent_offset,
})
}
ShredType::Code => Err(Error::InvalidShredType(ShredType::Code)),
ShredType::Code => Err(Error::InvalidShredType),
}
}

Expand All @@ -442,7 +442,7 @@ impl Shred {

pub(crate) fn data(&self) -> Result<&[u8], Error> {
match self.shred_type() {
ShredType::Code => Err(Error::InvalidShredType(ShredType::Code)),
ShredType::Code => Err(Error::InvalidShredType),
ShredType::Data => {
let size = usize::from(self.data_header.size);
if size > self.payload.len() || !DATA_SHRED_SIZE_RANGE.contains(&size) {
Expand Down Expand Up @@ -474,6 +474,27 @@ impl Shred {
}
}

// Possibly zero pads bytes stored in blockstore.
pub(crate) fn resize_stored_shred(mut shred: Vec<u8>) -> Result<Vec<u8>, Error> {
let shred_type = match shred.get(OFFSET_OF_SHRED_TYPE) {
None => return Err(Error::InvalidPayloadSize { size: shred.len() }),
Some(shred_type) => match ShredType::try_from(*shred_type) {
Err(_) => return Err(Error::InvalidShredType),
Ok(shred_type) => shred_type,
},
};
match shred_type {
ShredType::Code => Ok(shred),
ShredType::Data => {
if shred.len() > SHRED_PAYLOAD_SIZE {
return Err(Error::InvalidPayloadSize { size: shred.len() });
}
shred.resize(SHRED_PAYLOAD_SIZE, 0u8);
Ok(shred)
}
}
}

pub fn into_payload(self) -> Vec<u8> {
self.payload
}
Expand All @@ -494,7 +515,7 @@ impl Shred {

// Returns true if the shred passes sanity checks.
pub fn sanitize(&self) -> Result<(), Error> {
if self.payload().len() > SHRED_PAYLOAD_SIZE {
if self.payload().len() != SHRED_PAYLOAD_SIZE {
return Err(Error::InvalidPayloadSize {
size: self.payload.len(),
});
Expand Down Expand Up @@ -570,7 +591,12 @@ impl Shred {
}

// Returns the portion of the shred's payload which is erasure coded.
pub(crate) fn erasure_block(self) -> Vec<u8> {
pub(crate) fn erasure_block(self) -> Result<Vec<u8>, Error> {
if self.payload.len() != SHRED_PAYLOAD_SIZE {
return Err(Error::InvalidPayloadSize {
size: self.payload.len(),
});
}
let shred_type = self.shred_type();
let mut block = self.payload;
match shred_type {
Expand All @@ -581,19 +607,23 @@ impl Shred {
// SIZE_OF_CODING_SHRED_HEADERS bytes at the beginning of the
// coding shreds contains the header and is not part of erasure
// coding.
let offset = SIZE_OF_CODING_SHRED_HEADERS.min(block.len());
block.drain(..offset);
block.drain(..SIZE_OF_CODING_SHRED_HEADERS);
}
}
block
Ok(block)
}

// Like Shred::erasure_block but returning a slice
pub(crate) fn erasure_block_as_slice(&self) -> &[u8] {
match self.shred_type() {
pub(crate) fn erasure_block_as_slice(&self) -> Result<&[u8], Error> {
if self.payload.len() != SHRED_PAYLOAD_SIZE {
return Err(Error::InvalidPayloadSize {
size: self.payload.len(),
});
}
Ok(match self.shred_type() {
ShredType::Data => &self.payload[..ENCODED_PAYLOAD_SIZE],
ShredType::Code => &self.payload[SIZE_OF_CODING_SHRED_HEADERS..],
}
})
}

pub fn set_index(&mut self, index: u32) {
Expand Down Expand Up @@ -737,20 +767,20 @@ impl Shred {
|| num_data_shreds != other.coding_header.num_data_shreds
|| self.first_coding_index() != other.first_coding_index())
}
_ => Err(Error::InvalidShredType(ShredType::Data)),
_ => Err(Error::InvalidShredType),
}
}

pub(crate) fn num_data_shreds(self: &Shred) -> Result<u16, Error> {
match self.shred_type() {
ShredType::Data => Err(Error::InvalidShredType(ShredType::Data)),
ShredType::Data => Err(Error::InvalidShredType),
ShredType::Code => Ok(self.coding_header.num_data_shreds),
}
}

pub(crate) fn num_coding_shreds(self: &Shred) -> Result<u16, Error> {
match self.shred_type() {
ShredType::Data => Err(Error::InvalidShredType(ShredType::Data)),
ShredType::Data => Err(Error::InvalidShredType),
ShredType::Code => Ok(self.coding_header.num_coding_shreds),
}
}
Expand Down Expand Up @@ -833,6 +863,7 @@ pub fn verify_test_data_shred(
is_last_in_slot: bool,
is_last_data: bool,
) {
shred.sanitize().unwrap();
assert_eq!(shred.payload.len(), SHRED_PAYLOAD_SIZE);
assert!(shred.is_data());
assert_eq!(shred.index(), index);
Expand Down
47 changes: 8 additions & 39 deletions ledger/src/shredder.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
use {
crate::{
erasure::Session,
shred::{
Error, Shred, MAX_DATA_SHREDS_PER_FEC_BLOCK, SHRED_PAYLOAD_SIZE,
SIZE_OF_DATA_SHRED_PAYLOAD,
},
shred::{Error, Shred, MAX_DATA_SHREDS_PER_FEC_BLOCK, SIZE_OF_DATA_SHRED_PAYLOAD},
shred_stats::ProcessShredsStats,
},
rayon::{prelude::*, ThreadPool},
reed_solomon_erasure::Error::{InvalidIndex, TooFewDataShards, TooFewShardsPresent},
solana_entry::entry::Entry,
solana_measure::measure::Measure,
solana_rayon_threadlimit::get_thread_count,
Expand Down Expand Up @@ -234,7 +232,8 @@ impl Shredder {
} else {
num_data
};
let data: Vec<_> = data.iter().map(Shred::erasure_block_as_slice).collect();
let data = data.iter().map(Shred::erasure_block_as_slice);
let data: Vec<_> = data.collect::<Result<_, _>>().unwrap();
let mut parity = vec![vec![0u8; data[0].len()]; num_coding];
Session::new(num_data, num_coding)
.unwrap()
Expand Down Expand Up @@ -262,10 +261,8 @@ impl Shredder {
}

pub fn try_recovery(shreds: Vec<Shred>) -> Result<Vec<Shred>, Error> {
use reed_solomon_erasure::Error::InvalidIndex;
Self::verify_consistent_shred_payload_sizes("try_recovery()", &shreds)?;
let (slot, fec_set_index) = match shreds.first() {
None => return Ok(Vec::default()),
None => return Err(Error::from(TooFewShardsPresent)),
Some(shred) => (shred.slot(), shred.fec_set_index()),
};
let (num_data_shreds, num_coding_shreds) = match shreds.iter().find(|shred| shred.is_code())
Expand Down Expand Up @@ -298,7 +295,7 @@ impl Shredder {
Some(index) if index < fec_set_size => index,
_ => return Err(Error::from(InvalidIndex)),
};
blocks[index] = Some(shred.erasure_block());
blocks[index] = Some(shred.erasure_block()?);
if index < num_data_shreds {
mask[index] = true;
}
Expand All @@ -323,8 +320,6 @@ impl Shredder {

/// Combines all shreds to recreate the original buffer
pub fn deshred(shreds: &[Shred]) -> Result<Vec<u8>, Error> {
use reed_solomon_erasure::Error::TooFewDataShards;
Self::verify_consistent_shred_payload_sizes("deshred()", shreds)?;
let index = shreds.first().ok_or(TooFewDataShards)?.index();
let aligned = shreds.iter().zip(index..).all(|(s, i)| s.index() == i);
let data_complete = {
Expand All @@ -345,30 +340,6 @@ impl Shredder {
Ok(data)
}
}

fn verify_consistent_shred_payload_sizes(
caller: &str,
shreds: &[Shred],
) -> Result<(), reed_solomon_erasure::Error> {
if shreds.is_empty() {
return Err(reed_solomon_erasure::Error::TooFewShardsPresent);
}
let slot = shreds[0].slot();
for shred in shreds {
if shred.payload().len() != SHRED_PAYLOAD_SIZE {
error!(
"{} Shreds for slot: {} are inconsistent sizes. Expected: {} actual: {}",
caller,
slot,
SHRED_PAYLOAD_SIZE,
shred.payload().len()
);
return Err(reed_solomon_erasure::Error::IncorrectShardSize);
}
}

Ok(())
}
}

#[cfg(test)]
Expand All @@ -393,7 +364,7 @@ mod tests {
};

fn verify_test_code_shred(shred: &Shred, index: u32, slot: Slot, pk: &Pubkey, verify: bool) {
assert_eq!(shred.payload().len(), SHRED_PAYLOAD_SIZE);
assert_matches!(shred.sanitize(), Ok(()));
assert!(!shred.is_data());
assert_eq!(shred.index(), index);
assert_eq!(shred.slot(), slot);
Expand Down Expand Up @@ -783,9 +754,7 @@ mod tests {
assert_eq!(shreds.len(), 3);
assert_matches!(
Shredder::deshred(&shreds),
Err(Error::ErasureError(
reed_solomon_erasure::Error::TooFewDataShards
))
Err(Error::ErasureError(TooFewDataShards))
);

// Test5: Try recovery/reassembly with non zero index full slot with 3 missing data shreds
Expand Down

0 comments on commit 008860b

Please sign in to comment.