Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 62 additions & 2 deletions beacon_node/beacon_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,10 @@ pub enum Work<E: EthSpec> {
DataColumnsByRootsRequest(BlockingFn),
DataColumnsByRangeRequest(BlockingFn),
GossipBlsToExecutionChange(BlockingFn),
GossipExecutionPayload(AsyncFn),
GossipExecutionPayloadBid(BlockingFn),
GossipPayloadAttestation(BlockingFn),
GossipProposerPreferences(BlockingFn),
LightClientBootstrapRequest(BlockingFn),
LightClientOptimisticUpdateRequest(BlockingFn),
LightClientFinalityUpdateRequest(BlockingFn),
Expand Down Expand Up @@ -461,6 +465,10 @@ pub enum WorkType {
DataColumnsByRootsRequest,
DataColumnsByRangeRequest,
GossipBlsToExecutionChange,
GossipExecutionPayload,
GossipExecutionPayloadBid,
GossipPayloadAttestation,
GossipProposerPreferences,
LightClientBootstrapRequest,
LightClientOptimisticUpdateRequest,
LightClientFinalityUpdateRequest,
Expand Down Expand Up @@ -496,6 +504,10 @@ impl<E: EthSpec> Work<E> {
WorkType::GossipLightClientOptimisticUpdate
}
Work::GossipBlsToExecutionChange(_) => WorkType::GossipBlsToExecutionChange,
Work::GossipExecutionPayload(_) => WorkType::GossipExecutionPayload,
Work::GossipExecutionPayloadBid(_) => WorkType::GossipExecutionPayloadBid,
Work::GossipPayloadAttestation(_) => WorkType::GossipPayloadAttestation,
Work::GossipProposerPreferences(_) => WorkType::GossipProposerPreferences,
Work::RpcBlock { .. } => WorkType::RpcBlock,
Work::RpcBlobs { .. } => WorkType::RpcBlobs,
Work::RpcCustodyColumn { .. } => WorkType::RpcCustodyColumn,
Expand Down Expand Up @@ -777,10 +789,13 @@ impl<E: EthSpec> BeaconProcessor<E> {
// on the delayed ones.
} else if let Some(item) = work_queues.delayed_block_queue.pop() {
Some(item)
// Check gossip blocks before gossip attestations, since a block might be
// Check gossip blocks and payloads before gossip attestations, since a block might be
// required to verify some attestations.
} else if let Some(item) = work_queues.gossip_block_queue.pop() {
Some(item)
} else if let Some(item) = work_queues.gossip_execution_payload_queue.pop()
{
Some(item)
} else if let Some(item) = work_queues.gossip_blob_queue.pop() {
Some(item)
} else if let Some(item) = work_queues.gossip_data_column_queue.pop() {
Expand Down Expand Up @@ -903,6 +918,12 @@ impl<E: EthSpec> BeaconProcessor<E> {
// Convert any gossip attestations that need to be converted.
} else if let Some(item) = work_queues.attestation_to_convert_queue.pop() {
Some(item)
// Check payload attestation messages after attestations. They dont give rewards
// but they influence fork choice.
} else if let Some(item) =
work_queues.gossip_payload_attestation_queue.pop()
{
Some(item)
// Check sync committee messages after attestations as their rewards are lesser
// and they don't influence fork choice.
} else if let Some(item) = work_queues.sync_contribution_queue.pop() {
Expand All @@ -914,6 +935,17 @@ impl<E: EthSpec> BeaconProcessor<E> {
} else if let Some(item) = work_queues.unknown_block_aggregate_queue.pop() {
Some(item)
} else if let Some(item) = work_queues.unknown_block_attestation_queue.pop()
{
Some(item)
// Check execution payload bids. Most proposers will request bids directly from builders
// instead of receiving them over gossip.
} else if let Some(item) =
work_queues.gossip_execution_payload_bid_queue.pop()
{
Some(item)
// Check proposer preferences.
} else if let Some(item) =
work_queues.gossip_proposer_preferences_queue.pop()
{
Some(item)
// Check RPC methods next. Status messages are needed for sync so
Expand Down Expand Up @@ -1143,6 +1175,18 @@ impl<E: EthSpec> BeaconProcessor<E> {
Work::GossipBlsToExecutionChange { .. } => work_queues
.gossip_bls_to_execution_change_queue
.push(work, work_id),
Work::GossipExecutionPayload { .. } => work_queues
.gossip_execution_payload_queue
.push(work, work_id),
Work::GossipExecutionPayloadBid { .. } => work_queues
.gossip_execution_payload_bid_queue
.push(work, work_id),
Work::GossipPayloadAttestation { .. } => work_queues
.gossip_payload_attestation_queue
.push(work, work_id),
Work::GossipProposerPreferences { .. } => work_queues
.gossip_proposer_preferences_queue
.push(work, work_id),
Work::BlobsByRootsRequest { .. } => {
work_queues.blob_broots_queue.push(work, work_id)
}
Expand Down Expand Up @@ -1229,6 +1273,18 @@ impl<E: EthSpec> BeaconProcessor<E> {
WorkType::GossipBlsToExecutionChange => {
work_queues.gossip_bls_to_execution_change_queue.len()
}
WorkType::GossipExecutionPayload => {
work_queues.gossip_execution_payload_queue.len()
}
WorkType::GossipExecutionPayloadBid => {
work_queues.gossip_execution_payload_bid_queue.len()
}
WorkType::GossipPayloadAttestation => {
work_queues.gossip_payload_attestation_queue.len()
}
WorkType::GossipProposerPreferences => {
work_queues.gossip_proposer_preferences_queue.len()
}
WorkType::LightClientBootstrapRequest => {
work_queues.lc_bootstrap_queue.len()
}
Expand Down Expand Up @@ -1383,7 +1439,8 @@ impl<E: EthSpec> BeaconProcessor<E> {
Work::IgnoredRpcBlock { process_fn } => task_spawner.spawn_blocking(process_fn),
Work::GossipBlock(work)
| Work::GossipBlobSidecar(work)
| Work::GossipDataColumnSidecar(work) => task_spawner.spawn_async(async move {
| Work::GossipDataColumnSidecar(work)
| Work::GossipExecutionPayload(work) => task_spawner.spawn_async(async move {
work.await;
}),
Work::BlobsByRangeRequest(process_fn)
Expand Down Expand Up @@ -1416,6 +1473,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
| Work::GossipLightClientOptimisticUpdate(process_fn)
| Work::Status(process_fn)
| Work::GossipBlsToExecutionChange(process_fn)
| Work::GossipExecutionPayloadBid(process_fn)
| Work::GossipPayloadAttestation(process_fn)
| Work::GossipProposerPreferences(process_fn)
| Work::LightClientBootstrapRequest(process_fn)
| Work::LightClientOptimisticUpdateRequest(process_fn)
| Work::LightClientFinalityUpdateRequest(process_fn)
Expand Down
30 changes: 30 additions & 0 deletions beacon_node/beacon_processor/src/scheduler/work_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ pub struct BeaconProcessorQueueLengths {
dcbroots_queue: usize,
dcbrange_queue: usize,
gossip_bls_to_execution_change_queue: usize,
gossip_execution_payload_queue: usize,
gossip_execution_payload_bid_queue: usize,
gossip_payload_attestation_queue: usize,
gossip_proposer_preferences_queue: usize,
lc_bootstrap_queue: usize,
lc_rpc_optimistic_update_queue: usize,
lc_rpc_finality_update_queue: usize,
Expand Down Expand Up @@ -201,6 +205,15 @@ impl BeaconProcessorQueueLengths {
dcbroots_queue: 1024,
dcbrange_queue: 1024,
gossip_bls_to_execution_change_queue: 16384,
// TODO(EIP-7732): verify 1024 is preferable. I used same value as `gossip_block_queue` and `gossip_blob_queue`
gossip_execution_payload_queue: 1024,
// TODO(EIP-7732) how big should this queue be?
gossip_execution_payload_bid_queue: 1024,
// PTC size ~512 per slot, buffer 2-3 slots for reorgs and processing delays (512 * 3 = 1536)
// TODO(EIP-7732): verify if this is preferable queue length or otherwise
gossip_payload_attestation_queue: 1536,
// TODO(EIP-7732): verify if this is preferable queue length
gossip_proposer_preferences_queue: 1024,
lc_gossip_finality_update_queue: 1024,
lc_gossip_optimistic_update_queue: 1024,
lc_bootstrap_queue: 1024,
Expand Down Expand Up @@ -245,6 +258,10 @@ pub struct WorkQueues<E: EthSpec> {
pub dcbroots_queue: FifoQueue<Work<E>>,
pub dcbrange_queue: FifoQueue<Work<E>>,
pub gossip_bls_to_execution_change_queue: FifoQueue<Work<E>>,
pub gossip_execution_payload_queue: FifoQueue<Work<E>>,
pub gossip_execution_payload_bid_queue: FifoQueue<Work<E>>,
pub gossip_payload_attestation_queue: FifoQueue<Work<E>>,
pub gossip_proposer_preferences_queue: FifoQueue<Work<E>>,
pub lc_gossip_finality_update_queue: FifoQueue<Work<E>>,
pub lc_gossip_optimistic_update_queue: FifoQueue<Work<E>>,
pub lc_bootstrap_queue: FifoQueue<Work<E>>,
Expand Down Expand Up @@ -310,6 +327,15 @@ impl<E: EthSpec> WorkQueues<E> {
let gossip_bls_to_execution_change_queue =
FifoQueue::new(queue_lengths.gossip_bls_to_execution_change_queue);

let gossip_execution_payload_queue =
FifoQueue::new(queue_lengths.gossip_execution_payload_queue);
let gossip_execution_payload_bid_queue =
FifoQueue::new(queue_lengths.gossip_execution_payload_bid_queue);
let gossip_payload_attestation_queue =
FifoQueue::new(queue_lengths.gossip_payload_attestation_queue);
let gossip_proposer_preferences_queue =
FifoQueue::new(queue_lengths.gossip_proposer_preferences_queue);

let lc_gossip_optimistic_update_queue =
FifoQueue::new(queue_lengths.lc_gossip_optimistic_update_queue);
let lc_gossip_finality_update_queue =
Expand Down Expand Up @@ -357,6 +383,10 @@ impl<E: EthSpec> WorkQueues<E> {
dcbroots_queue,
dcbrange_queue,
gossip_bls_to_execution_change_queue,
gossip_execution_payload_queue,
gossip_execution_payload_bid_queue,
gossip_payload_attestation_queue,
gossip_proposer_preferences_queue,
lc_gossip_optimistic_update_queue,
lc_gossip_finality_update_queue,
lc_bootstrap_queue,
Expand Down
52 changes: 52 additions & 0 deletions beacon_node/lighthouse_network/src/service/gossip_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ pub struct GossipCache {
sync_committee_message: Option<Duration>,
/// Timeout for signed BLS to execution changes.
bls_to_execution_change: Option<Duration>,
/// Timeout for signed execution payload envelope.
execution_payload: Option<Duration>,
/// Timeout for execution payload bid.
execution_payload_bid: Option<Duration>,
/// Timeout for payload attestation message.
payload_attestation: Option<Duration>,
/// Timeout for proposer preferences.
proposer_preferences: Option<Duration>,
/// Timeout for light client finality updates.
light_client_finality_update: Option<Duration>,
/// Timeout for light client optimistic updates.
Expand Down Expand Up @@ -71,6 +79,14 @@ pub struct GossipCacheBuilder {
sync_committee_message: Option<Duration>,
/// Timeout for signed BLS to execution changes.
bls_to_execution_change: Option<Duration>,
/// Timeout for signed execution payload envelope.
execution_payload: Option<Duration>,
/// Timeout for execution payload bid.
execution_payload_bid: Option<Duration>,
/// Timeout for payload attestation message.
payload_attestation: Option<Duration>,
/// Timeout for proposer preferences.
proposer_preferences: Option<Duration>,
/// Timeout for light client finality updates.
light_client_finality_update: Option<Duration>,
/// Timeout for light client optimistic updates.
Expand Down Expand Up @@ -139,6 +155,30 @@ impl GossipCacheBuilder {
self
}

/// Timeout for signed execution payload envelope.
pub fn execution_payload_timeout(mut self, timeout: Duration) -> Self {
self.execution_payload = Some(timeout);
self
}

/// Timeout for execution payload bid.
pub fn execution_payload_bid_timeout(mut self, timeout: Duration) -> Self {
self.execution_payload_bid = Some(timeout);
self
}

/// Timeout for payload attestation message.
pub fn payload_attestation_timeout(mut self, timeout: Duration) -> Self {
self.payload_attestation = Some(timeout);
self
}

/// Timeout for proposer preferences.
pub fn proposer_preferences_timeout(mut self, timeout: Duration) -> Self {
self.proposer_preferences = Some(timeout);
self
}

/// Timeout for light client finality update messages.
pub fn light_client_finality_update_timeout(mut self, timeout: Duration) -> Self {
self.light_client_finality_update = Some(timeout);
Expand All @@ -165,6 +205,10 @@ impl GossipCacheBuilder {
signed_contribution_and_proof,
sync_committee_message,
bls_to_execution_change,
execution_payload,
execution_payload_bid,
payload_attestation,
proposer_preferences,
light_client_finality_update,
light_client_optimistic_update,
} = self;
Expand All @@ -182,6 +226,10 @@ impl GossipCacheBuilder {
signed_contribution_and_proof: signed_contribution_and_proof.or(default_timeout),
sync_committee_message: sync_committee_message.or(default_timeout),
bls_to_execution_change: bls_to_execution_change.or(default_timeout),
execution_payload: execution_payload.or(default_timeout),
execution_payload_bid: execution_payload_bid.or(default_timeout),
payload_attestation: payload_attestation.or(default_timeout),
proposer_preferences: proposer_preferences.or(default_timeout),
light_client_finality_update: light_client_finality_update.or(default_timeout),
light_client_optimistic_update: light_client_optimistic_update.or(default_timeout),
}
Expand Down Expand Up @@ -209,6 +257,10 @@ impl GossipCache {
GossipKind::SignedContributionAndProof => self.signed_contribution_and_proof,
GossipKind::SyncCommitteeMessage(_) => self.sync_committee_message,
GossipKind::BlsToExecutionChange => self.bls_to_execution_change,
GossipKind::ExecutionPayload => self.execution_payload,
GossipKind::ExecutionPayloadBid => self.execution_payload_bid,
GossipKind::PayloadAttestation => self.payload_attestation,
GossipKind::ProposerPreferences => self.proposer_preferences,
GossipKind::LightClientFinalityUpdate => self.light_client_finality_update,
GossipKind::LightClientOptimisticUpdate => self.light_client_optimistic_update,
};
Expand Down
4 changes: 4 additions & 0 deletions beacon_node/lighthouse_network/src/service/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,10 @@ pub(crate) fn create_whitelist_filter(
add(AttesterSlashing);
add(SignedContributionAndProof);
add(BlsToExecutionChange);
add(ExecutionPayload);
add(ExecutionPayloadBid);
add(PayloadAttestation);
add(ProposerPreferences);
add(LightClientFinalityUpdate);
add(LightClientOptimisticUpdate);
for id in 0..spec.attestation_subnet_count {
Expand Down
Loading