Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1D PeerDAS prototype: Data format and Distribution #5050

Merged
merged 11 commits into from
Feb 5, 2024
31 changes: 30 additions & 1 deletion beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use crate::attester_cache::{AttesterCache, AttesterCacheKey};
use crate::beacon_block_streamer::{BeaconBlockStreamer, CheckEarlyAttesterCache};
use crate::beacon_proposer_cache::compute_proposer_duties_from_head;
use crate::beacon_proposer_cache::BeaconProposerCache;
use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob};
use crate::blob_verification::{
GossipBlobError, GossipVerifiedBlob, GossipVerifiedDataColumnSidecar,
};
use crate::block_times_cache::BlockTimesCache;
use crate::block_verification::POS_PANDA_BANNER;
use crate::block_verification::{
Expand Down Expand Up @@ -2070,6 +2072,19 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
})
}

pub fn verify_data_column_sidecar_for_gossip(
self: &Arc<Self>,
data_column_sidecar: Arc<DataColumnSidecar<T::EthSpec>>,
subnet_id: u64,
) -> Result<GossipVerifiedDataColumnSidecar<T>, GossipBlobError<T::EthSpec>> {
metrics::inc_counter(&metrics::BLOBS_COLUMN_SIDECAR_PROCESSING_REQUESTS);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All checks in https://github.com/ethereum/consensus-specs/pull/3574/files#diff-bacd55427cc3606e97b0f11dada895de104a8f9c532aca270efae967199bf261R123 can be implemented except for the crypto, aliasing verify_data_column_sidecar_kzg_proof to a noop

let _timer = metrics::start_timer(&metrics::DATA_COLUMN_SIDECAR_GOSSIP_VERIFICATION_TIMES);
GossipVerifiedDataColumnSidecar::new(data_column_sidecar, subnet_id, self).map(|v| {
metrics::inc_counter(&metrics::DATA_COLUMNS_SIDECAR_PROCESSING_SUCCESSES);
v
})
}

pub fn verify_blob_sidecar_for_gossip(
self: &Arc<Self>,
blob_sidecar: Arc<BlobSidecar<T::EthSpec>>,
Expand Down Expand Up @@ -2885,6 +2900,20 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.remove_notified(&block_root, r)
}

pub fn process_gossip_data_column(
self: &Arc<Self>,
gossip_verified_data_column: GossipVerifiedDataColumnSidecar<T>,
) {
let data_column = gossip_verified_data_column.as_data_column();
// TODO(das) send to DA checker
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Until crypto is ready a basic impl could just check that > 50% of columns per row have been seen, without attempting re-construction otherwise

info!(
self.log,
"Processed gossip data column";
"index" => data_column.index,
"slot" => data_column.slot().as_u64()
);
}

/// Cache the blobs in the processing cache, process it, then evict it from the cache if it was
/// imported or errors.
pub async fn process_rpc_blobs(
Expand Down
41 changes: 40 additions & 1 deletion beacon_node/beacon_chain/src/blob_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ use ssz_types::VariableList;
use tree_hash::TreeHash;
use types::blob_sidecar::BlobIdentifier;
use types::{
BeaconStateError, BlobSidecar, CloneConfig, EthSpec, Hash256, SignedBeaconBlockHeader, Slot,
BeaconStateError, BlobSidecar, CloneConfig, DataColumnSidecar, EthSpec, Hash256,
SignedBeaconBlockHeader, Slot,
};

/// An error occurred while validating a gossip blob.
Expand Down Expand Up @@ -184,6 +185,33 @@ pub type GossipVerifiedBlobList<T> = VariableList<
<<T as BeaconChainTypes>::EthSpec as EthSpec>::MaxBlobsPerBlock,
>;

#[derive(Debug)]
pub struct GossipVerifiedDataColumnSidecar<T: BeaconChainTypes> {
data_column_sidecar: Arc<DataColumnSidecar<T::EthSpec>>,
}

impl<T: BeaconChainTypes> GossipVerifiedDataColumnSidecar<T> {
pub fn new(
column_sidecar: Arc<DataColumnSidecar<T::EthSpec>>,
subnet_id: u64,
chain: &BeaconChain<T>,
) -> Result<Self, GossipBlobError<T::EthSpec>> {
let header = column_sidecar.signed_block_header.clone();
// We only process slashing info if the gossip verification failed
// since we do not process the blob any further in that case.
validate_data_column_sidecar_for_gossip(column_sidecar, subnet_id, chain).map_err(|e| {
process_block_slash_info::<_, GossipBlobError<T::EthSpec>>(
chain,
BlockSlashInfo::from_early_error_blob(header, e),
)
})
}

pub fn as_data_column(&self) -> &Arc<DataColumnSidecar<T::EthSpec>> {
&self.data_column_sidecar
}
}

/// A wrapper around a `BlobSidecar` that indicates it has been approved for re-gossiping on
/// the p2p network.
#[derive(Debug)]
Expand Down Expand Up @@ -647,6 +675,17 @@ pub fn validate_blob_sidecar_for_gossip<T: BeaconChainTypes>(
})
}

pub fn validate_data_column_sidecar_for_gossip<T: BeaconChainTypes>(
data_column_sidecar: Arc<DataColumnSidecar<T::EthSpec>>,
_subnet: u64,
_chain: &BeaconChain<T>,
) -> Result<GossipVerifiedDataColumnSidecar<T>, GossipBlobError<T::EthSpec>> {
// TODO(das): validate kzg commitments, cell proofs etc
Ok(GossipVerifiedDataColumnSidecar {
data_column_sidecar: data_column_sidecar.clone(),
})
}

/// Returns the canonical root of the given `blob`.
///
/// Use this function to ensure that we report the blob hashing time Prometheus metric.
Expand Down
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ pub enum BeaconChainError {
InconsistentFork(InconsistentFork),
ProposerHeadForkChoiceError(fork_choice::Error<proto_array::Error>),
UnableToPublish,
UnableToBuildColumnSidecar(String),
AvailabilityCheckError(AvailabilityCheckError),
LightClientError(LightClientError),
UnsupportedFork,
Expand Down
12 changes: 12 additions & 0 deletions beacon_node/beacon_chain/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1014,14 +1014,26 @@ lazy_static! {
"beacon_blobs_sidecar_processing_requests_total",
"Count of all blob sidecars submitted for processing"
);
pub static ref BLOBS_COLUMN_SIDECAR_PROCESSING_REQUESTS: Result<IntCounter> = try_create_int_counter(
"beacon_blobs_column_sidecar_processing_requests_total",
"Count of all data column sidecars submitted for processing"
);
pub static ref BLOBS_SIDECAR_PROCESSING_SUCCESSES: Result<IntCounter> = try_create_int_counter(
"beacon_blobs_sidecar_processing_successes_total",
"Number of blob sidecars verified for gossip"
);
pub static ref DATA_COLUMNS_SIDECAR_PROCESSING_SUCCESSES: Result<IntCounter> = try_create_int_counter(
"beacon_blobs_column_sidecar_processing_successes_total",
"Number of data column sidecars verified for gossip"
);
pub static ref BLOBS_SIDECAR_GOSSIP_VERIFICATION_TIMES: Result<Histogram> = try_create_histogram(
"beacon_blobs_sidecar_gossip_verification_seconds",
"Full runtime of blob sidecars gossip verification"
);
pub static ref DATA_COLUMN_SIDECAR_GOSSIP_VERIFICATION_TIMES: Result<Histogram> = try_create_histogram(
"beacon_blobs_column_sidecar_gossip_verification_seconds",
"Full runtime of data column sidecars gossip verification"
);
pub static ref BLOB_SIDECAR_INCLUSION_PROOF_VERIFICATION: Result<Histogram> = try_create_histogram(
"blob_sidecar_inclusion_proof_verification_seconds",
"Time taken to verify blob sidecar inclusion proof"
Expand Down
27 changes: 22 additions & 5 deletions beacon_node/beacon_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ const MAX_GOSSIP_BLOCK_QUEUE_LEN: usize = 1_024;
/// will be stored before we start dropping them.
const MAX_GOSSIP_BLOB_QUEUE_LEN: usize = 1_024;

/// The maximum number of queued `DataColumnSidecar` objects received on gossip that
/// will be stored before we start dropping them.
const MAX_GOSSIP_DATA_COL_QUEUE_LEN: usize = 1_024;

/// The maximum number of queued `SignedBeaconBlock` objects received prior to their slot (but
/// within acceptable clock disparity) that will be queued before we start dropping them.
const MAX_DELAYED_BLOCK_QUEUE_LEN: usize = 1_024;
Expand Down Expand Up @@ -224,6 +228,7 @@ pub const GOSSIP_AGGREGATE: &str = "gossip_aggregate";
pub const GOSSIP_AGGREGATE_BATCH: &str = "gossip_aggregate_batch";
pub const GOSSIP_BLOCK: &str = "gossip_block";
pub const GOSSIP_BLOBS_SIDECAR: &str = "gossip_blobs_sidecar";
pub const GOSSIP_BLOBS_COLUMN_SIDECAR: &str = "gossip_blobs_column_sidecar";
pub const DELAYED_IMPORT_BLOCK: &str = "delayed_import_block";
pub const GOSSIP_VOLUNTARY_EXIT: &str = "gossip_voluntary_exit";
pub const GOSSIP_PROPOSER_SLASHING: &str = "gossip_proposer_slashing";
Expand Down Expand Up @@ -590,6 +595,7 @@ pub enum Work<E: EthSpec> {
},
GossipBlock(AsyncFn),
GossipBlobSidecar(AsyncFn),
GossipDataColumnSidecar(AsyncFn),
DelayedImportBlock {
beacon_block_slot: Slot,
beacon_block_root: Hash256,
Expand Down Expand Up @@ -640,6 +646,7 @@ impl<E: EthSpec> Work<E> {
Work::GossipAggregateBatch { .. } => GOSSIP_AGGREGATE_BATCH,
Work::GossipBlock(_) => GOSSIP_BLOCK,
Work::GossipBlobSidecar(_) => GOSSIP_BLOBS_SIDECAR,
Work::GossipDataColumnSidecar(_) => GOSSIP_BLOBS_COLUMN_SIDECAR,
Work::DelayedImportBlock { .. } => DELAYED_IMPORT_BLOCK,
Work::GossipVoluntaryExit(_) => GOSSIP_VOLUNTARY_EXIT,
Work::GossipProposerSlashing(_) => GOSSIP_PROPOSER_SLASHING,
Expand Down Expand Up @@ -809,6 +816,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
let mut backfill_chain_segment = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN);
let mut gossip_block_queue = FifoQueue::new(MAX_GOSSIP_BLOCK_QUEUE_LEN);
let mut gossip_blob_queue = FifoQueue::new(MAX_GOSSIP_BLOB_QUEUE_LEN);
let mut gossip_data_column_queue = FifoQueue::new(MAX_GOSSIP_DATA_COL_QUEUE_LEN);
let mut delayed_block_queue = FifoQueue::new(MAX_DELAYED_BLOCK_QUEUE_LEN);

let mut status_queue = FifoQueue::new(MAX_STATUS_QUEUE_LEN);
Expand Down Expand Up @@ -964,6 +972,8 @@ impl<E: EthSpec> BeaconProcessor<E> {
self.spawn_worker(item, idle_tx);
} else if let Some(item) = gossip_blob_queue.pop() {
self.spawn_worker(item, idle_tx);
} else if let Some(item) = gossip_data_column_queue.pop() {
self.spawn_worker(item, idle_tx);
// Check the priority 0 API requests after blocks and blobs, but before attestations.
} else if let Some(item) = api_request_p0_queue.pop() {
self.spawn_worker(item, idle_tx);
Expand Down Expand Up @@ -1206,6 +1216,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
Work::GossipBlobSidecar { .. } => {
gossip_blob_queue.push(work, work_id, &self.log)
}
Work::GossipDataColumnSidecar { .. } => {
gossip_data_column_queue.push(work, work_id, &self.log)
}
Work::DelayedImportBlock { .. } => {
delayed_block_queue.push(work, work_id, &self.log)
}
Expand Down Expand Up @@ -1304,6 +1317,10 @@ impl<E: EthSpec> BeaconProcessor<E> {
&metrics::BEACON_PROCESSOR_GOSSIP_BLOB_QUEUE_TOTAL,
gossip_blob_queue.len() as i64,
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_GOSSIP_DATA_COLUMN_QUEUE_TOTAL,
gossip_data_column_queue.len() as i64,
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_RPC_BLOCK_QUEUE_TOTAL,
rpc_block_queue.len() as i64,
Expand Down Expand Up @@ -1455,11 +1472,11 @@ impl<E: EthSpec> BeaconProcessor<E> {
task_spawner.spawn_async(process_fn)
}
Work::IgnoredRpcBlock { process_fn } => task_spawner.spawn_blocking(process_fn),
Work::GossipBlock(work) | Work::GossipBlobSidecar(work) => {
task_spawner.spawn_async(async move {
work.await;
})
}
Work::GossipBlock(work)
| Work::GossipBlobSidecar(work)
| Work::GossipDataColumnSidecar(work) => task_spawner.spawn_async(async move {
work.await;
}),
Work::BlobsByRangeRequest(process_fn) | Work::BlobsByRootsRequest(process_fn) => {
task_spawner.spawn_blocking(process_fn)
}
Expand Down
5 changes: 5 additions & 0 deletions beacon_node/beacon_processor/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ lazy_static::lazy_static! {
"beacon_processor_gossip_blob_queue_total",
"Count of blobs from gossip waiting to be verified."
);
// Gossip data column sidecars.
pub static ref BEACON_PROCESSOR_GOSSIP_DATA_COLUMN_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge(
"beacon_processor_gossip_data_column_queue_total",
"Count of data column sidecars from gossip waiting to be verified."
);
// Gossip Exits.
pub static ref BEACON_PROCESSOR_EXIT_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge(
"beacon_processor_exit_queue_total",
Expand Down
24 changes: 21 additions & 3 deletions beacon_node/http_api/src/publish_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ use std::time::Duration;
use tokio::sync::mpsc::UnboundedSender;
use tree_hash::TreeHash;
use types::{
AbstractExecPayload, BeaconBlockRef, BlobSidecarList, EthSpec, ExecPayload, ExecutionBlockHash,
ForkName, FullPayload, FullPayloadMerge, Hash256, SignedBeaconBlock, SignedBlindedBeaconBlock,
VariableList,
AbstractExecPayload, BeaconBlockRef, BlobSidecarList, DataColumnSidecar, DataColumnSubnetId,
EthSpec, ExecPayload, ExecutionBlockHash, ForkName, FullPayload, FullPayloadMerge, Hash256,
SignedBeaconBlock, SignedBlindedBeaconBlock, VariableList,
};
use warp::http::StatusCode;
use warp::{reply::Response, Rejection, Reply};
Expand Down Expand Up @@ -88,6 +88,24 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlockConten
SignedBeaconBlock::Deneb(_) => {
let mut pubsub_messages = vec![PubsubMessage::BeaconBlock(block.clone())];
if let Some(blob_sidecars) = blobs_opt {
// Build and publish column sidecars
let col_sidecars = DataColumnSidecar::random_from_blob_sidecars(&blob_sidecars)
.map_err(|e| {
BeaconChainError::UnableToBuildColumnSidecar(format!("{e:?}"))
})?;

for (col_index, col_sidecar) in col_sidecars.into_iter().enumerate() {
let subnet_id =
DataColumnSubnetId::try_from_column_index::<T::EthSpec>(col_index)
.map_err(|e| {
BeaconChainError::UnableToBuildColumnSidecar(format!("{e:?}"))
})?;
pubsub_messages.push(PubsubMessage::DataColumnSidecar(Box::new((
subnet_id,
Arc::new(col_sidecar),
))));
}
// Publish blob sidecars
for (blob_index, blob) in blob_sidecars.into_iter().enumerate() {
pubsub_messages.push(PubsubMessage::BlobSidecar(Box::new((
blob_index as u64,
Expand Down
3 changes: 3 additions & 0 deletions beacon_node/lighthouse_network/src/discovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,8 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
)
.map_err(|e| format!("{:?}", e))?;
}
// TODO(das) discovery to be implemented at a later phase. Initially we just use a large peer count.
Subnet::DataColumn(_) => return Ok(()),
}

// replace the global version
Expand Down Expand Up @@ -832,6 +834,7 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
let query_str = match query.subnet {
Subnet::Attestation(_) => "attestation",
Subnet::SyncCommittee(_) => "sync_committee",
Subnet::DataColumn(_) => "data_column",
};

if let Some(v) = metrics::get_int_counter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ where
Subnet::SyncCommittee(s) => sync_committee_bitfield
.as_ref()
.map_or(false, |b| b.get(*s.deref() as usize).unwrap_or(false)),
// TODO(das) discovery to be implemented at a later phase. Initially we just use a large peer count.
Subnet::DataColumn(_) => false,
});

if !predicate {
Expand Down
4 changes: 4 additions & 0 deletions beacon_node/lighthouse_network/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1073,6 +1073,10 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
.or_default()
.insert(id);
}
// TODO(das) to be implemented. We're not pruning data column peers yet
// because data column topics are subscribed as core topics until we
// implement recomputing data column subnets.
Subnet::DataColumn(_) => {}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ impl<T: EthSpec> PeerInfo<T> {
.syncnets()
.map_or(false, |s| s.get(**id as usize).unwrap_or(false))
}
// TODO(das) Add data column nets bitfield
Subnet::DataColumn(_) => return false,
}
}
false
Expand Down
1 change: 1 addition & 0 deletions beacon_node/lighthouse_network/src/service/gossip_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ impl GossipCache {
let expire_timeout = match topic.kind() {
GossipKind::BeaconBlock => self.beacon_block,
GossipKind::BlobSidecar(_) => self.blob_sidecar,
GossipKind::DataColumnSidecar(_) => self.blob_sidecar,
GossipKind::BeaconAggregateAndProof => self.aggregates,
GossipKind::Attestation(_) => self.attestation,
GossipKind::VoluntaryExit => self.voluntary_exit,
Expand Down
2 changes: 2 additions & 0 deletions beacon_node/lighthouse_network/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
let max_topics = ctx.chain_spec.attestation_subnet_count as usize
+ SYNC_COMMITTEE_SUBNET_COUNT as usize
+ ctx.chain_spec.blob_sidecar_subnet_count as usize
+ ctx.chain_spec.data_column_sidecar_subnet_count as usize
+ BASE_CORE_TOPICS.len()
+ ALTAIR_CORE_TOPICS.len()
+ CAPELLA_CORE_TOPICS.len()
Expand All @@ -239,6 +240,7 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
ctx.chain_spec.attestation_subnet_count,
SYNC_COMMITTEE_SUBNET_COUNT,
ctx.chain_spec.blob_sidecar_subnet_count,
ctx.chain_spec.data_column_sidecar_subnet_count,
),
// during a fork we subscribe to both the old and new topics
max_subscribed_topics: max_topics * 4,
Expand Down
8 changes: 7 additions & 1 deletion beacon_node/lighthouse_network/src/service/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ use std::io::prelude::*;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use types::{ChainSpec, EnrForkId, EthSpec, ForkContext, SubnetId, SyncSubnetId};
use types::{
ChainSpec, DataColumnSubnetId, EnrForkId, EthSpec, ForkContext, SubnetId, SyncSubnetId,
};

pub const NETWORK_KEY_FILENAME: &str = "key";
/// The maximum simultaneous libp2p connections per peer.
Expand Down Expand Up @@ -232,6 +234,7 @@ pub(crate) fn create_whitelist_filter(
attestation_subnet_count: u64,
sync_committee_subnet_count: u64,
blob_sidecar_subnet_count: u64,
data_column_subnet_count: u64,
) -> gossipsub::WhitelistSubscriptionFilter {
let mut possible_hashes = HashSet::new();
for fork_digest in possible_fork_digests {
Expand Down Expand Up @@ -260,6 +263,9 @@ pub(crate) fn create_whitelist_filter(
for id in 0..blob_sidecar_subnet_count {
add(BlobSidecar(id));
}
for id in 0..data_column_subnet_count {
add(DataColumnSidecar(DataColumnSubnetId::new(id)));
}
}
gossipsub::WhitelistSubscriptionFilter(possible_hashes)
}
Expand Down
Loading
Loading