From 0cd747cc8398275f3218ce5fcf6f7bc10bdb9a18 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Fri, 31 Jan 2020 19:59:49 +0100 Subject: [PATCH] Refactor im-online and print more debug info. (#4771) * Initial version. * Fix tests. * Refactor using StorageValueRef. * Add tests and apply review suggestions. * Bump runtime. Co-authored-by: Gavin Wood --- bin/node/runtime/src/lib.rs | 4 +- frame/im-online/src/lib.rs | 336 +++++++++++---------- frame/im-online/src/tests.rs | 13 +- primitives/runtime/src/offchain/mod.rs | 1 + primitives/runtime/src/offchain/storage.rs | 156 ++++++++++ 5 files changed, 353 insertions(+), 157 deletions(-) create mode 100644 primitives/runtime/src/offchain/storage.rs diff --git a/bin/node/runtime/src/lib.rs b/bin/node/runtime/src/lib.rs index df4423f85070d..a4a1c90665249 100644 --- a/bin/node/runtime/src/lib.rs +++ b/bin/node/runtime/src/lib.rs @@ -80,8 +80,8 @@ pub const VERSION: RuntimeVersion = RuntimeVersion { // and set impl_version to 0. If only runtime // implementation changes and behavior does not, then leave spec_version as // is and increment impl_version. - spec_version: 210, - impl_version: 1, + spec_version: 211, + impl_version: 0, apis: RUNTIME_API_VERSIONS, }; diff --git a/frame/im-online/src/lib.rs b/frame/im-online/src/lib.rs index ab1c6d7fd3e9e..ff06cb37d4e00 100644 --- a/frame/im-online/src/lib.rs +++ b/frame/im-online/src/lib.rs @@ -72,13 +72,14 @@ mod tests; use sp_application_crypto::RuntimeAppPublic; use codec::{Encode, Decode}; -use sp_core::offchain::{OpaqueNetworkState, StorageKind}; +use sp_core::offchain::OpaqueNetworkState; use sp_std::prelude::*; use sp_std::convert::TryInto; use pallet_session::historical::IdentificationTuple; use sp_runtime::{ + offchain::storage::StorageValueRef, RuntimeDebug, - traits::{Convert, Member, Printable, Saturating}, Perbill, + traits::{Convert, Member, Saturating, SimpleArithmetic}, Perbill, transaction_validity::{ TransactionValidity, ValidTransaction, InvalidTransaction, TransactionPriority, @@ -89,7 +90,7 @@ use sp_staking::{ offence::{ReportOffence, Offence, Kind}, }; use frame_support::{ - decl_module, decl_event, decl_storage, print, Parameter, debug, decl_error, + decl_module, decl_event, decl_storage, Parameter, debug, decl_error, traits::Get, }; use frame_system::{self as system, ensure_none}; @@ -129,37 +130,70 @@ pub mod ed25519 { pub type AuthorityId = app_ed25519::Public; } -/// The local storage database key under which the worker progress status -/// is tracked. -const DB_KEY: &[u8] = b"parity/im-online-worker-status"; +const DB_PREFIX: &[u8] = b"parity/im-online-heartbeat/"; +/// How many blocks do we wait for heartbeat transaction to be included +/// before sending another one. +const INCLUDE_THRESHOLD: u32 = 3; -/// It's important to persist the worker state, since e.g. the -/// server could be restarted while starting the gossip process, but before -/// finishing it. With every execution of the off-chain worker we check -/// if we need to recover and resume gossipping or if there is already -/// another off-chain worker in the process of gossipping. +/// Status of the offchain worker code. +/// +/// This stores the block number at which heartbeat was requested and when the worker +/// has actually managed to produce it. +/// Note we store such status for every `authority_index` separately. #[derive(Encode, Decode, Clone, PartialEq, Eq, RuntimeDebug)] -struct WorkerStatus { - done: bool, - gossipping_at: BlockNumber, +struct HeartbeatStatus { + /// An index of the session that we are supposed to send heartbeat for. + pub session_index: SessionIndex, + /// A block number at which the heartbeat for that session has been actually sent. + /// + /// It may be 0 in case the sending failed. In such case we should just retry + /// as soon as possible (i.e. in a worker running for the next block). + pub sent_at: BlockNumber, +} + +impl HeartbeatStatus { + /// Returns true if heartbeat has been recently sent. + /// + /// Parameters: + /// `session_index` - index of current session. + /// `now` - block at which the offchain worker is running. + /// + /// This function will return `true` iff: + /// 1. the session index is the same (we don't care if it went up or down) + /// 2. the heartbeat has been sent recently (within the threshold) + /// + /// The reasoning for 1. is that it's better to send an extra heartbeat than + /// to stall or not send one in case of a bug. + fn is_recent(&self, session_index: SessionIndex, now: BlockNumber) -> bool { + self.session_index == session_index && self.sent_at + INCLUDE_THRESHOLD.into() > now + } } /// Error which may occur while executing the off-chain code. -#[derive(RuntimeDebug)] -enum OffchainErr { - DecodeWorkerStatus, +#[cfg_attr(test, derive(PartialEq))] +enum OffchainErr { + TooEarly(BlockNumber), + WaitingForInclusion(BlockNumber), + AlreadyOnline(u32), FailedSigning, + FailedToAcquireLock, NetworkState, SubmitTransaction, } -impl Printable for OffchainErr { - fn print(&self) { - match self { - OffchainErr::DecodeWorkerStatus => print("Offchain error: decoding WorkerStatus failed!"), - OffchainErr::FailedSigning => print("Offchain error: signing failed!"), - OffchainErr::NetworkState => print("Offchain error: fetching network state failed!"), - OffchainErr::SubmitTransaction => print("Offchain error: submitting transaction failed!"), +impl sp_std::fmt::Debug for OffchainErr { + fn fmt(&self, fmt: &mut sp_std::fmt::Formatter) -> sp_std::fmt::Result { + match *self { + OffchainErr::TooEarly(ref block) => + write!(fmt, "Too early to send heartbeat, next expected at {:?}", block), + OffchainErr::WaitingForInclusion(ref block) => + write!(fmt, "Heartbeat already sent at {:?}. Waiting for inclusion.", block), + OffchainErr::AlreadyOnline(auth_idx) => + write!(fmt, "Authority {} is already online", auth_idx), + OffchainErr::FailedSigning => write!(fmt, "Failed to sign heartbeat"), + OffchainErr::FailedToAcquireLock => write!(fmt, "Failed to acquire lock"), + OffchainErr::NetworkState => write!(fmt, "Failed to fetch network state"), + OffchainErr::SubmitTransaction => write!(fmt, "Failed to submit transaction"), } } } @@ -197,7 +231,9 @@ pub trait Trait: frame_system::Trait + pallet_session::historical::Trait { /// An expected duration of the session. /// /// This parameter is used to determine the longevity of `heartbeat` transaction - /// and a rough time when the heartbeat should be sent. + /// and a rough time when we should start considering sending hearbeats, + /// since the workers avoids sending them at the very beginning of the session, assuming + /// there is a chance the authority will produce a block and they won't be necessary. type SessionDuration: Get; /// A type that gives us the ability to submit unresponsiveness offence reports. @@ -225,8 +261,13 @@ decl_event!( decl_storage! { trait Store for Module as ImOnline { - /// The block number when we should gossip. - GossipAt get(fn gossip_at): T::BlockNumber; + /// The block number after which it's ok to send heartbeats in current session. + /// + /// At the beginning of each session we set this to a value that should + /// fall roughly in the middle of the session duration. + /// The idea is to first wait for the validators to produce a block + /// in the current session, so that the heartbeat later on will not be necessary. + HeartbeatAfter get(fn heartbeat_after): T::BlockNumber; /// The current set of keys that may issue a heartbeat. Keys get(fn keys): Vec; @@ -302,12 +343,29 @@ decl_module! { // Only send messages if we are a potential validator. if sp_io::offchain::is_validator() { - Self::offchain(now); + for res in Self::send_heartbeats(now).into_iter().flatten() { + if let Err(e) = res { + debug::debug!( + target: "imonline", + "Skipping heartbeat at {:?}: {:?}", + now, + e, + ) + } + } + } else { + debug::trace!( + target: "imonline", + "Skipping heartbeat at {:?}. Not a validator.", + now, + ) } } } } +type OffchainResult = Result::BlockNumber>>; + /// Keep track of number of authored blocks per authority, uncles are counted as /// well since they're a valid proof of onlineness. impl pallet_authorship::EventHandler for Module { @@ -365,156 +423,128 @@ impl Module { ); } - pub(crate) fn offchain(now: T::BlockNumber) { - let next_gossip = >::get(); - let check = Self::check_not_yet_gossipped(now, next_gossip); - let (curr_worker_status, not_yet_gossipped) = match check { - Ok((s, v)) => (s, v), - Err(err) => { - print(err); - return; - }, - }; - if next_gossip < now && not_yet_gossipped { - let value_set = Self::compare_and_set_worker_status(now, false, curr_worker_status); - if !value_set { - // value could not be set in local storage, since the value was - // different from `curr_worker_status`. this indicates that - // another worker was running in parallel. - return; - } - - match Self::do_gossip_at(now) { - Ok(_) => {}, - Err(err) => print(err), - } - } else { - debug::native::debug!( - target: "imonline", - "Skipping gossip at: {:?} >= {:?} || {:?}", - next_gossip, - now, - if not_yet_gossipped { "not gossipped" } else { "gossipped" } - ); + pub(crate) fn send_heartbeats(block_number: T::BlockNumber) + -> OffchainResult>> + { + let heartbeat_after = >::get(); + if block_number < heartbeat_after { + return Err(OffchainErr::TooEarly(heartbeat_after)) } + + let session_index = >::current_index(); + Ok(Self::local_authority_keys() + .map(move |(authority_index, key)| + Self::send_single_heartbeat(authority_index, key, session_index, block_number) + )) } - fn do_gossip_at(block_number: T::BlockNumber) -> Result<(), OffchainErr> { - // we run only when a local authority key is configured - let authorities = Keys::::get(); - let mut results = Vec::new(); - let mut local_keys = T::AuthorityId::all(); - local_keys.sort(); - - for (authority_index, key) in authorities.into_iter() - .enumerate() - .filter_map(|(index, authority)| { - local_keys.binary_search(&authority) - .ok() - .map(|location| (index as u32, &local_keys[location])) - }) - { - if Self::is_online(authority_index) { - debug::native::info!( - target: "imonline", - "[index: {:?}] Skipping sending heartbeat at block: {:?}. Already online.", - authority_index, - block_number - ); - continue; - } + fn send_single_heartbeat( + authority_index: u32, + key: T::AuthorityId, + session_index: SessionIndex, + block_number: T::BlockNumber + ) -> OffchainResult { + // A helper function to prepare heartbeat call. + let prepare_heartbeat = || -> OffchainResult> { let network_state = sp_io::offchain::network_state() .map_err(|_| OffchainErr::NetworkState)?; let heartbeat_data = Heartbeat { block_number, network_state, - session_index: >::current_index(), + session_index, authority_index, }; - let signature = key.sign(&heartbeat_data.encode()).ok_or(OffchainErr::FailedSigning)?; - let call = Call::heartbeat(heartbeat_data, signature); - - debug::info!( - target: "imonline", - "[index: {:?}] Reporting im-online at block: {:?}", - authority_index, - block_number - ); + Ok(Call::heartbeat(heartbeat_data, signature)) + }; - results.push( - T::SubmitTransaction::submit_unsigned(call) - .map_err(|_| OffchainErr::SubmitTransaction) - ); + if Self::is_online(authority_index) { + return Err(OffchainErr::AlreadyOnline(authority_index)); } - // fail only after trying all keys. - results.into_iter().collect::, OffchainErr>>()?; - - // once finished we set the worker status without comparing - // if the existing value changed in the meantime. this is - // because at this point the heartbeat was definitely submitted. - Self::set_worker_status(block_number, true); + // acquire lock for that authority at current heartbeat to make sure we don't + // send concurrent heartbeats. + Self::with_heartbeat_lock( + authority_index, + session_index, + block_number, + || { + let call = prepare_heartbeat()?; + debug::info!( + target: "imonline", + "[index: {:?}] Reporting im-online at block: {:?} (session: {:?}): {:?}", + authority_index, + block_number, + session_index, + call, + ); - Ok(()) - } + T::SubmitTransaction::submit_unsigned(call) + .map_err(|_| OffchainErr::SubmitTransaction)?; - fn compare_and_set_worker_status( - gossipping_at: T::BlockNumber, - done: bool, - curr_worker_status: Option>, - ) -> bool { - let enc = WorkerStatus { - done, - gossipping_at, - }; - sp_io::offchain::local_storage_compare_and_set( - StorageKind::PERSISTENT, - DB_KEY, - curr_worker_status, - &enc.encode() + Ok(()) + }, ) } - fn set_worker_status( - gossipping_at: T::BlockNumber, - done: bool, - ) { - let enc = WorkerStatus { - done, - gossipping_at, - }; - sp_io::offchain::local_storage_set(StorageKind::PERSISTENT, DB_KEY, &enc.encode()); + fn local_authority_keys() -> impl Iterator { + // we run only when a local authority key is configured + let authorities = Keys::::get(); + let mut local_keys = T::AuthorityId::all(); + local_keys.sort(); + + authorities.into_iter() + .enumerate() + .filter_map(move |(index, authority)| { + local_keys.binary_search(&authority) + .ok() + .map(|location| (index as u32, local_keys[location].clone())) + }) } - // Checks if a heartbeat gossip already occurred at this block number. - // Returns a tuple of `(current worker status, bool)`, whereby the bool - // is true if not yet gossipped. - fn check_not_yet_gossipped( + fn with_heartbeat_lock( + authority_index: u32, + session_index: SessionIndex, now: T::BlockNumber, - next_gossip: T::BlockNumber, - ) -> Result<(Option>, bool), OffchainErr> { - let last_gossip = sp_io::offchain::local_storage_get(StorageKind::PERSISTENT, DB_KEY); - match last_gossip { - Some(last) => { - let worker_status: WorkerStatus = Decode::decode(&mut &last[..]) - .map_err(|_| OffchainErr::DecodeWorkerStatus)?; - - let was_aborted = !worker_status.done && worker_status.gossipping_at < now; + f: impl FnOnce() -> OffchainResult, + ) -> OffchainResult { + let key = { + let mut key = DB_PREFIX.to_vec(); + key.extend(authority_index.encode()); + key + }; + let storage = StorageValueRef::persistent(&key); + let res = storage.mutate(|status: Option>>| { + // Check if there is already a lock for that particular block. + // This means that the heartbeat has already been sent, and we are just waiting + // for it to be included. However if it doesn't get included for INCLUDE_THRESHOLD + // we will re-send it. + match status { + // we are still waiting for inclusion. + Some(Some(status)) if status.is_recent(session_index, now) => { + Err(OffchainErr::WaitingForInclusion(status.sent_at)) + }, + // attempt to set new status + _ => Ok(HeartbeatStatus { + session_index, + sent_at: now, + }), + } + })?; - // another off-chain worker is currently in the process of submitting - let already_submitting = - !worker_status.done && worker_status.gossipping_at == now; + let mut new_status = res.map_err(|_| OffchainErr::FailedToAcquireLock)?; - let not_yet_gossipped = - worker_status.done && worker_status.gossipping_at < next_gossip; + // we got the lock, let's try to send the heartbeat. + let res = f(); - let ret = (was_aborted && !already_submitting) || not_yet_gossipped; - Ok((Some(last), ret)) - }, - None => Ok((None, true)), + // clear the lock in case we have failed to send transaction. + if res.is_err() { + new_status.sent_at = 0.into(); + storage.set(&new_status); } + + res } fn initialize_keys(keys: &[T::AuthorityId]) { @@ -544,10 +574,10 @@ impl pallet_session::OneSessionHandler for Module { { // Tell the offchain worker to start making the next session's heartbeats. // Since we consider producing blocks as being online, - // the hearbeat is defered a bit to prevent spaming. + // the heartbeat is defered a bit to prevent spaming. let block_number = >::block_number(); let half_session = T::SessionDuration::get() / 2.into(); - >::put(block_number + half_session); + >::put(block_number + half_session); // Remember who the authorities are for the new session. Keys::::put(validators.map(|x| x.1).collect::>()); diff --git a/frame/im-online/src/tests.rs b/frame/im-online/src/tests.rs index 34eac233ae317..adc126094b612 100644 --- a/frame/im-online/src/tests.rs +++ b/frame/im-online/src/tests.rs @@ -210,7 +210,11 @@ fn should_generate_heartbeats() { // when UintAuthorityId::set_all_keys(vec![0, 1, 2]); - ImOnline::offchain(2); + ImOnline::send_heartbeats(2) + .unwrap() + // make sure to consume the iterator and check there are no errors. + .collect::, _>>().unwrap(); + // then let transaction = state.write().transactions.pop().unwrap(); @@ -315,7 +319,12 @@ fn should_not_send_a_report_if_already_online() { // when UintAuthorityId::set_all_keys(vec![0]); // all authorities use pallet_session key 0 - ImOnline::offchain(4); + // we expect error, since the authority is already online. + let mut res = ImOnline::send_heartbeats(4).unwrap(); + assert_eq!(res.next().unwrap().unwrap(), ()); + assert_eq!(res.next().unwrap().unwrap_err(), OffchainErr::AlreadyOnline(1)); + assert_eq!(res.next().unwrap().unwrap_err(), OffchainErr::AlreadyOnline(2)); + assert_eq!(res.next(), None); // then let transaction = pool_state.write().transactions.pop().unwrap(); diff --git a/primitives/runtime/src/offchain/mod.rs b/primitives/runtime/src/offchain/mod.rs index 742388f9ec422..dfc15360c6c91 100644 --- a/primitives/runtime/src/offchain/mod.rs +++ b/primitives/runtime/src/offchain/mod.rs @@ -17,3 +17,4 @@ //! A collection of higher lever helpers for offchain calls. pub mod http; +pub mod storage; diff --git a/primitives/runtime/src/offchain/storage.rs b/primitives/runtime/src/offchain/storage.rs new file mode 100644 index 0000000000000..d5c2b47298450 --- /dev/null +++ b/primitives/runtime/src/offchain/storage.rs @@ -0,0 +1,156 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! A set of storage helpers for offchain workers. + +use sp_core::offchain::StorageKind; + +/// A storage value with a static key. +pub type StorageValue = StorageValueRef<'static>; + +/// An abstraction over local storage value. +pub struct StorageValueRef<'a> { + key: &'a [u8], + kind: StorageKind, +} + +impl<'a> StorageValueRef<'a> { + /// Create a new reference to a value in the persistent local storage. + pub fn persistent(key: &'a [u8]) -> Self { + Self { key, kind: StorageKind::PERSISTENT } + } + + /// Create a new reference to a value in the fork-aware local storage. + pub fn local(key: &'a [u8]) -> Self { + Self { key, kind: StorageKind::LOCAL } + } + + /// Set the value of the storage to encoding of given parameter. + /// + /// Note that the storage may be accessed by workers running concurrently, + /// if you happen to write a `get-check-set` pattern you should most likely + /// be using `mutate` instead. + pub fn set(&self, value: &impl codec::Encode) { + value.using_encoded(|val| { + sp_io::offchain::local_storage_set(self.kind, self.key, val) + }) + } + + /// Retrieve & decode the value from storage. + /// + /// Note that if you want to do some checks based on the value + /// and write changes after that you should rather be using `mutate`. + /// + /// The function returns `None` if the value was not found in storage, + /// otherwise a decoding of the value to requested type. + pub fn get(&self) -> Option> { + sp_io::offchain::local_storage_get(self.kind, self.key) + .map(|val| T::decode(&mut &*val).ok()) + } + + /// Retrieve & decode the value and set it to a new one atomicaly. + /// + /// Function `f` should return a new value that we should attempt to write to storage. + /// This function returns: + /// 1. `Ok(Ok(T))` in case the value has been succesfuly set. + /// 2. `Ok(Err(T))` in case the value was returned, but it couldn't have been set. + /// 3. `Err(_)` in case `f` returns an error. + pub fn mutate(&self, f: F) -> Result, E> where + T: codec::Codec, + F: FnOnce(Option>) -> Result + { + let value = sp_io::offchain::local_storage_get(self.kind, self.key); + let decoded = value.as_deref().map(|mut v| T::decode(&mut v).ok()); + let val = f(decoded)?; + let set = val.using_encoded(|new_val| { + sp_io::offchain::local_storage_compare_and_set( + self.kind, + self.key, + value, + new_val, + ) + }); + + if set { + Ok(Ok(val)) + } else { + Ok(Err(val)) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use sp_io::TestExternalities; + use sp_core::offchain::{ + OffchainExt, + OffchainStorage, + testing, + }; + + #[test] + fn should_set_and_get() { + let (offchain, state) = testing::TestOffchainExt::new(); + let mut t = TestExternalities::default(); + t.register_extension(OffchainExt::new(offchain)); + + t.execute_with(|| { + let val = StorageValue::persistent(b"testval"); + + assert_eq!(val.get::(), None); + + val.set(&15_u32); + + assert_eq!(val.get::(), Some(Some(15_u32))); + assert_eq!(val.get::>(), Some(None)); + assert_eq!( + state.read().persistent_storage.get(b"", b"testval"), + Some(vec![15_u8, 0, 0, 0]) + ); + }) + } + + #[test] + fn should_mutate() { + let (offchain, state) = testing::TestOffchainExt::new(); + let mut t = TestExternalities::default(); + t.register_extension(OffchainExt::new(offchain)); + + t.execute_with(|| { + let val = StorageValue::persistent(b"testval"); + + let result = val.mutate::(|val| { + assert_eq!(val, None); + + Ok(16_u32) + }); + assert_eq!(result, Ok(Ok(16_u32))); + assert_eq!(val.get::(), Some(Some(16_u32))); + assert_eq!( + state.read().persistent_storage.get(b"", b"testval"), + Some(vec![16_u8, 0, 0, 0]) + ); + + // mutate again, but this time early-exit. + let res = val.mutate::(|val| { + assert_eq!(val, Some(Some(16_u32))); + Err(()) + }); + assert_eq!(res, Err(())); + }) + } +}