diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 6029ee1e73d..fbd8d248c21 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -987,9 +987,19 @@ impl BeaconChain { &self, block_root: &Hash256, ) -> Result>, Error> { - self.early_attester_cache - .get_blobs(*block_root) - .map_or_else(|| self.get_blobs(block_root), |blobs| Ok(Some(blobs))) + // If there is no data availability boundary, the Eip4844 fork is disabled. + if let Some(finalized_data_availability_boundary) = + self.finalized_data_availability_boundary() + { + self.early_attester_cache + .get_blobs(*block_root) + .map_or_else( + || self.get_blobs(block_root, finalized_data_availability_boundary), + |blobs| Ok(Some(blobs)), + ) + } else { + Ok(None) + } } /// Returns the block at the given root, if any. @@ -2657,11 +2667,11 @@ impl BeaconChain { pub async fn process_blob( self: &Arc, - blob: Arc>, + blob: BlobSidecar, count_unrealized: CountUnrealized, ) -> Result> { self.check_availability_and_maybe_import( - |chain| chain.data_availability_checker.put_blob(blob), + |chain| chain.data_availability_checker.put_blob(Arc::new(blob)), count_unrealized, ) .await diff --git a/beacon_node/beacon_chain/src/blob_verification.rs b/beacon_node/beacon_chain/src/blob_verification.rs index d2d8faa1a97..9dae0e1a218 100644 --- a/beacon_node/beacon_chain/src/blob_verification.rs +++ b/beacon_node/beacon_chain/src/blob_verification.rs @@ -9,6 +9,7 @@ use crate::gossip_blob_cache::AvailabilityCheckError; use crate::BeaconChainError; use derivative::Derivative; use state_processing::per_block_processing::eip4844::eip4844::verify_kzg_commitments_against_transactions; +use types::blob_sidecar::BlobSidecarList; use types::{ BeaconBlockRef, BeaconStateError, BlobSidecar, BlobSidecarList, Epoch, EthSpec, Hash256, KzgCommitment, SignedBeaconBlock, SignedBeaconBlockHeader, SignedBlobSidecar, Slot, @@ -130,11 +131,11 @@ impl From for BlobError { /// the p2p network. #[derive(Debug)] pub struct GossipVerifiedBlob { - blob: Arc>, + blob: BlobSidecar, } impl GossipVerifiedBlob { - pub fn to_blob(self) -> Arc> { + pub fn to_blob(self) -> BlobSidecar { self.blob } } diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 03a5dcb652f..6ffc0427824 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -12,6 +12,7 @@ use lighthouse_network::rpc::*; use lighthouse_network::{PeerId, PeerRequestId, ReportSource, Response, SyncInfo}; use slog::{debug, error, warn}; use slot_clock::SlotClock; +use std::collections::{hash_map::Entry, HashMap}; use std::sync::Arc; use task_executor::TaskExecutor; use types::blob_sidecar::BlobIdentifier; @@ -225,45 +226,36 @@ impl Worker { executor.spawn( async move { let requested_blobs = request.blob_ids.len(); - let send_blob_count = 0; + let mut send_blob_count = 0; let mut send_response = true; - for BlobIdentifier{ block_root: root, index: _index } in request.blob_ids.into_iter() { - match self - .chain - .get_blobs_checking_early_attester_cache(&root) - .await - { - Ok(Some(_blob_sidecar_list)) => { - todo!(); - // // - // // TODO: HORRIBLE NSFW CODE AHEAD - // // - // let types::SignedBeaconBlockAndBlobsSidecar {beacon_block, blobs_sidecar} = block_and_blobs; - // let types::BlobsSidecar{ beacon_block_root, beacon_block_slot, blobs: blob_bundle, kzg_aggregated_proof }: types::BlobsSidecar<_> = blobs_sidecar.as_ref().clone(); - // // TODO: this should be unreachable after this is addressed seriously, - // // so for now let's be ok with a panic in the expect. - // let block = beacon_block.message_eip4844().expect("We fucked up the block blob stuff"); - // // Intentionally not accessing the list directly - // for (known_index, blob) in blob_bundle.into_iter().enumerate() { - // if (known_index as u64) == index { - // let blob_sidecar = types::BlobSidecar{ - // block_root: beacon_block_root, - // index, - // slot: beacon_block_slot, - // block_parent_root: block.parent_root, - // proposer_index: block.proposer_index, - // blob, - // kzg_commitment: block.body.blob_kzg_commitments[known_index], // TODO: needs to be stored in a more logical way so that this won't panic. - // kzg_proof: kzg_aggregated_proof // TODO: yeah - // }; - // self.send_response( - // peer_id, - // Response::BlobsByRoot(Some(Arc::new(blob_sidecar))), - // request_id, - // ); - // send_block_count += 1; - // } - // } + + let mut blob_list_results = HashMap::new(); + for BlobIdentifier{ block_root: root, index } in request.blob_ids.into_iter() { + let blob_list_result = match blob_list_results.entry(root) { + Entry::Vacant(entry) => { + entry.insert(self + .chain + .get_blobs_checking_early_attester_cache(&root) + .await) + } + Entry::Occupied(entry) => { + entry.into_mut() + } + }; + + match blob_list_result.as_ref() { + Ok(Some(blobs_sidecar_list)) => { + for blob_sidecar in blobs_sidecar_list.iter() { + if blob_sidecar.index == index { + self.send_response( + peer_id, + Response::BlobsByRoot(Some(blob_sidecar.clone())), + request_id, + ); + send_blob_count += 1; + break; + } + } } Ok(None) => { debug!( @@ -837,36 +829,16 @@ impl Worker { let mut send_response = true; for root in block_roots { - match self.chain.get_blobs(&root) { - Ok(Some(_blobs)) => { - todo!(); - // // TODO: more GROSS code ahead. Reader beware - // let types::BlobsSidecar { - // beacon_block_root, - // beacon_block_slot, - // blobs: blob_bundle, - // kzg_aggregated_proof: _, - // }: types::BlobsSidecar<_> = blobs; - // - // for (blob_index, blob) in blob_bundle.into_iter().enumerate() { - // let blob_sidecar = types::BlobSidecar { - // block_root: beacon_block_root, - // index: blob_index as u64, - // slot: beacon_block_slot, - // block_parent_root: Hash256::zero(), - // proposer_index: 0, - // blob, - // kzg_commitment: types::KzgCommitment::default(), - // kzg_proof: types::KzgProof::default(), - // }; - // - // blobs_sent += 1; - // self.send_network_message(NetworkMessage::SendResponse { - // peer_id, - // response: Response::BlobsByRange(Some(Arc::new(blob_sidecar))), - // id: request_id, - // }); - // } + match self.chain.get_blobs(&root, data_availability_boundary) { + Ok(Some(blob_sidecar_list)) => { + for blob_sidecar in blob_sidecar_list.iter() { + blobs_sent += 1; + self.send_network_message(NetworkMessage::SendResponse { + peer_id, + response: Response::BlobsByRange(Some(blob_sidecar.clone())), + id: request_id, + }); + } } Ok(None) => { error!( diff --git a/consensus/types/src/blob_sidecar.rs b/consensus/types/src/blob_sidecar.rs index 12f8afd9c73..3bc6559a015 100644 --- a/consensus/types/src/blob_sidecar.rs +++ b/consensus/types/src/blob_sidecar.rs @@ -47,7 +47,8 @@ pub struct BlobSidecar { pub kzg_proof: KzgProof, } -pub type BlobSidecarList = VariableList>, ::MaxBlobsPerBlock>; +pub type BlobSidecarList = + VariableList>, ::MaxBlobsPerBlock>; pub type Blobs = VariableList, ::MaxExtraDataBytes>; impl SignedRoot for BlobSidecar {}