Skip to content

Commit

Permalink
Schema cache rework + background task tests setup (polkadot-evm#626)
Browse files Browse the repository at this point in the history
* Schema cache task rework using block import

* Use parent id

* Add test for background task

* fmt

* Update Cargo.lock

* Remove unused tokio

* Cleanup

* Add license

* Template runtime feature aura

* Handle reorgs and test

* Add `tempfile` dev-dependency

* Move `cache` mod + tests
  • Loading branch information
tgmichel authored Apr 25, 2022
1 parent 8bf5e12 commit d5588ba
Show file tree
Hide file tree
Showing 4 changed files with 488 additions and 59 deletions.
126 changes: 121 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions client/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,12 @@ fc-rpc-core = { version = "1.1.0-dev", path = "../rpc-core" }
fp-rpc = { version = "3.0.0-dev", path = "../../primitives/rpc" }
fp-storage = { version = "2.0.0-dev", path = "../../primitives/storage" }

[dev-dependencies]
tempfile = "3.3.0"
substrate-test-runtime-client = { version = "2.0.0", git = "https://github.com/paritytech/substrate", branch = "master" }
frontier-template-runtime = { path = "../../template/runtime", default-features = false, features = ["std", "aura"] }
sc-block-builder = { version = "0.10.0-dev", git = "https://github.com/paritytech/substrate", branch = "master" }
sp-consensus = { version = "0.10.0-dev", git = "https://github.com/paritytech/substrate", branch = "master" }

[features]
rpc_binary_search_estimate = []
138 changes: 84 additions & 54 deletions client/rpc/src/eth/cache.rs → client/rpc/src/eth/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

mod tests;

use std::{
collections::{BTreeMap, HashMap},
marker::PhantomData,
Expand Down Expand Up @@ -276,13 +278,10 @@ where
BE: Backend<B> + 'static,
BE::State: StateBackend<BlakeTwo256>,
{
/// Task that caches at which best hash a new EthereumStorageSchema was inserted in the Runtime Storage.
/// Task that caches at which substrate hash a new EthereumStorageSchema was inserted in the Runtime Storage.
pub async fn ethereum_schema_cache_task(client: Arc<C>, backend: Arc<fc_db::Backend<B>>) {
use fp_storage::PALLET_ETHEREUM_SCHEMA;
use log::warn;
use sp_storage::{StorageData, StorageKey};

if let Ok(None) = frontier_backend_client::load_cached_schema::<B>(backend.as_ref()) {
// Initialize the schema cache at genesis.
let mut cache: Vec<(EthereumStorageSchema, H256)> = Vec::new();
let id = BlockId::Number(Zero::zero());
if let Ok(Some(header)) = client.header(id) {
Expand All @@ -294,70 +293,101 @@ where
cache.push((genesis_schema_version, header.hash()));
let _ = frontier_backend_client::write_cached_schema::<B>(backend.as_ref(), cache)
.map_err(|err| {
warn!("Error schema cache insert for genesis: {:?}", err);
log::warn!("Error schema cache insert for genesis: {:?}", err);
});
} else {
warn!("Error genesis header unreachable");
log::warn!("Error genesis header unreachable");
}
}

// Subscribe to changes for the pallet-ethereum Schema.
if let Ok(mut stream) = client.storage_changes_notification_stream(
Some(&[StorageKey(PALLET_ETHEREUM_SCHEMA.to_vec())]),
None,
) {
while let Some(notification) = stream.next().await {
let (hash, changes) = (notification.block, notification.changes);
// Make sure only block hashes marked as best are referencing cache checkpoints.
if hash == client.info().best_hash {
// Just map the change set to the actual data.
let storage: Vec<Option<StorageData>> = changes
// Returns the schema for the given block hash and its parent.
let current_and_parent_schema =
|hash: B::Hash| -> Option<(EthereumStorageSchema, EthereumStorageSchema)> {
let id = BlockId::Hash(hash);
if let Ok(Some(header)) = client.header(id) {
let new_schema = frontier_backend_client::onchain_storage_schema::<B, C, BE>(
client.as_ref(),
id,
);

let parent_hash = header.parent_hash();
let parent_id: BlockId<B> = BlockId::Hash(*parent_hash);
let parent_schema = frontier_backend_client::onchain_storage_schema::<B, C, BE>(
client.as_ref(),
parent_id,
);
return Some((new_schema, parent_schema));
}
None
};

let mut notification_st = client.import_notification_stream();
while let Some(notification) = notification_st.next().await {
let imported_hash = notification.hash;
if let (Some((new_schema, parent_schema)), Ok(Some(old_cache))) = (
current_and_parent_schema(imported_hash),
frontier_backend_client::load_cached_schema::<B>(backend.as_ref()),
) {
let mut new_cache: Vec<(EthereumStorageSchema, H256)> = old_cache.clone();

if new_schema != parent_schema && notification.is_new_best {
// Always update cache on best block if there is a schema change.
new_cache.push((new_schema, imported_hash));
}

// Re-org handling.
if let Some(tree_route) = notification.tree_route {
// Imported block belongs to a re-org.
// First remove the retracted hashes from cache, if any.
let retracted = tree_route
.retracted()
.iter()
.map(|hash_and_number| hash_and_number.hash)
.collect::<Vec<_>>();
let to_remove = old_cache
.iter()
.filter_map(|(o_sk, _k, v)| {
if o_sk.is_none() {
Some(v.cloned())
.enumerate()
.filter_map(|(index, (_, hash))| {
if retracted.contains(hash) {
Some(index)
} else {
None
}
})
.collect();
for change in storage {
if let Some(data) = change {
// Decode the wrapped blob which's type is known.
let new_schema: EthereumStorageSchema =
Decode::decode(&mut &data.0[..]).unwrap();
// Cache new entry and overwrite the old database value.
if let Ok(Some(old_cache)) =
frontier_backend_client::load_cached_schema::<B>(backend.as_ref())
.collect::<Vec<_>>();
for index in to_remove {
new_cache.remove(index);
}
// Next add if there is a schema change in the branch.
let to_add = tree_route
.enacted()
.iter()
.filter_map(|hash_and_number| {
if let Some((new_schema, parent_schema)) =
current_and_parent_schema(hash_and_number.hash)
{
let mut new_cache: Vec<(EthereumStorageSchema, H256)> = old_cache;
match &new_cache[..] {
[.., (schema, _)] if *schema == new_schema => {
warn!(
"Schema version already in Frontier database, ignoring: {:?}",
new_schema
);
}
_ => {
new_cache.push((new_schema, hash));
let _ = frontier_backend_client::write_cached_schema::<B>(
backend.as_ref(),
new_cache,
)
.map_err(|err| {
warn!(
"Error schema cache insert for genesis: {:?}",
err
);
});
}
if new_schema != parent_schema {
return Some((new_schema, hash_and_number.hash));
}
} else {
warn!("Error schema cache is corrupted");
return None;
}
}
None
})
.collect::<Vec<_>>();
for item in to_add {
new_cache.push(item);
}
}
// Write cache.
if new_cache != old_cache {
let _ = frontier_backend_client::write_cached_schema::<B>(
backend.as_ref(),
new_cache,
)
.map_err(|err| {
log::warn!("Error schema cache insert: {:?}", err);
});
}
}
}
}
Expand Down
Loading

0 comments on commit d5588ba

Please sign in to comment.