Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[store] ChainStoreUpdate remove dependency on reading tentatively updated values #12162

Draft
wants to merge 14 commits into
base: master
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
test update access
  • Loading branch information
shreyan-gupta committed Sep 27, 2024
commit 7471c0551be7aff6ae8abab315cc74382ae3c1e2
94 changes: 79 additions & 15 deletions chain/chain/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1160,6 +1160,7 @@ impl<'a> ChainStoreAccess for ChainStoreUpdate<'a> {
/// The chain head.
fn head(&self) -> Result<Tip, Error> {
if let Some(head) = &self.head {
assert_eq!(*head, self.chain_store.head().unwrap());
Ok(head.clone())
} else {
self.chain_store.head()
Expand All @@ -1169,6 +1170,7 @@ impl<'a> ChainStoreAccess for ChainStoreUpdate<'a> {
/// The chain Block Tail height, used by GC.
fn tail(&self) -> Result<BlockHeight, Error> {
if let Some(tail) = &self.tail {
assert_eq!(*tail, self.chain_store.tail().unwrap());
Ok(*tail)
} else {
self.chain_store.tail()
Expand All @@ -1178,6 +1180,7 @@ impl<'a> ChainStoreAccess for ChainStoreUpdate<'a> {
/// The chain Chunks Tail height, used by GC.
fn chunk_tail(&self) -> Result<BlockHeight, Error> {
if let Some(chunk_tail) = &self.chunk_tail {
assert_eq!(*chunk_tail, self.chain_store.chunk_tail().unwrap());
Ok(*chunk_tail)
} else {
self.chain_store.chunk_tail()
Expand All @@ -1187,6 +1190,7 @@ impl<'a> ChainStoreAccess for ChainStoreUpdate<'a> {
/// Fork tail used by GC
fn fork_tail(&self) -> Result<BlockHeight, Error> {
if let Some(fork_tail) = &self.fork_tail {
assert_eq!(*fork_tail, self.chain_store.fork_tail().unwrap());
Ok(*fork_tail)
} else {
self.chain_store.fork_tail()
Expand All @@ -1196,6 +1200,7 @@ impl<'a> ChainStoreAccess for ChainStoreUpdate<'a> {
/// Head of the header chain (not the same thing as head_header).
fn header_head(&self) -> Result<Tip, Error> {
if let Some(header_head) = &self.header_head {
assert_eq!(*header_head, self.chain_store.header_head().unwrap());
Ok(header_head.clone())
} else {
self.chain_store.header_head()
Expand All @@ -1204,6 +1209,7 @@ impl<'a> ChainStoreAccess for ChainStoreUpdate<'a> {

fn final_head(&self) -> Result<Tip, Error> {
if let Some(final_head) = self.final_head.as_ref() {
assert_eq!(*final_head, self.chain_store.final_head().unwrap());
Ok(final_head.clone())
} else {
self.chain_store.final_head()
Expand All @@ -1212,6 +1218,7 @@ impl<'a> ChainStoreAccess for ChainStoreUpdate<'a> {

fn largest_target_height(&self) -> Result<BlockHeight, Error> {
if let Some(largest_target_height) = &self.largest_target_height {
assert_eq!(*largest_target_height, self.chain_store.largest_target_height().unwrap());
Ok(*largest_target_height)
} else {
self.chain_store.largest_target_height()
Expand All @@ -1226,6 +1233,7 @@ impl<'a> ChainStoreAccess for ChainStoreUpdate<'a> {
/// Get full block.
fn get_block(&self, h: &CryptoHash) -> Result<Block, Error> {
if let Some(block) = self.chain_store_cache_update.blocks.get(h) {
assert_eq!(*block, self.chain_store.get_block(h).unwrap());
Ok(block.clone())
} else {
self.chain_store.get_block(h)
Expand All @@ -1234,11 +1242,17 @@ impl<'a> ChainStoreAccess for ChainStoreUpdate<'a> {

/// Does this full block exist?
fn block_exists(&self, h: &CryptoHash) -> Result<bool, Error> {
if self.chain_store_cache_update.blocks.contains_key(h) {
assert!(self.chain_store.block_exists(h).unwrap());
}
Ok(self.chain_store_cache_update.blocks.contains_key(h)
|| self.chain_store.block_exists(h)?)
}

fn chunk_exists(&self, h: &ChunkHash) -> Result<bool, Error> {
if self.chain_store_cache_update.chunks.contains_key(h) {
assert!(self.chain_store.chunk_exists(h).unwrap());
}
Ok(self.chain_store_cache_update.chunks.contains_key(h)
|| self.chain_store.chunk_exists(h)?)
}
Expand All @@ -1250,6 +1264,7 @@ impl<'a> ChainStoreAccess for ChainStoreUpdate<'a> {

fn get_block_extra(&self, block_hash: &CryptoHash) -> Result<Arc<BlockExtra>, Error> {
if let Some(block_extra) = self.chain_store_cache_update.block_extras.get(block_hash) {
assert_eq!(*block_extra, self.chain_store.get_block_extra(block_hash).unwrap());
Ok(Arc::clone(block_extra))
} else {
self.chain_store.get_block_extra(block_hash)
Expand All @@ -1265,6 +1280,10 @@ impl<'a> ChainStoreAccess for ChainStoreUpdate<'a> {
if let Some(chunk_extra) =
self.chain_store_cache_update.chunk_extras.get(&(*block_hash, *shard_uid))
{
assert_eq!(
*chunk_extra,
self.chain_store.get_chunk_extra(block_hash, shard_uid).unwrap()
);
Ok(Arc::clone(chunk_extra))
} else {
self.chain_store.get_chunk_extra(block_hash, shard_uid)
Expand All @@ -1274,6 +1293,7 @@ impl<'a> ChainStoreAccess for ChainStoreUpdate<'a> {
/// Get block header.
fn get_block_header(&self, hash: &CryptoHash) -> Result<BlockHeader, Error> {
if let Some(header) = self.chain_store_cache_update.headers.get(hash).cloned() {
assert_eq!(header, self.chain_store.get_block_header(hash).unwrap());
Ok(header)
} else {
self.chain_store.get_block_header(hash)
Expand All @@ -1282,6 +1302,9 @@ impl<'a> ChainStoreAccess for ChainStoreUpdate<'a> {

/// Get block header from the current chain by height.
fn get_block_hash_by_height(&self, height: BlockHeight) -> Result<CryptoHash, Error> {
if let Some(Some(hash)) = self.chain_store_cache_update.height_to_hashes.get(&height) {
assert_eq!(*hash, self.chain_store.get_block_hash_by_height(height).unwrap());
}
match self.chain_store_cache_update.height_to_hashes.get(&height) {
Some(Some(hash)) => Ok(*hash),
Some(None) => Err(Error::DBNotFoundErr(format!("BLOCK HEIGHT: {}", height))),
Expand All @@ -1290,22 +1313,24 @@ impl<'a> ChainStoreAccess for ChainStoreUpdate<'a> {
}

fn get_block_refcount(&self, block_hash: &CryptoHash) -> Result<u64, Error> {
let store_refcount = match self.chain_store.get_block_refcount(block_hash) {
Ok(refcount) => refcount,
Err(e) => match e {
Error::DBNotFoundErr(_) => 0,
_ => return Err(e),
},
};
if let Some(refcount) = self.chain_store_cache_update.block_refcounts.get(block_hash) {
assert_eq!(*refcount, store_refcount);
Ok(*refcount)
} else {
let refcount = match self.chain_store.get_block_refcount(block_hash) {
Ok(refcount) => refcount,
Err(e) => match e {
Error::DBNotFoundErr(_) => 0,
_ => return Err(e),
},
};
Ok(refcount)
Ok(store_refcount)
}
}

fn get_next_block_hash(&self, hash: &CryptoHash) -> Result<CryptoHash, Error> {
if let Some(next_hash) = self.chain_store_cache_update.next_block_hashes.get(hash) {
assert_eq!(*next_hash, self.chain_store.get_next_block_hash(hash).unwrap());
Ok(*next_hash)
} else {
self.chain_store.get_next_block_hash(hash)
Expand All @@ -1319,6 +1344,10 @@ impl<'a> ChainStoreAccess for ChainStoreUpdate<'a> {
if let Some(light_client_block) =
self.chain_store_cache_update.epoch_light_client_blocks.get(hash)
{
assert_eq!(
*light_client_block,
self.chain_store.get_epoch_light_client_block(hash).unwrap()
);
Ok(Arc::clone(light_client_block))
} else {
self.chain_store.get_epoch_light_client_block(hash)
Expand All @@ -1334,6 +1363,7 @@ impl<'a> ChainStoreAccess for ChainStoreUpdate<'a> {
if let Some(receipts) =
self.chain_store_cache_update.outgoing_receipts.get(&(*hash, shard_id))
{
assert_eq!(*receipts, self.chain_store.get_outgoing_receipts(hash, shard_id).unwrap());
Ok(Arc::clone(receipts))
} else {
self.chain_store.get_outgoing_receipts(hash, shard_id)
Expand All @@ -1349,6 +1379,10 @@ impl<'a> ChainStoreAccess for ChainStoreUpdate<'a> {
if let Some(receipt_proofs) =
self.chain_store_cache_update.incoming_receipts.get(&(*hash, shard_id))
{
assert_eq!(
*receipt_proofs,
self.chain_store.get_incoming_receipts(hash, shard_id).unwrap()
);
Ok(Arc::clone(receipt_proofs))
} else {
self.chain_store.get_incoming_receipts(hash, shard_id)
Expand All @@ -1357,6 +1391,7 @@ impl<'a> ChainStoreAccess for ChainStoreUpdate<'a> {

fn get_chunk(&self, chunk_hash: &ChunkHash) -> Result<Arc<ShardChunk>, Error> {
if let Some(chunk) = self.chain_store_cache_update.chunks.get(chunk_hash) {
assert_eq!(*chunk, self.chain_store.get_chunk(chunk_hash).unwrap());
Ok(Arc::clone(chunk))
} else {
self.chain_store.get_chunk(chunk_hash)
Expand All @@ -1365,6 +1400,7 @@ impl<'a> ChainStoreAccess for ChainStoreUpdate<'a> {

fn get_partial_chunk(&self, chunk_hash: &ChunkHash) -> Result<Arc<PartialEncodedChunk>, Error> {
if let Some(partial_chunk) = self.chain_store_cache_update.partial_chunks.get(chunk_hash) {
assert_eq!(*partial_chunk, self.chain_store.get_partial_chunk(chunk_hash).unwrap());
Ok(Arc::clone(partial_chunk))
} else {
self.chain_store.get_partial_chunk(chunk_hash)
Expand All @@ -1373,6 +1409,10 @@ impl<'a> ChainStoreAccess for ChainStoreUpdate<'a> {

fn get_chunk_clone_from_header(&self, header: &ShardChunkHeader) -> Result<ShardChunk, Error> {
if let Some(chunk) = self.chain_store_cache_update.chunks.get(&header.chunk_hash()) {
assert_eq!(
chunk.as_ref(),
&self.chain_store.get_chunk_clone_from_header(header).unwrap()
);
Ok(ShardChunk::clone(chunk))
} else {
self.chain_store.get_chunk_clone_from_header(header)
Expand All @@ -1390,6 +1430,7 @@ impl<'a> ChainStoreAccess for ChainStoreUpdate<'a> {

fn is_block_challenged(&self, hash: &CryptoHash) -> Result<bool, Error> {
if self.challenged_blocks.contains(hash) {
assert_eq!(true, self.chain_store.is_block_challenged(hash).unwrap());
return Ok(true);
}
self.chain_store.is_block_challenged(hash)
Expand All @@ -1400,6 +1441,10 @@ impl<'a> ChainStoreAccess for ChainStoreUpdate<'a> {
chunk_hash: &ChunkHash,
) -> Result<Option<Arc<EncodedShardChunk>>, Error> {
if let Some(chunk) = self.chain_store_cache_update.invalid_chunks.get(chunk_hash) {
assert_eq!(
Some(Arc::clone(chunk)),
self.chain_store.is_invalid_chunk(chunk_hash).unwrap()
);
Ok(Some(Arc::clone(chunk)))
} else {
self.chain_store.is_invalid_chunk(chunk_hash)
Expand All @@ -1411,6 +1456,7 @@ impl<'a> ChainStoreAccess for ChainStoreUpdate<'a> {
tx_hash: &CryptoHash,
) -> Result<Option<Arc<SignedTransaction>>, Error> {
if let Some(tx) = self.chain_store_cache_update.transactions.get(tx_hash) {
assert_eq!(Some(Arc::clone(tx)), self.chain_store.get_transaction(tx_hash).unwrap());
Ok(Some(Arc::clone(tx)))
} else {
self.chain_store.get_transaction(tx_hash)
Expand All @@ -1419,6 +1465,10 @@ impl<'a> ChainStoreAccess for ChainStoreUpdate<'a> {

fn get_receipt(&self, receipt_id: &CryptoHash) -> Result<Option<Arc<Receipt>>, Error> {
if let Some(receipt) = self.chain_store_cache_update.receipts.get(receipt_id) {
assert_eq!(
Some(Arc::clone(receipt)),
self.chain_store.get_receipt(receipt_id).unwrap()
);
Ok(Some(Arc::clone(receipt)))
} else {
self.chain_store.get_receipt(receipt_id)
Expand All @@ -1434,6 +1484,8 @@ impl<'a> ChainStoreAccess for ChainStoreUpdate<'a> {
block_hash: &CryptoHash,
) -> Result<Arc<PartialMerkleTree>, Error> {
if let Some(merkle_tree) = self.chain_store_cache_update.block_merkle_tree.get(block_hash) {
let store_merkel_tree = self.chain_store.get_block_merkle_tree(block_hash).unwrap();
assert_eq!(Arc::clone(merkle_tree), store_merkel_tree);
Ok(Arc::clone(&merkle_tree))
} else {
self.chain_store.get_block_merkle_tree(block_hash)
Expand All @@ -1444,6 +1496,10 @@ impl<'a> ChainStoreAccess for ChainStoreUpdate<'a> {
if let Some(block_hash) =
self.chain_store_cache_update.block_ordinal_to_hash.get(&block_ordinal)
{
assert_eq!(
*block_hash,
self.chain_store.get_block_hash_from_ordinal(block_ordinal).unwrap()
);
Ok(*block_hash)
} else {
self.chain_store.get_block_hash_from_ordinal(block_ordinal)
Expand All @@ -1452,6 +1508,7 @@ impl<'a> ChainStoreAccess for ChainStoreUpdate<'a> {

fn is_height_processed(&self, height: BlockHeight) -> Result<bool, Error> {
if self.chain_store_cache_update.processed_block_heights.contains(&height) {
assert_eq!(true, self.chain_store.is_height_processed(height).unwrap());
Ok(true)
} else {
self.chain_store.is_height_processed(height)
Expand Down Expand Up @@ -1497,7 +1554,7 @@ impl<'a> ChainStoreUpdate<'a> {
}
// Override block ordinal to hash mapping for blocks in between.
// At this point block_merkle_tree for header is already saved.
let block_ordinal = self.get_block_merkle_tree(&header_hash)?.size();
let block_ordinal = self.generate_block_merkel_tree(&header_prev_hash)?.size();
self.chain_store_cache_update.block_ordinal_to_hash.insert(block_ordinal, header_hash);
match self.get_block_hash_by_height(header_height) {
Ok(cur_hash) if cur_hash == header_hash => {
Expand Down Expand Up @@ -1562,7 +1619,7 @@ impl<'a> ChainStoreUpdate<'a> {
}

// save block ordinal and height if we need to update header head
let block_ordinal = self.get_block_merkle_tree(&t.last_block_hash)?.size();
let block_ordinal = self.generate_block_merkel_tree(&t.prev_block_hash)?.size();
self.chain_store_cache_update
.block_ordinal_to_hash
.insert(block_ordinal, t.last_block_hash);
Expand Down Expand Up @@ -1651,16 +1708,23 @@ impl<'a> ChainStoreUpdate<'a> {
}

fn update_and_save_block_merkle_tree(&mut self, header: &BlockHeader) -> Result<(), Error> {
if header.is_genesis() {
self.save_block_merkle_tree(*header.hash(), PartialMerkleTree::default());
let new_merkle_tree = self.generate_block_merkel_tree(header.prev_hash())?;
self.save_block_merkle_tree(*header.hash(), new_merkle_tree);
Ok(())
}

fn generate_block_merkel_tree(
&self,
prev_hash: &CryptoHash,
) -> Result<PartialMerkleTree, Error> {
if prev_hash == &CryptoHash::default() {
Ok(PartialMerkleTree::default())
} else {
let prev_hash = header.prev_hash();
let old_merkle_tree = self.get_block_merkle_tree(prev_hash)?;
let mut new_merkle_tree = PartialMerkleTree::clone(&old_merkle_tree);
new_merkle_tree.insert(*prev_hash);
self.save_block_merkle_tree(*header.hash(), new_merkle_tree);
Ok(new_merkle_tree)
}
Ok(())
}

/// Used only in Epoch Sync finalization
Expand Down
Loading