Skip to content

Commit

Permalink
fix(fork-network): fix delayed receipt handling (#12798)
Browse files Browse the repository at this point in the history
There are a few problems with the way the fork-network tool handles
delayed receipts that we want to write to the altered DB:

1. We were trying to parse them as a `Receipt` when now they're stored
as `ReceiptOrStateStoredReceipt`
2. We were deleting delayed receipts (to write them again modified) with
a `TrieKey::DelayedReceipt { index }` starting from 0 and incrementing
by 1 each time. But the first one we see probably doesn't start at index
0, since it'll be at wherever the indices are in the source chain
(probably mainnet)
3. We were assuming the trie iter will return the receipts in their
queued order, which is not the case. So we're storing them in the end
possibly in the wrong order

So here we fix all 3. We fix no. 2 by just removing trie items with the
raw key we already have. And we fix no. 3 by adding an "index" field to
the delayed receipt `StateRecord` in a backwards-compatible way, and
then we write the same indices we read, and make sure to write a correct
value for the `TrieKey::DelayedReceiptIndices` key at the end. While
we're at it, we also fix no. 3 for the genesis startup mechanism that
uses serialized `StateRecords` in `GenesisStateApplier::apply()`
  • Loading branch information
marcelo-gonzalez authored Jan 30, 2025
1 parent 7a4d954 commit 94513ac
Show file tree
Hide file tree
Showing 13 changed files with 178 additions and 112 deletions.
49 changes: 31 additions & 18 deletions core/primitives/src/state_record.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use crate::account::{AccessKey, Account};
use crate::hash::{hash, CryptoHash};
use crate::receipt::{Receipt, ReceivedData};
use crate::receipt::{Receipt, ReceiptOrStateStoredReceipt, ReceivedData};
use crate::trie_key::trie_key_parsers::{
parse_account_id_from_access_key_key, parse_account_id_from_account_key,
parse_account_id_from_contract_code_key, parse_account_id_from_contract_data_key,
parse_account_id_from_received_data_key, parse_data_id_from_received_data_key,
parse_data_key_from_contract_data_key, parse_public_key_from_access_key_key,
parse_data_key_from_contract_data_key, parse_index_from_delayed_receipt_key,
parse_public_key_from_access_key_key,
};
use crate::trie_key::{col, TrieKey};
use crate::types::{AccountId, StoreKey, StoreValue};
Expand All @@ -15,6 +16,15 @@ use serde_with::base64::Base64;
use serde_with::serde_as;
use std::fmt::{Display, Formatter};

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq)]
pub struct DelayedReceipt {
#[serde(skip)]
pub index: Option<u64>,

#[serde(flatten)]
pub receipt: Box<Receipt>,
}

/// Record in the state storage.
#[serde_as]
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq)]
Expand Down Expand Up @@ -42,50 +52,50 @@ pub enum StateRecord {
},
/// Delayed Receipt.
/// The receipt was delayed because the shard was overwhelmed.
DelayedReceipt(Box<Receipt>),
DelayedReceipt(DelayedReceipt),
}

impl StateRecord {
/// NOTE: This function is not safe to be running during block production. It contains a lot
/// of `unwrap` and should only be used during `state_dump`.
/// Most `unwrap()` here are because the implementation of columns and data are internal and
/// can't be influenced by external calls.
pub fn from_raw_key_value(key: Vec<u8>, value: Vec<u8>) -> Option<StateRecord> {
pub fn from_raw_key_value(key: &[u8], value: Vec<u8>) -> Option<StateRecord> {
Self::from_raw_key_value_impl(key, value).unwrap_or(None)
}

pub fn from_raw_key_value_impl(
key: Vec<u8>,
key: &[u8],
value: Vec<u8>,
) -> Result<Option<StateRecord>, std::io::Error> {
Ok(match key[0] {
col::ACCOUNT => Some(StateRecord::Account {
account_id: parse_account_id_from_account_key(&key)?,
account_id: parse_account_id_from_account_key(key)?,
account: Account::try_from_slice(&value)?,
}),
col::CONTRACT_DATA => {
let account_id = parse_account_id_from_contract_data_key(&key)?;
let data_key = parse_data_key_from_contract_data_key(&key, &account_id)?;
let account_id = parse_account_id_from_contract_data_key(key)?;
let data_key = parse_data_key_from_contract_data_key(key, &account_id)?;
Some(StateRecord::Data {
account_id,
data_key: data_key.to_vec().into(),
value: value.into(),
})
}
col::CONTRACT_CODE => Some(StateRecord::Contract {
account_id: parse_account_id_from_contract_code_key(&key)?,
account_id: parse_account_id_from_contract_code_key(key)?,
code: value,
}),
col::ACCESS_KEY => {
let access_key = AccessKey::try_from_slice(&value)?;
let account_id = parse_account_id_from_access_key_key(&key)?;
let public_key = parse_public_key_from_access_key_key(&key, &account_id)?;
let account_id = parse_account_id_from_access_key_key(key)?;
let public_key = parse_public_key_from_access_key_key(key, &account_id)?;
Some(StateRecord::AccessKey { account_id, public_key, access_key })
}
col::RECEIVED_DATA => {
let data = ReceivedData::try_from_slice(&value)?.data;
let account_id = parse_account_id_from_received_data_key(&key)?;
let data_id = parse_data_id_from_received_data_key(&key, &account_id)?;
let account_id = parse_account_id_from_received_data_key(key)?;
let data_id = parse_data_id_from_received_data_key(key, &account_id)?;
Some(StateRecord::ReceivedData { account_id, data_id, data })
}
col::POSTPONED_RECEIPT_ID => None,
Expand All @@ -100,8 +110,12 @@ impl StateRecord {
None
}
col::DELAYED_RECEIPT_OR_INDICES => {
let receipt = Receipt::try_from_slice(&value)?;
Some(StateRecord::DelayedReceipt(Box::new(receipt)))
let receipt = ReceiptOrStateStoredReceipt::try_from_slice(&value)?.into_receipt();
let index = Some(parse_index_from_delayed_receipt_key(key)?);
Some(StateRecord::DelayedReceipt(DelayedReceipt {
index,
receipt: Box::new(receipt),
}))
}
_ => {
println!("key[0]: {} is unreachable", key[0]);
Expand Down Expand Up @@ -178,9 +192,8 @@ pub fn state_record_to_account_id(state_record: &StateRecord) -> &AccountId {
| StateRecord::Contract { account_id, .. }
| StateRecord::ReceivedData { account_id, .. }
| StateRecord::Data { account_id, .. } => account_id,
StateRecord::PostponedReceipt(receipt) | StateRecord::DelayedReceipt(receipt) => {
receipt.receiver_id()
}
StateRecord::PostponedReceipt(receipt) => receipt.receiver_id(),
StateRecord::DelayedReceipt(receipt) => receipt.receipt.receiver_id(),
}
}

Expand Down
16 changes: 15 additions & 1 deletion core/primitives/src/trie_key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,19 @@ pub mod trie_key_parsers {
}
}

pub fn parse_index_from_delayed_receipt_key(raw_key: &[u8]) -> Result<u64, std::io::Error> {
// The length of TrieKey::DelayedReceipt { .. } should be 9 since it's a single byte for the
// column and then 8 bytes for a u64 index.
if raw_key.len() != 9 {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("unexpected raw key len of {} for delayed receipt index", raw_key.len()),
));
}
let index = raw_key[1..9].try_into().unwrap();
Ok(u64::from_le_bytes(index))
}

pub fn parse_account_id_from_contract_code_key(
raw_key: &[u8],
) -> Result<AccountId, std::io::Error> {
Expand Down Expand Up @@ -856,9 +869,10 @@ mod tests {
let key = TrieKey::DelayedReceiptIndices;
let raw_key = key.to_vec();
assert!(trie_key_parsers::parse_account_id_from_raw_key(&raw_key).unwrap().is_none());
let key = TrieKey::DelayedReceipt { index: 0 };
let key = TrieKey::DelayedReceipt { index: 123 };
let raw_key = key.to_vec();
assert!(trie_key_parsers::parse_account_id_from_raw_key(&raw_key).unwrap().is_none());
assert_eq!(trie_key_parsers::parse_index_from_delayed_receipt_key(&raw_key).unwrap(), 123);
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion core/store/src/genesis/state_applier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ impl GenesisStateApplier {
})
}
StateRecord::DelayedReceipt(receipt) => storage.modify(|state_update| {
set_delayed_receipt(state_update, delayed_receipts_indices, &*receipt);
set_delayed_receipt(state_update, delayed_receipts_indices, &*receipt.receipt);
}),
}
});
Expand Down
4 changes: 2 additions & 2 deletions core/store/src/trie/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -907,7 +907,7 @@ impl Trie {
Ok(value) => String::from(value),
Err(_) => " ".repeat(key.len()),
};
let state_record = StateRecord::from_raw_key_value(key.clone(), value);
let state_record = StateRecord::from_raw_key_value(&key, value);

limit -= 1;
writeln!(f, "{} {state_record:?}", key_string).expect("write failed");
Expand Down Expand Up @@ -1015,7 +1015,7 @@ impl Trie {
assert!(remainder.is_empty());

if !Self::should_prune_view_trie(&leaf_key, record_type, from, to) {
let state_record = StateRecord::from_raw_key_value(leaf_key, bytes.to_vec());
let state_record = StateRecord::from_raw_key_value(&leaf_key, bytes.to_vec());

writeln!(
f,
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/src/test_loop/utils/sharding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub fn print_and_assert_shard_accounts(clients: &[&Client], tip: &Tip) {
let mut shard_accounts = vec![];
for item in trie.lock_for_iter().iter().unwrap() {
let (key, value) = item.unwrap();
let state_record = StateRecord::from_raw_key_value(key, value);
let state_record = StateRecord::from_raw_key_value(&key, value);
if let Some(StateRecord::Account { account_id, .. }) = state_record {
shard_accounts.push(account_id.to_string());
}
Expand Down
4 changes: 2 additions & 2 deletions nearcore/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ pub(crate) static STATE_SYNC_DUMP_EPOCH_HEIGHT: LazyLock<IntGaugeVec> = LazyLock
.unwrap()
});

fn log_trie_item(key: Vec<u8>, value: Vec<u8>) {
fn log_trie_item(key: &[u8], value: Vec<u8>) {
if !tracing::level_enabled!(tracing::Level::TRACE) {
return;
}
Expand Down Expand Up @@ -184,7 +184,7 @@ fn get_postponed_receipt_count_for_trie(trie: Trie) -> Result<i64, anyhow::Error
break;
}
count += 1;
log_trie_item(key, value);
log_trie_item(&key, value);
}
tracing::trace!(target: "metrics", "trie-stats - postponed receipt count {count}");
Ok(count)
Expand Down
24 changes: 14 additions & 10 deletions tools/fork-network/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ impl ForkNetworkCommand {
Ok((key, FlatStateValue::Inlined(value))) => (key, value),
otherwise => panic!("Unexpected flat state value: {otherwise:?}"),
};
if let Some(sr) = StateRecord::from_raw_key_value(key.clone(), value.clone()) {
if let Some(sr) = StateRecord::from_raw_key_value(&key, value.clone()) {
match sr {
StateRecord::AccessKey { account_id, public_key, access_key } => {
// TODO(eth-implicit) Change back to is_implicit() when ETH-implicit accounts are supported.
Expand All @@ -630,7 +630,7 @@ impl ForkNetworkCommand {
}
let new_account_id = map_account(&account_id, None);
let replacement = map_key(&public_key, None);
storage_mutator.delete_access_key(account_id, public_key)?;
storage_mutator.remove(key)?;
storage_mutator.set_access_key(
new_account_id,
replacement.public_key(),
Expand All @@ -643,7 +643,7 @@ impl ForkNetworkCommand {
// TODO(eth-implicit) Change back to is_implicit() when ETH-implicit accounts are supported.
if account_id.get_account_type() == AccountType::NearImplicitAccount {
let new_account_id = map_account(&account_id, None);
storage_mutator.delete_account(account_id)?;
storage_mutator.remove(key)?;
storage_mutator.set_account(new_account_id, account)?;
accounts_implicit_updated += 1;
}
Expand All @@ -652,7 +652,7 @@ impl ForkNetworkCommand {
// TODO(eth-implicit) Change back to is_implicit() when ETH-implicit accounts are supported.
if account_id.get_account_type() == AccountType::NearImplicitAccount {
let new_account_id = map_account(&account_id, None);
storage_mutator.delete_data(account_id, &data_key)?;
storage_mutator.remove(key)?;
storage_mutator.set_data(new_account_id, &data_key, value)?;
contract_data_updated += 1;
}
Expand All @@ -661,13 +661,13 @@ impl ForkNetworkCommand {
// TODO(eth-implicit) Change back to is_implicit() when ETH-implicit accounts are supported.
if account_id.get_account_type() == AccountType::NearImplicitAccount {
let new_account_id = map_account(&account_id, None);
storage_mutator.delete_code(account_id)?;
storage_mutator.remove(key)?;
storage_mutator.set_code(new_account_id, code)?;
contract_code_updated += 1;
}
}
StateRecord::PostponedReceipt(mut receipt) => {
storage_mutator.delete_postponed_receipt(&receipt)?;
storage_mutator.remove(key)?;
near_mirror::genesis::map_receipt(&mut receipt, None, &default_key);
storage_mutator.set_postponed_receipt(&receipt)?;
postponed_receipts_updated += 1;
Expand All @@ -676,15 +676,18 @@ impl ForkNetworkCommand {
// TODO(eth-implicit) Change back to is_implicit() when ETH-implicit accounts are supported.
if account_id.get_account_type() == AccountType::NearImplicitAccount {
let new_account_id = map_account(&account_id, None);
storage_mutator.delete_received_data(account_id, data_id)?;
storage_mutator.remove(key)?;
storage_mutator.set_received_data(new_account_id, data_id, &data)?;
received_data_updated += 1;
}
}
StateRecord::DelayedReceipt(mut receipt) => {
storage_mutator.delete_delayed_receipt(index_delayed_receipt)?;
near_mirror::genesis::map_receipt(&mut receipt, None, &default_key);
storage_mutator.set_delayed_receipt(index_delayed_receipt, &receipt)?;
storage_mutator.remove(key)?;
near_mirror::genesis::map_receipt(&mut receipt.receipt, None, &default_key);
// The index is guaranteed to be set when iterating over the trie rather than reading
// serialized StateRecords
let index = receipt.index.unwrap();
storage_mutator.set_delayed_receipt(index, &receipt.receipt)?;
index_delayed_receipt += 1;
}
}
Expand Down Expand Up @@ -785,6 +788,7 @@ impl ForkNetworkCommand {
}
tracing::info!(?shard_uid, num_accounts, num_added, "Pass 2 done");

storage_mutator.set_delayed_receipt_indices()?;
let state_root = storage_mutator.commit(&shard_uid, fake_block_height)?;

tracing::info!(?shard_uid, "Commit done");
Expand Down
Loading

0 comments on commit 94513ac

Please sign in to comment.