Skip to content

Commit

Permalink
Compiles
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed May 6, 2024
1 parent 8959d00 commit e0b9b0c
Show file tree
Hide file tree
Showing 12 changed files with 425 additions and 149 deletions.
15 changes: 15 additions & 0 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,21 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
})
}

/// Return the set of imported custody column indexes for `block_root`. Returns None if there is
/// no block component for `block_root`.
pub fn imported_custody_column_indexes(&self, block_root: &Hash256) -> Option<Vec<u64>> {
self.availability_cache
.peek_pending_components(block_root, |components| {
components.map(|components| {
components
.get_cached_data_columns()
.iter()
.map(|item| item.data_column_index())
.collect::<Vec<_>>()
})
})
}

/// Get a blob from the availability cache.
pub fn get_blob(
&self,
Expand Down
4 changes: 4 additions & 0 deletions beacon_node/lighthouse_network/src/rpc/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,10 @@ impl DataColumnsByRootRequest {
Self { data_column_ids }
}

pub fn new_single(block_root: Hash256, index: ColumnIndex, spec: &ChainSpec) -> Self {
Self::new(vec![DataColumnIdentifier { block_root, index }], spec)
}

pub fn group_by_ordered_block_root(&self) -> Vec<(Hash256, Vec<ColumnIndex>)> {
let mut column_indexes_by_block = BTreeMap::<Hash256, Vec<ColumnIndex>>::new();
for request_id in self.data_column_ids.as_slice() {
Expand Down
1 change: 1 addition & 0 deletions beacon_node/lighthouse_network/src/types/globals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ impl<E: EthSpec> NetworkGlobals<E> {
pub fn custody_columns(&self, _epoch: Epoch) -> Result<Vec<ColumnIndex>, &'static str> {
let enr = self.local_enr();
let node_id = enr.node_id().raw().into();
// TODO(das): cache this number at start-up to not make this fallible
let custody_subnet_count = enr.custody_subnet_count::<E>()?;
Ok(
DataColumnSubnetId::compute_custody_columns::<E>(node_id, custody_subnet_count)
Expand Down
79 changes: 28 additions & 51 deletions beacon_node/network/src/sync/block_lookups/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::BeaconChainTypes;
use std::sync::Arc;
use types::blob_sidecar::FixedBlobSidecarList;
use types::SignedBeaconBlock;
use types::{DataColumnSidecar, SignedBeaconBlock};

use super::single_block_lookup::DownloadResult;
use super::SingleLookupId;
Expand All @@ -23,6 +23,7 @@ use super::single_block_lookup::CustodyRequestState;
pub enum ResponseType {
Block,
Blob,
CustodyColumn,
}

/// The maximum depth we will search for a parent block. In principle we should have sync'd any
Expand Down Expand Up @@ -234,75 +235,51 @@ impl<T: BeaconChainTypes> RequestState<T> for BlobRequestState<T::EthSpec> {
}
}

impl<L: Lookup, T: BeaconChainTypes> RequestState<L, T> for CustodyRequestState<L> {
type RequestType = BlobsByRootSingleBlockRequest;
type VerifiedResponseType = FixedBlobSidecarList<T::EthSpec>;
type ReconstructedResponseType = FixedBlobSidecarList<T::EthSpec>;

fn new_request(&self) -> Self::RequestType {
// Removed in https://github.com/sigp/lighthouse/pull/5655
todo!()
}
impl<T: BeaconChainTypes> RequestState<T> for CustodyRequestState<T::EthSpec> {
type RequestType = ();
type VerifiedResponseType = Vec<Arc<DataColumnSidecar<T::EthSpec>>>;

fn make_request(
id: SingleLookupReqId,
&self,
id: Id,
peer_id: PeerId,
block_root: Hash256,
downloaded_block_expected_blobs: Option<usize>,
cx: &mut SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
cx.custody_lookup_request(id, block_root)
) -> Result<bool, LookupRequestError> {
cx.custody_lookup_request(id, self.block_root, downloaded_block_expected_blobs)
.map_err(LookupRequestError::SendFailed)
}

fn get_parent_root(verified_response: &FixedBlobSidecarList<T::EthSpec>) -> Option<Hash256> {
verified_response
.into_iter()
.filter_map(|blob| blob.as_ref())
.map(|blob| blob.block_parent_root())
.next()
}

fn add_to_child_components(
verified_response: FixedBlobSidecarList<T::EthSpec>,
components: &mut ChildComponents<T::EthSpec>,
) {
components.merge_blobs(verified_response);
}

fn verified_to_reconstructed(
_block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>,
) -> FixedBlobSidecarList<T::EthSpec> {
blobs
}

fn send_reconstructed_for_processing(
fn send_for_processing(
id: Id,
bl: &BlockLookups<T>,
block_root: Hash256,
verified: FixedBlobSidecarList<T::EthSpec>,
duration: Duration,
download_result: DownloadResult<Self::VerifiedResponseType>,
cx: &SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
bl.send_blobs_for_processing(
let DownloadResult {
value,
block_root,
verified,
duration,
BlockProcessType::SingleBlob { id },
cx,
seen_timestamp,
peer_id: _,
} = download_result;
cx.send_custody_columns_for_processing(
block_root,
value,
seen_timestamp,
BlockProcessType::SingleCustodyColumn(id),
)
.map_err(LookupRequestError::SendFailed)
}

fn response_type() -> ResponseType {
ResponseType::Blob
ResponseType::CustodyColumn
}
fn request_state_mut(request: &mut SingleBlockLookup<L, T>) -> &mut Self {
&mut request.blob_request_state
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> &mut Self {
&mut request.custody_request_state
}
fn get_state(&self) -> &SingleLookupRequestState {
fn get_state(&self) -> &SingleLookupRequestState<Self::VerifiedResponseType> {
&self.state
}
fn get_state_mut(&mut self) -> &mut SingleLookupRequestState {
fn get_state_mut(&mut self) -> &mut SingleLookupRequestState<Self::VerifiedResponseType> {
&mut self.state
}
}
8 changes: 7 additions & 1 deletion beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub use common::RequestState;
use fnv::FnvHashMap;
use lighthouse_network::{PeerAction, PeerId};
use lru_cache::LRUTimeCache;
pub use single_block_lookup::{BlobRequestState, BlockRequestState};
pub use single_block_lookup::{BlobRequestState, BlockRequestState, CustodyRequestState};
use slog::{debug, error, trace, warn, Logger};
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -395,6 +395,9 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
BlockProcessType::SingleBlob { id } => {
self.on_processing_result_inner::<BlobRequestState<T::EthSpec>>(id, result, cx)
}
BlockProcessType::SingleCustodyColumn(id) => {
self.on_processing_result_inner::<CustodyRequestState<T::EthSpec>>(id, result, cx)
}
} {
self.on_lookup_request_error(process_type.id(), e, "processing_result");
}
Expand Down Expand Up @@ -514,6 +517,9 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
match R::response_type() {
ResponseType::Block => "lookup_block_processing_failure",
ResponseType::Blob => "lookup_blobs_processing_failure",
ResponseType::CustodyColumn => {
"lookup_custody_column_processing_failure"
}
},
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::time::Duration;
use store::Hash256;
use strum::IntoStaticStr;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{EthSpec, SignedBeaconBlock};
use types::{DataColumnSidecar, EthSpec, SignedBeaconBlock};

#[derive(Debug, PartialEq, Eq, IntoStaticStr)]
pub enum LookupRequestError {
Expand All @@ -31,7 +31,7 @@ pub struct SingleBlockLookup<T: BeaconChainTypes> {
pub id: Id,
pub block_request_state: BlockRequestState<T::EthSpec>,
pub blob_request_state: BlobRequestState<T::EthSpec>,
pub custody_request_state: CustodyRequestState<L>,
pub custody_request_state: CustodyRequestState<T::EthSpec>,
block_root: Hash256,
awaiting_parent: Option<Hash256>,
}
Expand Down Expand Up @@ -198,12 +198,12 @@ impl<E: EthSpec> BlobRequestState<E> {
}

/// The state of the blob request component of a `SingleBlockLookup`.
pub struct CustodyRequestState {
pub struct CustodyRequestState<E: EthSpec> {
pub block_root: Hash256,
pub state: SingleLookupRequestState,
pub state: SingleLookupRequestState<Vec<Arc<DataColumnSidecar<E>>>>,
}

impl CustodyRequestState {
impl<E: EthSpec> CustodyRequestState<E> {
pub fn new(block_root: Hash256, peer_source: &[PeerId]) -> Self {
Self {
block_root,
Expand Down
12 changes: 8 additions & 4 deletions beacon_node/network/src/sync/block_lookups/tests.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use crate::network_beacon_processor::NetworkBeaconProcessor;

use crate::service::RequestId;
use crate::sync::manager::{RequestId as SyncRequestId, SingleLookupReqId, SyncManager};
use crate::sync::manager::{
DataColumnsByRootRequestId, DataColumnsByRootRequester, RequestId as SyncRequestId,
SingleLookupReqId, SyncManager,
};
use crate::sync::sampling::{SamplingConfig, SamplingRequester};
use crate::sync::{SamplingId, SyncMessage};
use crate::NetworkMessage;
Expand Down Expand Up @@ -82,7 +85,7 @@ const D: Duration = Duration::new(0, 0);
const PARENT_FAIL_TOLERANCE: u8 = SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS;
const SAMPLING_REQUIRED_SUCCESSES: usize = 2;

type SamplingIds = Vec<(Id, ColumnIndex)>;
type SamplingIds = Vec<(DataColumnsByRootRequestId, ColumnIndex)>;

impl TestRig {
fn test_setup() -> Self {
Expand Down Expand Up @@ -495,7 +498,7 @@ impl TestRig {
}
}

fn return_empty_sampling_request(&mut self, id: Id) {
fn return_empty_sampling_request(&mut self, id: DataColumnsByRootRequestId) {
let peer_id = PeerId::random();
// Send stream termination
self.send_sync_message(SyncMessage::RpcDataColumn {
Expand All @@ -522,7 +525,7 @@ impl TestRig {

fn complete_valid_sampling_column_request(
&mut self,
id: Id,
id: DataColumnsByRootRequestId,
data_column: DataColumnSidecar<E>,
) {
let peer_id = PeerId::random();
Expand Down Expand Up @@ -752,6 +755,7 @@ impl TestRig {
}
other => panic!("Expected blob process, found {:?}", other),
},
ResponseType::CustodyColumn => todo!(),
}
}

Expand Down
24 changes: 18 additions & 6 deletions beacon_node/network/src/sync/block_sidecar_coupling.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use beacon_chain::block_verification_types::RpcBlock;
use ssz_types::VariableList;
use std::{collections::VecDeque, sync::Arc};
use types::{BlobSidecar, EthSpec, SignedBeaconBlock};
use types::{BlobSidecar, DataColumnSidecar, EthSpec, SignedBeaconBlock};

use super::range_sync::ByRangeRequestType;

Expand All @@ -11,10 +11,12 @@ pub struct BlocksAndBlobsRequestInfo<E: EthSpec> {
accumulated_blocks: VecDeque<Arc<SignedBeaconBlock<E>>>,
/// Sidecars we have received awaiting for their corresponding block.
accumulated_sidecars: VecDeque<Arc<BlobSidecar<E>>>,
accumulated_custody_columns: VecDeque<Arc<DataColumnSidecar<E>>>,
/// Whether the individual RPC request for blocks is finished or not.
is_blocks_stream_terminated: bool,
/// Whether the individual RPC request for sidecars is finished or not.
is_sidecars_stream_terminated: bool,
is_custody_columns_stream_terminated: bool,
/// Used to determine if this accumulator should wait for a sidecars stream termination
request_type: ByRangeRequestType,
}
Expand All @@ -24,8 +26,10 @@ impl<E: EthSpec> BlocksAndBlobsRequestInfo<E> {
Self {
accumulated_blocks: <_>::default(),
accumulated_sidecars: <_>::default(),
accumulated_custody_columns: <_>::default(),
is_blocks_stream_terminated: <_>::default(),
is_sidecars_stream_terminated: <_>::default(),
is_custody_columns_stream_terminated: <_>::default(),
request_type,
}
}
Expand All @@ -48,6 +52,13 @@ impl<E: EthSpec> BlocksAndBlobsRequestInfo<E> {
}
}

pub fn add_custody_column(&mut self, column_opt: Option<Arc<DataColumnSidecar<E>>>) {
match column_opt {
Some(column) => self.accumulated_custody_columns.push_back(column),
None => self.is_custody_columns_stream_terminated = true,
}
}

pub fn into_responses(self) -> Result<Vec<RpcBlock<E>>, String> {
let BlocksAndBlobsRequestInfo {
accumulated_blocks,
Expand Down Expand Up @@ -96,11 +107,12 @@ impl<E: EthSpec> BlocksAndBlobsRequestInfo<E> {
}

pub fn is_finished(&self) -> bool {
let blobs_requested = match self.request_type {
ByRangeRequestType::Blocks => false,
ByRangeRequestType::BlocksAndBlobs => true,
};
self.is_blocks_stream_terminated && (!blobs_requested || self.is_sidecars_stream_terminated)
let blobs_requested = matches!(self.request_type, ByRangeRequestType::BlocksAndBlobs);
let custody_columns_requested =
matches!(self.request_type, ByRangeRequestType::BlocksAndColumns);
self.is_blocks_stream_terminated
&& (!blobs_requested || self.is_sidecars_stream_terminated)
&& (!custody_columns_requested || self.is_custody_columns_stream_terminated)
}
}

Expand Down
Loading

0 comments on commit e0b9b0c

Please sign in to comment.