Skip to content

Commit

Permalink
Initial Changes to Blob Storage
Browse files Browse the repository at this point in the history
  • Loading branch information
ethDreamer committed Mar 20, 2023
1 parent 7841433 commit 0184c51
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 100 deletions.
38 changes: 12 additions & 26 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -959,35 +959,20 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok(self.get_block(block_root).await?.map(Arc::new))
}

pub async fn get_block_and_blobs_checking_early_attester_cache(
pub async fn get_blobs_checking_early_attester_cache(
&self,
block_root: &Hash256,
) -> Result<Option<SignedBeaconBlockAndBlobsSidecar<T::EthSpec>>, Error> {
) -> Result<Option<Arc<BlobSidecarList<T::EthSpec>>>, Error> {
// If there is no data availability boundary, the Eip4844 fork is disabled.
if let Some(finalized_data_availability_boundary) =
self.finalized_data_availability_boundary()
{
// Only use the attester cache if we can find both the block and blob
if let (Some(block), Some(blobs)) = (
self.early_attester_cache.get_block(*block_root),
self.early_attester_cache.get_blobs(*block_root),
) {
Ok(Some(SignedBeaconBlockAndBlobsSidecar {
beacon_block: block,
blobs_sidecar: blobs,
}))
// Attempt to get the block and blobs from the database
} else if let Some(block) = self.get_block(block_root).await?.map(Arc::new) {
let blobs = self
.get_blobs(block_root, finalized_data_availability_boundary)?
.map(Arc::new);
Ok(blobs.map(|blobs| SignedBeaconBlockAndBlobsSidecar {
beacon_block: block,
blobs_sidecar: blobs,
}))
} else {
Ok(None)
}
self.early_attester_cache
.get_blobs(*block_root)
.map_or_else(
|| self.get_blobs(block_root, finalized_data_availability_boundary),
|blobs| Ok(Some(blobs)),
)
} else {
Ok(None)
}
Expand Down Expand Up @@ -1074,7 +1059,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self,
block_root: &Hash256,
data_availability_boundary: Epoch,
) -> Result<Option<BlobsSidecar<T::EthSpec>>, Error> {
) -> Result<Option<Arc<BlobSidecarList<T::EthSpec>>>, Error> {
match self.store.get_blobs(block_root)? {
Some(blobs) => Ok(Some(blobs)),
None => {
Expand All @@ -1089,7 +1074,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Err(_) => return Err(Error::NoKzgCommitmentsFieldOnBlock),
};
if expected_kzg_commitments.is_empty() {
Ok(BlobsSidecar::empty_from_parts(*block_root, block.slot()))
// TODO (mark): verify this
Ok(Arc::new(BlobSidecarList::empty()))
} else if data_availability_boundary <= block.epoch() {
// We should have blobs for all blocks younger than the boundary.
Err(Error::BlobsUnavailable)
Expand Down Expand Up @@ -3035,7 +3021,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// margin, or younger (of higher epoch number).
if block_epoch >= import_boundary {
if let Some(blobs) = blobs {
if !blobs.blobs.is_empty() {
if !blobs.is_empty() {
//FIXME(sean) using this for debugging for now
info!(
self.log, "Writing blobs to store";
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/beacon_chain/src/early_attester_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub struct CacheItem<E: EthSpec> {
* Values used to make the block available.
*/
block: Arc<SignedBeaconBlock<E>>,
blobs: Option<Arc<BlobsSidecar<E>>>,
blobs: Option<Arc<BlobSidecarList<E>>>,
proto_block: ProtoBlock,
}

Expand Down Expand Up @@ -160,7 +160,7 @@ impl<E: EthSpec> EarlyAttesterCache<E> {
}

/// Returns the blobs, if `block_root` matches the cached item.
pub fn get_blobs(&self, block_root: Hash256) -> Option<Arc<BlobsSidecar<E>>> {
pub fn get_blobs(&self, block_root: Hash256) -> Option<Arc<BlobSidecarList<E>>> {
self.item
.read()
.as_ref()
Expand Down
8 changes: 4 additions & 4 deletions beacon_node/http_api/src/block_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use eth2::types::BlockId as CoreBlockId;
use std::fmt;
use std::str::FromStr;
use std::sync::Arc;
use types::{BlobsSidecar, Hash256, SignedBeaconBlock, SignedBlindedBeaconBlock, Slot};
use types::{BlobSidecarList, Hash256, SignedBeaconBlock, SignedBlindedBeaconBlock, Slot};

/// Wraps `eth2::types::BlockId` and provides a simple way to obtain a block or root for a given
/// `BlockId`.
Expand Down Expand Up @@ -213,16 +213,16 @@ impl BlockId {
}

/// Return the `BlobsSidecar` identified by `self`.
pub async fn blobs_sidecar<T: BeaconChainTypes>(
pub async fn blobs_sidecar_list<T: BeaconChainTypes>(
&self,
chain: &BeaconChain<T>,
) -> Result<Arc<BlobsSidecar<T::EthSpec>>, warp::Rejection> {
) -> Result<Arc<BlobSidecarList<T::EthSpec>>, warp::Rejection> {
let root = self.root(chain)?.0;
let Some(data_availability_boundary) = chain.data_availability_boundary() else {
return Err(warp_utils::reject::custom_not_found("Eip4844 fork disabled".into()));
};
match chain.get_blobs(&root, data_availability_boundary) {
Ok(Some(blob)) => Ok(Arc::new(blob)),
Ok(Some(blob)) => Ok(blob),
Ok(None) => Err(warp_utils::reject::custom_not_found(format!(
"Blob with block root {} is not in the store",
root
Expand Down
6 changes: 3 additions & 3 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3511,21 +3511,21 @@ pub fn serve<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>,
accept_header: Option<api_types::Accept>| {
async move {
let blobs_sidecar = block_id.blobs_sidecar(&chain).await?;
let blob_sidecar_list = block_id.blobs_sidecar_list(&chain).await?;

match accept_header {
Some(api_types::Accept::Ssz) => Response::builder()
.status(200)
.header("Content-Type", "application/octet-stream")
.body(blobs_sidecar.as_ssz_bytes().into())
.body(blob_sidecar_list.as_ssz_bytes().into())
.map_err(|e| {
warp_utils::reject::custom_server_error(format!(
"failed to create response: {}",
e
))
}),
_ => Ok(warp::reply::json(&api_types::GenericResponse::from(
blobs_sidecar,
blob_sidecar_list,
))
.into_response()),
}
Expand Down
80 changes: 27 additions & 53 deletions beacon_node/network/src/beacon_processor/worker/rpc_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use lighthouse_network::rpc::*;
use lighthouse_network::{PeerId, PeerRequestId, ReportSource, Response, SyncInfo};
use slog::{debug, error, warn};
use slot_clock::SlotClock;
use std::collections::{hash_map::Entry, HashMap};
use std::sync::Arc;
use task_executor::TaskExecutor;
use types::blob_sidecar::BlobIdentifier;
Expand Down Expand Up @@ -225,42 +226,34 @@ impl<T: BeaconChainTypes> Worker<T> {
executor.spawn(
async move {
let requested_blobs = request.blob_ids.len();
let mut send_block_count = 0;
let mut send_blob_count = 0;
let mut send_response = true;

let mut blob_list_results = HashMap::new();
for BlobIdentifier{ block_root: root, index } in request.blob_ids.into_iter() {
match self
.chain
.get_block_and_blobs_checking_early_attester_cache(&root)
.await
{
Ok(Some(block_and_blobs)) => {
//
// TODO: HORRIBLE NSFW CODE AHEAD
//
let types::SignedBeaconBlockAndBlobsSidecar {beacon_block, blobs_sidecar} = block_and_blobs;
let types::BlobsSidecar{ beacon_block_root, beacon_block_slot, blobs: blob_bundle, kzg_aggregated_proof }: types::BlobsSidecar<_> = blobs_sidecar.as_ref().clone();
// TODO: this should be unreachable after this is addressed seriously,
// so for now let's be ok with a panic in the expect.
let block = beacon_block.message_eip4844().expect("We fucked up the block blob stuff");
// Intentionally not accessing the list directly
for (known_index, blob) in blob_bundle.into_iter().enumerate() {
if (known_index as u64) == index {
let blob_sidecar = types::BlobSidecar{
block_root: beacon_block_root,
index,
slot: beacon_block_slot,
block_parent_root: block.parent_root,
proposer_index: block.proposer_index,
blob,
kzg_commitment: block.body.blob_kzg_commitments[known_index], // TODO: needs to be stored in a more logical way so that this won't panic.
kzg_proof: kzg_aggregated_proof // TODO: yeah
};
let blob_list_result = match blob_list_results.entry(root) {
Entry::Vacant(entry) => {
entry.insert(self
.chain
.get_blobs_checking_early_attester_cache(&root)
.await)
}
Entry::Occupied(entry) => {
entry.into_mut()
}
};

match blob_list_result.as_ref() {
Ok(Some(blobs_sidecar_list)) => {
for blob_sidecar in blobs_sidecar_list.iter() {
if blob_sidecar.index == index {
self.send_response(
peer_id,
Response::BlobsByRoot(Some(Arc::new(blob_sidecar))),
Response::BlobsByRoot(Some(Arc::new(blob_sidecar.clone()))),
request_id,
);
send_block_count += 1;
send_blob_count += 1;
break;
}
}
}
Expand Down Expand Up @@ -355,7 +348,7 @@ impl<T: BeaconChainTypes> Worker<T> {
"Received BlobsByRoot Request";
"peer" => %peer_id,
"requested" => requested_blobs,
"returned" => send_block_count
"returned" => send_blob_count
);

// send stream termination
Expand Down Expand Up @@ -837,31 +830,12 @@ impl<T: BeaconChainTypes> Worker<T> {

for root in block_roots {
match self.chain.get_blobs(&root, data_availability_boundary) {
Ok(Some(blobs)) => {
// TODO: more GROSS code ahead. Reader beware
let types::BlobsSidecar {
beacon_block_root,
beacon_block_slot,
blobs: blob_bundle,
kzg_aggregated_proof: _,
}: types::BlobsSidecar<_> = blobs;

for (blob_index, blob) in blob_bundle.into_iter().enumerate() {
let blob_sidecar = types::BlobSidecar {
block_root: beacon_block_root,
index: blob_index as u64,
slot: beacon_block_slot,
block_parent_root: Hash256::zero(),
proposer_index: 0,
blob,
kzg_commitment: types::KzgCommitment::default(),
kzg_proof: types::KzgProof::default(),
};

Ok(Some(blob_sidecar_list)) => {
for blob_sidecar in blob_sidecar_list.iter() {
blobs_sent += 1;
self.send_network_message(NetworkMessage::SendResponse {
peer_id,
response: Response::BlobsByRange(Some(Arc::new(blob_sidecar))),
response: Response::BlobsByRange(Some(Arc::new(blob_sidecar.clone()))),
id: request_id,
});
}
Expand Down
29 changes: 18 additions & 11 deletions beacon_node/store/src/hot_cold_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub struct HotColdDB<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
/// The hot database also contains all blocks.
pub hot_db: Hot,
/// LRU cache of deserialized blobs. Updated whenever a blob is loaded.
blob_cache: Mutex<LruCache<Hash256, BlobsSidecar<E>>>,
blob_cache: Mutex<LruCache<Hash256, Arc<BlobSidecarList<E>>>>,
/// LRU cache of deserialized blocks. Updated whenever a block is loaded.
block_cache: Mutex<LruCache<Hash256, SignedBeaconBlock<E>>>,
/// Chain spec.
Expand Down Expand Up @@ -568,12 +568,16 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
blobs_db.key_delete(DBColumn::BeaconBlob.into(), block_root.as_bytes())
}

pub fn put_blobs(&self, block_root: &Hash256, blobs: BlobsSidecar<E>) -> Result<(), Error> {
pub fn put_blobs(
&self,
block_root: &Hash256,
blobs: Arc<BlobSidecarList<E>>,
) -> Result<(), Error> {
let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db);
blobs_db.put_bytes(
DBColumn::BeaconBlob.into(),
block_root.as_bytes(),
&blobs.as_ssz_bytes(),
&blobs.as_ref().as_ssz_bytes(),
)?;
self.blob_cache.lock().push(*block_root, blobs);
Ok(())
Expand All @@ -582,7 +586,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
pub fn blobs_as_kv_store_ops(
&self,
key: &Hash256,
blobs: &BlobsSidecar<E>,
blobs: &BlobSidecarList<E>,
ops: &mut Vec<KeyValueStoreOp>,
) {
let db_key = get_key_for_col(DBColumn::BeaconBlob.into(), key.as_bytes());
Expand Down Expand Up @@ -817,7 +821,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}

StoreOp::PutBlobs(block_root, blobs) => {
self.blobs_as_kv_store_ops(&block_root, &blobs, &mut key_value_batch);
self.blobs_as_kv_store_ops(&block_root, blobs.as_ref(), &mut key_value_batch);
}

StoreOp::PutStateSummary(state_root, summary) => {
Expand Down Expand Up @@ -885,8 +889,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
StoreOp::PutBlobs(_, _) => true,
StoreOp::DeleteBlobs(block_root) => {
match self.get_blobs(block_root) {
Ok(Some(blobs_sidecar)) => {
blobs_to_delete.push(blobs_sidecar);
Ok(Some(blobs_sidecar_list)) => {
blobs_to_delete.push((*block_root, blobs_sidecar_list));
}
Err(e) => {
error!(
Expand Down Expand Up @@ -926,7 +930,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
let reverse_op = match op {
StoreOp::PutBlobs(block_root, _) => StoreOp::DeleteBlobs(*block_root),
StoreOp::DeleteBlobs(_) => match blobs_to_delete.pop() {
Some(blobs) => StoreOp::PutBlobs(blobs.beacon_block_root, Arc::new(blobs)),
Some((block_root, blobs)) => StoreOp::PutBlobs(block_root, blobs),
None => return Err(HotColdDBError::Rollback.into()),
},
_ => return Err(HotColdDBError::Rollback.into()),
Expand Down Expand Up @@ -972,7 +976,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
for op in blob_cache_ops {
match op {
StoreOp::PutBlobs(block_root, blobs) => {
guard_blob.put(block_root, (*blobs).clone());
guard_blob.put(block_root, blobs);
}

StoreOp::DeleteBlobs(block_root) => {
Expand Down Expand Up @@ -1320,12 +1324,15 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}

/// Fetch a blobs sidecar from the store.
pub fn get_blobs(&self, block_root: &Hash256) -> Result<Option<BlobsSidecar<E>>, Error> {
pub fn get_blobs(
&self,
block_root: &Hash256,
) -> Result<Option<Arc<BlobSidecarList<E>>>, Error> {
let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db);

match blobs_db.get_bytes(DBColumn::BeaconBlob.into(), block_root.as_bytes())? {
Some(ref blobs_bytes) => {
let blobs = BlobsSidecar::from_ssz_bytes(blobs_bytes)?;
let blobs = Arc::new(BlobSidecarList::from_ssz_bytes(blobs_bytes)?);
// FIXME(sean) I was attempting to use a blob cache here but was getting deadlocks,
// may want to attempt to use one again
self.blob_cache.lock().put(*block_root, blobs.clone());
Expand Down
3 changes: 2 additions & 1 deletion beacon_node/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ pub trait ItemStore<E: EthSpec>: KeyValueStore<E> + Sync + Send + Sized + 'stati
pub enum StoreOp<'a, E: EthSpec> {
PutBlock(Hash256, Arc<SignedBeaconBlock<E>>),
PutState(Hash256, &'a BeaconState<E>),
PutBlobs(Hash256, Arc<BlobsSidecar<E>>),
// TODO (mark): space can be optimized here by de-duplicating data
PutBlobs(Hash256, Arc<BlobSidecarList<E>>),
PutOrphanedBlobsKey(Hash256),
PutStateSummary(Hash256, HotStateSummary),
PutStateTemporaryFlag(Hash256),
Expand Down

0 comments on commit 0184c51

Please sign in to comment.