Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Blob Storage Structure #4104

Merged
merged 2 commits into from
Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Initial Changes to Blob Storage
  • Loading branch information
ethDreamer committed Mar 21, 2023
commit 0080df9fd23c86278b17f77b2137508306228e7c
65 changes: 17 additions & 48 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -959,35 +959,20 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok(self.get_block(block_root).await?.map(Arc::new))
}

pub async fn get_block_and_blobs_checking_early_attester_cache(
pub async fn get_blobs_checking_early_attester_cache(
&self,
block_root: &Hash256,
) -> Result<Option<SignedBeaconBlockAndBlobsSidecar<T::EthSpec>>, Error> {
) -> Result<Option<BlobSidecarList<T::EthSpec>>, Error> {
// If there is no data availability boundary, the Eip4844 fork is disabled.
if let Some(finalized_data_availability_boundary) =
self.finalized_data_availability_boundary()
{
// Only use the attester cache if we can find both the block and blob
if let (Some(block), Some(blobs)) = (
self.early_attester_cache.get_block(*block_root),
self.early_attester_cache.get_blobs(*block_root),
) {
Ok(Some(SignedBeaconBlockAndBlobsSidecar {
beacon_block: block,
blobs_sidecar: blobs,
}))
// Attempt to get the block and blobs from the database
} else if let Some(block) = self.get_block(block_root).await?.map(Arc::new) {
let blobs = self
.get_blobs(block_root, finalized_data_availability_boundary)?
.map(Arc::new);
Ok(blobs.map(|blobs| SignedBeaconBlockAndBlobsSidecar {
beacon_block: block,
blobs_sidecar: blobs,
}))
} else {
Ok(None)
}
self.early_attester_cache
.get_blobs(*block_root)
.map_or_else(
|| self.get_blobs(block_root, finalized_data_availability_boundary),
|blobs| Ok(Some(blobs)),
)
} else {
Ok(None)
}
Expand Down Expand Up @@ -1057,23 +1042,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.map(Some)
}

// FIXME(jimmy): temporary method added to unblock API work. This method will be replaced by
// the `get_blobs` method below once the new blob sidecar structure (`BlobSidecarList`) is
// implemented in that method.
#[allow(clippy::type_complexity)] // FIXME: this will be fixed by the `BlobSidecarList` alias in Sean's PR
pub fn get_blob_sidecar_list(
&self,
_block_root: &Hash256,
_data_availability_boundary: Epoch,
) -> Result<
Option<
VariableList<Arc<BlobSidecar<T::EthSpec>>, <T::EthSpec as EthSpec>::MaxBlobsPerBlock>,
>,
Error,
> {
unimplemented!("update to use the updated `get_blobs` method instead once this PR is merged: https://github.com/sigp/lighthouse/pull/4104")
}

/// Returns the blobs at the given root, if any.
///
/// Returns `Ok(None)` if the blobs and associated block are not found.
Expand All @@ -1091,9 +1059,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self,
block_root: &Hash256,
data_availability_boundary: Epoch,
) -> Result<Option<BlobsSidecar<T::EthSpec>>, Error> {
) -> Result<Option<BlobSidecarList<T::EthSpec>>, Error> {
match self.store.get_blobs(block_root)? {
Some(blobs) => Ok(Some(blobs)),
Some(blob_sidecar_list) => Ok(Some(blob_sidecar_list)),
None => {
// Check for the corresponding block to understand whether we *should* have blobs.
self.get_blinded_block(block_root)?
Expand All @@ -1106,7 +1074,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Err(_) => return Err(Error::NoKzgCommitmentsFieldOnBlock),
};
if expected_kzg_commitments.is_empty() {
Ok(BlobsSidecar::empty_from_parts(*block_root, block.slot()))
// TODO (mark): verify this
Ok(BlobSidecarList::empty())
} else if data_availability_boundary <= block.epoch() {
// We should have blobs for all blocks younger than the boundary.
Err(Error::BlobsUnavailable)
Expand Down Expand Up @@ -3052,7 +3021,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// margin, or younger (of higher epoch number).
if block_epoch >= import_boundary {
if let Some(blobs) = blobs {
if !blobs.blobs.is_empty() {
if !blobs.is_empty() {
//FIXME(sean) using this for debugging for now
info!(
self.log, "Writing blobs to store";
Expand Down Expand Up @@ -4814,7 +4783,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
)
.map_err(BlockProductionError::KzgError)?;

let blob_sidecars = VariableList::from(
let blob_sidecars = BlobSidecarList::from(
blobs
.into_iter()
.enumerate()
Expand All @@ -4827,7 +4796,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.get(blob_index)
.expect("KZG proof should exist for blob");

Ok(BlobSidecar {
Ok(Arc::new(BlobSidecar {
block_root: beacon_block_root,
index: blob_index as u64,
slot,
Expand All @@ -4836,9 +4805,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
blob,
kzg_commitment: *kzg_commitment,
kzg_proof: *kzg_proof,
})
}))
})
.collect::<Result<Vec<BlobSidecar<T::EthSpec>>, BlockProductionError>>()?,
.collect::<Result<Vec<_>, BlockProductionError>>()?,
);

self.blob_cache.put(beacon_block_root, blob_sidecars);
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/beacon_chain/src/early_attester_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub struct CacheItem<E: EthSpec> {
* Values used to make the block available.
*/
block: Arc<SignedBeaconBlock<E>>,
blobs: Option<Arc<BlobsSidecar<E>>>,
blobs: Option<BlobSidecarList<E>>,
proto_block: ProtoBlock,
}

Expand Down Expand Up @@ -160,7 +160,7 @@ impl<E: EthSpec> EarlyAttesterCache<E> {
}

/// Returns the blobs, if `block_root` matches the cached item.
pub fn get_blobs(&self, block_root: Hash256) -> Option<Arc<BlobsSidecar<E>>> {
pub fn get_blobs(&self, block_root: Hash256) -> Option<BlobSidecarList<E>> {
self.item
.read()
.as_ref()
Expand Down
11 changes: 4 additions & 7 deletions beacon_node/http_api/src/block_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use eth2::types::{BlockId as CoreBlockId, VariableList};
use std::fmt;
use std::str::FromStr;
use std::sync::Arc;
use types::{BlobSidecar, EthSpec, Hash256, SignedBeaconBlock, SignedBlindedBeaconBlock, Slot};
use types::{BlobSidecarList, Hash256, SignedBeaconBlock, SignedBlindedBeaconBlock, Slot};

/// Wraps `eth2::types::BlockId` and provides a simple way to obtain a block or root for a given
/// `BlockId`.
Expand Down Expand Up @@ -216,16 +216,13 @@ impl BlockId {
pub async fn blob_sidecar_list<T: BeaconChainTypes>(
&self,
chain: &BeaconChain<T>,
) -> Result<
VariableList<Arc<BlobSidecar<T::EthSpec>>, <T::EthSpec as EthSpec>::MaxBlobsPerBlock>,
warp::Rejection,
> {
) -> Result<BlobSidecarList<T::EthSpec>, warp::Rejection> {
let root = self.root(chain)?.0;
let Some(data_availability_boundary) = chain.data_availability_boundary() else {
return Err(warp_utils::reject::custom_not_found("Deneb fork disabled".into()));
};
match chain.get_blob_sidecar_list(&root, data_availability_boundary) {
Ok(Some(blobs)) => Ok(blobs),
match chain.get_blobs(&root, data_availability_boundary) {
Ok(Some(blob_sidecar_list)) => Ok(blob_sidecar_list),
Ok(None) => Err(warp_utils::reject::custom_not_found(format!(
"No blobs with block root {} found in the store",
root
Expand Down
80 changes: 27 additions & 53 deletions beacon_node/network/src/beacon_processor/worker/rpc_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use lighthouse_network::rpc::*;
use lighthouse_network::{PeerId, PeerRequestId, ReportSource, Response, SyncInfo};
use slog::{debug, error, warn};
use slot_clock::SlotClock;
use std::collections::{hash_map::Entry, HashMap};
use std::sync::Arc;
use task_executor::TaskExecutor;
use types::blob_sidecar::BlobIdentifier;
Expand Down Expand Up @@ -225,42 +226,34 @@ impl<T: BeaconChainTypes> Worker<T> {
executor.spawn(
async move {
let requested_blobs = request.blob_ids.len();
let mut send_block_count = 0;
let mut send_blob_count = 0;
let mut send_response = true;

let mut blob_list_results = HashMap::new();
for BlobIdentifier{ block_root: root, index } in request.blob_ids.into_iter() {
match self
.chain
.get_block_and_blobs_checking_early_attester_cache(&root)
.await
{
Ok(Some(block_and_blobs)) => {
//
// TODO: HORRIBLE NSFW CODE AHEAD
//
let types::SignedBeaconBlockAndBlobsSidecar {beacon_block, blobs_sidecar} = block_and_blobs;
let types::BlobsSidecar{ beacon_block_root, beacon_block_slot, blobs: blob_bundle, kzg_aggregated_proof }: types::BlobsSidecar<_> = blobs_sidecar.as_ref().clone();
// TODO: this should be unreachable after this is addressed seriously,
// so for now let's be ok with a panic in the expect.
let block = beacon_block.message_eip4844().expect("We fucked up the block blob stuff");
// Intentionally not accessing the list directly
for (known_index, blob) in blob_bundle.into_iter().enumerate() {
if (known_index as u64) == index {
let blob_sidecar = types::BlobSidecar{
block_root: beacon_block_root,
index,
slot: beacon_block_slot,
block_parent_root: block.parent_root,
proposer_index: block.proposer_index,
blob,
kzg_commitment: block.body.blob_kzg_commitments[known_index], // TODO: needs to be stored in a more logical way so that this won't panic.
kzg_proof: kzg_aggregated_proof // TODO: yeah
};
let blob_list_result = match blob_list_results.entry(root) {
Entry::Vacant(entry) => {
entry.insert(self
.chain
.get_blobs_checking_early_attester_cache(&root)
.await)
}
Entry::Occupied(entry) => {
entry.into_mut()
}
};

match blob_list_result.as_ref() {
Ok(Some(blobs_sidecar_list)) => {
for blob_sidecar in blobs_sidecar_list.iter() {
if blob_sidecar.index == index {
self.send_response(
peer_id,
Response::BlobsByRoot(Some(Arc::new(blob_sidecar))),
Response::BlobsByRoot(Some(blob_sidecar.clone())),
request_id,
);
send_block_count += 1;
send_blob_count += 1;
break;
}
}
}
Expand Down Expand Up @@ -355,7 +348,7 @@ impl<T: BeaconChainTypes> Worker<T> {
"Received BlobsByRoot Request";
"peer" => %peer_id,
"requested" => requested_blobs,
"returned" => send_block_count
"returned" => send_blob_count
);

// send stream termination
Expand Down Expand Up @@ -837,31 +830,12 @@ impl<T: BeaconChainTypes> Worker<T> {

for root in block_roots {
match self.chain.get_blobs(&root, data_availability_boundary) {
Ok(Some(blobs)) => {
// TODO: more GROSS code ahead. Reader beware
let types::BlobsSidecar {
beacon_block_root,
beacon_block_slot,
blobs: blob_bundle,
kzg_aggregated_proof: _,
}: types::BlobsSidecar<_> = blobs;

for (blob_index, blob) in blob_bundle.into_iter().enumerate() {
let blob_sidecar = types::BlobSidecar {
block_root: beacon_block_root,
index: blob_index as u64,
slot: beacon_block_slot,
block_parent_root: Hash256::zero(),
proposer_index: 0,
blob,
kzg_commitment: types::KzgCommitment::default(),
kzg_proof: types::KzgProof::default(),
};

Ok(Some(blob_sidecar_list)) => {
for blob_sidecar in blob_sidecar_list.iter() {
blobs_sent += 1;
self.send_network_message(NetworkMessage::SendResponse {
peer_id,
response: Response::BlobsByRange(Some(Arc::new(blob_sidecar))),
response: Response::BlobsByRange(Some(blob_sidecar.clone())),
id: request_id,
});
}
Expand Down
20 changes: 10 additions & 10 deletions beacon_node/store/src/hot_cold_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub struct HotColdDB<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
/// The hot database also contains all blocks.
pub hot_db: Hot,
/// LRU cache of deserialized blobs. Updated whenever a blob is loaded.
blob_cache: Mutex<LruCache<Hash256, BlobsSidecar<E>>>,
blob_cache: Mutex<LruCache<Hash256, BlobSidecarList<E>>>,
/// LRU cache of deserialized blocks. Updated whenever a block is loaded.
block_cache: Mutex<LruCache<Hash256, SignedBeaconBlock<E>>>,
/// Chain spec.
Expand Down Expand Up @@ -568,7 +568,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
blobs_db.key_delete(DBColumn::BeaconBlob.into(), block_root.as_bytes())
}

pub fn put_blobs(&self, block_root: &Hash256, blobs: BlobsSidecar<E>) -> Result<(), Error> {
pub fn put_blobs(&self, block_root: &Hash256, blobs: BlobSidecarList<E>) -> Result<(), Error> {
let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db);
blobs_db.put_bytes(
DBColumn::BeaconBlob.into(),
Expand All @@ -582,7 +582,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
pub fn blobs_as_kv_store_ops(
&self,
key: &Hash256,
blobs: &BlobsSidecar<E>,
blobs: BlobSidecarList<E>,
ops: &mut Vec<KeyValueStoreOp>,
) {
let db_key = get_key_for_col(DBColumn::BeaconBlob.into(), key.as_bytes());
Expand Down Expand Up @@ -817,7 +817,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}

StoreOp::PutBlobs(block_root, blobs) => {
self.blobs_as_kv_store_ops(&block_root, &blobs, &mut key_value_batch);
self.blobs_as_kv_store_ops(&block_root, blobs, &mut key_value_batch);
}

StoreOp::PutStateSummary(state_root, summary) => {
Expand Down Expand Up @@ -885,8 +885,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
StoreOp::PutBlobs(_, _) => true,
StoreOp::DeleteBlobs(block_root) => {
match self.get_blobs(block_root) {
Ok(Some(blobs_sidecar)) => {
blobs_to_delete.push(blobs_sidecar);
Ok(Some(blobs_sidecar_list)) => {
blobs_to_delete.push((*block_root, blobs_sidecar_list));
}
Err(e) => {
error!(
Expand Down Expand Up @@ -926,7 +926,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
let reverse_op = match op {
StoreOp::PutBlobs(block_root, _) => StoreOp::DeleteBlobs(*block_root),
StoreOp::DeleteBlobs(_) => match blobs_to_delete.pop() {
Some(blobs) => StoreOp::PutBlobs(blobs.beacon_block_root, Arc::new(blobs)),
Some((block_root, blobs)) => StoreOp::PutBlobs(block_root, blobs),
None => return Err(HotColdDBError::Rollback.into()),
},
_ => return Err(HotColdDBError::Rollback.into()),
Expand Down Expand Up @@ -972,7 +972,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
for op in blob_cache_ops {
match op {
StoreOp::PutBlobs(block_root, blobs) => {
guard_blob.put(block_root, (*blobs).clone());
guard_blob.put(block_root, blobs);
}

StoreOp::DeleteBlobs(block_root) => {
Expand Down Expand Up @@ -1320,12 +1320,12 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}

/// Fetch a blobs sidecar from the store.
pub fn get_blobs(&self, block_root: &Hash256) -> Result<Option<BlobsSidecar<E>>, Error> {
pub fn get_blobs(&self, block_root: &Hash256) -> Result<Option<BlobSidecarList<E>>, Error> {
let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db);

match blobs_db.get_bytes(DBColumn::BeaconBlob.into(), block_root.as_bytes())? {
Some(ref blobs_bytes) => {
let blobs = BlobsSidecar::from_ssz_bytes(blobs_bytes)?;
let blobs = BlobSidecarList::from_ssz_bytes(blobs_bytes)?;
// FIXME(sean) I was attempting to use a blob cache here but was getting deadlocks,
// may want to attempt to use one again
self.blob_cache.lock().put(*block_root, blobs.clone());
Expand Down
3 changes: 2 additions & 1 deletion beacon_node/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ pub trait ItemStore<E: EthSpec>: KeyValueStore<E> + Sync + Send + Sized + 'stati
pub enum StoreOp<'a, E: EthSpec> {
PutBlock(Hash256, Arc<SignedBeaconBlock<E>>),
PutState(Hash256, &'a BeaconState<E>),
PutBlobs(Hash256, Arc<BlobsSidecar<E>>),
// TODO (mark): space can be optimized here by de-duplicating data
PutBlobs(Hash256, BlobSidecarList<E>),
PutOrphanedBlobsKey(Hash256),
PutStateSummary(Hash256, HotStateSummary),
PutStateTemporaryFlag(Hash256),
Expand Down
Loading