diff --git a/core/src/block.rs b/core/src/block.rs index 4f265d5b603..e1e620da704 100644 --- a/core/src/block.rs +++ b/core/src/block.rs @@ -5,7 +5,7 @@ use std::{collections::BTreeSet, iter, marker::PhantomData}; -use dashmap::{iter::Iter as MapIter, mapref::one::Ref as MapRef, DashMap}; +use dashmap::{mapref::one::Ref as MapRef, DashMap}; use eyre::{Context, Result}; use iroha_crypto::{HashOf, KeyPair, SignatureOf, SignaturesOf}; use iroha_data_model::{current_time, events::prelude::*, transaction::prelude::*}; @@ -62,8 +62,8 @@ impl Chain { } /// Iterator over height and block. - pub fn iter(&self) -> MapIter { - self.blocks.iter() + pub fn iter(&self) -> ChainIterator { + ChainIterator::new(self) } /// Latest block reference and its height. @@ -82,6 +82,77 @@ impl Chain { } } +/// Chain iterator +pub struct ChainIterator<'a> { + chain: &'a Chain, + pos_front: u64, + pos_back: u64, +} + +impl<'a> ChainIterator<'a> { + fn new(chain: &'a Chain) -> Self { + ChainIterator { + chain, + pos_front: 1, + pos_back: chain.len() as u64, + } + } + + fn is_exhausted(&self) -> bool { + self.pos_front > self.pos_back + } +} + +impl<'a> Iterator for ChainIterator<'a> { + type Item = MapRef<'a, u64, VersionedCommittedBlock>; + fn next(&mut self) -> Option { + if !self.is_exhausted() { + let val = self.chain.blocks.get(&self.pos_front); + self.pos_front += 1; + return val; + } + None + } + + fn nth(&mut self, n: usize) -> Option { + self.pos_front += n as u64; + self.next() + } + + fn last(mut self) -> Option { + self.pos_front = self.chain.len() as u64; + self.chain.blocks.get(&self.pos_front) + } + + fn count(self) -> usize { + #[allow(clippy::cast_possible_truncation)] + let count = (self.chain.len() as u64 - (self.pos_front - 1)) as usize; + count + } + + fn size_hint(&self) -> (usize, Option) { + #[allow(clippy::cast_possible_truncation)] + let height = (self.chain.len() as u64 - (self.pos_front - 1)) as usize; + (height, Some(height)) + } +} + +impl<'a> DoubleEndedIterator for ChainIterator<'a> { + fn next_back(&mut self) -> Option { + if !self.is_exhausted() { + let val = self.chain.blocks.get(&self.pos_back); + self.pos_back -= 1; + return val; + } + None + } + + fn nth_back(&mut self, n: usize) -> Option { + self.pos_back -= n as u64; + self.next_back() + } +} + declare_versioned_with_scale!(VersionedPendingBlock 1..2, Debug, Clone, iroha_derive::FromVariant); /// Transaction data is permanently recorded in files called blocks. Blocks are organized into @@ -464,6 +535,32 @@ impl ValidBlock { .iter() .any(|transaction| transaction.is_in_blockchain(wsv)) } + + /// Creates dummy `ValidBlock`. Used in tests + /// + /// # Panics + /// If generating keys or block signing fails. + #[cfg(test)] + #[allow(clippy::restriction)] + pub fn new_dummy() -> Self { + ValidBlock { + header: BlockHeader { + timestamp: 0, + height: 1, + previous_block_hash: EmptyChainHash::default().into(), + transactions_hash: EmptyChainHash::default().into(), + rejected_transactions_hash: EmptyChainHash::default().into(), + view_change_proofs: ViewChangeProofs::empty(), + invalidated_blocks_hashes: Vec::new(), + genesis_topology: None, + }, + rejected_transactions: vec![], + transactions: vec![], + signatures: BTreeSet::default(), + } + .sign(KeyPair::generate().unwrap()) + .unwrap() + } } impl From<&VersionedValidBlock> for Vec { @@ -674,35 +771,71 @@ impl From<&CommittedBlock> for Vec { mod tests { #![allow(clippy::restriction)] - use std::collections::BTreeSet; + use super::*; - use iroha_crypto::KeyPair; + #[test] + pub fn committed_and_valid_block_hashes_are_equal() { + let valid_block = ValidBlock::new_dummy(); + let committed_block = valid_block.clone().commit(); - use crate::{ - block::{BlockHeader, EmptyChainHash, ValidBlock}, - sumeragi::view_change, - }; + assert_eq!(*valid_block.hash(), *committed_block.hash()) + } #[test] - pub fn committed_and_valid_block_hashes_are_equal() { - let valid_block = ValidBlock { - header: BlockHeader { - timestamp: 0, - height: 0, - previous_block_hash: EmptyChainHash::default().into(), - transactions_hash: EmptyChainHash::default().into(), - rejected_transactions_hash: EmptyChainHash::default().into(), - view_change_proofs: view_change::ProofChain::empty(), - invalidated_blocks_hashes: Vec::new(), - genesis_topology: None, - }, - rejected_transactions: vec![], - transactions: vec![], - signatures: BTreeSet::default(), + pub fn chain_iter_returns_blocks_ordered() { + const BLOCK_COUNT: usize = 10; + let chain = Chain::new(); + + let mut block = ValidBlock::new_dummy().commit(); + + for i in 1..=BLOCK_COUNT { + block.header.height = i as u64; + chain.push(block.clone().into()); } - .sign(KeyPair::generate().unwrap()) - .unwrap(); - let commited_block = valid_block.clone().commit(); - assert_eq!(valid_block.hash().transmute(), commited_block.hash()) + + assert_eq!( + (BLOCK_COUNT - 5..=BLOCK_COUNT) + .map(|i| i as u64) + .collect::>(), + chain + .iter() + .skip(BLOCK_COUNT - 6) + .map(|b| *b.key()) + .collect::>() + ); + + assert_eq!(BLOCK_COUNT - 2, chain.iter().skip(2).count()); + assert_eq!(3, *chain.iter().nth(2).unwrap().key()); + } + + #[test] + pub fn chain_rev_iter_returns_blocks_ordered() { + const BLOCK_COUNT: usize = 10; + let chain = Chain::new(); + + let mut block = ValidBlock::new_dummy().commit(); + + for i in 1..=BLOCK_COUNT { + block.header.height = i as u64; + chain.push(block.clone().into()); + } + + assert_eq!( + (1..=BLOCK_COUNT - 4) + .rev() + .map(|i| i as u64) + .collect::>(), + chain + .iter() + .rev() + .skip(BLOCK_COUNT - 6) + .map(|b| *b.key()) + .collect::>() + ); + + assert_eq!( + (BLOCK_COUNT - 2) as u64, + *chain.iter().nth_back(2).unwrap().key() + ); } } diff --git a/core/src/block_sync.rs b/core/src/block_sync.rs index efaae1d2b60..66e7b7bccf8 100644 --- a/core/src/block_sync.rs +++ b/core/src/block_sync.rs @@ -5,6 +5,7 @@ use std::{fmt::Debug, sync::Arc, time::Duration}; use iroha_actor::{broker::*, prelude::*, Context}; use iroha_crypto::SignatureOf; use iroha_data_model::prelude::*; +use rand::{prelude::SliceRandom, SeedableRng}; use self::{ config::BlockSyncConfiguration, @@ -12,9 +13,7 @@ use self::{ }; use crate::{ prelude::*, - sumeragi::{ - network_topology::Role, CommitBlock, GetNetworkTopology, GetSortedPeers, SumeragiTrait, - }, + sumeragi::{network_topology::Role, CommitBlock, GetNetworkTopology, SumeragiTrait}, wsv::WorldTrait, VersionedCommittedBlock, }; @@ -91,11 +90,9 @@ impl BlockSynchronizerTrait for BlockSynchroniz #[derive(Debug, Clone, Copy, iroha_actor::Message)] pub struct ContinueSync; -/// Message for getting updates from other peers +/// Message to initiate receiving of latest blocks from other peers /// -/// Starts the `BlockSync`, meaning that every `gossip_period` -/// the peers would gossip about latest block hashes -/// and try to synchronize their blocks. +/// Every `gossip_period` peer will poll one randomly selected peer for latest blocks #[derive(Debug, Clone, Copy, Default, iroha_actor::Message)] pub struct ReceiveUpdates; @@ -116,13 +113,12 @@ impl Actor for BlockSynchronizer { impl Handler for BlockSynchronizer { type Result = (); async fn handle(&mut self, ReceiveUpdates: ReceiveUpdates) { - let peers = self.sumeragi.send(GetSortedPeers).await; - Message::LatestBlock(LatestBlock::new( - self.wsv.latest_block_hash(), - self.peer_id.clone(), - )) - .send_to_multiple(self.broker.clone(), &peers) - .await; + let rng = &mut rand::rngs::StdRng::from_entropy(); + + if let Some(random_peer) = self.wsv.peers().choose(rng) { + self.request_latest_blocks_from_peer(random_peer.id.clone()) + .await; + } } } @@ -138,11 +134,21 @@ impl Handler for BlockSynchronize impl Handler for BlockSynchronizer { type Result = (); async fn handle(&mut self, message: Message) { - message.handle(&mut self).await + message.handle(&mut self).await; } } impl BlockSynchronizer { + /// Sends request for latest blocks to a chosen peer + async fn request_latest_blocks_from_peer(&mut self, peer_id: PeerId) { + Message::GetBlocksAfter(GetBlocksAfter::new( + self.wsv.latest_block_hash(), + self.peer_id.clone(), + )) + .send_to(self.broker.clone(), peer_id) + .await; + } + /// Continues the synchronization if it was ongoing. Should be called after `WSV` update. #[iroha_futures::telemetry_future] pub async fn continue_sync(&mut self) { @@ -158,12 +164,7 @@ impl BlockSynchronizer { (block, blocks) } else { self.state = State::Idle; - Message::GetBlocksAfter(GetBlocksAfter::new( - self.wsv.latest_block_hash(), - self.peer_id.clone(), - )) - .send_to(self.broker.clone(), peer_id.clone()) - .await; + self.request_latest_blocks_from_peer(peer_id).await; return; }; @@ -171,7 +172,7 @@ impl BlockSynchronizer { .sumeragi .send(GetNetworkTopology(block.header().clone())) .await; - // If it is genesis topology we can not apply view changes as peers have custom order! + // If it is genesis topology we cannot apply view changes as peers have custom order! #[allow(clippy::expect_used)] if !block.header().is_genesis() { network_topology = network_topology @@ -204,7 +205,6 @@ impl BlockSynchronizer { /// The module for block synchronization related peer to peer messages. pub mod message { - use futures::{prelude::*, stream::FuturesUnordered}; use iroha_actor::broker::Broker; use iroha_crypto::*; use iroha_data_model::prelude::*; @@ -244,22 +244,6 @@ pub mod message { } } - /// Message variant to send our current latest block - #[derive(Io, Decode, Encode, Debug, Clone)] - pub struct LatestBlock { - /// Block hash - pub hash: HashOf, - /// Peer id - pub peer_id: PeerId, - } - - impl LatestBlock { - /// Default constructor - pub const fn new(hash: HashOf, peer_id: PeerId) -> Self { - Self { hash, peer_id } - } - } - /// Get blocks after some block #[derive(Io, Decode, Encode, Debug, Clone)] pub struct GetBlocksAfter { @@ -296,8 +280,6 @@ pub mod message { #[version_with_scale(n = 1, versioned = "VersionedMessage", derive = "Debug, Clone")] #[derive(Io, Decode, Encode, Debug, Clone, FromVariant, iroha_actor::Message)] pub enum Message { - /// Gossip about latest block. - LatestBlock(LatestBlock), /// Request for blocks after the block with `Hash` for the peer with `PeerId`. GetBlocksAfter(GetBlocksAfter), /// The response to `GetBlocksAfter`. Contains the requested blocks and the id of the peer who shared them. @@ -312,17 +294,6 @@ pub mod message { block_sync: &mut BlockSynchronizer, ) { match self { - Message::LatestBlock(LatestBlock { hash, peer_id }) => { - let latest_block_hash = block_sync.wsv.latest_block_hash(); - if *hash != latest_block_hash { - Message::GetBlocksAfter(GetBlocksAfter::new( - latest_block_hash, - block_sync.peer_id.clone(), - )) - .send_to(block_sync.broker.clone(), peer_id.clone()) - .await; - } - } Message::GetBlocksAfter(GetBlocksAfter { hash, peer_id }) => { if block_sync.batch_size == 0 { iroha_logger::warn!( @@ -330,18 +301,17 @@ pub mod message { ); return; } + if *hash == block_sync.wsv.latest_block_hash() { + return; + } - match block_sync.wsv.blocks_after(*hash, block_sync.batch_size) { - Ok(blocks) if !blocks.is_empty() => { - Message::ShareBlocks(ShareBlocks::new( - blocks.clone(), - block_sync.peer_id.clone(), - )) + let blocks = block_sync.wsv.blocks_after(*hash, block_sync.batch_size); + if blocks.is_empty() { + iroha_logger::warn!(%hash, "Block hash not found"); + } else { + Message::ShareBlocks(ShareBlocks::new(blocks, block_sync.peer_id.clone())) .send_to(block_sync.broker.clone(), peer_id.clone()) .await; - } - Ok(_) => (), - Err(error) => iroha_logger::error!(%error), } } Message::ShareBlocks(ShareBlocks { blocks, peer_id }) => { @@ -358,25 +328,9 @@ pub mod message { #[log("TRACE")] pub async fn send_to(self, broker: Broker, peer: PeerId) { let data = NetworkMessage::BlockSync(Box::new(VersionedMessage::from(self))); - let message = Post { - data, - id: peer.clone(), - }; + let message = Post { data, id: peer }; broker.issue_send(message).await; } - - /// Send this message over the network to the specified `peer`. - #[iroha_futures::telemetry_future] - #[log("TRACE")] - pub async fn send_to_multiple(self, broker: Broker, peers: &[PeerId]) { - let futures = peers - .iter() - .map(|peer| self.clone().send_to(broker.clone(), peer.clone())) - .collect::>() - .collect::<()>(); - - tokio::task::spawn(futures); - } } } @@ -395,9 +349,9 @@ pub mod config { #[serde(default)] #[config(env_prefix = "BLOCK_SYNC_")] pub struct BlockSyncConfiguration { - /// The time between peer sharing its latest block hash with other peers in milliseconds. + /// The time between sending request for latest block. pub gossip_period_ms: u64, - /// The number of blocks, which can be send in one message. + /// The number of blocks, which can be sent in one message. /// Underlying network (`iroha_network`) should support transferring messages this large. pub batch_size: u32, /// Mailbox size diff --git a/core/src/sumeragi/mod.rs b/core/src/sumeragi/mod.rs index 7ff8926e81c..c32c79e43b1 100644 --- a/core/src/sumeragi/mod.rs +++ b/core/src/sumeragi/mod.rs @@ -78,11 +78,6 @@ pub struct Init { pub height: u64, } -/// Get sorted peers -#[derive(Debug, Clone, Copy, iroha_actor::Message)] -#[message(result = "Vec")] -pub struct GetSortedPeers; - /// Get network topology #[derive(Debug, Clone, iroha_actor::Message)] #[message(result = "Topology")] @@ -147,7 +142,6 @@ pub trait SumeragiTrait: + ContextHandler + ContextHandler + ContextHandler - + ContextHandler> + ContextHandler + ContextHandler + ContextHandler @@ -376,22 +370,12 @@ impl ContextHandler } } -#[async_trait::async_trait] -impl Handler - for Sumeragi -{ - type Result = Vec; - async fn handle(&mut self, GetSortedPeers: GetSortedPeers) -> Vec { - self.topology.sorted_peers().to_vec() - } -} - #[async_trait::async_trait] impl Handler for Sumeragi { type Result = Topology; - async fn handle(&mut self, GetNetworkTopology(header): GetNetworkTopology) -> Topology { + async fn handle(&mut self, GetNetworkTopology(header): GetNetworkTopology) -> Self::Result { self.network_topology_current_or_genesis(&header) } } @@ -414,7 +398,7 @@ pub struct IsLeader; #[async_trait::async_trait] impl Handler for Sumeragi { type Result = bool; - async fn handle(&mut self, IsLeader: IsLeader) -> bool { + async fn handle(&mut self, IsLeader: IsLeader) -> Self::Result { self.is_leader() } } @@ -427,7 +411,7 @@ pub struct GetLeader; #[async_trait::async_trait] impl Handler for Sumeragi { type Result = PeerId; - async fn handle(&mut self, GetLeader: GetLeader) -> PeerId { + async fn handle(&mut self, GetLeader: GetLeader) -> Self::Result { self.topology.leader().clone() } } diff --git a/core/src/sumeragi/network_topology.rs b/core/src/sumeragi/network_topology.rs index 83217cd3f5a..9dcf071131d 100644 --- a/core/src/sumeragi/network_topology.rs +++ b/core/src/sumeragi/network_topology.rs @@ -391,7 +391,7 @@ impl Topology { signatures: impl IntoIterator> + 'a, ) -> Vec> { let roles: HashSet = roles.iter().copied().collect(); - let public_keys: Vec<_> = roles + let public_keys: HashSet<_> = roles .iter() .flat_map(|role| role.peers(self)) .map(|peer| peer.public_key) diff --git a/core/src/wsv.rs b/core/src/wsv.rs index 67e086b97be..bf8b889b7d0 100644 --- a/core/src/wsv.rs +++ b/core/src/wsv.rs @@ -17,11 +17,7 @@ use iroha_crypto::HashOf; use iroha_data_model::{domain::DomainsMap, peer::PeersIds, prelude::*}; use tokio::task; -use crate::{ - block::Chain, - prelude::*, - smartcontracts::{FindError, ParentHashNotFound}, -}; +use crate::{block::Chain, prelude::*, smartcontracts::FindError}; /// World proxy for using with `WorldTrait` #[derive(Debug, Default, Clone)] @@ -164,26 +160,17 @@ impl WorldStateView { } /// Returns blocks after hash - /// - /// # Errors - /// Block with `hash` was not found. pub fn blocks_after( &self, hash: HashOf, max_blocks: u32, - ) -> Result> { - let from_pos = self - .blocks - .iter() - .position(|block_entry| block_entry.value().header().previous_block_hash == hash) - .ok_or(FindError::Block(ParentHashNotFound(hash)))?; - Ok(self - .blocks + ) -> Vec { + self.blocks .iter() - .skip(from_pos) + .skip_while(|block_entry| block_entry.value().header().previous_block_hash != hash) .take(max_blocks as usize) .map(|block_entry| block_entry.value().clone()) - .collect()) + .collect() } /// Get `World` without an ability to modify it. @@ -507,3 +494,42 @@ pub mod config { } } } + +#[cfg(test)] +mod tests { + #![allow(clippy::restriction)] + + use super::{World, *}; + + #[tokio::test] + async fn get_blocks_after_hash() { + const BLOCK_CNT: usize = 10; + const BATCH_SIZE: u32 = 3; + + let mut block = ValidBlock::new_dummy().commit(); + let wsv = WorldStateView::::default(); + + let mut block_hashes = vec![]; + for i in 1..=BLOCK_CNT { + block.header.height = i as u64; + if let Some(block_hash) = block_hashes.last() { + block.header.previous_block_hash = *block_hash; + } + let block: VersionedCommittedBlock = block.clone().into(); + block_hashes.push(block.hash()); + wsv.apply(block).await; + } + + assert_eq!( + wsv.blocks_after(block_hashes[2], BATCH_SIZE) + .iter() + .map(VersionedCommittedBlock::hash) + .collect::>(), + block_hashes + .into_iter() + .skip(3) + .take(BATCH_SIZE as usize) + .collect::>(), + ); + } +} diff --git a/core/test_network/src/lib.rs b/core/test_network/src/lib.rs index 9b9d7faa9e2..d9db032d575 100644 --- a/core/test_network/src/lib.rs +++ b/core/test_network/src/lib.rs @@ -568,7 +568,7 @@ pub trait TestConfiguration { fn test() -> Self; /// Returns default pipeline time. fn pipeline_time() -> Duration; - /// Returns default time between bocksync gossips for new blocks. + /// Returns default time between block sync requests fn block_sync_gossip_time() -> Duration; } diff --git a/core/test_network/tests/sumeragi_with_mock.rs b/core/test_network/tests/sumeragi_with_mock.rs index 68f902e8ef1..ac2df139960 100644 --- a/core/test_network/tests/sumeragi_with_mock.rs +++ b/core/test_network/tests/sumeragi_with_mock.rs @@ -423,7 +423,6 @@ pub mod utils { Faulty: Handler + Handler + Handler - + Handler> + Handler + Handler + Handler diff --git a/docs/source/references/config.md b/docs/source/references/config.md index 0b01eace2f7..c8a42a3de1f 100644 --- a/docs/source/references/config.md +++ b/docs/source/references/config.md @@ -113,7 +113,7 @@ Has type `BlockSyncConfiguration`. Can be configured via environment variable `I ### `block_sync.batch_size` -The number of blocks, which can be send in one message. +The number of blocks, which can be sent in one message. Has type `u32`. Can be configured via environment variable `BLOCK_SYNC_BATCH_SIZE` @@ -123,7 +123,7 @@ Has type `u32`. Can be configured via environment variable `BLOCK_SYNC_BATCH_SIZ ### `block_sync.gossip_period_ms` -The time between peer sharing its latest block hash with other peers in milliseconds. +The time between sending request for latest block. Has type `u64`. Can be configured via environment variable `BLOCK_SYNC_GOSSIP_PERIOD_MS`