Skip to content

Commit

Permalink
HDiff in the hot DB - squashed
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed Dec 24, 2024
1 parent 7e0cdde commit fbc62f2
Show file tree
Hide file tree
Showing 22 changed files with 1,492 additions and 927 deletions.
52 changes: 17 additions & 35 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use crate::events::ServerSentEventHandler;
use crate::execution_payload::{get_execution_payload, NotifyExecutionLayer, PreparePayloadHandle};
use crate::fork_choice_signal::{ForkChoiceSignalRx, ForkChoiceSignalTx, ForkChoiceWaitResult};
use crate::graffiti_calculator::GraffitiCalculator;
use crate::head_tracker::{HeadTracker, HeadTrackerReader, SszHeadTracker};
use crate::light_client_finality_update_verification::{
Error as LightClientFinalityUpdateError, VerifiedLightClientFinalityUpdate,
};
Expand Down Expand Up @@ -456,8 +455,6 @@ pub struct BeaconChain<T: BeaconChainTypes> {
/// A handler for events generated by the beacon chain. This is only initialized when the
/// HTTP server is enabled.
pub event_handler: Option<ServerSentEventHandler<T::EthSpec>>,
/// Used to track the heads of the beacon chain.
pub(crate) head_tracker: Arc<HeadTracker>,
/// Caches the attester shuffling for a given epoch and shuffling key root.
pub shuffling_cache: RwLock<ShufflingCache>,
/// A cache of eth1 deposit data at epoch boundaries for deposit finalization
Expand Down Expand Up @@ -620,48 +617,30 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

let _head_timer = metrics::start_timer(&metrics::PERSIST_HEAD);

// Hold a lock to head_tracker until it has been persisted to disk. Otherwise there's a race
// condition with the pruning thread which can result in a block present in the head tracker
// but absent in the DB. This inconsistency halts pruning and dramastically increases disk
// size. Ref: https://github.com/sigp/lighthouse/issues/4773
let head_tracker = self.head_tracker.0.read();
batch.push(self.persist_head_in_batch(&head_tracker));

let _fork_choice_timer = metrics::start_timer(&metrics::PERSIST_FORK_CHOICE);
batch.push(self.persist_fork_choice_in_batch());

self.store.hot_db.do_atomically(batch)?;
drop(head_tracker);

Ok(())
}

/// Return a `PersistedBeaconChain` without reference to a `BeaconChain`.
pub fn make_persisted_head(
genesis_block_root: Hash256,
head_tracker_reader: &HeadTrackerReader,
) -> PersistedBeaconChain {
pub fn make_persisted_head(genesis_block_root: Hash256) -> PersistedBeaconChain {
PersistedBeaconChain {
_canonical_head_block_root: DUMMY_CANONICAL_HEAD_BLOCK_ROOT,
genesis_block_root,
ssz_head_tracker: SszHeadTracker::from_map(head_tracker_reader),
ssz_head_tracker: <_>::default(),
}
}

/// Return a database operation for writing the beacon chain head to disk.
pub fn persist_head_in_batch(
&self,
head_tracker_reader: &HeadTrackerReader,
) -> KeyValueStoreOp {
Self::persist_head_in_batch_standalone(self.genesis_block_root, head_tracker_reader)
pub fn persist_head_in_batch(&self) -> KeyValueStoreOp {
Self::persist_head_in_batch_standalone(self.genesis_block_root)
}

pub fn persist_head_in_batch_standalone(
genesis_block_root: Hash256,
head_tracker_reader: &HeadTrackerReader,
) -> KeyValueStoreOp {
Self::make_persisted_head(genesis_block_root, head_tracker_reader)
.as_kv_store_op(BEACON_CHAIN_DB_KEY)
pub fn persist_head_in_batch_standalone(genesis_block_root: Hash256) -> KeyValueStoreOp {
Self::make_persisted_head(genesis_block_root).as_kv_store_op(BEACON_CHAIN_DB_KEY)
}

/// Load fork choice from disk, returning `None` if it isn't found.
Expand Down Expand Up @@ -1405,12 +1384,21 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
///
/// Returns `(block_root, block_slot)`.
pub fn heads(&self) -> Vec<(Hash256, Slot)> {
self.head_tracker.heads()
let head_slot = self.canonical_head.cached_head().head_slot();
self.canonical_head
.fork_choice_read_lock()
.proto_array()
.viable_heads::<T::EthSpec>(head_slot)
.iter()
.map(|node| (node.root, node.slot))
.collect()
}

/// Only used in tests.
pub fn knows_head(&self, block_hash: &SignedBeaconBlockHash) -> bool {
self.head_tracker.contains_head((*block_hash).into())
self.heads()
.iter()
.any(|head| head.0 == Into::<Hash256>::into(*block_hash))
}

/// Returns the `BeaconState` at the given slot.
Expand Down Expand Up @@ -3989,9 +3977,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// about it.
let block_time_imported = timestamp_now();

let parent_root = block.parent_root();
let slot = block.slot();

let current_eth1_finalization_data = Eth1FinalizationData {
eth1_data: state.eth1_data().clone(),
eth1_deposit_index: state.eth1_deposit_index(),
Expand All @@ -4012,9 +3997,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
});
}

self.head_tracker
.register_block(block_root, parent_root, slot);

metrics::stop_timer(db_write_timer);

metrics::inc_counter(&metrics::BLOCK_PROCESSING_SUCCESSES);
Expand Down
85 changes: 41 additions & 44 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ use std::fmt::Debug;
use std::fs;
use std::io::Write;
use std::sync::Arc;
use store::{Error as DBError, HotStateSummary, KeyValueStore, StoreOp};
use store::{Error as DBError, KeyValueStore, StoreOp};
use strum::AsRefStr;
use task_executor::JoinHandle;
use types::{
Expand Down Expand Up @@ -1455,52 +1455,49 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {

let distance = block.slot().as_u64().saturating_sub(state.slot().as_u64());
for _ in 0..distance {
let state_root = if parent.beacon_block.slot() == state.slot() {
// If it happens that `pre_state` has *not* already been advanced forward a single
// slot, then there is no need to compute the state root for this
// `per_slot_processing` call since that state root is already stored in the parent
// block.
parent.beacon_block.state_root()
} else {
// This is a new state we've reached, so stage it for storage in the DB.
// Computing the state root here is time-equivalent to computing it during slot
// processing, but we get early access to it.
let state_root = state.update_tree_hash_cache()?;

// Store the state immediately, marking it as temporary, and staging the deletion
// of its temporary status as part of the larger atomic operation.
let txn_lock = chain.store.hot_db.begin_rw_transaction();
let state_already_exists =
chain.store.load_hot_state_summary(&state_root)?.is_some();

let state_batch = if state_already_exists {
// If the state exists, it could be temporary or permanent, but in neither case
// should we rewrite it or store a new temporary flag for it. We *will* stage
// the temporary flag for deletion because it's OK to double-delete the flag,
// and we don't mind if another thread gets there first.
vec![]
let state_root =
if parent.beacon_block.slot() == state.slot() {
// If it happens that `pre_state` has *not* already been advanced forward a single
// slot, then there is no need to compute the state root for this
// `per_slot_processing` call since that state root is already stored in the parent
// block.
parent.beacon_block.state_root()
} else {
vec![
if state.slot() % T::EthSpec::slots_per_epoch() == 0 {
StoreOp::PutState(state_root, &state)
} else {
StoreOp::PutStateSummary(
state_root,
HotStateSummary::new(&state_root, &state)?,
)
},
StoreOp::PutStateTemporaryFlag(state_root),
]
};
chain
.store
.do_atomically_with_block_and_blobs_cache(state_batch)?;
drop(txn_lock);
// This is a new state we've reached, so stage it for storage in the DB.
// Computing the state root here is time-equivalent to computing it during slot
// processing, but we get early access to it.
let state_root = state.update_tree_hash_cache()?;

// Store the state immediately, marking it as temporary, and staging the deletion
// of its temporary status as part of the larger atomic operation.
// TODO(hdiff): Is it necessary to do this read tx now? Also why is it necessary to
// check that the summary exists at all? Are double writes common? Can this txn
// lock deadlock with the `do_atomically` call?
let txn_lock = chain.store.hot_db.begin_rw_transaction();
let state_already_exists =
chain.store.load_hot_state_summary(&state_root)?.is_some();

if state_already_exists {
// If the state exists, it could be temporary or permanent, but in neither case
// should we rewrite it or store a new temporary flag for it. We *will* stage
// the temporary flag for deletion because it's OK to double-delete the flag,
// and we don't mind if another thread gets there first.
} else {
let mut ops = vec![];
// Recycle store codepath to create a state summary and store the state / diff
chain.store.store_hot_state(&state_root, &state, &mut ops)?;
// Additionally write a temporary flag as part of the atomic write
ops.extend(chain.store.convert_to_kv_batch(vec![
StoreOp::PutStateTemporaryFlag(state_root),
])?);
chain.store.hot_db.do_atomically(ops)?;
}
drop(txn_lock);

confirmed_state_roots.push(state_root);
confirmed_state_roots.push(state_root);

state_root
};
state_root
};

if let Some(summary) = per_slot_processing(&mut state, Some(state_root), &chain.spec)? {
// Expose Prometheus metrics.
Expand Down
Loading

0 comments on commit fbc62f2

Please sign in to comment.