Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple gossip #3950

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 173 additions & 3 deletions beacon_node/beacon_chain/src/blob_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ use derivative::Derivative;
use slot_clock::SlotClock;
use std::sync::Arc;

use crate::beacon_chain::{BeaconChain, BeaconChainTypes, MAXIMUM_GOSSIP_CLOCK_DISPARITY};
use crate::beacon_chain::{
BeaconChain, BeaconChainTypes, MAXIMUM_GOSSIP_CLOCK_DISPARITY,
VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT,
};
use crate::{kzg_utils, BeaconChainError};
use state_processing::per_block_processing::eip4844::eip4844::verify_kzg_commitments_against_transactions;
use types::signed_beacon_block::BlobReconstructionError;
use types::{
BeaconBlockRef, BeaconStateError, BlobsSidecar, EthSpec, Hash256, KzgCommitment,
SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar, SignedBeaconBlockHeader, Slot,
Transactions,
SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar, SignedBeaconBlockHeader,
SignedBlobSidecar, Slot, Transactions,
};
use types::{Epoch, ExecPayload};

Expand Down Expand Up @@ -62,6 +65,52 @@ pub enum BlobError {
UnavailableBlobs,
/// Blobs provided for a pre-Eip4844 fork.
InconsistentFork,

/// The `blobs_sidecar.message.beacon_block_root` block is unknown.
///
/// ## Peer scoring
///
/// The blob points to a block we have not yet imported. The blob cannot be imported
/// into fork choice yet
UnknownHeadBlock {
beacon_block_root: Hash256,
},

/// The `BlobSidecar` was gossiped over an incorrect subnet.
InvalidSubnet {
expected: u64,
received: u64,
},

/// The sidecar corresponds to a slot older than the finalized head slot.
PastFinalizedSlot {
blob_slot: Slot,
finalized_slot: Slot,
},

/// The proposer index specified in the sidecar does not match the locally computed
/// proposer index.
ProposerIndexMismatch {
sidecar: usize,
local: usize,
},

ProposerSignatureInvalid,

/// A sidecar with same slot, beacon_block_root and proposer_index but different blob is received for
/// the same blob index.
RepeatSidecar {
proposer: usize,
slot: Slot,
blob_index: usize,
},

/// The proposal_index corresponding to blob.beacon_block_root is not known.
///
/// ## Peer scoring
///
/// The block is invalid and the peer is faulty.
UnknownValidator(u64),
}

impl From<BlobReconstructionError> for BlobError {
Expand Down Expand Up @@ -115,6 +164,127 @@ pub fn validate_blob_for_gossip<T: BeaconChainTypes>(
block_wrapper.into_available_block(block_root, chain)
}

pub fn validate_blob_sidecar_for_gossip<T: BeaconChainTypes>(
blob_sidecar: SignedBlobSidecar<T::EthSpec>,
subnet: u64,
chain: &BeaconChain<T>,
) -> Result<(), BlobError> {
let blob_slot = blob_sidecar.message.slot;
let blob_index = blob_sidecar.message.index;
let block_root = blob_sidecar.message.block_root;

// Verify that the blob_sidecar was received on the correct subnet.
if blob_index != subnet {
return Err(BlobError::InvalidSubnet {
expected: blob_index,
received: subnet,
});
}

// Verify that the sidecar is not from a future slot.
let latest_permissible_slot = chain
.slot_clock
.now_with_future_tolerance(MAXIMUM_GOSSIP_CLOCK_DISPARITY)
.ok_or(BeaconChainError::UnableToReadSlot)?;
if blob_slot > latest_permissible_slot {
return Err(BlobError::FutureSlot {
message_slot: blob_slot,
latest_permissible_slot,
});
}

// TODO(pawan): Verify not from a past slot?

// Verify that the sidecar slot is greater than the latest finalized slot
let latest_finalized_slot = chain
.head()
.finalized_checkpoint()
.epoch
.start_slot(T::EthSpec::slots_per_epoch());
if blob_slot <= latest_finalized_slot {
return Err(BlobError::PastFinalizedSlot {
blob_slot,
finalized_slot: latest_finalized_slot,
});
}

// TODO(pawan): should we verify locally that the parent root is correct
// or just use whatever the proposer gives us?
let proposer_shuffling_root = blob_sidecar.message.block_parent_root;

let (proposer_index, fork) = match chain
.beacon_proposer_cache
.lock()
.get_slot::<T::EthSpec>(proposer_shuffling_root, blob_slot)
{
Some(proposer) => (proposer.index, proposer.fork),
None => {
let state = &chain.canonical_head.cached_head().snapshot.beacon_state;
(
state.get_beacon_proposer_index(blob_slot, &chain.spec)?,
state.fork(),
)
}
};

let blob_proposer_index = blob_sidecar.message.proposer_index;
if proposer_index != blob_proposer_index {
return Err(BlobError::ProposerIndexMismatch {
sidecar: blob_proposer_index,
local: proposer_index,
});
}

let signature_is_valid = {
let pubkey_cache = chain
.validator_pubkey_cache
.try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(BeaconChainError::ValidatorPubkeyCacheLockTimeout)
.map_err(BlobError::BeaconChainError)?;

let pubkey = pubkey_cache
.get(proposer_index as usize)
.ok_or_else(|| BlobError::UnknownValidator(proposer_index as u64))?;

blob_sidecar.verify_signature(
None,
pubkey,
&fork,
chain.genesis_validators_root,
&chain.spec,
)
};

if !signature_is_valid {
return Err(BlobError::ProposerSignatureInvalid);
}

// TODO(pawan): kzg validations.

// TODO(pawan): Check if other blobs for the same proposer index and blob index have been
// received and drop if required.

// TODO(pawan): potentially add to a seen cache at this point.

// Verify if the corresponding block for this blob has been received.
// Note: this should be the last gossip check so that we can forward the blob
// over the gossip network even if we haven't received the corresponding block yet
// as all other validations have passed.
let block_opt = chain
.canonical_head
.fork_choice_read_lock()
.get_block(&block_root)
.or_else(|| chain.early_attester_cache.get_proto_block(block_root)); // TODO(pawan): should we be checking this cache?

if block_opt.is_none() {
return Err(BlobError::UnknownHeadBlock {
beacon_block_root: block_root,
});
}

Ok(())
}

fn verify_data_availability<T: BeaconChainTypes>(
blob_sidecar: &BlobsSidecar<T::EthSpec>,
kzg_commitments: &[KzgCommitment],
Expand Down
69 changes: 25 additions & 44 deletions beacon_node/http_api/src/publish_blocks.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::metrics;
use beacon_chain::blob_verification::{AsBlock, BlockWrapper, IntoAvailableBlock};
use beacon_chain::validator_monitor::{get_block_delay_ms, timestamp_now};
use beacon_chain::NotifyExecutionLayer;
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, CountUnrealized};
Expand All @@ -12,7 +11,7 @@ use tokio::sync::mpsc::UnboundedSender;
use tree_hash::TreeHash;
use types::{
AbstractExecPayload, BlindedPayload, EthSpec, ExecPayload, ExecutionBlockHash, FullPayload,
Hash256, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar,
Hash256, SignedBeaconBlock,
};
use warp::Rejection;

Expand All @@ -32,51 +31,33 @@ pub async fn publish_block<T: BeaconChainTypes>(

// Send the block, regardless of whether or not it is valid. The API
// specification is very clear that this is the desired behaviour.
let wrapped_block: BlockWrapper<T::EthSpec> =
if matches!(block.as_ref(), &SignedBeaconBlock::Eip4844(_)) {
if let Some(sidecar) = chain.blob_cache.pop(&block_root) {
let block_and_blobs = SignedBeaconBlockAndBlobsSidecar {
beacon_block: block,
blobs_sidecar: Arc::new(sidecar),
};
crate::publish_pubsub_message(
network_tx,
PubsubMessage::BeaconBlockAndBlobsSidecars(block_and_blobs.clone()),
)?;
block_and_blobs.into()
} else {
//FIXME(sean): This should probably return a specific no-blob-cached error code, beacon API coordination required
return Err(warp_utils::reject::broadcast_without_import(format!(
"no blob cached for block"
)));
}
} else {
crate::publish_pubsub_message(network_tx, PubsubMessage::BeaconBlock(block.clone()))?;
block.into()
};
crate::publish_pubsub_message(network_tx, PubsubMessage::BeaconBlock(block.clone()))?;

/* TODO: publish all blob sidecars associated with this block */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's an open discussion whether to do blob publishing separately
ethereum/beacon-APIs#300
PR: ethereum/beacon-APIs#302


// Determine the delay after the start of the slot, register it with metrics.
let block = wrapped_block.as_block();
let delay = get_block_delay_ms(seen_timestamp, block.message(), &chain.slot_clock);
metrics::observe_duration(&metrics::HTTP_API_BLOCK_BROADCAST_DELAY_TIMES, delay);

let available_block = match wrapped_block.into_available_block(block_root, &chain) {
Ok(available_block) => available_block,
Err(e) => {
let msg = format!("{:?}", e);
error!(
log,
"Invalid block provided to HTTP API";
"reason" => &msg
);
return Err(warp_utils::reject::broadcast_without_import(msg));
}
};
/* TODO: check availability of block */

// let available_block = match wrapped_block.into_available_block(block_root, &chain) {
// Ok(available_block) => available_block,
// Err(e) => {
// let msg = format!("{:?}", e);
// error!(
// log,
// "Invalid block provided to HTTP API";
// "reason" => &msg
// );
// return Err(warp_utils::reject::broadcast_without_import(msg));
// }
// };

match chain
.process_block(
block_root,
available_block.clone(),
block.clone(),
CountUnrealized::True,
NotifyExecutionLayer::Yes,
)
Expand All @@ -88,14 +69,14 @@ pub async fn publish_block<T: BeaconChainTypes>(
"Valid block from HTTP API";
"block_delay" => ?delay,
"root" => format!("{}", root),
"proposer_index" => available_block.message().proposer_index(),
"slot" => available_block.slot(),
"proposer_index" => block.message().proposer_index(),
"slot" => block.slot(),
);

// Notify the validator monitor.
chain.validator_monitor.read().register_api_block(
seen_timestamp,
available_block.message(),
block.message(),
root,
&chain.slot_clock,
);
Expand All @@ -117,7 +98,7 @@ pub async fn publish_block<T: BeaconChainTypes>(
"Block was broadcast too late";
"msg" => "system may be overloaded, block likely to be orphaned",
"delay_ms" => delay.as_millis(),
"slot" => available_block.slot(),
"slot" => block.slot(),
"root" => ?root,
)
} else if delay >= delayed_threshold {
Expand All @@ -126,7 +107,7 @@ pub async fn publish_block<T: BeaconChainTypes>(
"Block broadcast was delayed";
"msg" => "system may be overloaded, block may be orphaned",
"delay_ms" => delay.as_millis(),
"slot" => available_block.slot(),
"slot" => block.slot(),
"root" => ?root,
)
}
Expand All @@ -138,7 +119,7 @@ pub async fn publish_block<T: BeaconChainTypes>(
log,
"Block from HTTP API already known";
"block" => ?block_root,
"slot" => available_block.slot(),
"slot" => block.slot(),
);
Ok(())
}
Expand Down
10 changes: 5 additions & 5 deletions beacon_node/lighthouse_network/src/service/gossip_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub struct GossipCache {
/// Timeout for blocks.
beacon_block: Option<Duration>,
/// Timeout for blobs.
beacon_block_and_blobs_sidecar: Option<Duration>,
blob_sidecar: Option<Duration>,
/// Timeout for aggregate attestations.
aggregates: Option<Duration>,
/// Timeout for attestations.
Expand Down Expand Up @@ -50,7 +50,7 @@ pub struct GossipCacheBuilder {
/// Timeout for blocks.
beacon_block: Option<Duration>,
/// Timeout for blob sidecars.
beacon_block_and_blobs_sidecar: Option<Duration>,
blob_sidecar: Option<Duration>,
/// Timeout for aggregate attestations.
aggregates: Option<Duration>,
/// Timeout for attestations.
Expand Down Expand Up @@ -151,7 +151,7 @@ impl GossipCacheBuilder {
let GossipCacheBuilder {
default_timeout,
beacon_block,
beacon_block_and_blobs_sidecar,
blob_sidecar,
aggregates,
attestation,
voluntary_exit,
Expand All @@ -167,7 +167,7 @@ impl GossipCacheBuilder {
expirations: DelayQueue::default(),
topic_msgs: HashMap::default(),
beacon_block: beacon_block.or(default_timeout),
beacon_block_and_blobs_sidecar: beacon_block_and_blobs_sidecar.or(default_timeout),
blob_sidecar: blob_sidecar.or(default_timeout),
aggregates: aggregates.or(default_timeout),
attestation: attestation.or(default_timeout),
voluntary_exit: voluntary_exit.or(default_timeout),
Expand All @@ -193,7 +193,7 @@ impl GossipCache {
pub fn insert(&mut self, topic: GossipTopic, data: Vec<u8>) {
let expire_timeout = match topic.kind() {
GossipKind::BeaconBlock => self.beacon_block,
GossipKind::BeaconBlocksAndBlobsSidecar => self.beacon_block_and_blobs_sidecar,
GossipKind::BlobSidecar(_) => self.blob_sidecar,
GossipKind::BeaconAggregateAndProof => self.aggregates,
GossipKind::Attestation(_) => self.attestation,
GossipKind::VoluntaryExit => self.voluntary_exit,
Expand Down
1 change: 1 addition & 0 deletions beacon_node/lighthouse_network/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
possible_fork_digests,
ctx.chain_spec.attestation_subnet_count,
SYNC_COMMITTEE_SUBNET_COUNT,
4, // TODO(pawan): get this from chainspec
),
max_subscribed_topics: 200,
max_subscriptions_per_request: 150, // 148 in theory = (64 attestation + 4 sync committee + 6 core topics) * 2
Expand Down
Loading