Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions beacon_node/beacon_chain/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub struct ServerSentEventHandler<E: EthSpec> {
execution_payload_gossip_tx: Sender<EventKind<E>>,
execution_payload_available_tx: Sender<EventKind<E>>,
execution_payload_bid_tx: Sender<EventKind<E>>,
proposer_preferences_tx: Sender<EventKind<E>>,
payload_attestation_message_tx: Sender<EventKind<E>>,
}

Expand Down Expand Up @@ -60,6 +61,7 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
let (execution_payload_gossip_tx, _) = broadcast::channel(capacity);
let (execution_payload_available_tx, _) = broadcast::channel(capacity);
let (execution_payload_bid_tx, _) = broadcast::channel(capacity);
let (proposer_preferences_tx, _) = broadcast::channel(capacity);
let (payload_attestation_message_tx, _) = broadcast::channel(capacity);

Self {
Expand All @@ -85,6 +87,7 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
execution_payload_gossip_tx,
execution_payload_available_tx,
execution_payload_bid_tx,
proposer_preferences_tx,
payload_attestation_message_tx,
}
}
Expand Down Expand Up @@ -186,6 +189,10 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
.execution_payload_bid_tx
.send(kind)
.map(|count| log_count("execution payload bid", count)),
EventKind::ProposerPreferences(_) => self
.proposer_preferences_tx
.send(kind)
.map(|count| log_count("proposer preferences", count)),
EventKind::PayloadAttestationMessage(_) => self
.payload_attestation_message_tx
.send(kind)
Expand Down Expand Up @@ -284,6 +291,10 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
self.execution_payload_bid_tx.subscribe()
}

pub fn subscribe_proposer_preferences(&self) -> Receiver<EventKind<E>> {
self.proposer_preferences_tx.subscribe()
}

pub fn subscribe_payload_attestation_message(&self) -> Receiver<EventKind<E>> {
self.payload_attestation_message_tx.subscribe()
}
Expand Down Expand Up @@ -368,6 +379,10 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
self.execution_payload_bid_tx.receiver_count() > 0
}

pub fn has_proposer_preferences_subscribers(&self) -> bool {
self.proposer_preferences_tx.receiver_count() > 0
}

pub fn has_payload_attestation_message_subscribers(&self) -> bool {
self.payload_attestation_message_tx.receiver_count() > 0
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{
ProposerPreferencesError, proposer_preference_cache::GossipVerifiedProposerPreferenceCache,
},
};
use eth2::types::{EventKind, ForkVersionedResponse};
use slot_clock::SlotClock;
use state_processing::signature_sets::{get_pubkey_from_state, proposer_preferences_signature_set};
use tracing::debug;
Expand Down Expand Up @@ -137,6 +138,19 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
%validator_index,
"Successfully verified gossip proposer preferences"
);

if let Some(event_handler) = self.event_handler.as_ref()
&& event_handler.has_proposer_preferences_subscribers()
{
event_handler.register(EventKind::ProposerPreferences(Box::new(
ForkVersionedResponse {
version: self.spec.fork_name_at_slot::<T::EthSpec>(proposal_slot),
metadata: Default::default(),
data: (*verified.signed_preferences).clone(),
},
)));
}

Ok(verified)
}
Err(e) => {
Expand Down
3 changes: 3 additions & 0 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3241,6 +3241,9 @@ pub fn serve<T: BeaconChainTypes>(
api_types::EventTopic::ExecutionPayloadBid => {
event_handler.subscribe_execution_payload_bid()
}
api_types::EventTopic::ProposerPreferences => {
event_handler.subscribe_proposer_preferences()
}
api_types::EventTopic::PayloadAttestationMessage => {
event_handler.subscribe_payload_attestation_message()
}
Expand Down
11 changes: 11 additions & 0 deletions common/eth2/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1164,6 +1164,7 @@ pub struct SseExtendedPayloadAttributesGeneric<T> {
pub type SseExtendedPayloadAttributes = SseExtendedPayloadAttributesGeneric<SsePayloadAttributes>;
pub type VersionedSsePayloadAttributes = ForkVersionedResponse<SseExtendedPayloadAttributes>;
pub type VersionedSseExecutionPayloadBid<E> = ForkVersionedResponse<SignedExecutionPayloadBid<E>>;
pub type VersionedSseProposerPreferences = ForkVersionedResponse<SignedProposerPreferences>;
pub type VersionedSsePayloadAttestationMessage = ForkVersionedResponse<PayloadAttestationMessage>;

impl<'de> ContextDeserialize<'de, ForkName> for SsePayloadAttributes {
Expand Down Expand Up @@ -1245,6 +1246,7 @@ pub enum EventKind<E: EthSpec> {
ExecutionPayloadGossip(SseExecutionPayloadGossip),
ExecutionPayloadAvailable(SseExecutionPayloadAvailable),
ExecutionPayloadBid(Box<VersionedSseExecutionPayloadBid<E>>),
ProposerPreferences(Box<VersionedSseProposerPreferences>),
PayloadAttestationMessage(Box<VersionedSsePayloadAttestationMessage>),
}

Expand Down Expand Up @@ -1273,6 +1275,7 @@ impl<E: EthSpec> EventKind<E> {
EventKind::ExecutionPayloadGossip(_) => "execution_payload_gossip",
EventKind::ExecutionPayloadAvailable(_) => "execution_payload_available",
EventKind::ExecutionPayloadBid(_) => "execution_payload_bid",
EventKind::ProposerPreferences(_) => "proposer_preferences",
EventKind::PayloadAttestationMessage(_) => "payload_attestation_message",
}
}
Expand Down Expand Up @@ -1389,6 +1392,11 @@ impl<E: EthSpec> EventKind<E> {
ServerError::InvalidServerSentEvent(format!("Execution Payload Bid: {:?}", e))
})?,
))),
"proposer_preferences" => Ok(EventKind::ProposerPreferences(Box::new(
serde_json::from_str(data).map_err(|e| {
ServerError::InvalidServerSentEvent(format!("Proposer Preferences: {:?}", e))
})?,
))),
"payload_attestation_message" => Ok(EventKind::PayloadAttestationMessage(Box::new(
serde_json::from_str(data).map_err(|e| {
ServerError::InvalidServerSentEvent(format!(
Expand Down Expand Up @@ -1436,6 +1444,7 @@ pub enum EventTopic {
ExecutionPayloadGossip,
ExecutionPayloadAvailable,
ExecutionPayloadBid,
ProposerPreferences,
PayloadAttestationMessage,
}

Expand Down Expand Up @@ -1466,6 +1475,7 @@ impl FromStr for EventTopic {
"execution_payload_gossip" => Ok(EventTopic::ExecutionPayloadGossip),
"execution_payload_available" => Ok(EventTopic::ExecutionPayloadAvailable),
"execution_payload_bid" => Ok(EventTopic::ExecutionPayloadBid),
"proposer_preferences" => Ok(EventTopic::ProposerPreferences),
"payload_attestation_message" => Ok(EventTopic::PayloadAttestationMessage),
_ => Err("event topic cannot be parsed.".to_string()),
}
Expand Down Expand Up @@ -1499,6 +1509,7 @@ impl fmt::Display for EventTopic {
write!(f, "execution_payload_available")
}
EventTopic::ExecutionPayloadBid => write!(f, "execution_payload_bid"),
EventTopic::ProposerPreferences => write!(f, "proposer_preferences"),
EventTopic::PayloadAttestationMessage => {
write!(f, "payload_attestation_message")
}
Expand Down
2 changes: 2 additions & 0 deletions consensus/types/src/builder/proposer_preferences.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ use tree_hash_derive::TreeHash;
pub struct ProposerPreferences {
pub dependent_root: Hash256,
pub proposal_slot: Slot,
#[serde(with = "serde_utils::quoted_u64")]
pub validator_index: u64,
pub fee_recipient: Address,
#[serde(with = "serde_utils::quoted_u64")]
pub gas_limit: u64,
}

Expand Down
Loading