From 411d9bb4c551c65acb55bc67443f05fe631f9330 Mon Sep 17 00:00:00 2001 From: Adrian Catangiu Date: Fri, 25 Mar 2022 17:31:42 +0200 Subject: [PATCH] Implement Lean BEEFY (#10882) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Simplified BEEFY worker logic based on the invariant that GRANDPA will always finalize 1st block of each new session, meaning BEEFY worker is guaranteed to receive finality notification for the BEEFY mandatory blocks. Under these conditions the current design is as follows: - session changes are detected based on BEEFY Digest present in BEEFY mandatory blocks, - on each new session new `Rounds` of voting is created, with old rounds being dropped (for gossip rounds, last 3 are still alive so votes are still being gossiped), - after processing finality for a block, the worker votes if a new voting target has become available as a result of said block finality processing, - incoming votes as well as self-created votes are processed and signed commitments are created for completed BEEFY voting rounds, - the worker votes if a new voting target becomes available once a round successfully completes. On worker startup, the current validator set is retrieved from the BEEFY pallet. If it is the genesis validator set, worker starts voting right away considering Block #1 as session start. Otherwise (not genesis), the worker will vote starting with mandatory block of the next session. Later on when we add the BEEFY initial-sync (catch-up) logic, the worker will sync all past mandatory blocks Signed Commitments and will be able to start voting right away. BEEFY mandatory block is the block with header containing the BEEFY `AuthoritiesChange` Digest, this block is guaranteed to be finalized by GRANDPA. This session-boundary block is signed by the ending-session's validator set. Next blocks will be signed by the new session's validator set. This behavior is consistent with what GRANDPA does as well. Also drop the limit N on active gossip rounds. In an adversarial network, a bad actor could create and gossip N invalid votes with round numbers larger than the current correct round number. This would lead to votes for correct rounds to no longer be gossiped. Add unit-tests for all components, including full voter consensus tests. Signed-off-by: Adrian Catangiu Co-authored-by: Tomasz Drwięga Co-authored-by: David Salami --- Cargo.lock | 10 + client/beefy/Cargo.toml | 14 +- client/beefy/src/gossip.rs | 306 ++++++----- client/beefy/src/lib.rs | 73 +-- client/beefy/src/metrics.rs | 13 +- client/beefy/src/round.rs | 358 +++++++++---- client/beefy/src/tests.rs | 590 ++++++++++++++++++++ client/beefy/src/worker.rs | 978 ++++++++++++++++++++++++---------- frame/beefy/src/lib.rs | 28 +- primitives/beefy/Cargo.toml | 5 + test-utils/runtime/Cargo.toml | 2 + test-utils/runtime/src/lib.rs | 6 + 12 files changed, 1815 insertions(+), 568 deletions(-) create mode 100644 client/beefy/src/tests.rs diff --git a/Cargo.lock b/Cargo.lock index 1731ded2906cf..81a53ed858859 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -484,12 +484,15 @@ dependencies = [ "beefy-primitives", "fnv", "futures 0.3.19", + "futures-timer", "hex", "log 0.4.14", "parity-scale-codec", "parking_lot 0.12.0", "sc-chain-spec", "sc-client-api", + "sc-consensus", + "sc-finality-grandpa", "sc-keystore", "sc-network", "sc-network-gossip", @@ -500,13 +503,19 @@ dependencies = [ "sp-application-crypto", "sp-arithmetic", "sp-blockchain", + "sp-consensus", "sp-core", + "sp-finality-grandpa", + "sp-keyring", "sp-keystore", "sp-runtime", "sp-tracing", "strum", "substrate-prometheus-endpoint", + "substrate-test-runtime-client", + "tempfile", "thiserror", + "tokio", "wasm-timer", ] @@ -10688,6 +10697,7 @@ dependencies = [ name = "substrate-test-runtime" version = "2.0.0" dependencies = [ + "beefy-primitives", "cfg-if 1.0.0", "frame-support", "frame-system", diff --git a/client/beefy/Cargo.toml b/client/beefy/Cargo.toml index 1cd0f1fd50d80..02be645b3fc08 100644 --- a/client/beefy/Cargo.toml +++ b/client/beefy/Cargo.toml @@ -10,6 +10,7 @@ description = "BEEFY Client gadget for substrate" [dependencies] fnv = "1.0.6" futures = "0.3" +futures-timer = "3.0.1" hex = "0.4.2" log = "0.4" parking_lot = "0.12.0" @@ -23,22 +24,31 @@ sp-api = { version = "4.0.0-dev", path = "../../primitives/api" } sp-application-crypto = { version = "6.0.0", path = "../../primitives/application-crypto" } sp-arithmetic = { version = "5.0.0", path = "../../primitives/arithmetic" } sp-blockchain = { version = "4.0.0-dev", path = "../../primitives/blockchain" } +sp-consensus = { version = "0.10.0-dev", path = "../../primitives/consensus/common" } sp-core = { version = "6.0.0", path = "../../primitives/core" } sp-keystore = { version = "0.12.0", path = "../../primitives/keystore" } sp-runtime = { version = "6.0.0", path = "../../primitives/runtime" } sc-chain-spec = { version = "4.0.0-dev", path = "../../client/chain-spec" } -sc-utils = { version = "4.0.0-dev", path = "../utils" } sc-client-api = { version = "4.0.0-dev", path = "../api" } +sc-finality-grandpa = { version = "0.10.0-dev", path = "../../client/finality-grandpa" } sc-keystore = { version = "4.0.0-dev", path = "../keystore" } sc-network = { version = "0.10.0-dev", path = "../network" } sc-network-gossip = { version = "0.10.0-dev", path = "../network-gossip" } +sc-utils = { version = "4.0.0-dev", path = "../utils" } beefy-primitives = { version = "4.0.0-dev", path = "../../primitives/beefy" } [dev-dependencies] -sp-tracing = { version = "5.0.0", path = "../../primitives/tracing" } +sc-consensus = { version = "0.10.0-dev", path = "../consensus/common" } sc-network-test = { version = "0.8.0", path = "../network/test" } +sp-finality-grandpa = { version = "4.0.0-dev", path = "../../primitives/finality-grandpa" } +sp-keyring = { version = "6.0.0", path = "../../primitives/keyring" } +sp-tracing = { version = "5.0.0", path = "../../primitives/tracing" } +substrate-test-runtime-client = { version = "2.0.0", path = "../../test-utils/runtime/client" } + serde = "1.0.136" strum = { version = "0.23", features = ["derive"] } +tokio = "1.15" +tempfile = "3.1.0" diff --git a/client/beefy/src/gossip.rs b/client/beefy/src/gossip.rs index 37358441ef88a..54d283fede32e 100644 --- a/client/beefy/src/gossip.rs +++ b/client/beefy/src/gossip.rs @@ -35,9 +35,6 @@ use beefy_primitives::{ use crate::keystore::BeefyKeystore; -// Limit BEEFY gossip by keeping only a bound number of voting rounds alive. -const MAX_LIVE_GOSSIP_ROUNDS: usize = 3; - // Timeout for rebroadcasting messages. const REBROADCAST_AFTER: Duration = Duration::from_secs(60 * 5); @@ -52,13 +49,50 @@ where /// A type that represents hash of the message. pub type MessageHash = [u8; 8]; -type KnownVotes = BTreeMap, fnv::FnvHashSet>; +struct KnownVotes { + last_done: Option>, + live: BTreeMap, fnv::FnvHashSet>, +} + +impl KnownVotes { + pub fn new() -> Self { + Self { last_done: None, live: BTreeMap::new() } + } + + /// Create new round votes set if not already present. + fn insert(&mut self, round: NumberFor) { + self.live.entry(round).or_default(); + } + + /// Remove `round` and older from live set, update `last_done` accordingly. + fn conclude(&mut self, round: NumberFor) { + self.live.retain(|&number, _| number > round); + self.last_done = self.last_done.max(Some(round)); + } + + /// Return true if `round` is newer than previously concluded rounds. + /// + /// Latest concluded round is still considered alive to allow proper gossiping for it. + fn is_live(&self, round: &NumberFor) -> bool { + Some(*round) >= self.last_done + } + + /// Add new _known_ `hash` to the round's known votes. + fn add_known(&mut self, round: &NumberFor, hash: MessageHash) { + self.live.get_mut(round).map(|known| known.insert(hash)); + } + + /// Check if `hash` is already part of round's known votes. + fn is_known(&self, round: &NumberFor, hash: &MessageHash) -> bool { + self.live.get(round).map(|known| known.contains(hash)).unwrap_or(false) + } +} /// BEEFY gossip validator /// /// Validate BEEFY gossip messages and limit the number of live BEEFY voting rounds. /// -/// Allows messages from last [`MAX_LIVE_GOSSIP_ROUNDS`] to flow, everything else gets +/// Allows messages for 'rounds >= last concluded' to flow, everything else gets /// rejected/expired. /// ///All messaging is handled in a single BEEFY global topic. @@ -78,57 +112,25 @@ where pub fn new() -> GossipValidator { GossipValidator { topic: topic::(), - known_votes: RwLock::new(BTreeMap::new()), + known_votes: RwLock::new(KnownVotes::new()), next_rebroadcast: Mutex::new(Instant::now() + REBROADCAST_AFTER), } } /// Note a voting round. /// - /// Noting `round` will keep `round` live. - /// - /// We retain the [`MAX_LIVE_GOSSIP_ROUNDS`] most **recent** voting rounds as live. - /// As long as a voting round is live, it will be gossiped to peer nodes. + /// Noting round will start a live `round`. pub(crate) fn note_round(&self, round: NumberFor) { - debug!(target: "beefy", "🥩 About to note round #{}", round); - - let mut live = self.known_votes.write(); - - if !live.contains_key(&round) { - live.insert(round, Default::default()); - } - - if live.len() > MAX_LIVE_GOSSIP_ROUNDS { - let to_remove = live.iter().next().map(|x| x.0).copied(); - if let Some(first) = to_remove { - live.remove(&first); - } - } - } - - fn add_known(known_votes: &mut KnownVotes, round: &NumberFor, hash: MessageHash) { - known_votes.get_mut(round).map(|known| known.insert(hash)); - } - - // Note that we will always keep the most recent unseen round alive. - // - // This is a preliminary fix and the detailed description why we are - // doing this can be found as part of the issue below - // - // https://github.com/paritytech/grandpa-bridge-gadget/issues/237 - // - fn is_live(known_votes: &KnownVotes, round: &NumberFor) -> bool { - let unseen_round = if let Some(max_known_round) = known_votes.keys().last() { - round > max_known_round - } else { - known_votes.is_empty() - }; - - known_votes.contains_key(round) || unseen_round + debug!(target: "beefy", "🥩 About to note gossip round #{}", round); + self.known_votes.write().insert(round); } - fn is_known(known_votes: &KnownVotes, round: &NumberFor, hash: &MessageHash) -> bool { - known_votes.get(round).map(|known| known.contains(hash)).unwrap_or(false) + /// Conclude a voting round. + /// + /// This can be called once round is complete so we stop gossiping for it. + pub(crate) fn conclude_round(&self, round: NumberFor) { + debug!(target: "beefy", "🥩 About to drop gossip round #{}", round); + self.known_votes.write().conclude(round); } } @@ -152,17 +154,17 @@ where { let known_votes = self.known_votes.read(); - if !GossipValidator::::is_live(&known_votes, &round) { + if !known_votes.is_live(&round) { return ValidationResult::Discard } - if GossipValidator::::is_known(&known_votes, &round, &msg_hash) { + if known_votes.is_known(&round, &msg_hash) { return ValidationResult::ProcessAndKeep(self.topic) } } if BeefyKeystore::verify(&msg.id, &msg.signature, &msg.commitment.encode()) { - GossipValidator::::add_known(&mut *self.known_votes.write(), &round, msg_hash); + self.known_votes.write().add_known(&round, msg_hash); return ValidationResult::ProcessAndKeep(self.topic) } else { // TODO: report peer @@ -182,7 +184,7 @@ where }; let round = msg.commitment.block_number; - let expired = !GossipValidator::::is_live(&known_votes, &round); + let expired = !known_votes.is_live(&round); trace!(target: "beefy", "🥩 Message for round #{} expired: {}", round, expired); @@ -212,11 +214,11 @@ where let msg = match VoteMessage::, Public, Signature>::decode(&mut data) { Ok(vote) => vote, - Err(_) => return true, + Err(_) => return false, }; let round = msg.commitment.block_number; - let allowed = GossipValidator::::is_live(&known_votes, &round); + let allowed = known_votes.is_live(&round); debug!(target: "beefy", "🥩 Message for round #{} allowed: {}", round, allowed); @@ -240,60 +242,58 @@ mod tests { use super::*; #[test] - fn note_round_works() { - let gv = GossipValidator::::new(); - - gv.note_round(1u64); - - let live = gv.known_votes.read(); - assert!(GossipValidator::::is_live(&live, &1u64)); - - drop(live); - - gv.note_round(3u64); - gv.note_round(7u64); - gv.note_round(10u64); - - let live = gv.known_votes.read(); - - assert_eq!(live.len(), MAX_LIVE_GOSSIP_ROUNDS); - - assert!(!GossipValidator::::is_live(&live, &1u64)); - assert!(GossipValidator::::is_live(&live, &3u64)); - assert!(GossipValidator::::is_live(&live, &7u64)); - assert!(GossipValidator::::is_live(&live, &10u64)); + fn known_votes_insert_remove() { + let mut kv = KnownVotes::::new(); + + kv.insert(1); + kv.insert(1); + kv.insert(2); + assert_eq!(kv.live.len(), 2); + + let mut kv = KnownVotes::::new(); + kv.insert(1); + kv.insert(2); + kv.insert(3); + + assert!(kv.last_done.is_none()); + kv.conclude(2); + assert_eq!(kv.live.len(), 1); + assert!(!kv.live.contains_key(&2)); + assert_eq!(kv.last_done, Some(2)); + + kv.conclude(1); + assert_eq!(kv.last_done, Some(2)); + + kv.conclude(3); + assert_eq!(kv.last_done, Some(3)); + assert!(kv.live.is_empty()); } #[test] - fn keeps_most_recent_max_rounds() { + fn note_and_drop_round_works() { let gv = GossipValidator::::new(); - gv.note_round(3u64); - gv.note_round(7u64); - gv.note_round(10u64); gv.note_round(1u64); - let live = gv.known_votes.read(); - - assert_eq!(live.len(), MAX_LIVE_GOSSIP_ROUNDS); + assert!(gv.known_votes.read().is_live(&1u64)); - assert!(GossipValidator::::is_live(&live, &3u64)); - assert!(!GossipValidator::::is_live(&live, &1u64)); - - drop(live); + gv.note_round(3u64); + gv.note_round(7u64); + gv.note_round(10u64); - gv.note_round(23u64); - gv.note_round(15u64); - gv.note_round(20u64); - gv.note_round(2u64); + assert_eq!(gv.known_votes.read().live.len(), 4); - let live = gv.known_votes.read(); + gv.conclude_round(7u64); - assert_eq!(live.len(), MAX_LIVE_GOSSIP_ROUNDS); + let votes = gv.known_votes.read(); - assert!(GossipValidator::::is_live(&live, &15u64)); - assert!(GossipValidator::::is_live(&live, &20u64)); - assert!(GossipValidator::::is_live(&live, &23u64)); + // rounds 1 and 3 are outdated, don't gossip anymore + assert!(!votes.is_live(&1u64)); + assert!(!votes.is_live(&3u64)); + // latest concluded round is still gossiped + assert!(votes.is_live(&7u64)); + // round 10 is alive and in-progress + assert!(votes.is_live(&10u64)); } #[test] @@ -304,22 +304,18 @@ mod tests { gv.note_round(7u64); gv.note_round(10u64); - let live = gv.known_votes.read(); - - assert_eq!(live.len(), MAX_LIVE_GOSSIP_ROUNDS); - - drop(live); + assert_eq!(gv.known_votes.read().live.len(), 3); // note round #7 again -> should not change anything gv.note_round(7u64); - let live = gv.known_votes.read(); + let votes = gv.known_votes.read(); - assert_eq!(live.len(), MAX_LIVE_GOSSIP_ROUNDS); + assert_eq!(votes.live.len(), 3); - assert!(GossipValidator::::is_live(&live, &3u64)); - assert!(GossipValidator::::is_live(&live, &7u64)); - assert!(GossipValidator::::is_live(&live, &10u64)); + assert!(votes.is_live(&3u64)); + assert!(votes.is_live(&7u64)); + assert!(votes.is_live(&10u64)); } struct TestContext; @@ -349,29 +345,32 @@ mod tests { beefy_keystore.sign(&who.public(), &commitment.encode()).unwrap() } + fn dummy_vote(block_number: u64) -> VoteMessage { + let payload = Payload::new(known_payload_ids::MMR_ROOT_ID, MmrRootHash::default().encode()); + let commitment = Commitment { payload, block_number, validator_set_id: 0 }; + let signature = sign_commitment(&Keyring::Alice, &commitment); + + VoteMessage { commitment, id: Keyring::Alice.public(), signature } + } + #[test] fn should_avoid_verifying_signatures_twice() { let gv = GossipValidator::::new(); let sender = sc_network::PeerId::random(); let mut context = TestContext; - let payload = Payload::new(known_payload_ids::MMR_ROOT_ID, MmrRootHash::default().encode()); - let commitment = Commitment { payload, block_number: 3_u64, validator_set_id: 0 }; - - let signature = sign_commitment(&Keyring::Alice, &commitment); - - let vote = VoteMessage { commitment, id: Keyring::Alice.public(), signature }; + let vote = dummy_vote(3); gv.note_round(3u64); gv.note_round(7u64); gv.note_round(10u64); - // first time the cache should be populated. + // first time the cache should be populated let res = gv.validate(&mut context, &sender, &vote.encode()); assert!(matches!(res, ValidationResult::ProcessAndKeep(_))); assert_eq!( - gv.known_votes.read().get(&vote.commitment.block_number).map(|x| x.len()), + gv.known_votes.read().live.get(&vote.commitment.block_number).map(|x| x.len()), Some(1) ); @@ -380,17 +379,84 @@ mod tests { assert!(matches!(res, ValidationResult::ProcessAndKeep(_))); - // next we should quickly reject if the round is not live. - gv.note_round(11_u64); - gv.note_round(12_u64); + // next we should quickly reject if the round is not live + gv.conclude_round(7_u64); - assert!(!GossipValidator::::is_live( - &*gv.known_votes.read(), - &vote.commitment.block_number - )); + assert!(!gv.known_votes.read().is_live(&vote.commitment.block_number)); let res = gv.validate(&mut context, &sender, &vote.encode()); assert!(matches!(res, ValidationResult::Discard)); } + + #[test] + fn messages_allowed_and_expired() { + let gv = GossipValidator::::new(); + let sender = sc_network::PeerId::random(); + let topic = Default::default(); + let intent = MessageIntent::Broadcast; + + // note round 2 and 3, then conclude 2 + gv.note_round(2u64); + gv.note_round(3u64); + gv.conclude_round(2u64); + let mut allowed = gv.message_allowed(); + let mut expired = gv.message_expired(); + + // check bad vote format + assert!(!allowed(&sender, intent, &topic, &mut [0u8; 16])); + assert!(expired(topic, &mut [0u8; 16])); + + // inactive round 1 -> expired + let vote = dummy_vote(1); + let mut encoded_vote = vote.encode(); + assert!(!allowed(&sender, intent, &topic, &mut encoded_vote)); + assert!(expired(topic, &mut encoded_vote)); + + // active round 2 -> !expired - concluded but still gossiped + let vote = dummy_vote(2); + let mut encoded_vote = vote.encode(); + assert!(allowed(&sender, intent, &topic, &mut encoded_vote)); + assert!(!expired(topic, &mut encoded_vote)); + + // in progress round 3 -> !expired + let vote = dummy_vote(3); + let mut encoded_vote = vote.encode(); + assert!(allowed(&sender, intent, &topic, &mut encoded_vote)); + assert!(!expired(topic, &mut encoded_vote)); + + // unseen round 4 -> !expired + let vote = dummy_vote(3); + let mut encoded_vote = vote.encode(); + assert!(allowed(&sender, intent, &topic, &mut encoded_vote)); + assert!(!expired(topic, &mut encoded_vote)); + } + + #[test] + fn messages_rebroadcast() { + let gv = GossipValidator::::new(); + let sender = sc_network::PeerId::random(); + let topic = Default::default(); + + let vote = dummy_vote(1); + let mut encoded_vote = vote.encode(); + + // re-broadcasting only allowed at `REBROADCAST_AFTER` intervals + let intent = MessageIntent::PeriodicRebroadcast; + let mut allowed = gv.message_allowed(); + + // rebroadcast not allowed so soon after GossipValidator creation + assert!(!allowed(&sender, intent, &topic, &mut encoded_vote)); + + // hack the inner deadline to be `now` + *gv.next_rebroadcast.lock() = Instant::now(); + + // still not allowed on old `allowed` closure result + assert!(!allowed(&sender, intent, &topic, &mut encoded_vote)); + + // renew closure result + let mut allowed = gv.message_allowed(); + // rebroadcast should be allowed now + assert!(allowed(&sender, intent, &topic, &mut encoded_vote)); + } } diff --git a/client/beefy/src/lib.rs b/client/beefy/src/lib.rs index 29d74c15dd599..8a6e175f58321 100644 --- a/client/beefy/src/lib.rs +++ b/client/beefy/src/lib.rs @@ -18,14 +18,14 @@ use std::sync::Arc; -use log::debug; use prometheus::Registry; use sc_client_api::{Backend, BlockchainEvents, Finalizer}; -use sc_network_gossip::{GossipEngine, Network as GossipNetwork}; +use sc_network_gossip::Network as GossipNetwork; use sp_api::ProvideRuntimeApi; use sp_blockchain::HeaderBackend; +use sp_consensus::SyncOracle; use sp_keystore::SyncCryptoStorePtr; use sp_runtime::traits::Block; @@ -41,6 +41,10 @@ mod round; mod worker; pub mod notification; + +#[cfg(test)] +mod tests; + pub use beefy_protocol_name::standard_name as protocol_standard_name; pub(crate) mod beefy_protocol_name { @@ -112,7 +116,7 @@ where BE: Backend, C: Client, C::Api: BeefyApi, - N: GossipNetwork + Clone + Send + 'static, + N: GossipNetwork + Clone + SyncOracle + Send + Sync + 'static, { /// BEEFY client pub client: Arc, @@ -134,6 +138,7 @@ where pub protocol_name: std::borrow::Cow<'static, str>, } +#[cfg(not(test))] /// Start the BEEFY gadget. /// /// This is a thin shim around running and awaiting a BEEFY worker. @@ -143,7 +148,7 @@ where BE: Backend, C: Client, C::Api: BeefyApi, - N: GossipNetwork + Clone + Send + 'static, + N: GossipNetwork + Clone + SyncOracle + Send + Sync + 'static, { let BeefyParams { client, @@ -157,18 +162,24 @@ where protocol_name, } = beefy_params; + let sync_oracle = network.clone(); let gossip_validator = Arc::new(gossip::GossipValidator::new()); - let gossip_engine = GossipEngine::new(network, protocol_name, gossip_validator.clone(), None); + let gossip_engine = sc_network_gossip::GossipEngine::new( + network, + protocol_name, + gossip_validator.clone(), + None, + ); let metrics = prometheus_registry.as_ref().map(metrics::Metrics::register).and_then( |result| match result { Ok(metrics) => { - debug!(target: "beefy", "🥩 Registered metrics"); + log::debug!(target: "beefy", "🥩 Registered metrics"); Some(metrics) }, Err(err) => { - debug!(target: "beefy", "🥩 Failed to register metrics: {:?}", err); + log::debug!(target: "beefy", "🥩 Failed to register metrics: {:?}", err); None }, }, @@ -184,54 +195,10 @@ where gossip_validator, min_block_delta, metrics, + sync_oracle, }; - let worker = worker::BeefyWorker::<_, _, _>::new(worker_params); + let worker = worker::BeefyWorker::<_, _, _, _>::new(worker_params); worker.run().await } - -#[cfg(test)] -mod tests { - use super::*; - use sc_chain_spec::{ChainSpec, GenericChainSpec}; - use serde::{Deserialize, Serialize}; - use sp_core::H256; - use sp_runtime::{BuildStorage, Storage}; - - #[derive(Debug, Serialize, Deserialize)] - struct Genesis(std::collections::BTreeMap); - impl BuildStorage for Genesis { - fn assimilate_storage(&self, storage: &mut Storage) -> Result<(), String> { - storage.top.extend( - self.0.iter().map(|(a, b)| (a.clone().into_bytes(), b.clone().into_bytes())), - ); - Ok(()) - } - } - - #[test] - fn beefy_protocol_name() { - let chain_spec = GenericChainSpec::::from_json_file(std::path::PathBuf::from( - "../chain-spec/res/chain_spec.json", - )) - .unwrap() - .cloned_box(); - - // Create protocol name using random genesis hash. - let genesis_hash = H256::random(); - let expected = format!("/{}/beefy/1", hex::encode(genesis_hash)); - let proto_name = beefy_protocol_name::standard_name(&genesis_hash, &chain_spec); - assert_eq!(proto_name.to_string(), expected); - - // Create protocol name using hardcoded genesis hash. Verify exact representation. - let genesis_hash = [ - 50, 4, 60, 123, 58, 106, 216, 246, 194, 188, 139, 193, 33, 212, 202, 171, 9, 55, 123, - 94, 8, 43, 12, 251, 187, 57, 173, 19, 188, 74, 205, 147, - ]; - let expected = - "/32043c7b3a6ad8f6c2bc8bc121d4caab09377b5e082b0cfbbb39ad13bc4acd93/beefy/1".to_string(); - let proto_name = beefy_protocol_name::standard_name(&genesis_hash, &chain_spec); - assert_eq!(proto_name.to_string(), expected); - } -} diff --git a/client/beefy/src/metrics.rs b/client/beefy/src/metrics.rs index 4b2a5c8dfd5c9..20fa98e52fdd5 100644 --- a/client/beefy/src/metrics.rs +++ b/client/beefy/src/metrics.rs @@ -18,7 +18,9 @@ //! BEEFY Prometheus metrics definition -use prometheus::{register, Counter, Gauge, PrometheusError, Registry, U64}; +#[cfg(not(test))] +use prometheus::{register, PrometheusError, Registry}; +use prometheus::{Counter, Gauge, U64}; /// BEEFY metrics exposed through Prometheus pub(crate) struct Metrics { @@ -37,6 +39,7 @@ pub(crate) struct Metrics { } impl Metrics { + #[cfg(not(test))] pub(crate) fn register(registry: &Registry) -> Result { Ok(Self { beefy_validator_set_id: register( @@ -97,3 +100,11 @@ macro_rules! metric_inc { } }}; } + +#[cfg(test)] +#[macro_export] +macro_rules! metric_get { + ($self:ident, $m:ident) => {{ + $self.metrics.as_ref().map(|metrics| metrics.$m.clone()) + }}; +} diff --git a/client/beefy/src/round.rs b/client/beefy/src/round.rs index e5404cfa6d216..eba769b2356f0 100644 --- a/client/beefy/src/round.rs +++ b/client/beefy/src/round.rs @@ -16,7 +16,10 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use std::{collections::BTreeMap, hash::Hash}; +use std::{ + collections::{BTreeMap, HashMap}, + hash::Hash, +}; use log::{debug, trace}; @@ -24,25 +27,33 @@ use beefy_primitives::{ crypto::{Public, Signature}, ValidatorSet, ValidatorSetId, }; -use sp_arithmetic::traits::AtLeast32BitUnsigned; -use sp_runtime::traits::MaybeDisplay; +use sp_runtime::traits::{Block, NumberFor}; +/// Tracks for each round which validators have voted/signed and +/// whether the local `self` validator has voted/signed. +/// +/// Does not do any validation on votes or signatures, layers above need to handle that (gossip). #[derive(Default)] struct RoundTracker { - votes: Vec<(Public, Signature)>, + self_vote: bool, + votes: HashMap, } impl RoundTracker { - fn add_vote(&mut self, vote: (Public, Signature)) -> bool { - // this needs to handle equivocations in the future - if self.votes.contains(&vote) { + fn add_vote(&mut self, vote: (Public, Signature), self_vote: bool) -> bool { + if self.votes.contains_key(&vote.0) { return false } - self.votes.push(vote); + self.self_vote = self.self_vote || self_vote; + self.votes.insert(vote.0, vote.1); true } + fn has_self_vote(&self) -> bool { + self.self_vote + } + fn is_done(&self, threshold: usize) -> bool { self.votes.len() >= threshold } @@ -53,74 +64,125 @@ fn threshold(authorities: usize) -> usize { authorities - faulty } -pub(crate) struct Rounds { - rounds: BTreeMap<(Payload, Number), RoundTracker>, +/// Keeps track of all voting rounds (block numbers) within a session. +/// Only round numbers > `best_done` are of interest, all others are considered stale. +/// +/// Does not do any validation on votes or signatures, layers above need to handle that (gossip). +pub(crate) struct Rounds { + rounds: BTreeMap<(Payload, NumberFor), RoundTracker>, + best_done: Option>, + session_start: NumberFor, validator_set: ValidatorSet, + prev_validator_set: ValidatorSet, } -impl Rounds +impl Rounds where - P: Ord + Hash, - N: Ord + AtLeast32BitUnsigned + MaybeDisplay, + P: Ord + Hash + Clone, + B: Block, { - pub(crate) fn new(validator_set: ValidatorSet) -> Self { - Rounds { rounds: BTreeMap::new(), validator_set } + pub(crate) fn new( + session_start: NumberFor, + validator_set: ValidatorSet, + prev_validator_set: ValidatorSet, + ) -> Self { + Rounds { + rounds: BTreeMap::new(), + best_done: None, + session_start, + validator_set, + prev_validator_set, + } } } -impl Rounds +impl Rounds where - H: Ord + Hash + Clone, - N: Ord + AtLeast32BitUnsigned + MaybeDisplay + Clone, + P: Ord + Hash + Clone, + B: Block, { - pub(crate) fn validator_set_id(&self) -> ValidatorSetId { - self.validator_set.id() + pub(crate) fn validator_set_id_for(&self, block_number: NumberFor) -> ValidatorSetId { + if block_number > self.session_start { + self.validator_set.id() + } else { + self.prev_validator_set.id() + } } - pub(crate) fn validators(&self) -> &[Public] { - self.validator_set.validators() + pub(crate) fn validators_for(&self, block_number: NumberFor) -> &[Public] { + if block_number > self.session_start { + self.validator_set.validators() + } else { + self.prev_validator_set.validators() + } } - pub(crate) fn add_vote(&mut self, round: &(H, N), vote: (Public, Signature)) -> bool { - if self.validator_set.validators().iter().any(|id| vote.0 == *id) { - self.rounds.entry(round.clone()).or_default().add_vote(vote) - } else { + pub(crate) fn validator_set(&self) -> &ValidatorSet { + &self.validator_set + } + + pub(crate) fn session_start(&self) -> &NumberFor { + &self.session_start + } + + pub(crate) fn should_self_vote(&self, round: &(P, NumberFor)) -> bool { + Some(round.1.clone()) > self.best_done && + self.rounds.get(round).map(|tracker| !tracker.has_self_vote()).unwrap_or(true) + } + + pub(crate) fn add_vote( + &mut self, + round: &(P, NumberFor), + vote: (Public, Signature), + self_vote: bool, + ) -> bool { + if Some(round.1.clone()) <= self.best_done { + debug!( + target: "beefy", + "🥩 received vote for old stale round {:?}, ignoring", + round.1 + ); + false + } else if !self.validator_set.validators().iter().any(|id| vote.0 == *id) { + debug!( + target: "beefy", + "🥩 received vote {:?} from validator that is not in the validator set, ignoring", + vote + ); false + } else { + self.rounds.entry(round.clone()).or_default().add_vote(vote, self_vote) } } - pub(crate) fn is_done(&self, round: &(H, N)) -> bool { + pub(crate) fn try_conclude( + &mut self, + round: &(P, NumberFor), + ) -> Option>> { let done = self .rounds .get(round) .map(|tracker| tracker.is_done(threshold(self.validator_set.len()))) .unwrap_or(false); - - debug!(target: "beefy", "🥩 Round #{} done: {}", round.1, done); - - done - } - - pub(crate) fn drop(&mut self, round: &(H, N)) -> Option>> { - trace!(target: "beefy", "🥩 About to drop round #{}", round.1); - - let signatures = self.rounds.remove(round)?.votes; - - Some( - self.validator_set - .validators() - .iter() - .map(|authority_id| { - signatures.iter().find_map(|(id, sig)| { - if id == authority_id { - Some(sig.clone()) - } else { - None - } - }) - }) - .collect(), - ) + trace!(target: "beefy", "🥩 Round #{} done: {}", round.1, done); + + if done { + // remove this and older (now stale) rounds + let signatures = self.rounds.remove(round)?.votes; + self.rounds.retain(|&(_, number), _| number > round.1); + self.best_done = self.best_done.clone().max(Some(round.1.clone())); + trace!(target: "beefy", "🥩 Concluded round #{}", round.1); + + Some( + self.validator_set + .validators() + .iter() + .map(|authority_id| signatures.get(authority_id).cloned()) + .collect(), + ) + } else { + None + } } } @@ -128,13 +190,52 @@ where mod tests { use sc_network_test::Block; use sp_core::H256; - use sp_runtime::traits::NumberFor; use beefy_primitives::{crypto::Public, ValidatorSet}; - use super::Rounds; + use super::{threshold, RoundTracker, Rounds}; use crate::keystore::tests::Keyring; + #[test] + fn round_tracker() { + let mut rt = RoundTracker::default(); + let bob_vote = (Keyring::Bob.public(), Keyring::Bob.sign(b"I am committed")); + let threshold = 2; + + // self vote not added yet + assert!(!rt.has_self_vote()); + + // adding new vote allowed + assert!(rt.add_vote(bob_vote.clone(), false)); + // adding existing vote not allowed + assert!(!rt.add_vote(bob_vote, false)); + + // self vote still not added yet + assert!(!rt.has_self_vote()); + + // vote is not done + assert!(!rt.is_done(threshold)); + + let alice_vote = (Keyring::Alice.public(), Keyring::Alice.sign(b"I am committed")); + // adding new vote (self vote this time) allowed + assert!(rt.add_vote(alice_vote, true)); + + // self vote registered + assert!(rt.has_self_vote()); + // vote is now done + assert!(rt.is_done(threshold)); + } + + #[test] + fn vote_threshold() { + assert_eq!(threshold(1), 1); + assert_eq!(threshold(2), 2); + assert_eq!(threshold(3), 3); + assert_eq!(threshold(4), 3); + assert_eq!(threshold(100), 67); + assert_eq!(threshold(300), 201); + } + #[test] fn new_rounds() { sp_tracing::try_init_simple(); @@ -145,116 +246,175 @@ mod tests { ) .unwrap(); - let rounds = Rounds::>::new(validators); - - assert_eq!(42, rounds.validator_set_id()); + let session_start = 1u64.into(); + let rounds = Rounds::::new(session_start, validators.clone(), validators); + assert_eq!(42, rounds.validator_set_id_for(session_start)); + assert_eq!(1, *rounds.session_start()); assert_eq!( &vec![Keyring::Alice.public(), Keyring::Bob.public(), Keyring::Charlie.public()], - rounds.validators() + rounds.validators_for(session_start) ); } #[test] - fn add_vote() { + fn add_and_conclude_votes() { sp_tracing::try_init_simple(); let validators = ValidatorSet::::new( - vec![Keyring::Alice.public(), Keyring::Bob.public(), Keyring::Charlie.public()], + vec![ + Keyring::Alice.public(), + Keyring::Bob.public(), + Keyring::Charlie.public(), + Keyring::Eve.public(), + ], Default::default(), ) .unwrap(); + let round = (H256::from_low_u64_le(1), 1); - let mut rounds = Rounds::>::new(validators); + let session_start = 1u64.into(); + let mut rounds = Rounds::::new(session_start, validators.clone(), validators); + // no self vote yet, should self vote + assert!(rounds.should_self_vote(&round)); + + // add 1st good vote assert!(rounds.add_vote( - &(H256::from_low_u64_le(1), 1), - (Keyring::Alice.public(), Keyring::Alice.sign(b"I am committed")) + &round, + (Keyring::Alice.public(), Keyring::Alice.sign(b"I am committed")), + true )); + // round not concluded + assert!(rounds.try_conclude(&round).is_none()); + // self vote already present, should not self vote + assert!(!rounds.should_self_vote(&round)); - assert!(!rounds.is_done(&(H256::from_low_u64_le(1), 1))); - - // invalid vote + // double voting not allowed assert!(!rounds.add_vote( - &(H256::from_low_u64_le(1), 1), - (Keyring::Dave.public(), Keyring::Dave.sign(b"I am committed")) + &round, + (Keyring::Alice.public(), Keyring::Alice.sign(b"I am committed")), + true )); - assert!(!rounds.is_done(&(H256::from_low_u64_le(1), 1))); + // invalid vote (Dave is not a validator) + assert!(!rounds.add_vote( + &round, + (Keyring::Dave.public(), Keyring::Dave.sign(b"I am committed")), + false + )); + assert!(rounds.try_conclude(&round).is_none()); + // add 2nd good vote assert!(rounds.add_vote( - &(H256::from_low_u64_le(1), 1), - (Keyring::Bob.public(), Keyring::Bob.sign(b"I am committed")) + &round, + (Keyring::Bob.public(), Keyring::Bob.sign(b"I am committed")), + false )); + // round not concluded + assert!(rounds.try_conclude(&round).is_none()); - assert!(!rounds.is_done(&(H256::from_low_u64_le(1), 1))); - + // add 3rd good vote assert!(rounds.add_vote( - &(H256::from_low_u64_le(1), 1), - (Keyring::Charlie.public(), Keyring::Charlie.sign(b"I am committed")) + &round, + (Keyring::Charlie.public(), Keyring::Charlie.sign(b"I am committed")), + false )); + // round concluded + assert!(rounds.try_conclude(&round).is_some()); - assert!(rounds.is_done(&(H256::from_low_u64_le(1), 1))); + // Eve is a validator, but round was concluded, adding vote disallowed + assert!(!rounds.add_vote( + &round, + (Keyring::Eve.public(), Keyring::Eve.sign(b"I am committed")), + false + )); } #[test] - fn drop() { + fn multiple_rounds() { sp_tracing::try_init_simple(); let validators = ValidatorSet::::new( - vec![Keyring::Alice.public(), Keyring::Bob.public(), Keyring::Charlie.public()], + vec![ + Keyring::Alice.public(), + Keyring::Bob.public(), + Keyring::Charlie.public(), + Keyring::Dave.public(), + ], Default::default(), ) .unwrap(); - let mut rounds = Rounds::>::new(validators); + let session_start = 1u64.into(); + let mut rounds = Rounds::::new(session_start, validators.clone(), validators); // round 1 - rounds.add_vote( + assert!(rounds.add_vote( &(H256::from_low_u64_le(1), 1), (Keyring::Alice.public(), Keyring::Alice.sign(b"I am committed")), - ); - rounds.add_vote( + true, + )); + assert!(rounds.add_vote( &(H256::from_low_u64_le(1), 1), (Keyring::Bob.public(), Keyring::Bob.sign(b"I am committed")), - ); + false, + )); + assert!(rounds.add_vote( + &(H256::from_low_u64_le(1), 1), + (Keyring::Charlie.public(), Keyring::Charlie.sign(b"I am committed")), + false, + )); // round 2 - rounds.add_vote( + assert!(rounds.add_vote( &(H256::from_low_u64_le(2), 2), (Keyring::Alice.public(), Keyring::Alice.sign(b"I am again committed")), - ); - rounds.add_vote( + true, + )); + assert!(rounds.add_vote( &(H256::from_low_u64_le(2), 2), (Keyring::Bob.public(), Keyring::Bob.sign(b"I am again committed")), - ); + false, + )); + assert!(rounds.add_vote( + &(H256::from_low_u64_le(2), 2), + (Keyring::Charlie.public(), Keyring::Charlie.sign(b"I am again committed")), + false, + )); // round 3 - rounds.add_vote( + assert!(rounds.add_vote( &(H256::from_low_u64_le(3), 3), (Keyring::Alice.public(), Keyring::Alice.sign(b"I am still committed")), - ); - rounds.add_vote( + true, + )); + assert!(rounds.add_vote( &(H256::from_low_u64_le(3), 3), (Keyring::Bob.public(), Keyring::Bob.sign(b"I am still committed")), - ); - + false, + )); + assert!(rounds.add_vote( + &(H256::from_low_u64_le(3), 3), + (Keyring::Charlie.public(), Keyring::Charlie.sign(b"I am still committed")), + false, + )); assert_eq!(3, rounds.rounds.len()); - // drop unknown round - assert!(rounds.drop(&(H256::from_low_u64_le(5), 5)).is_none()); + // conclude unknown round + assert!(rounds.try_conclude(&(H256::from_low_u64_le(5), 5)).is_none()); assert_eq!(3, rounds.rounds.len()); - // drop round 2 - let signatures = rounds.drop(&(H256::from_low_u64_le(2), 2)).unwrap(); - - assert_eq!(2, rounds.rounds.len()); + // conclude round 2 + let signatures = rounds.try_conclude(&(H256::from_low_u64_le(2), 2)).unwrap(); + assert_eq!(1, rounds.rounds.len()); assert_eq!( signatures, vec![ Some(Keyring::Alice.sign(b"I am again committed")), Some(Keyring::Bob.sign(b"I am again committed")), + Some(Keyring::Charlie.sign(b"I am again committed")), None ] ); diff --git a/client/beefy/src/tests.rs b/client/beefy/src/tests.rs new file mode 100644 index 0000000000000..92b5ad91c11e1 --- /dev/null +++ b/client/beefy/src/tests.rs @@ -0,0 +1,590 @@ +// This file is part of Substrate. + +// Copyright (C) 2018-2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Tests and test helpers for BEEFY. + +use futures::{future, stream::FuturesUnordered, Future, StreamExt}; +use parking_lot::Mutex; +use serde::{Deserialize, Serialize}; +use std::{sync::Arc, task::Poll}; +use tokio::{runtime::Runtime, time::Duration}; + +use sc_chain_spec::{ChainSpec, GenericChainSpec}; +use sc_client_api::HeaderBackend; +use sc_consensus::BoxJustificationImport; +use sc_keystore::LocalKeystore; +use sc_network::{config::ProtocolConfig, NetworkService}; +use sc_network_gossip::GossipEngine; +use sc_network_test::{ + Block, BlockImportAdapter, FullPeerConfig, PassThroughVerifier, Peer, PeersClient, + PeersFullClient, TestNetFactory, +}; +use sc_utils::notification::NotificationReceiver; + +use beefy_primitives::{ + crypto::AuthorityId, ConsensusLog, MmrRootHash, ValidatorSet, BEEFY_ENGINE_ID, + KEY_TYPE as BeefyKeyType, +}; +use sp_consensus::BlockOrigin; +use sp_core::H256; +use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr}; +use sp_runtime::{ + codec::Encode, generic::BlockId, traits::Header as HeaderT, BuildStorage, DigestItem, Storage, +}; + +use substrate_test_runtime_client::{runtime::Header, Backend, ClientExt}; + +use crate::{ + beefy_protocol_name, + keystore::tests::Keyring as BeefyKeyring, + notification::*, + worker::{tests::TestModifiers, BeefyWorker}, +}; + +const BEEFY_PROTOCOL_NAME: &'static str = "/beefy/1"; + +type BeefyValidatorSet = ValidatorSet; +type BeefyPeer = Peer; + +#[derive(Debug, Serialize, Deserialize)] +struct Genesis(std::collections::BTreeMap); +impl BuildStorage for Genesis { + fn assimilate_storage(&self, storage: &mut Storage) -> Result<(), String> { + storage + .top + .extend(self.0.iter().map(|(a, b)| (a.clone().into_bytes(), b.clone().into_bytes()))); + Ok(()) + } +} + +#[test] +fn beefy_protocol_name() { + let chain_spec = GenericChainSpec::::from_json_file(std::path::PathBuf::from( + "../chain-spec/res/chain_spec.json", + )) + .unwrap() + .cloned_box(); + + // Create protocol name using random genesis hash. + let genesis_hash = H256::random(); + let expected = format!("/{}/beefy/1", hex::encode(genesis_hash)); + let proto_name = beefy_protocol_name::standard_name(&genesis_hash, &chain_spec); + assert_eq!(proto_name.to_string(), expected); + + // Create protocol name using hardcoded genesis hash. Verify exact representation. + let genesis_hash = [ + 50, 4, 60, 123, 58, 106, 216, 246, 194, 188, 139, 193, 33, 212, 202, 171, 9, 55, 123, 94, + 8, 43, 12, 251, 187, 57, 173, 19, 188, 74, 205, 147, + ]; + let expected = + "/32043c7b3a6ad8f6c2bc8bc121d4caab09377b5e082b0cfbbb39ad13bc4acd93/beefy/1".to_string(); + let proto_name = beefy_protocol_name::standard_name(&genesis_hash, &chain_spec); + assert_eq!(proto_name.to_string(), expected); +} + +// TODO: compiler warns us about unused `signed_commitment_stream`, will use in later tests +#[allow(dead_code)] +#[derive(Clone)] +pub(crate) struct BeefyLinkHalf { + signed_commitment_stream: BeefySignedCommitmentStream, + beefy_best_block_stream: BeefyBestBlockStream, +} + +#[derive(Default)] +pub(crate) struct PeerData { + pub(crate) beefy_link_half: Mutex>, + pub(crate) test_modifiers: Option, +} + +impl PeerData { + pub(crate) fn use_validator_set(&mut self, validator_set: &ValidatorSet) { + if let Some(tm) = self.test_modifiers.as_mut() { + tm.active_validators = validator_set.clone(); + } else { + self.test_modifiers = Some(TestModifiers { + active_validators: validator_set.clone(), + corrupt_mmr_roots: false, + }); + } + } +} + +pub(crate) struct BeefyTestNet { + peers: Vec, +} + +impl BeefyTestNet { + pub(crate) fn new(n_authority: usize, n_full: usize) -> Self { + let mut net = BeefyTestNet { peers: Vec::with_capacity(n_authority + n_full) }; + for _ in 0..n_authority { + net.add_authority_peer(); + } + for _ in 0..n_full { + net.add_full_peer(); + } + net + } + + pub(crate) fn add_authority_peer(&mut self) { + self.add_full_peer_with_config(FullPeerConfig { + notifications_protocols: vec![BEEFY_PROTOCOL_NAME.into()], + is_authority: true, + ..Default::default() + }) + } + + pub(crate) fn generate_blocks( + &mut self, + count: usize, + session_length: u64, + validator_set: &BeefyValidatorSet, + ) { + self.peer(0).generate_blocks(count, BlockOrigin::File, |builder| { + let mut block = builder.build().unwrap().block; + + let block_num = *block.header.number(); + let num_byte = block_num.to_le_bytes().into_iter().next().unwrap(); + let mmr_root = MmrRootHash::repeat_byte(num_byte); + + add_mmr_digest(&mut block.header, mmr_root); + + if block_num % session_length == 0 { + add_auth_change_digest(&mut block.header, validator_set.clone()); + } + + block + }); + } +} + +impl TestNetFactory for BeefyTestNet { + type Verifier = PassThroughVerifier; + type BlockImport = PeersClient; + type PeerData = PeerData; + + /// Create new test network with peers and given config. + fn from_config(_config: &ProtocolConfig) -> Self { + BeefyTestNet { peers: Vec::new() } + } + + fn make_verifier( + &self, + _client: PeersClient, + _cfg: &ProtocolConfig, + _: &PeerData, + ) -> Self::Verifier { + PassThroughVerifier::new(false) // use non-instant finality. + } + + fn make_block_import( + &self, + client: PeersClient, + ) -> ( + BlockImportAdapter, + Option>, + Self::PeerData, + ) { + (client.as_block_import(), None, PeerData::default()) + } + + fn peer(&mut self, i: usize) -> &mut BeefyPeer { + &mut self.peers[i] + } + + fn peers(&self) -> &Vec { + &self.peers + } + + fn mut_peers)>(&mut self, closure: F) { + closure(&mut self.peers); + } + + fn add_full_peer(&mut self) { + self.add_full_peer_with_config(FullPeerConfig { + notifications_protocols: vec![BEEFY_PROTOCOL_NAME.into()], + is_authority: false, + ..Default::default() + }) + } +} + +fn add_mmr_digest(header: &mut Header, mmr_hash: MmrRootHash) { + header.digest_mut().push(DigestItem::Consensus( + BEEFY_ENGINE_ID, + ConsensusLog::::MmrRoot(mmr_hash).encode(), + )); +} + +fn add_auth_change_digest(header: &mut Header, new_auth_set: BeefyValidatorSet) { + header.digest_mut().push(DigestItem::Consensus( + BEEFY_ENGINE_ID, + ConsensusLog::::AuthoritiesChange(new_auth_set).encode(), + )); +} + +pub(crate) fn make_beefy_ids(keys: &[BeefyKeyring]) -> Vec { + keys.iter().map(|key| key.clone().public().into()).collect() +} + +pub(crate) fn create_beefy_keystore(authority: BeefyKeyring) -> SyncCryptoStorePtr { + let keystore = Arc::new(LocalKeystore::in_memory()); + SyncCryptoStore::ecdsa_generate_new(&*keystore, BeefyKeyType, Some(&authority.to_seed())) + .expect("Creates authority key"); + keystore +} + +pub(crate) fn create_beefy_worker( + peer: &BeefyPeer, + key: &BeefyKeyring, + min_block_delta: u32, +) -> BeefyWorker>> { + let keystore = create_beefy_keystore(*key); + + let (signed_commitment_sender, signed_commitment_stream) = + BeefySignedCommitmentStream::::channel(); + let (beefy_best_block_sender, beefy_best_block_stream) = + BeefyBestBlockStream::::channel(); + + let beefy_link_half = BeefyLinkHalf { signed_commitment_stream, beefy_best_block_stream }; + *peer.data.beefy_link_half.lock() = Some(beefy_link_half); + let test_modifiers = peer.data.test_modifiers.clone().unwrap(); + + let network = peer.network_service().clone(); + let sync_oracle = network.clone(); + let gossip_validator = Arc::new(crate::gossip::GossipValidator::new()); + let gossip_engine = + GossipEngine::new(network, BEEFY_PROTOCOL_NAME, gossip_validator.clone(), None); + let worker_params = crate::worker::WorkerParams { + client: peer.client().as_client(), + backend: peer.client().as_backend(), + key_store: Some(keystore).into(), + signed_commitment_sender, + beefy_best_block_sender, + gossip_engine, + gossip_validator, + min_block_delta, + metrics: None, + sync_oracle, + }; + + BeefyWorker::<_, _, _, _>::new(worker_params, test_modifiers) +} + +// Spawns beefy voters. Returns a future to spawn on the runtime. +fn initialize_beefy( + net: &mut BeefyTestNet, + peers: &[BeefyKeyring], + min_block_delta: u32, +) -> impl Future { + let voters = FuturesUnordered::new(); + + for (peer_id, key) in peers.iter().enumerate() { + let worker = create_beefy_worker(&net.peers[peer_id], key, min_block_delta); + let gadget = worker.run(); + + fn assert_send(_: &T) {} + assert_send(&gadget); + voters.push(gadget); + } + + voters.for_each(|_| async move {}) +} + +fn block_until(future: impl Future + Unpin, net: &Arc>, runtime: &mut Runtime) { + let drive_to_completion = futures::future::poll_fn(|cx| { + net.lock().poll(cx); + Poll::<()>::Pending + }); + runtime.block_on(future::select(future, drive_to_completion)); +} + +fn run_for(duration: Duration, net: &Arc>, runtime: &mut Runtime) { + let sleep = runtime.spawn(async move { tokio::time::sleep(duration).await }); + block_until(sleep, net, runtime); +} + +pub(crate) fn get_beefy_streams( + net: &mut BeefyTestNet, + peers: &[BeefyKeyring], +) -> (Vec>, Vec>>) { + let mut best_block_streams = Vec::new(); + let mut signed_commitment_streams = Vec::new(); + for peer_id in 0..peers.len() { + let beefy_link_half = + net.peer(peer_id).data.beefy_link_half.lock().as_ref().unwrap().clone(); + let BeefyLinkHalf { signed_commitment_stream, beefy_best_block_stream } = beefy_link_half; + best_block_streams.push(beefy_best_block_stream.subscribe()); + signed_commitment_streams.push(signed_commitment_stream.subscribe()); + } + (best_block_streams, signed_commitment_streams) +} + +fn wait_for_best_beefy_blocks( + streams: Vec>, + net: &Arc>, + runtime: &mut Runtime, + expected_beefy_blocks: &[u64], +) { + let mut wait_for = Vec::new(); + let len = expected_beefy_blocks.len(); + streams.into_iter().enumerate().for_each(|(i, stream)| { + let mut expected = expected_beefy_blocks.iter(); + wait_for.push(Box::pin(stream.take(len).for_each(move |best_beefy_hash| { + let expected = expected.next(); + async move { + let block_id = BlockId::hash(best_beefy_hash); + let header = + net.lock().peer(i).client().as_client().expect_header(block_id).unwrap(); + let best_beefy = *header.number(); + + assert_eq!(expected, Some(best_beefy).as_ref()); + } + }))); + }); + let wait_for = futures::future::join_all(wait_for); + block_until(wait_for, net, runtime); +} + +fn wait_for_beefy_signed_commitments( + streams: Vec>>, + net: &Arc>, + runtime: &mut Runtime, + expected_commitment_block_nums: &[u64], +) { + let mut wait_for = Vec::new(); + let len = expected_commitment_block_nums.len(); + streams.into_iter().for_each(|stream| { + let mut expected = expected_commitment_block_nums.iter(); + wait_for.push(Box::pin(stream.take(len).for_each(move |signed_commitment| { + let expected = expected.next(); + async move { + let commitment_block_num = signed_commitment.commitment.block_number; + assert_eq!(expected, Some(commitment_block_num).as_ref()); + // TODO: also verify commitment payload, validator set id, and signatures. + } + }))); + }); + let wait_for = futures::future::join_all(wait_for); + block_until(wait_for, net, runtime); +} + +fn streams_empty_after_timeout( + streams: Vec>, + net: &Arc>, + runtime: &mut Runtime, + timeout: Option, +) where + T: std::fmt::Debug, + T: std::cmp::PartialEq, +{ + if let Some(timeout) = timeout { + run_for(timeout, net, runtime); + } + streams.into_iter().for_each(|mut stream| { + runtime.block_on(future::poll_fn(move |cx| { + assert_eq!(stream.poll_next_unpin(cx), Poll::Pending); + Poll::Ready(()) + })); + }); +} + +fn finalize_block_and_wait_for_beefy( + net: &Arc>, + peers: &[BeefyKeyring], + runtime: &mut Runtime, + finalize_targets: &[u64], + expected_beefy: &[u64], +) { + let (best_blocks, signed_commitments) = get_beefy_streams(&mut *net.lock(), peers); + + for block in finalize_targets { + let finalize = BlockId::number(*block); + for i in 0..peers.len() { + net.lock().peer(i).client().as_client().finalize_block(finalize, None).unwrap(); + } + } + + if expected_beefy.is_empty() { + // run for 1 second then verify no new best beefy block available + let timeout = Some(Duration::from_millis(500)); + streams_empty_after_timeout(best_blocks, &net, runtime, timeout); + streams_empty_after_timeout(signed_commitments, &net, runtime, None); + } else { + // run until expected beefy blocks are received + wait_for_best_beefy_blocks(best_blocks, &net, runtime, expected_beefy); + wait_for_beefy_signed_commitments(signed_commitments, &net, runtime, expected_beefy); + } +} + +#[test] +fn beefy_finalizing_blocks() { + sp_tracing::try_init_simple(); + + let mut runtime = Runtime::new().unwrap(); + let peers = &[BeefyKeyring::Alice, BeefyKeyring::Bob]; + let validator_set = ValidatorSet::new(make_beefy_ids(peers), 0).unwrap(); + let session_len = 10; + let min_block_delta = 4; + + let mut net = BeefyTestNet::new(2, 0); + + for i in 0..peers.len() { + net.peer(i).data.use_validator_set(&validator_set); + } + runtime.spawn(initialize_beefy(&mut net, peers, min_block_delta)); + + // push 42 blocks including `AuthorityChange` digests every 10 blocks. + net.generate_blocks(42, session_len, &validator_set); + net.block_until_sync(); + + let net = Arc::new(Mutex::new(net)); + + // Minimum BEEFY block delta is 4. + + // finalize block #5 -> BEEFY should finalize #1 (mandatory) and #5 from diff-power-of-two rule. + finalize_block_and_wait_for_beefy(&net, peers, &mut runtime, &[5], &[1, 5]); + + // GRANDPA finalize #10 -> BEEFY finalize #10 (mandatory) + finalize_block_and_wait_for_beefy(&net, peers, &mut runtime, &[10], &[10]); + + // GRANDPA finalize #18 -> BEEFY finalize #14, then #18 (diff-power-of-two rule) + finalize_block_and_wait_for_beefy(&net, peers, &mut runtime, &[18], &[14, 18]); + + // GRANDPA finalize #20 -> BEEFY finalize #20 (mandatory) + finalize_block_and_wait_for_beefy(&net, peers, &mut runtime, &[20], &[20]); + + // GRANDPA finalize #21 -> BEEFY finalize nothing (yet) because min delta is 4 + finalize_block_and_wait_for_beefy(&net, peers, &mut runtime, &[21], &[]); +} + +#[test] +fn lagging_validators() { + sp_tracing::try_init_simple(); + + let mut runtime = Runtime::new().unwrap(); + let peers = &[BeefyKeyring::Charlie, BeefyKeyring::Dave]; + let validator_set = ValidatorSet::new(make_beefy_ids(peers), 0).unwrap(); + let session_len = 30; + let min_block_delta = 1; + + let mut net = BeefyTestNet::new(2, 0); + for i in 0..peers.len() { + net.peer(i).data.use_validator_set(&validator_set); + } + runtime.spawn(initialize_beefy(&mut net, peers, min_block_delta)); + + // push 42 blocks including `AuthorityChange` digests every 30 blocks. + net.generate_blocks(42, session_len, &validator_set); + net.block_until_sync(); + + let net = Arc::new(Mutex::new(net)); + + // finalize block #15 -> BEEFY should finalize #1 (mandatory) and #9, #13, #14, #15 from + // diff-power-of-two rule. + finalize_block_and_wait_for_beefy(&net, peers, &mut runtime, &[15], &[1, 9, 13, 14, 15]); + + // Charlie finalizes #25, Dave lags behind + let finalize = BlockId::number(25); + let (best_blocks, signed_commitments) = get_beefy_streams(&mut *net.lock(), peers); + net.lock().peer(0).client().as_client().finalize_block(finalize, None).unwrap(); + // verify nothing gets finalized by BEEFY + let timeout = Some(Duration::from_millis(500)); + streams_empty_after_timeout(best_blocks, &net, &mut runtime, timeout); + streams_empty_after_timeout(signed_commitments, &net, &mut runtime, None); + + // Dave catches up and also finalizes #25 + let (best_blocks, signed_commitments) = get_beefy_streams(&mut *net.lock(), peers); + net.lock().peer(1).client().as_client().finalize_block(finalize, None).unwrap(); + // expected beefy finalizes block #17 from diff-power-of-two + wait_for_best_beefy_blocks(best_blocks, &net, &mut runtime, &[23, 24, 25]); + wait_for_beefy_signed_commitments(signed_commitments, &net, &mut runtime, &[23, 24, 25]); + + // Both finalize #30 (mandatory session) and #32 -> BEEFY finalize #30 (mandatory), #31, #32 + finalize_block_and_wait_for_beefy(&net, peers, &mut runtime, &[30, 32], &[30, 31, 32]); +} + +#[test] +fn correct_beefy_payload() { + sp_tracing::try_init_simple(); + + let mut runtime = Runtime::new().unwrap(); + let peers = + &[BeefyKeyring::Alice, BeefyKeyring::Bob, BeefyKeyring::Charlie, BeefyKeyring::Dave]; + let validator_set = ValidatorSet::new(make_beefy_ids(peers), 0).unwrap(); + let session_len = 20; + let min_block_delta = 2; + + let mut net = BeefyTestNet::new(4, 0); + for i in 0..peers.len() { + net.peer(i).data.use_validator_set(&validator_set); + } + + // Dave will vote on bad mmr roots + net.peer(3).data.test_modifiers.as_mut().map(|tm| tm.corrupt_mmr_roots = true); + runtime.spawn(initialize_beefy(&mut net, peers, min_block_delta)); + + // push 10 blocks + net.generate_blocks(12, session_len, &validator_set); + net.block_until_sync(); + + let net = Arc::new(Mutex::new(net)); + // with 3 good voters and 1 bad one, consensus should happen and best blocks produced. + finalize_block_and_wait_for_beefy(&net, peers, &mut runtime, &[10], &[1, 9]); + + let (best_blocks, signed_commitments) = + get_beefy_streams(&mut *net.lock(), &[BeefyKeyring::Alice]); + + // now 2 good validators and 1 bad one are voting + net.lock() + .peer(0) + .client() + .as_client() + .finalize_block(BlockId::number(11), None) + .unwrap(); + net.lock() + .peer(1) + .client() + .as_client() + .finalize_block(BlockId::number(11), None) + .unwrap(); + net.lock() + .peer(3) + .client() + .as_client() + .finalize_block(BlockId::number(11), None) + .unwrap(); + + // verify consensus is _not_ reached + let timeout = Some(Duration::from_millis(500)); + streams_empty_after_timeout(best_blocks, &net, &mut runtime, timeout); + streams_empty_after_timeout(signed_commitments, &net, &mut runtime, None); + + // 3rd good validator catches up and votes as well + let (best_blocks, signed_commitments) = + get_beefy_streams(&mut *net.lock(), &[BeefyKeyring::Alice]); + net.lock() + .peer(2) + .client() + .as_client() + .finalize_block(BlockId::number(11), None) + .unwrap(); + + // verify consensus is reached + wait_for_best_beefy_blocks(best_blocks, &net, &mut runtime, &[11]); + wait_for_beefy_signed_commitments(signed_commitments, &net, &mut runtime, &[11]); +} diff --git a/client/beefy/src/worker.rs b/client/beefy/src/worker.rs index 3f23638758eca..85674c09a278b 100644 --- a/client/beefy/src/worker.rs +++ b/client/beefy/src/worker.rs @@ -16,7 +16,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use std::{collections::BTreeSet, fmt::Debug, marker::PhantomData, sync::Arc}; +use std::{collections::BTreeSet, fmt::Debug, marker::PhantomData, sync::Arc, time::Duration}; use codec::{Codec, Decode, Encode}; use futures::{future, FutureExt, StreamExt}; @@ -28,6 +28,7 @@ use sc_network_gossip::GossipEngine; use sp_api::BlockId; use sp_arithmetic::traits::AtLeast32Bit; +use sp_consensus::SyncOracle; use sp_runtime::{ generic::OpaqueDigestItemId, traits::{Block, Header, NumberFor}, @@ -35,7 +36,7 @@ use sp_runtime::{ }; use beefy_primitives::{ - crypto::{AuthorityId, Public, Signature}, + crypto::{AuthorityId, Signature}, known_payload_ids, BeefyApi, Commitment, ConsensusLog, MmrRootHash, Payload, SignedCommitment, ValidatorSet, VersionedFinalityProof, VoteMessage, BEEFY_ENGINE_ID, GENESIS_AUTHORITY_SET_ID, }; @@ -47,10 +48,11 @@ use crate::{ metric_inc, metric_set, metrics::Metrics, notification::{BeefyBestBlockSender, BeefySignedCommitmentSender}, - round, Client, + round::Rounds, + Client, }; -pub(crate) struct WorkerParams +pub(crate) struct WorkerParams where B: Block, { @@ -63,14 +65,16 @@ where pub gossip_validator: Arc>, pub min_block_delta: u32, pub metrics: Option, + pub sync_oracle: SO, } /// A BEEFY worker plays the BEEFY protocol -pub(crate) struct BeefyWorker +pub(crate) struct BeefyWorker where B: Block, BE: Backend, C: Client, + SO: SyncOracle + Send + Sync + Clone + 'static, { client: Arc, backend: Arc, @@ -81,26 +85,32 @@ where /// Min delta in block numbers between two blocks, BEEFY should vote on min_block_delta: u32, metrics: Option, - rounds: Option>>, + rounds: Option>, finality_notifications: FinalityNotifications, /// Best block we received a GRANDPA notification for - best_grandpa_block: NumberFor, + best_grandpa_block_header: ::Header, /// Best block a BEEFY voting round has been concluded for best_beefy_block: Option>, /// Used to keep RPC worker up to date on latest/best beefy beefy_best_block_sender: BeefyBestBlockSender, /// Validator set id for the last signed commitment last_signed_id: u64, + /// Handle to the sync oracle + sync_oracle: SO, // keep rustc happy _backend: PhantomData, + #[cfg(test)] + // behavior modifiers used in tests + test_res: tests::TestModifiers, } -impl BeefyWorker +impl BeefyWorker where B: Block + Codec, BE: Backend, C: Client, C::Api: BeefyApi, + SO: SyncOracle + Send + Sync + Clone + 'static, { /// Return a new BEEFY worker instance. /// @@ -108,7 +118,12 @@ where /// BEEFY pallet has been deployed on-chain. /// /// The BEEFY pallet is needed in order to keep track of the BEEFY authority set. - pub(crate) fn new(worker_params: WorkerParams) -> Self { + pub(crate) fn new( + worker_params: WorkerParams, + #[cfg(test)] + // behavior modifiers used in tests + test_res: tests::TestModifiers, + ) -> Self { let WorkerParams { client, backend, @@ -119,8 +134,13 @@ where gossip_validator, min_block_delta, metrics, + sync_oracle, } = worker_params; + let last_finalized_header = client + .expect_header(BlockId::number(client.info().finalized_number)) + .expect("latest block always has header available; qed."); + BeefyWorker { client: client.clone(), backend, @@ -128,200 +148,176 @@ where signed_commitment_sender, gossip_engine: Arc::new(Mutex::new(gossip_engine)), gossip_validator, - min_block_delta, + // always target at least one block better than current best beefy + min_block_delta: min_block_delta.max(1), metrics, rounds: None, finality_notifications: client.finality_notification_stream(), - best_grandpa_block: client.info().finalized_number, + best_grandpa_block_header: last_finalized_header, best_beefy_block: None, last_signed_id: 0, beefy_best_block_sender, + sync_oracle, _backend: PhantomData, + #[cfg(test)] + test_res, } } } -impl BeefyWorker +impl BeefyWorker where B: Block, BE: Backend, C: Client, C::Api: BeefyApi, + SO: SyncOracle + Send + Sync + Clone + 'static, { - /// Return `true`, if we should vote on block `number` - fn should_vote_on(&self, number: NumberFor) -> bool { - let best_beefy_block = if let Some(block) = self.best_beefy_block { - block - } else { - debug!(target: "beefy", "🥩 Missing best BEEFY block - won't vote for: {:?}", number); - return false - }; - - let target = vote_target(self.best_grandpa_block, best_beefy_block, self.min_block_delta); - - trace!(target: "beefy", "🥩 should_vote_on: #{:?}, next_block_to_vote_on: #{:?}", number, target); - - metric_set!(self, beefy_should_vote_on, target); - - number == target - } - - /// Return the current active validator set at header `header`. - /// - /// Note that the validator set could be `None`. This is the case if we don't find - /// a BEEFY authority set change and we can't fetch the authority set from the - /// BEEFY on-chain state. - /// - /// Such a failure is usually an indication that the BEEFY pallet has not been deployed (yet). - fn validator_set(&self, header: &B::Header) -> Option> { - let new = if let Some(new) = find_authorities_change::(header) { - Some(new) + /// Return `Some(number)` if we should be voting on block `number` now, + /// return `None` if there is no block we should vote on now. + fn current_vote_target(&self) -> Option> { + let rounds = if let Some(r) = &self.rounds { + r } else { - let at = BlockId::hash(header.hash()); - self.client.runtime_api().validator_set(&at).ok().flatten() + debug!(target: "beefy", "🥩 No voting round started"); + return None }; - trace!(target: "beefy", "🥩 active validator set: {:?}", new); - - new + let best_finalized = *self.best_grandpa_block_header.number(); + // `target` is guaranteed > `best_beefy` since `min_block_delta` is at least `1`. + let target = vote_target( + best_finalized, + self.best_beefy_block, + *rounds.session_start(), + self.min_block_delta, + ); + trace!( + target: "beefy", + "🥩 best beefy: #{:?}, best finalized: #{:?}, current_vote_target: {:?}", + self.best_beefy_block, + best_finalized, + target + ); + if let Some(target) = &target { + metric_set!(self, beefy_should_vote_on, target); + } + target } /// Verify `active` validator set for `block` against the key store /// - /// The critical case is, if we do have a public key in the key store which is not - /// part of the active validator set. + /// We want to make sure that we have _at least one_ key in our keystore that + /// is part of the validator set, that's because if there are no local keys + /// then we can't perform our job as a validator. /// /// Note that for a non-authority node there will be no keystore, and we will /// return an error and don't check. The error can usually be ignored. fn verify_validator_set( &self, block: &NumberFor, - active: &ValidatorSet, + active: &ValidatorSet, ) -> Result<(), error::Error> { - let active: BTreeSet<&Public> = active.validators().iter().collect(); + let active: BTreeSet<&AuthorityId> = active.validators().iter().collect(); let public_keys = self.key_store.public_keys()?; - let store: BTreeSet<&Public> = public_keys.iter().collect(); + let store: BTreeSet<&AuthorityId> = public_keys.iter().collect(); - let missing: Vec<_> = store.difference(&active).cloned().collect(); - - if !missing.is_empty() { - debug!(target: "beefy", "🥩 for block {:?} public key missing in validator set: {:?}", block, missing); + if store.intersection(&active).count() == 0 { + let msg = "no authority public key found in store".to_string(); + debug!(target: "beefy", "🥩 for block {:?} {}", block, msg); + Err(error::Error::Keystore(msg)) + } else { + Ok(()) } - - Ok(()) } - fn handle_finality_notification(&mut self, notification: FinalityNotification) { - trace!(target: "beefy", "🥩 Finality notification: {:?}", notification); - - // update best GRANDPA finalized block we have seen - self.best_grandpa_block = *notification.header.number(); - - if let Some(active) = self.validator_set(¬ification.header) { - // Authority set change or genesis set id triggers new voting rounds - // - // TODO: (grandpa-bridge-gadget#366) Enacting a new authority set will also - // implicitly 'conclude' the currently active BEEFY voting round by starting a - // new one. This should be replaced by proper round life-cycle handling. - if self.rounds.is_none() || - active.id() != self.rounds.as_ref().unwrap().validator_set_id() || - (active.id() == GENESIS_AUTHORITY_SET_ID && self.best_beefy_block.is_none()) - { - debug!(target: "beefy", "🥩 New active validator set id: {:?}", active); - metric_set!(self, beefy_validator_set_id, active.id()); - - // BEEFY should produce a signed commitment for each session - if active.id() != self.last_signed_id + 1 && active.id() != GENESIS_AUTHORITY_SET_ID - { - metric_inc!(self, beefy_skipped_sessions); - } - - if log_enabled!(target: "beefy", log::Level::Debug) { - // verify the new validator set - only do it if we're also logging the warning - let _ = self.verify_validator_set(notification.header.number(), &active); - } - - let id = active.id(); - self.rounds = Some(round::Rounds::new(active)); - - debug!(target: "beefy", "🥩 New Rounds for id: {:?}", id); - - self.best_beefy_block = Some(*notification.header.number()); + /// Set best BEEFY block to `block_num`. + /// + /// Also sends/updates the best BEEFY block hash to the RPC worker. + fn set_best_beefy_block(&mut self, block_num: NumberFor) { + if Some(block_num) > self.best_beefy_block { + // Try to get block hash ourselves. + let block_hash = match self.client.hash(block_num) { + Ok(h) => h, + Err(e) => { + error!(target: "beefy", "🥩 Failed to get hash for block number {}: {}", + block_num, e); + None + }, + }; + // Update RPC worker with new best BEEFY block hash. + block_hash.map(|hash| { self.beefy_best_block_sender - .notify(|| Ok::<_, ()>(notification.hash.clone())) - .expect("forwards closure result; the closure always returns Ok; qed."); - - // this metric is kind of 'fake'. Best BEEFY block should only be updated once we - // have a signed commitment for the block. Remove once the above TODO is done. - metric_set!(self, beefy_best_block, *notification.header.number()); - } + .notify(|| Ok::<_, ()>(hash)) + .expect("forwards closure result; the closure always returns Ok; qed.") + }); + // Set new best BEEFY block number. + self.best_beefy_block = Some(block_num); + metric_set!(self, beefy_best_block, block_num); + } else { + debug!(target: "beefy", "🥩 Can't set best beefy to older: {}", block_num); } + } - if self.should_vote_on(*notification.header.number()) { - let (validators, validator_set_id) = if let Some(rounds) = &self.rounds { - (rounds.validators(), rounds.validator_set_id()) - } else { - debug!(target: "beefy", "🥩 Missing validator set - can't vote for: {:?}", notification.header.hash()); - return - }; - let authority_id = if let Some(id) = self.key_store.authority_id(validators) { - debug!(target: "beefy", "🥩 Local authority id: {:?}", id); - id - } else { - debug!(target: "beefy", "🥩 Missing validator id - can't vote for: {:?}", notification.header.hash()); - return - }; - - let mmr_root = - if let Some(hash) = find_mmr_root_digest::(¬ification.header) { - hash - } else { - warn!(target: "beefy", "🥩 No MMR root digest found for: {:?}", notification.header.hash()); - return - }; - - let payload = Payload::new(known_payload_ids::MMR_ROOT_ID, mmr_root.encode()); - let commitment = Commitment { - payload, - block_number: notification.header.number(), - validator_set_id, - }; - let encoded_commitment = commitment.encode(); + /// Handle session changes by starting new voting round for mandatory blocks. + fn init_session_at(&mut self, active: ValidatorSet, session_start: NumberFor) { + debug!(target: "beefy", "🥩 New active validator set: {:?}", active); + metric_set!(self, beefy_validator_set_id, active.id()); + // BEEFY should produce a signed commitment for each session + if active.id() != self.last_signed_id + 1 && active.id() != GENESIS_AUTHORITY_SET_ID { + metric_inc!(self, beefy_skipped_sessions); + } - let signature = match self.key_store.sign(&authority_id, &*encoded_commitment) { - Ok(sig) => sig, - Err(err) => { - warn!(target: "beefy", "🥩 Error signing commitment: {:?}", err); - return - }, - }; + if log_enabled!(target: "beefy", log::Level::Debug) { + // verify the new validator set - only do it if we're also logging the warning + let _ = self.verify_validator_set(&session_start, &active); + } - trace!( - target: "beefy", - "🥩 Produced signature using {:?}, is_valid: {:?}", - authority_id, - BeefyKeystore::verify(&authority_id, &signature, &*encoded_commitment) - ); + let prev_validator_set = if let Some(r) = &self.rounds { + r.validator_set().clone() + } else { + // no previous rounds present use new validator set instead (genesis case) + active.clone() + }; + let id = active.id(); + self.rounds = Some(Rounds::new(session_start, active, prev_validator_set)); + info!(target: "beefy", "🥩 New Rounds for validator set id: {:?} with session_start {:?}", id, session_start); + } - let message = VoteMessage { commitment, id: authority_id, signature }; + fn handle_finality_notification(&mut self, notification: &FinalityNotification) { + trace!(target: "beefy", "🥩 Finality notification: {:?}", notification); + let number = *notification.header.number(); - let encoded_message = message.encode(); + // On start-up ignore old finality notifications that we're not interested in. + if number <= *self.best_grandpa_block_header.number() { + debug!(target: "beefy", "🥩 Got unexpected finality for old block #{:?}", number); + return + } - metric_inc!(self, beefy_votes_sent); + // update best GRANDPA finalized block we have seen + self.best_grandpa_block_header = notification.header.clone(); - debug!(target: "beefy", "🥩 Sent vote message: {:?}", message); + self.handle_finality(¬ification.header); + } - self.handle_vote( - (message.commitment.payload, *message.commitment.block_number), - (message.id, message.signature), - ); + fn handle_finality(&mut self, header: &B::Header) { + // Check for and handle potential new session. + if let Some(new_validator_set) = find_authorities_change::(header) { + self.init_session_at(new_validator_set, *header.number()); + } - self.gossip_engine.lock().gossip_message(topic::(), encoded_message, false); + // Vote if there's now a new vote target. + if let Some(target_number) = self.current_vote_target() { + self.do_vote(target_number); } } - fn handle_vote(&mut self, round: (Payload, NumberFor), vote: (Public, Signature)) { + fn handle_vote( + &mut self, + round: (Payload, NumberFor), + vote: (AuthorityId, Signature), + self_vote: bool, + ) { self.gossip_validator.note_round(round.1); let rounds = if let Some(rounds) = self.rounds.as_mut() { @@ -331,12 +327,12 @@ where return }; - let vote_added = rounds.add_vote(&round, vote); + if rounds.add_vote(&round, vote, self_vote) { + if let Some(signatures) = rounds.try_conclude(&round) { + self.gossip_validator.conclude_round(round.1); - if vote_added && rounds.is_done(&round) { - if let Some(signatures) = rounds.drop(&round) { // id is stored for skipped session metric calculation - self.last_signed_id = rounds.validator_set_id(); + self.last_signed_id = rounds.validator_set_id_for(round.1); let block_num = round.1; let commitment = Commitment { @@ -351,48 +347,167 @@ where info!(target: "beefy", "🥩 Round #{} concluded, committed: {:?}.", round.1, signed_commitment); - if self - .backend - .append_justification( - BlockId::Number(block_num), - ( - BEEFY_ENGINE_ID, - VersionedFinalityProof::V1(signed_commitment.clone()).encode(), - ), - ) - .is_err() - { - // just a trace, because until the round lifecycle is improved, we will - // conclude certain rounds multiple times. - trace!(target: "beefy", "🥩 Failed to append justification: {:?}", signed_commitment); + if let Err(e) = self.backend.append_justification( + BlockId::Number(block_num), + ( + BEEFY_ENGINE_ID, + VersionedFinalityProof::V1(signed_commitment.clone()).encode(), + ), + ) { + trace!(target: "beefy", "🥩 Error {:?} on appending justification: {:?}", e, signed_commitment); } self.signed_commitment_sender .notify(|| Ok::<_, ()>(signed_commitment)) .expect("forwards closure result; the closure always returns Ok; qed."); - self.best_beefy_block = Some(block_num); - if let Err(err) = self.client.hash(block_num).map(|h| { - if let Some(hash) = h { - self.beefy_best_block_sender - .notify(|| Ok::<_, ()>(hash)) - .expect("forwards closure result; the closure always returns Ok; qed."); - } - }) { - error!(target: "beefy", "🥩 Failed to get hash for block number {}: {}", - block_num, err); - } + self.set_best_beefy_block(block_num); - metric_set!(self, beefy_best_block, block_num); + // Vote if there's now a new vote target. + if let Some(target_number) = self.current_vote_target() { + self.do_vote(target_number); + } } } } + /// Create and gossip Signed Commitment for block number `target_number`. + /// + /// Also handle this self vote by calling `self.handle_vote()` for it. + fn do_vote(&mut self, target_number: NumberFor) { + trace!(target: "beefy", "🥩 Try voting on {}", target_number); + + // Most of the time we get here, `target` is actually `best_grandpa`, + // avoid asking `client` for header in that case. + let target_header = if target_number == *self.best_grandpa_block_header.number() { + self.best_grandpa_block_header.clone() + } else { + match self.client.expect_header(BlockId::Number(target_number)) { + Ok(h) => h, + Err(err) => { + debug!( + target: "beefy", + "🥩 Could not get header for block #{:?} (error: {:?}), skipping vote..", + target_number, + err + ); + return + }, + } + }; + let target_hash = target_header.hash(); + + let mmr_root = if let Some(hash) = self.extract_mmr_root_digest(&target_header) { + hash + } else { + warn!(target: "beefy", "🥩 No MMR root digest found for: {:?}", target_hash); + return + }; + let payload = Payload::new(known_payload_ids::MMR_ROOT_ID, mmr_root.encode()); + + let (validators, validator_set_id) = if let Some(rounds) = &self.rounds { + if !rounds.should_self_vote(&(payload.clone(), target_number)) { + debug!(target: "beefy", "🥩 Don't double vote for block number: {:?}", target_number); + return + } + (rounds.validators_for(target_number), rounds.validator_set_id_for(target_number)) + } else { + debug!(target: "beefy", "🥩 Missing validator set - can't vote for: {:?}", target_hash); + return + }; + let authority_id = if let Some(id) = self.key_store.authority_id(validators) { + debug!(target: "beefy", "🥩 Local authority id: {:?}", id); + id + } else { + debug!(target: "beefy", "🥩 Missing validator id - can't vote for: {:?}", target_hash); + return + }; + + let commitment = Commitment { payload, block_number: target_number, validator_set_id }; + let encoded_commitment = commitment.encode(); + + let signature = match self.key_store.sign(&authority_id, &*encoded_commitment) { + Ok(sig) => sig, + Err(err) => { + warn!(target: "beefy", "🥩 Error signing commitment: {:?}", err); + return + }, + }; + + trace!( + target: "beefy", + "🥩 Produced signature using {:?}, is_valid: {:?}", + authority_id, + BeefyKeystore::verify(&authority_id, &signature, &*encoded_commitment) + ); + + let message = VoteMessage { commitment, id: authority_id, signature }; + + let encoded_message = message.encode(); + + metric_inc!(self, beefy_votes_sent); + + debug!(target: "beefy", "🥩 Sent vote message: {:?}", message); + + self.handle_vote( + (message.commitment.payload, message.commitment.block_number), + (message.id, message.signature), + true, + ); + + self.gossip_engine.lock().gossip_message(topic::(), encoded_message, false); + } + + /// Wait for BEEFY runtime pallet to be available. + #[cfg(not(test))] + async fn wait_for_runtime_pallet(&mut self) { + self.client + .finality_notification_stream() + .take_while(|notif| { + let at = BlockId::hash(notif.header.hash()); + if let Some(active) = self.client.runtime_api().validator_set(&at).ok().flatten() { + if active.id() == GENESIS_AUTHORITY_SET_ID { + // When starting from genesis, there is no session boundary digest. + // Just initialize `rounds` to Block #1 as BEEFY mandatory block. + self.init_session_at(active, 1u32.into()); + } + // In all other cases, we just go without `rounds` initialized, meaning the + // worker won't vote until it witnesses a session change. + // Once we'll implement 'initial sync' (catch-up), the worker will be able to + // start voting right away. + self.handle_finality_notification(notif); + future::ready(false) + } else { + trace!(target: "beefy", "🥩 Finality notification: {:?}", notif); + trace!(target: "beefy", "🥩 Waiting for BEEFY pallet to become available..."); + future::ready(true) + } + }) + .for_each(|_| future::ready(())) + .await; + // get a new stream that provides _new_ notifications (from here on out) + self.finality_notifications = self.client.finality_notification_stream(); + } + + /// For tests don't use runtime pallet. Start rounds from block #1. + #[cfg(test)] + async fn wait_for_runtime_pallet(&mut self) { + let active = self.test_res.active_validators.clone(); + self.init_session_at(active, 1u32.into()); + } + + /// Main loop for BEEFY worker. + /// + /// Wait for BEEFY runtime pallet to be available, then start the main async loop + /// which is driven by finality notifications and gossiped votes. pub(crate) async fn run(mut self) { + info!(target: "beefy", "🥩 run BEEFY worker, best grandpa: #{:?}.", self.best_grandpa_block_header.number()); + self.wait_for_runtime_pallet().await; + let mut votes = Box::pin(self.gossip_engine.lock().messages_for(topic::()).filter_map( |notification| async move { debug!(target: "beefy", "🥩 Got vote message: {:?}", notification); - VoteMessage::, Public, Signature>::decode( + VoteMessage::, AuthorityId, Signature>::decode( &mut ¬ification.message[..], ) .ok() @@ -400,13 +515,18 @@ where )); loop { + while self.sync_oracle.is_major_syncing() { + debug!(target: "beefy", "Waiting for major sync to complete..."); + futures_timer::Delay::new(Duration::from_secs(5)).await; + } + let engine = self.gossip_engine.clone(); let gossip_engine = future::poll_fn(|cx| engine.lock().poll_unpin(cx)); futures::select! { notification = self.finality_notifications.next().fuse() => { if let Some(notification) = notification { - self.handle_finality_notification(notification); + self.handle_finality_notification(¬ification); } else { return; } @@ -416,6 +536,7 @@ where self.handle_vote( (vote.commitment.payload, vote.commitment.block_number), (vote.id, vote.signature), + false ); } else { return; @@ -428,20 +549,36 @@ where } } } + + /// Simple wrapper over mmr root extraction. + #[cfg(not(test))] + fn extract_mmr_root_digest(&self, header: &B::Header) -> Option { + find_mmr_root_digest::(header) + } + + /// For tests, have the option to modify mmr root. + #[cfg(test)] + fn extract_mmr_root_digest(&self, header: &B::Header) -> Option { + let mut mmr_root = find_mmr_root_digest::(header); + if self.test_res.corrupt_mmr_roots { + mmr_root.as_mut().map(|hash| *hash ^= MmrRootHash::random()); + } + mmr_root + } } /// Extract the MMR root hash from a digest in the given header, if it exists. -fn find_mmr_root_digest(header: &B::Header) -> Option +fn find_mmr_root_digest(header: &B::Header) -> Option where B: Block, - Id: Codec, { - header.digest().logs().iter().find_map(|log| { - match log.try_to::>(OpaqueDigestItemId::Consensus(&BEEFY_ENGINE_ID)) { - Some(ConsensusLog::MmrRoot(root)) => Some(root), - _ => None, - } - }) + let id = OpaqueDigestItemId::Consensus(&BEEFY_ENGINE_ID); + + let filter = |log: ConsensusLog| match log { + ConsensusLog::MmrRoot(root) => Some(root), + _ => None, + }; + header.digest().convert_first(|l| l.try_to(id).and_then(filter)) } /// Scan the `header` digest log for a BEEFY validator set change. Return either the new @@ -456,119 +593,402 @@ where ConsensusLog::AuthoritiesChange(validator_set) => Some(validator_set), _ => None, }; - header.digest().convert_first(|l| l.try_to(id).and_then(filter)) } -/// Calculate next block number to vote on -fn vote_target(best_grandpa: N, best_beefy: N, min_delta: u32) -> N +/// Calculate next block number to vote on. +/// +/// Return `None` if there is no voteable target yet. +fn vote_target( + best_grandpa: N, + best_beefy: Option, + session_start: N, + min_delta: u32, +) -> Option where N: AtLeast32Bit + Copy + Debug, { - let diff = best_grandpa.saturating_sub(best_beefy); - let diff = diff.saturated_into::(); - let target = best_beefy + min_delta.max(diff.next_power_of_two()).into(); - - trace!( - target: "beefy", - "🥩 vote target - diff: {:?}, next_power_of_two: {:?}, target block: #{:?}", - diff, - diff.next_power_of_two(), - target, - ); - - target + // if the mandatory block (session_start) does not have a beefy justification yet, + // we vote on it + let target = match best_beefy { + None => { + trace!( + target: "beefy", + "🥩 vote target - mandatory block: #{:?}", + session_start, + ); + session_start + }, + Some(bbb) if bbb < session_start => { + trace!( + target: "beefy", + "🥩 vote target - mandatory block: #{:?}", + session_start, + ); + session_start + }, + Some(bbb) => { + let diff = best_grandpa.saturating_sub(bbb) + 1u32.into(); + let diff = diff.saturated_into::() / 2; + let target = bbb + min_delta.max(diff.next_power_of_two()).into(); + + trace!( + target: "beefy", + "🥩 vote target - diff: {:?}, next_power_of_two: {:?}, target block: #{:?}", + diff, + diff.next_power_of_two(), + target, + ); + + target + }, + }; + + // Don't vote for targets until they've been finalized + // (`target` can be > `best_grandpa` when `min_delta` is big enough). + if target > best_grandpa { + None + } else { + Some(target) + } } #[cfg(test)] -mod tests { - use super::vote_target; +pub(crate) mod tests { + use super::*; + use crate::{ + keystore::tests::Keyring, + tests::{create_beefy_worker, get_beefy_streams, make_beefy_ids, BeefyTestNet}, + }; + + use futures::{executor::block_on, future::poll_fn, task::Poll}; + + use sc_client_api::HeaderBackend; + use sc_network::NetworkService; + use sc_network_test::{PeersFullClient, TestNetFactory}; + use sp_api::HeaderT; + use substrate_test_runtime_client::{ + runtime::{Block, Digest, DigestItem, Header, H256}, + Backend, + }; + + #[derive(Clone)] + pub struct TestModifiers { + pub active_validators: ValidatorSet, + pub corrupt_mmr_roots: bool, + } #[test] fn vote_on_min_block_delta() { - let t = vote_target(1u32, 0, 4); - assert_eq!(4, t); - let t = vote_target(2u32, 0, 4); - assert_eq!(4, t); - let t = vote_target(3u32, 0, 4); - assert_eq!(4, t); - let t = vote_target(4u32, 0, 4); - assert_eq!(4, t); - - let t = vote_target(4u32, 4, 4); - assert_eq!(8, t); - - let t = vote_target(10u32, 10, 4); - assert_eq!(14, t); - let t = vote_target(11u32, 10, 4); - assert_eq!(14, t); - let t = vote_target(12u32, 10, 4); - assert_eq!(14, t); - let t = vote_target(13u32, 10, 4); - assert_eq!(14, t); - - let t = vote_target(10u32, 10, 8); - assert_eq!(18, t); - let t = vote_target(11u32, 10, 8); - assert_eq!(18, t); - let t = vote_target(12u32, 10, 8); - assert_eq!(18, t); - let t = vote_target(13u32, 10, 8); - assert_eq!(18, t); + let t = vote_target(1u32, Some(1), 1, 4); + assert_eq!(None, t); + let t = vote_target(2u32, Some(1), 1, 4); + assert_eq!(None, t); + let t = vote_target(4u32, Some(2), 1, 4); + assert_eq!(None, t); + let t = vote_target(6u32, Some(2), 1, 4); + assert_eq!(Some(6), t); + + let t = vote_target(9u32, Some(4), 1, 4); + assert_eq!(Some(8), t); + + let t = vote_target(10u32, Some(10), 1, 8); + assert_eq!(None, t); + let t = vote_target(12u32, Some(10), 1, 8); + assert_eq!(None, t); + let t = vote_target(18u32, Some(10), 1, 8); + assert_eq!(Some(18), t); } #[test] fn vote_on_power_of_two() { - let t = vote_target(1008u32, 1000, 4); - assert_eq!(1008, t); + let t = vote_target(1008u32, Some(1000), 1, 4); + assert_eq!(Some(1004), t); - let t = vote_target(1016u32, 1000, 4); - assert_eq!(1016, t); + let t = vote_target(1016u32, Some(1000), 1, 4); + assert_eq!(Some(1008), t); - let t = vote_target(1032u32, 1000, 4); - assert_eq!(1032, t); + let t = vote_target(1032u32, Some(1000), 1, 4); + assert_eq!(Some(1016), t); - let t = vote_target(1064u32, 1000, 4); - assert_eq!(1064, t); + let t = vote_target(1064u32, Some(1000), 1, 4); + assert_eq!(Some(1032), t); - let t = vote_target(1128u32, 1000, 4); - assert_eq!(1128, t); + let t = vote_target(1128u32, Some(1000), 1, 4); + assert_eq!(Some(1064), t); - let t = vote_target(1256u32, 1000, 4); - assert_eq!(1256, t); + let t = vote_target(1256u32, Some(1000), 1, 4); + assert_eq!(Some(1128), t); - let t = vote_target(1512u32, 1000, 4); - assert_eq!(1512, t); + let t = vote_target(1512u32, Some(1000), 1, 4); + assert_eq!(Some(1256), t); - let t = vote_target(1024u32, 0, 4); - assert_eq!(1024, t); + let t = vote_target(1024u32, Some(1), 1, 4); + assert_eq!(Some(513), t); } #[test] fn vote_on_target_block() { - let t = vote_target(1008u32, 1002, 4); - assert_eq!(1010, t); - let t = vote_target(1010u32, 1002, 4); - assert_eq!(1010, t); - - let t = vote_target(1016u32, 1006, 4); - assert_eq!(1022, t); - let t = vote_target(1022u32, 1006, 4); - assert_eq!(1022, t); - - let t = vote_target(1032u32, 1012, 4); - assert_eq!(1044, t); - let t = vote_target(1044u32, 1012, 4); - assert_eq!(1044, t); - - let t = vote_target(1064u32, 1014, 4); - assert_eq!(1078, t); - let t = vote_target(1078u32, 1014, 4); - assert_eq!(1078, t); - - let t = vote_target(1128u32, 1008, 4); - assert_eq!(1136, t); - let t = vote_target(1136u32, 1008, 4); - assert_eq!(1136, t); + let t = vote_target(1008u32, Some(1002), 1, 4); + assert_eq!(Some(1006), t); + let t = vote_target(1010u32, Some(1002), 1, 4); + assert_eq!(Some(1006), t); + + let t = vote_target(1016u32, Some(1006), 1, 4); + assert_eq!(Some(1014), t); + let t = vote_target(1022u32, Some(1006), 1, 4); + assert_eq!(Some(1014), t); + + let t = vote_target(1032u32, Some(1012), 1, 4); + assert_eq!(Some(1028), t); + let t = vote_target(1044u32, Some(1012), 1, 4); + assert_eq!(Some(1028), t); + + let t = vote_target(1064u32, Some(1014), 1, 4); + assert_eq!(Some(1046), t); + let t = vote_target(1078u32, Some(1014), 1, 4); + assert_eq!(Some(1046), t); + + let t = vote_target(1128u32, Some(1008), 1, 4); + assert_eq!(Some(1072), t); + let t = vote_target(1136u32, Some(1008), 1, 4); + assert_eq!(Some(1072), t); + } + + #[test] + fn vote_on_mandatory_block() { + let t = vote_target(1008u32, Some(1002), 1004, 4); + assert_eq!(Some(1004), t); + let t = vote_target(1016u32, Some(1006), 1007, 4); + assert_eq!(Some(1007), t); + let t = vote_target(1064u32, Some(1014), 1063, 4); + assert_eq!(Some(1063), t); + let t = vote_target(1320u32, Some(1012), 1234, 4); + assert_eq!(Some(1234), t); + + let t = vote_target(1128u32, Some(1008), 1008, 4); + assert_eq!(Some(1072), t); + } + + #[test] + fn extract_authorities_change_digest() { + let mut header = Header::new( + 1u32.into(), + Default::default(), + Default::default(), + Default::default(), + Digest::default(), + ); + + // verify empty digest shows nothing + assert!(find_authorities_change::(&header).is_none()); + + let peers = &[Keyring::One, Keyring::Two]; + let id = 42; + let validator_set = ValidatorSet::new(make_beefy_ids(peers), id).unwrap(); + header.digest_mut().push(DigestItem::Consensus( + BEEFY_ENGINE_ID, + ConsensusLog::::AuthoritiesChange(validator_set.clone()).encode(), + )); + + // verify validator set is correctly extracted from digest + let extracted = find_authorities_change::(&header); + assert_eq!(extracted, Some(validator_set)); + } + + #[test] + fn extract_mmr_root_digest() { + let mut header = Header::new( + 1u32.into(), + Default::default(), + Default::default(), + Default::default(), + Digest::default(), + ); + + // verify empty digest shows nothing + assert!(find_mmr_root_digest::(&header).is_none()); + + let mmr_root_hash = H256::random(); + header.digest_mut().push(DigestItem::Consensus( + BEEFY_ENGINE_ID, + ConsensusLog::::MmrRoot(mmr_root_hash.clone()).encode(), + )); + + // verify validator set is correctly extracted from digest + let extracted = find_mmr_root_digest::(&header); + assert_eq!(extracted, Some(mmr_root_hash)); + } + + #[test] + fn should_vote_target() { + let keys = &[Keyring::Alice]; + let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap(); + let mut net = BeefyTestNet::new(1, 0); + net.peer(0).data.use_validator_set(&validator_set); + let mut worker = create_beefy_worker(&net.peer(0), &keys[0], 1); + + // rounds not initialized -> should vote: `None` + assert_eq!(worker.current_vote_target(), None); + + let set_up = |worker: &mut BeefyWorker< + Block, + PeersFullClient, + Backend, + Arc>, + >, + best_grandpa: u64, + best_beefy: Option, + session_start: u64, + min_delta: u32| { + let grandpa_header = Header::new( + best_grandpa, + Default::default(), + Default::default(), + Default::default(), + Default::default(), + ); + worker.best_grandpa_block_header = grandpa_header; + worker.best_beefy_block = best_beefy; + worker.min_block_delta = min_delta; + worker.rounds = + Some(Rounds::new(session_start, validator_set.clone(), validator_set.clone())); + }; + + // under min delta + set_up(&mut worker, 1, Some(1), 1, 4); + assert_eq!(worker.current_vote_target(), None); + set_up(&mut worker, 5, Some(2), 1, 4); + assert_eq!(worker.current_vote_target(), None); + + // vote on min delta + set_up(&mut worker, 9, Some(4), 1, 4); + assert_eq!(worker.current_vote_target(), Some(8)); + set_up(&mut worker, 18, Some(10), 1, 8); + assert_eq!(worker.current_vote_target(), Some(18)); + + // vote on power of two + set_up(&mut worker, 1008, Some(1000), 1, 1); + assert_eq!(worker.current_vote_target(), Some(1004)); + set_up(&mut worker, 1016, Some(1000), 1, 2); + assert_eq!(worker.current_vote_target(), Some(1008)); + + // nothing new to vote on + set_up(&mut worker, 1000, Some(1000), 1, 1); + assert_eq!(worker.current_vote_target(), None); + + // vote on mandatory + set_up(&mut worker, 1008, None, 1000, 8); + assert_eq!(worker.current_vote_target(), Some(1000)); + set_up(&mut worker, 1008, Some(1000), 1001, 8); + assert_eq!(worker.current_vote_target(), Some(1001)); + } + + #[test] + fn keystore_vs_validator_set() { + let keys = &[Keyring::Alice]; + let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap(); + let mut net = BeefyTestNet::new(1, 0); + net.peer(0).data.use_validator_set(&validator_set); + let mut worker = create_beefy_worker(&net.peer(0), &keys[0], 1); + + // keystore doesn't contain other keys than validators' + assert_eq!(worker.verify_validator_set(&1, &validator_set), Ok(())); + + // unknown `Bob` key + let keys = &[Keyring::Bob]; + let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap(); + let err_msg = "no authority public key found in store".to_string(); + let expected = Err(error::Error::Keystore(err_msg)); + assert_eq!(worker.verify_validator_set(&1, &validator_set), expected); + + // worker has no keystore + worker.key_store = None.into(); + let expected_err = Err(error::Error::Keystore("no Keystore".into())); + assert_eq!(worker.verify_validator_set(&1, &validator_set), expected_err); + } + + #[test] + fn setting_best_beefy_block() { + let keys = &[Keyring::Alice]; + let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap(); + let mut net = BeefyTestNet::new(1, 0); + net.peer(0).data.use_validator_set(&validator_set); + let mut worker = create_beefy_worker(&net.peer(0), &keys[0], 1); + + let (mut best_block_streams, _) = get_beefy_streams(&mut net, keys); + let mut best_block_stream = best_block_streams.drain(..).next().unwrap(); + + // no 'best beefy block' + assert_eq!(worker.best_beefy_block, None); + block_on(poll_fn(move |cx| { + assert_eq!(best_block_stream.poll_next_unpin(cx), Poll::Pending); + Poll::Ready(()) + })); + + // unknown hash for block #1 + let (mut best_block_streams, _) = get_beefy_streams(&mut net, keys); + let mut best_block_stream = best_block_streams.drain(..).next().unwrap(); + worker.set_best_beefy_block(1); + assert_eq!(worker.best_beefy_block, Some(1)); + block_on(poll_fn(move |cx| { + assert_eq!(best_block_stream.poll_next_unpin(cx), Poll::Pending); + Poll::Ready(()) + })); + + // generate 2 blocks, try again expect success + let (mut best_block_streams, _) = get_beefy_streams(&mut net, keys); + let mut best_block_stream = best_block_streams.drain(..).next().unwrap(); + net.generate_blocks(2, 10, &validator_set); + + worker.set_best_beefy_block(2); + assert_eq!(worker.best_beefy_block, Some(2)); + block_on(poll_fn(move |cx| { + match best_block_stream.poll_next_unpin(cx) { + // expect Some(hash-of-block-2) + Poll::Ready(Some(hash)) => { + let block_num = net.peer(0).client().as_client().number(hash).unwrap(); + assert_eq!(block_num, Some(2)); + }, + v => panic!("unexpected value: {:?}", v), + } + Poll::Ready(()) + })); + } + + #[test] + fn setting_initial_session() { + let keys = &[Keyring::Alice]; + let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap(); + let mut net = BeefyTestNet::new(1, 0); + net.peer(0).data.use_validator_set(&validator_set); + let mut worker = create_beefy_worker(&net.peer(0), &keys[0], 1); + + assert!(worker.rounds.is_none()); + + // verify setting the correct validator sets and boundary for genesis session + worker.init_session_at(validator_set.clone(), 1); + + let worker_rounds = worker.rounds.as_ref().unwrap(); + assert_eq!(worker_rounds.validator_set(), &validator_set); + assert_eq!(worker_rounds.session_start(), &1); + // in genesis case both current and prev validator sets are the same + assert_eq!(worker_rounds.validator_set_id_for(1), validator_set.id()); + assert_eq!(worker_rounds.validator_set_id_for(2), validator_set.id()); + + // new validator set + let keys = &[Keyring::Bob]; + let new_validator_set = ValidatorSet::new(make_beefy_ids(keys), 1).unwrap(); + + // verify setting the correct validator sets and boundary for non-genesis session + worker.init_session_at(new_validator_set.clone(), 11); + + let worker_rounds = worker.rounds.as_ref().unwrap(); + assert_eq!(worker_rounds.validator_set(), &new_validator_set); + assert_eq!(worker_rounds.session_start(), &11); + // mandatory block gets prev set, further blocks get new set + assert_eq!(worker_rounds.validator_set_id_for(11), validator_set.id()); + assert_eq!(worker_rounds.validator_set_id_for(12), new_validator_set.id()); + assert_eq!(worker_rounds.validator_set_id_for(13), new_validator_set.id()); } } diff --git a/frame/beefy/src/lib.rs b/frame/beefy/src/lib.rs index 4aa1d1337cd0a..744a06561e8c2 100644 --- a/frame/beefy/src/lib.rs +++ b/frame/beefy/src/lib.rs @@ -105,20 +105,20 @@ impl Pallet { } fn change_authorities(new: Vec, queued: Vec) { - // As in GRANDPA, we trigger a validator set change only if the the validator - // set has actually changed. - if new != Self::authorities() { - >::put(&new); - - let next_id = Self::validator_set_id() + 1u64; - >::put(next_id); - if let Some(validator_set) = ValidatorSet::::new(new, next_id) { - let log = DigestItem::Consensus( - BEEFY_ENGINE_ID, - ConsensusLog::AuthoritiesChange(validator_set).encode(), - ); - >::deposit_log(log); - } + // Always issue a change if `session` says that the validators have changed. + // Even if their session keys are the same as before, the underlying economic + // identities have changed. Furthermore, the digest below is used to signal + // BEEFY mandatory blocks. + >::put(&new); + + let next_id = Self::validator_set_id() + 1u64; + >::put(next_id); + if let Some(validator_set) = ValidatorSet::::new(new, next_id) { + let log = DigestItem::Consensus( + BEEFY_ENGINE_ID, + ConsensusLog::AuthoritiesChange(validator_set).encode(), + ); + >::deposit_log(log); } >::put(&queued); diff --git a/primitives/beefy/Cargo.toml b/primitives/beefy/Cargo.toml index 4aa53aff2c3cb..cf901f4a34fc6 100644 --- a/primitives/beefy/Cargo.toml +++ b/primitives/beefy/Cargo.toml @@ -4,8 +4,13 @@ version = "4.0.0-dev" authors = ["Parity Technologies "] edition = "2021" license = "Apache-2.0" +homepage = "https://substrate.io" repository = "https://github.com/paritytech/substrate" description = "Primitives for BEEFY protocol." +readme = "README.md" + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] [dependencies] codec = { version = "3.0.0", package = "parity-scale-codec", default-features = false, features = ["derive"] } diff --git a/test-utils/runtime/Cargo.toml b/test-utils/runtime/Cargo.toml index 8c61cbbf8adbe..ad7dca5e08fb1 100644 --- a/test-utils/runtime/Cargo.toml +++ b/test-utils/runtime/Cargo.toml @@ -13,6 +13,7 @@ publish = false targets = ["x86_64-unknown-linux-gnu"] [dependencies] +beefy-primitives = { version = "4.0.0-dev", default-features = false, path = "../../primitives/beefy" } sp-application-crypto = { version = "6.0.0", default-features = false, path = "../../primitives/application-crypto" } sp-consensus-aura = { version = "0.10.0-dev", default-features = false, path = "../../primitives/consensus/aura" } sp-consensus-babe = { version = "0.10.0-dev", default-features = false, path = "../../primitives/consensus/babe" } @@ -65,6 +66,7 @@ default = [ "std", ] std = [ + "beefy-primitives/std", "sp-application-crypto/std", "sp-consensus-aura/std", "sp-consensus-babe/std", diff --git a/test-utils/runtime/src/lib.rs b/test-utils/runtime/src/lib.rs index 861d95efb3087..743652a0ee899 100644 --- a/test-utils/runtime/src/lib.rs +++ b/test-utils/runtime/src/lib.rs @@ -926,6 +926,12 @@ cfg_if! { } } + impl beefy_primitives::BeefyApi for RuntimeApi { + fn validator_set() -> Option> { + None + } + } + impl frame_system_rpc_runtime_api::AccountNonceApi for Runtime { fn account_nonce(_account: AccountId) -> Index { 0