Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store changes to persist data columns #6073

Merged
merged 8 commits into from
Aug 2, 2024
Merged
2 changes: 2 additions & 0 deletions beacon_node/store/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub enum Error {
AnchorInfoConcurrentMutation,
/// The store's `blob_info` was mutated concurrently, the latest modification wasn't applied.
BlobInfoConcurrentMutation,
/// The store's `data_column_info` was mutated concurrently, the latest modification wasn't applied.
DataColumnInfoConcurrentMutation,
/// The block or state is unavailable due to weak subjectivity sync.
HistoryUnavailable,
/// State reconstruction cannot commence because not all historic blocks are known.
Expand Down
278 changes: 264 additions & 14 deletions beacon_node/store/src/hot_cold_store.rs

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions beacon_node/store/src/leveldb_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,10 @@ impl db_key::Key for BytesKey {
}

impl BytesKey {
pub fn starts_with(&self, prefix: &Self) -> bool {
self.key.starts_with(&prefix.key)
}

/// Return `true` iff this `BytesKey` was created with the given `column`.
pub fn matches_column(&self, column: DBColumn) -> bool {
self.key.starts_with(column.as_bytes())
Expand Down
32 changes: 29 additions & 3 deletions beacon_node/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub use metrics::scrape_for_metrics;
use parking_lot::MutexGuard;
use std::sync::Arc;
use strum::{EnumString, IntoStaticStr};
use types::data_column_sidecar::{ColumnIndex, DataColumnSidecarList};
pub use types::*;

pub type ColumnIter<'a, K> = Box<dyn Iterator<Item = Result<(K, Vec<u8>), Error>> + 'a>;
Expand Down Expand Up @@ -109,9 +110,7 @@ pub trait KeyValueStore<E: EthSpec>: Sync + Send + Sized + 'static {
Box::new(std::iter::empty())
}

fn iter_raw_keys(&self, _column: DBColumn, _prefix: &[u8]) -> RawKeyIter {
Box::new(std::iter::empty())
}
fn iter_raw_keys(&self, column: DBColumn, prefix: &[u8]) -> RawKeyIter;

/// Iterate through all keys in a particular column.
fn iter_column_keys<K: Key>(&self, column: DBColumn) -> ColumnKeyIter<K>;
Expand Down Expand Up @@ -143,6 +142,28 @@ pub fn get_key_for_col(column: &str, key: &[u8]) -> Vec<u8> {
result
}

pub fn get_data_column_key(block_root: &Hash256, column_index: &ColumnIndex) -> Vec<u8> {
let mut result = block_root.as_bytes().to_vec();
result.extend_from_slice(&column_index.to_le_bytes());
result
}

pub fn parse_data_column_key(data: Vec<u8>) -> Result<(Hash256, ColumnIndex), Error> {
if data.len() != 32 + 8 {
michaelsproul marked this conversation as resolved.
Show resolved Hide resolved
return Err(Error::InvalidKey);
}
// split_at panics if 32 < 40 which will never happen after the length check above
let (block_root_bytes, column_index_bytes) = data.split_at(32);
let block_root = Hash256::from_slice(block_root_bytes);
// column_index_bytes is asserted to be 8 bytes after the length check above
let column_index = ColumnIndex::from_le_bytes(
column_index_bytes
.try_into()
.expect("slice with incorrect length"),
);
Ok((block_root, column_index))
}

#[must_use]
#[derive(Clone)]
pub enum KeyValueStoreOp {
Expand Down Expand Up @@ -203,11 +224,13 @@ pub enum StoreOp<'a, E: EthSpec> {
PutBlock(Hash256, Arc<SignedBeaconBlock<E>>),
PutState(Hash256, &'a BeaconState<E>),
PutBlobs(Hash256, BlobSidecarList<E>),
PutDataColumns(Hash256, DataColumnSidecarList<E>),
PutStateSummary(Hash256, HotStateSummary),
PutStateTemporaryFlag(Hash256),
DeleteStateTemporaryFlag(Hash256),
DeleteBlock(Hash256),
DeleteBlobs(Hash256),
DeleteDataColumns(Hash256, Vec<ColumnIndex>),
DeleteState(Hash256, Option<Slot>),
DeleteExecutionPayload(Hash256),
KeyValueOp(KeyValueStoreOp),
Expand All @@ -223,6 +246,8 @@ pub enum DBColumn {
BeaconBlock,
#[strum(serialize = "blb")]
BeaconBlob,
#[strum(serialize = "bdc")]
BeaconDataColumn,
/// For full `BeaconState`s in the hot database (finalized or fork-boundary states).
#[strum(serialize = "ste")]
BeaconState,
Expand Down Expand Up @@ -310,6 +335,7 @@ impl DBColumn {
| Self::BeaconHistoricalRoots
| Self::BeaconHistoricalSummaries
| Self::BeaconRandaoMixes => 8,
Self::BeaconDataColumn => 32 + 8,
}
}
}
Expand Down
14 changes: 13 additions & 1 deletion beacon_node/store/src/memory_store.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
get_key_for_col, leveldb_store::BytesKey, ColumnIter, ColumnKeyIter, DBColumn, Error,
ItemStore, Key, KeyValueStore, KeyValueStoreOp,
ItemStore, Key, KeyValueStore, KeyValueStoreOp, RawKeyIter,
};
use parking_lot::{Mutex, MutexGuard, RwLock};
use std::collections::BTreeMap;
Expand Down Expand Up @@ -100,6 +100,18 @@ impl<E: EthSpec> KeyValueStore<E> for MemoryStore<E> {
}))
}

fn iter_raw_keys(&self, column: DBColumn, prefix: &[u8]) -> RawKeyIter {
let start_key = BytesKey::from_vec(get_key_for_col(column.as_str(), prefix));
let keys = self
.db
.read()
.range(start_key.clone()..)
.take_while(|(k, _)| k.starts_with(&start_key))
.filter_map(|(k, _)| k.remove_column_variable(column).map(|k| k.to_vec()))
.collect::<Vec<_>>();
Box::new(keys.into_iter().map(Ok))
}

fn iter_column_keys<K: Key>(&self, column: DBColumn) -> ColumnKeyIter<K> {
Box::new(self.iter_column(column).map(|res| res.map(|(k, _)| k)))
}
Expand Down
28 changes: 28 additions & 0 deletions beacon_node/store/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub const PRUNING_CHECKPOINT_KEY: Hash256 = Hash256::repeat_byte(3);
pub const COMPACTION_TIMESTAMP_KEY: Hash256 = Hash256::repeat_byte(4);
pub const ANCHOR_INFO_KEY: Hash256 = Hash256::repeat_byte(5);
pub const BLOB_INFO_KEY: Hash256 = Hash256::repeat_byte(6);
pub const DATA_COLUMN_INFO_KEY: Hash256 = Hash256::repeat_byte(7);

/// State upper limit value used to indicate that a node is not storing historic states.
pub const STATE_UPPER_LIMIT_NO_RETAIN: Slot = Slot::new(u64::MAX);
Expand Down Expand Up @@ -152,3 +153,30 @@ impl StoreItem for BlobInfo {
Ok(Self::from_ssz_bytes(bytes)?)
}
}

/// Database parameters relevant to data column sync.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode, Serialize, Deserialize, Default)]
pub struct DataColumnInfo {
/// The slot after which data columns are or *will be* available (>=).
///
/// If this slot is in the future, then it is the first slot of the EIP-7594 fork, from which
/// data columns will be available.
///
/// If the `oldest_data_column_slot` is `None` then this means that the EIP-7594 fork epoch is
/// not yet known.
pub oldest_data_column_slot: Option<Slot>,
}

impl StoreItem for DataColumnInfo {
fn db_column() -> DBColumn {
DBColumn::BeaconMeta
}

fn as_store_bytes(&self) -> Vec<u8> {
self.as_ssz_bytes()
}

fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {
Ok(Self::from_ssz_bytes(bytes)?)
}
}
4 changes: 4 additions & 0 deletions beacon_node/store/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ lazy_static! {
"store_beacon_blobs_cache_hit_total",
"Number of hits to the store's blob cache"
);
pub static ref BEACON_DATA_COLUMNS_CACHE_HIT_COUNT: Result<IntCounter> = try_create_int_counter(
"store_beacon_data_columns_cache_hit_total",
"Number of hits to the store's data column cache"
);
}

/// Updates the global metrics registry with store-related information.
Expand Down
20 changes: 20 additions & 0 deletions consensus/types/src/chain_spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ pub struct ChainSpec {
/*
* DAS params
*/
pub eip7594_fork_epoch: Option<Epoch>,
pub number_of_columns: usize,

/*
Expand Down Expand Up @@ -392,6 +393,13 @@ impl ChainSpec {
}
}

/// Returns true if the given epoch is greater than or equal to the `EIP7594_FORK_EPOCH`.
pub fn is_peer_das_enabled_for_epoch(&self, block_epoch: Epoch) -> bool {
self.eip7594_fork_epoch.map_or(false, |eip7594_fork_epoch| {
block_epoch >= eip7594_fork_epoch
})
}

/// For a given `BeaconState`, return the whistleblower reward quotient associated with its variant.
pub fn whistleblower_reward_quotient_for_state<E: EthSpec>(
&self,
Expand Down Expand Up @@ -777,6 +785,10 @@ impl ChainSpec {
})
.expect("calculation does not overflow"),

/*
* DAS params
*/
eip7594_fork_epoch: None,
number_of_columns: 128,

/*
Expand Down Expand Up @@ -880,6 +892,10 @@ impl ChainSpec {
electra_fork_epoch: None,
max_pending_partials_per_withdrawals_sweep: u64::checked_pow(2, 0)
.expect("pow does not overflow"),
/*
* DAS params
*/
eip7594_fork_epoch: None,
// Other
network_id: 2, // lighthouse testnet network id
deposit_chain_id: 5,
Expand Down Expand Up @@ -1081,6 +1097,10 @@ impl ChainSpec {
})
.expect("calculation does not overflow"),

/*
* DAS params
*/
eip7594_fork_epoch: None,
number_of_columns: 128,

/*
Expand Down