Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/unstable' into integrate-tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
ThreeHrSleep committed Aug 10, 2024
2 parents fb2eb8a + 3913ea4 commit 235a484
Show file tree
Hide file tree
Showing 24 changed files with 1,151 additions and 130 deletions.
15 changes: 14 additions & 1 deletion beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1351,14 +1351,27 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
) -> Result<(), Error> {
self.light_client_server_cache.recompute_and_cache_updates(
self.store.clone(),
&parent_root,
slot,
&parent_root,
&sync_aggregate,
&self.log,
&self.spec,
)
}

pub fn get_light_client_updates(
&self,
sync_committee_period: u64,
count: u64,
) -> Result<Vec<LightClientUpdate<T::EthSpec>>, Error> {
self.light_client_server_cache.get_light_client_updates(
&self.store,
sync_committee_period,
count,
&self.spec,
)
}

/// Returns the current heads of the `BeaconChain`. For the canonical head, see `Self::head`.
///
/// Returns `(block_root, block_slot)`.
Expand Down
223 changes: 173 additions & 50 deletions beacon_node/beacon_chain/src/light_client_server_cache.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,23 @@
use crate::errors::BeaconChainError;
use crate::{metrics, BeaconChainTypes, BeaconStore};
use parking_lot::{Mutex, RwLock};
use safe_arith::SafeArith;
use slog::{debug, Logger};
use ssz::Decode;
use ssz::Encode;
use ssz_types::FixedVector;
use std::num::NonZeroUsize;
use types::light_client_update::{FinalizedRootProofLen, FINALIZED_ROOT_INDEX};
use std::sync::Arc;
use store::DBColumn;
use store::KeyValueStore;
use types::light_client_update::{
FinalizedRootProofLen, NextSyncCommitteeProofLen, FINALIZED_ROOT_INDEX,
NEXT_SYNC_COMMITTEE_INDEX,
};
use types::non_zero_usize::new_non_zero_usize;
use types::{
BeaconBlockRef, BeaconState, ChainSpec, EthSpec, ForkName, Hash256, LightClientFinalityUpdate,
LightClientOptimisticUpdate, Slot, SyncAggregate,
LightClientOptimisticUpdate, LightClientUpdate, Slot, SyncAggregate, SyncCommittee,
};

/// A prev block cache miss requires to re-generate the state of the post-parent block. Items in the
Expand All @@ -30,22 +39,25 @@ pub struct LightClientServerCache<T: BeaconChainTypes> {
latest_finality_update: RwLock<Option<LightClientFinalityUpdate<T::EthSpec>>>,
/// Tracks a single global latest optimistic update out of all imported blocks.
latest_optimistic_update: RwLock<Option<LightClientOptimisticUpdate<T::EthSpec>>>,
/// Caches the most recent light client update
latest_light_client_update: RwLock<Option<LightClientUpdate<T::EthSpec>>>,
/// Caches state proofs by block root
prev_block_cache: Mutex<lru::LruCache<Hash256, LightClientCachedData>>,
prev_block_cache: Mutex<lru::LruCache<Hash256, LightClientCachedData<T::EthSpec>>>,
}

impl<T: BeaconChainTypes> LightClientServerCache<T> {
pub fn new() -> Self {
Self {
latest_finality_update: None.into(),
latest_optimistic_update: None.into(),
latest_light_client_update: None.into(),
prev_block_cache: lru::LruCache::new(PREV_BLOCK_CACHE_SIZE).into(),
}
}

/// Compute and cache state proofs for latter production of light-client messages. Does not
/// trigger block replay.
pub fn cache_state_data(
pub(crate) fn cache_state_data(
&self,
spec: &ChainSpec,
block: BeaconBlockRef<T::EthSpec>,
Expand All @@ -67,13 +79,13 @@ impl<T: BeaconChainTypes> LightClientServerCache<T> {
Ok(())
}

/// Given a block with a SyncAggregte computes better or more recent light client updates. The
/// Given a block with a SyncAggregate computes better or more recent light client updates. The
/// results are cached either on disk or memory to be served via p2p and rest API
pub fn recompute_and_cache_updates(
&self,
store: BeaconStore<T>,
block_parent_root: &Hash256,
block_slot: Slot,
block_parent_root: &Hash256,
sync_aggregate: &SyncAggregate<T::EthSpec>,
log: &Logger,
chain_spec: &ChainSpec,
Expand All @@ -100,11 +112,17 @@ impl<T: BeaconChainTypes> LightClientServerCache<T> {

let attested_slot = attested_block.slot();

let maybe_finalized_block = store.get_blinded_block(&cached_parts.finalized_block_root)?;

let sync_period = block_slot
.epoch(T::EthSpec::slots_per_epoch())
.sync_committee_period(chain_spec)?;

// Spec: Full nodes SHOULD provide the LightClientOptimisticUpdate with the highest
// attested_header.beacon.slot (if multiple, highest signature_slot) as selected by fork choice
let is_latest_optimistic = match &self.latest_optimistic_update.read().clone() {
Some(latest_optimistic_update) => {
is_latest_optimistic_update(latest_optimistic_update, attested_slot, signature_slot)
latest_optimistic_update.is_latest(attested_slot, signature_slot)
}
None => true,
};
Expand All @@ -122,18 +140,17 @@ impl<T: BeaconChainTypes> LightClientServerCache<T> {
// attested_header.beacon.slot (if multiple, highest signature_slot) as selected by fork choice
let is_latest_finality = match &self.latest_finality_update.read().clone() {
Some(latest_finality_update) => {
is_latest_finality_update(latest_finality_update, attested_slot, signature_slot)
latest_finality_update.is_latest(attested_slot, signature_slot)
}
None => true,
};

if is_latest_finality & !cached_parts.finalized_block_root.is_zero() {
// Immediately after checkpoint sync the finalized block may not be available yet.
if let Some(finalized_block) =
store.get_blinded_block(&cached_parts.finalized_block_root)?
{
if let Some(finalized_block) = maybe_finalized_block.as_ref() {
*self.latest_finality_update.write() = Some(LightClientFinalityUpdate::new(
&attested_block,
&finalized_block,
finalized_block,
cached_parts.finality_branch.clone(),
sync_aggregate.clone(),
signature_slot,
Expand All @@ -148,9 +165,142 @@ impl<T: BeaconChainTypes> LightClientServerCache<T> {
}
}

let new_light_client_update = LightClientUpdate::new(
sync_aggregate,
block_slot,
cached_parts.next_sync_committee,
cached_parts.next_sync_committee_branch,
cached_parts.finality_branch,
&attested_block,
maybe_finalized_block.as_ref(),
chain_spec,
)?;

// Spec: Full nodes SHOULD provide the best derivable LightClientUpdate (according to is_better_update)
// for each sync committee period
let prev_light_client_update = match &self.latest_light_client_update.read().clone() {
Some(prev_light_client_update) => Some(prev_light_client_update.clone()),
None => self.get_light_client_update(&store, sync_period, chain_spec)?,
};

let should_persist_light_client_update =
if let Some(prev_light_client_update) = prev_light_client_update {
let prev_sync_period = prev_light_client_update
.signature_slot()
.epoch(T::EthSpec::slots_per_epoch())
.sync_committee_period(chain_spec)?;

if sync_period != prev_sync_period {
true
} else {
prev_light_client_update
.is_better_light_client_update(&new_light_client_update, chain_spec)?
}
} else {
true
};

if should_persist_light_client_update {
self.store_light_client_update(&store, sync_period, &new_light_client_update)?;
}

Ok(())
}

fn store_light_client_update(
&self,
store: &BeaconStore<T>,
sync_committee_period: u64,
light_client_update: &LightClientUpdate<T::EthSpec>,
) -> Result<(), BeaconChainError> {
let column = DBColumn::LightClientUpdate;

store.hot_db.put_bytes(
column.into(),
&sync_committee_period.to_le_bytes(),
&light_client_update.as_ssz_bytes(),
)?;

*self.latest_light_client_update.write() = Some(light_client_update.clone());

Ok(())
}

// Used to fetch the most recently persisted "best" light client update.
// Should not be used outside the light client server, as it also caches the fetched
// light client update.
fn get_light_client_update(
&self,
store: &BeaconStore<T>,
sync_committee_period: u64,
chain_spec: &ChainSpec,
) -> Result<Option<LightClientUpdate<T::EthSpec>>, BeaconChainError> {
if let Some(latest_light_client_update) = self.latest_light_client_update.read().clone() {
let latest_lc_update_sync_committee_period = latest_light_client_update
.signature_slot()
.epoch(T::EthSpec::slots_per_epoch())
.sync_committee_period(chain_spec)?;
if latest_lc_update_sync_committee_period == sync_committee_period {
return Ok(Some(latest_light_client_update));
}
}

let column = DBColumn::LightClientUpdate;
let res = store
.hot_db
.get_bytes(column.into(), &sync_committee_period.to_le_bytes())?;

if let Some(light_client_update_bytes) = res {
let epoch = sync_committee_period
.safe_mul(chain_spec.epochs_per_sync_committee_period.into())?;

let fork_name = chain_spec.fork_name_at_epoch(epoch.into());

let light_client_update =
LightClientUpdate::from_ssz_bytes(&light_client_update_bytes, &fork_name)
.map_err(store::errors::Error::SszDecodeError)?;

*self.latest_light_client_update.write() = Some(light_client_update.clone());
return Ok(Some(light_client_update));
}

Ok(None)
}

pub fn get_light_client_updates(
&self,
store: &BeaconStore<T>,
start_period: u64,
count: u64,
chain_spec: &ChainSpec,
) -> Result<Vec<LightClientUpdate<T::EthSpec>>, BeaconChainError> {
let column = DBColumn::LightClientUpdate;
let mut light_client_updates = vec![];
for res in store
.hot_db
.iter_column_from::<Vec<u8>>(column, &start_period.to_le_bytes())
{
let (sync_committee_bytes, light_client_update_bytes) = res?;
let sync_committee_period = u64::from_ssz_bytes(&sync_committee_bytes)
.map_err(store::errors::Error::SszDecodeError)?;
let epoch = sync_committee_period
.safe_mul(chain_spec.epochs_per_sync_committee_period.into())?;

let fork_name = chain_spec.fork_name_at_epoch(epoch.into());

let light_client_update =
LightClientUpdate::from_ssz_bytes(&light_client_update_bytes, &fork_name)
.map_err(store::errors::Error::SszDecodeError)?;

light_client_updates.push(light_client_update);

if sync_committee_period >= start_period + count {
break;
}
}
Ok(light_client_updates)
}

/// Retrieves prev block cached data from cache. If not present re-computes by retrieving the
/// parent state, and inserts an entry to the cache.
///
Expand All @@ -161,7 +311,7 @@ impl<T: BeaconChainTypes> LightClientServerCache<T> {
block_root: &Hash256,
block_state_root: &Hash256,
block_slot: Slot,
) -> Result<LightClientCachedData, BeaconChainError> {
) -> Result<LightClientCachedData<T::EthSpec>, BeaconChainError> {
// Attempt to get the value from the cache first.
if let Some(cached_parts) = self.prev_block_cache.lock().get(block_root) {
return Ok(cached_parts.clone());
Expand Down Expand Up @@ -199,52 +349,25 @@ impl<T: BeaconChainTypes> Default for LightClientServerCache<T> {
}

type FinalityBranch = FixedVector<Hash256, FinalizedRootProofLen>;
type NextSyncCommitteeBranch = FixedVector<Hash256, NextSyncCommitteeProofLen>;

#[derive(Clone)]
struct LightClientCachedData {
struct LightClientCachedData<E: EthSpec> {
finality_branch: FinalityBranch,
next_sync_committee_branch: NextSyncCommitteeBranch,
next_sync_committee: Arc<SyncCommittee<E>>,
finalized_block_root: Hash256,
}

impl LightClientCachedData {
fn from_state<E: EthSpec>(state: &mut BeaconState<E>) -> Result<Self, BeaconChainError> {
impl<E: EthSpec> LightClientCachedData<E> {
fn from_state(state: &mut BeaconState<E>) -> Result<Self, BeaconChainError> {
Ok(Self {
finality_branch: state.compute_merkle_proof(FINALIZED_ROOT_INDEX)?.into(),
next_sync_committee: state.next_sync_committee()?.clone(),
next_sync_committee_branch: state
.compute_merkle_proof(NEXT_SYNC_COMMITTEE_INDEX)?
.into(),
finalized_block_root: state.finalized_checkpoint().root,
})
}
}

// Implements spec prioritization rules:
// > Full nodes SHOULD provide the LightClientFinalityUpdate with the highest attested_header.beacon.slot (if multiple, highest signature_slot)
//
// ref: https://github.com/ethereum/consensus-specs/blob/113c58f9bf9c08867f6f5f633c4d98e0364d612a/specs/altair/light-client/full-node.md#create_light_client_finality_update
fn is_latest_finality_update<E: EthSpec>(
prev: &LightClientFinalityUpdate<E>,
attested_slot: Slot,
signature_slot: Slot,
) -> bool {
let prev_slot = prev.get_attested_header_slot();
if attested_slot > prev_slot {
true
} else {
attested_slot == prev_slot && signature_slot > *prev.signature_slot()
}
}

// Implements spec prioritization rules:
// > Full nodes SHOULD provide the LightClientOptimisticUpdate with the highest attested_header.beacon.slot (if multiple, highest signature_slot)
//
// ref: https://github.com/ethereum/consensus-specs/blob/113c58f9bf9c08867f6f5f633c4d98e0364d612a/specs/altair/light-client/full-node.md#create_light_client_optimistic_update
fn is_latest_optimistic_update<E: EthSpec>(
prev: &LightClientOptimisticUpdate<E>,
attested_slot: Slot,
signature_slot: Slot,
) -> bool {
let prev_slot = prev.get_slot();
if attested_slot > prev_slot {
true
} else {
attested_slot == prev_slot && signature_slot > *prev.signature_slot()
}
}
Loading

0 comments on commit 235a484

Please sign in to comment.