From 494431d784369cc11b55bf27d7eeafcdc4ab449a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marin=20Ver=C5=A1i=C4=87?= Date: Mon, 20 Dec 2021 09:23:03 +0100 Subject: [PATCH] [feature] #1210 Block streaming - server side (#1724) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * move transaction related functionality to data_model/transaction module Signed-off-by: Marin Veršić * split web socket event producer and consumer Signed-off-by: Marin Veršić * use publisher/subscriber terminology Signed-off-by: Marin Veršić * add block streaming over websocket to torii Signed-off-by: Marin Veršić * add server side tests Signed-off-by: Marin Veršić * refactor implemented block streaming code Signed-off-by: Marin Veršić * update api_spec Signed-off-by: Marin Veršić --- core/src/block.rs | 93 +++++++++++++++++++++ core/src/block_sync.rs | 7 +- core/src/event.rs | 79 +++++------------- core/src/lib.rs | 3 +- core/src/stream.rs | 125 +++++++++++++++++++++++++++++ core/src/torii/mod.rs | 102 +++++++++++++++++++---- core/src/torii/tests.rs | 60 ++++++++++++++ core/src/wsv.rs | 86 ++++++++++++++++---- data_model/src/lib.rs | 4 +- data_model/src/transaction.rs | 6 +- docs/source/references/api_spec.md | 58 +++++++++---- schema/bin/src/main.rs | 11 ++- 12 files changed, 522 insertions(+), 112 deletions(-) create mode 100644 core/src/stream.rs diff --git a/core/src/block.rs b/core/src/block.rs index cc67af06a35..1a760e11a99 100644 --- a/core/src/block.rs +++ b/core/src/block.rs @@ -779,6 +779,99 @@ impl From<&CommittedBlock> for Vec { } } +// TODO: Move to data_model after release +pub mod stream { + //! Blocks for streaming API. + + use iroha_macro::FromVariant; + use iroha_schema::prelude::*; + use iroha_version::prelude::*; + use parity_scale_codec::{Decode, Encode}; + + use crate::block::VersionedCommittedBlock; + + declare_versioned_with_scale!(VersionedBlockPublisherMessage 1..2, Debug, Clone, FromVariant, IntoSchema); + + impl VersionedBlockPublisherMessage { + /// Converts from `&VersionedBlockPublisherMessage` to V1 reference + pub const fn as_v1(&self) -> &BlockPublisherMessage { + match self { + Self::V1(v1) => v1, + } + } + + /// Converts from `&mut VersionedBlockPublisherMessage` to V1 mutable reference + pub fn as_mut_v1(&mut self) -> &mut BlockPublisherMessage { + match self { + Self::V1(v1) => v1, + } + } + + /// Performs the conversion from `VersionedBlockPublisherMessage` to V1 + pub fn into_v1(self) -> BlockPublisherMessage { + match self { + Self::V1(v1) => v1, + } + } + } + + /// Message sent by the stream producer + #[version_with_scale(n = 1, versioned = "VersionedBlockPublisherMessage")] + #[derive(Debug, Clone, Decode, Encode, FromVariant, IntoSchema)] + #[allow(clippy::large_enum_variant)] + pub enum BlockPublisherMessage { + /// Answer sent by the peer. + /// The message means that block stream connection is initialized and will be supplying + /// events starting with the next message. + SubscriptionAccepted, + /// Block sent by the peer. + Block(VersionedCommittedBlock), + } + + declare_versioned_with_scale!(VersionedBlockSubscriberMessage 1..2, Debug, Clone, FromVariant, IntoSchema); + + impl VersionedBlockSubscriberMessage { + /// Converts from `&VersionedBlockSubscriberMessage` to V1 reference + pub const fn as_v1(&self) -> &BlockSubscriberMessage { + match self { + Self::V1(v1) => v1, + } + } + + /// Converts from `&mut VersionedBlockSubscriberMessage` to V1 mutable reference + pub fn as_mut_v1(&mut self) -> &mut BlockSubscriberMessage { + match self { + Self::V1(v1) => v1, + } + } + + /// Performs the conversion from `VersionedBlockSubscriberMessage` to V1 + pub fn into_v1(self) -> BlockSubscriberMessage { + match self { + Self::V1(v1) => v1, + } + } + } + + /// Message sent by the stream consumer + #[version_with_scale(n = 1, versioned = "VersionedBlockSubscriberMessage")] + #[derive(Debug, Clone, Copy, Decode, Encode, FromVariant, IntoSchema)] + pub enum BlockSubscriberMessage { + /// Request sent to subscribe to blocks stream starting from the given height. + SubscriptionRequest(u64), + /// Acknowledgment of receiving block sent from the peer. + BlockReceived, + } + + /// Exports common structs and enums from this module. + pub mod prelude { + pub use super::{ + BlockPublisherMessage, BlockSubscriberMessage, VersionedBlockPublisherMessage, + VersionedBlockSubscriberMessage, + }; + } +} + #[cfg(test)] mod tests { #![allow(clippy::restriction)] diff --git a/core/src/block_sync.rs b/core/src/block_sync.rs index 17672aded88..76746e9e581 100644 --- a/core/src/block_sync.rs +++ b/core/src/block_sync.rs @@ -302,7 +302,12 @@ pub mod message { return; } - let blocks = block_sync.wsv.blocks_after(*hash, block_sync.batch_size); + let blocks: Vec<_> = block_sync + .wsv + .blocks_after_hash(*hash) + .take(block_sync.batch_size as usize) + .collect(); + if blocks.is_empty() { warn!(%hash, "Block hash not found"); } else { diff --git a/core/src/event.rs b/core/src/event.rs index fdc99aa3acd..d8813705ca9 100644 --- a/core/src/event.rs +++ b/core/src/event.rs @@ -2,25 +2,18 @@ //! This module contains descriptions of such an events and //! utilitary Iroha Special Instructions to work with them. -use std::{fmt::Debug, time::Duration}; - -use eyre::{eyre, Result, WrapErr}; -use futures::{SinkExt, StreamExt}; +use eyre::{eyre, Result}; use iroha_data_model::events::prelude::*; -use iroha_version::prelude::*; -use tokio::{sync::broadcast, time}; -use warp::ws::{self, WebSocket}; +use tokio::sync::broadcast; +use warp::ws::WebSocket; + +use crate::stream::{Sink, Stream}; /// Type of `Sender` which should be used for channels of `Event` messages. pub type EventsSender = broadcast::Sender; /// Type of `Receiver` which should be used for channels of `Event` messages. pub type EventsReceiver = broadcast::Receiver; -#[cfg(test)] -const TIMEOUT: Duration = Duration::from_millis(10_000); -#[cfg(not(test))] -const TIMEOUT: Duration = Duration::from_millis(1000); - /// Consumer for Iroha `Event`(s). /// Passes the events over the corresponding connection `stream` if they match the `filter`. #[derive(Debug)] @@ -36,29 +29,14 @@ impl Consumer { /// Can fail due to timeout or without message at websocket or during decoding request #[iroha_futures::telemetry_future] pub async fn new(mut stream: WebSocket) -> Result { - let message = time::timeout(TIMEOUT, stream.next()) - .await - .wrap_err("Read message timeout")? - .ok_or_else(|| eyre!("Failed to read message: no message"))? - .wrap_err("Web Socket failure")?; - - if !message.is_binary() { - return Err(eyre!("Unexpected message type")); - } - let filter = VersionedEventSubscriberMessage::decode_versioned(message.as_bytes())? - .into_v1() - .try_into()?; + let subscription_request: VersionedEventSubscriberMessage = stream.recv().await?; + let filter = subscription_request.into_v1().try_into()?; - time::timeout( - TIMEOUT, - stream.send(ws::Message::binary( - VersionedEventPublisherMessage::from(EventPublisherMessage::SubscriptionAccepted) - .encode_versioned()?, - )), - ) - .await - .wrap_err("Send message timeout")? - .wrap_err("Failed to send message")?; + stream + .send(VersionedEventPublisherMessage::from( + EventPublisherMessage::SubscriptionAccepted, + )) + .await?; Ok(Consumer { stream, filter }) } @@ -68,34 +46,19 @@ impl Consumer { /// # Errors /// Can fail due to timeout or sending event. Also receiving might fail #[iroha_futures::telemetry_future] - pub async fn consume(&mut self, event: &Event) -> Result<()> { - if !self.filter.apply(event) { + pub async fn consume(&mut self, event: Event) -> Result<()> { + if !self.filter.apply(&event) { return Ok(()); } - let event = - VersionedEventPublisherMessage::from(EventPublisherMessage::from(event.clone())) - .encode_versioned() - .wrap_err("Failed to serialize event")?; - time::timeout(TIMEOUT, self.stream.send(ws::Message::binary(event))) - .await - .wrap_err("Send message timeout")? - .wrap_err("Failed to send message")?; - - let message = time::timeout(TIMEOUT, self.stream.next()) - .await - .wrap_err("Failed to read receipt")? - .ok_or_else(|| eyre!("Failed to read receipt: no receipt"))? - .wrap_err("Web Socket failure")?; - - if !message.is_binary() { - return Err(eyre!("Unexpected message type")); - } + self.stream + .send(VersionedEventPublisherMessage::from( + EventPublisherMessage::from(event), + )) + .await?; - if let EventSubscriberMessage::EventReceived = - VersionedEventSubscriberMessage::decode_versioned(message.as_bytes())?.into_v1() - { - self.stream.flush().await?; + let message: VersionedEventSubscriberMessage = self.stream.recv().await?; + if let EventSubscriberMessage::EventReceived = message.into_v1() { Ok(()) } else { Err(eyre!("Expected `EventReceived`.")) diff --git a/core/src/lib.rs b/core/src/lib.rs index 9cd105c7cdc..373090b9bb4 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -11,6 +11,7 @@ pub mod modules; pub mod queue; pub mod samples; pub mod smartcontracts; +pub mod stream; pub mod sumeragi; pub mod torii; pub mod tx; @@ -194,7 +195,7 @@ where let network_addr = network.start().await; let (events_sender, _) = broadcast::channel(100); - let wsv = Arc::new(WorldStateView::from_config( + let wsv = Arc::new(WorldStateView::from_configuration( config.wsv, W::with( init::domains(&config).wrap_err("Failed to get initial domains")?, diff --git a/core/src/stream.rs b/core/src/stream.rs new file mode 100644 index 00000000000..996e8f2fa5a --- /dev/null +++ b/core/src/stream.rs @@ -0,0 +1,125 @@ +//! Extension to the [`futures::StreamExt`] and [`futures::SinkExt`]. +//! Adds support for sending custom Iroha messages over the stream, taking care +//! of encoding/decoding as well as timeouts + +use std::time::Duration; + +use eyre::{eyre, Context, Result}; +use futures::{SinkExt, StreamExt}; +use iroha_version::prelude::*; + +#[cfg(test)] +const TIMEOUT: Duration = Duration::from_millis(10_000); +#[cfg(not(test))] +const TIMEOUT: Duration = Duration::from_millis(1000); + +/// Represents messsage used by the stream +pub trait StreamMessage { + /// Constructs new binary message + fn binary(source: Vec) -> Self; + /// Decodes the message into byte slice + fn as_bytes(&self) -> &[u8]; + /// Returns `true` if the message is binary + fn is_binary(&self) -> bool; +} + +/// Trait for writing custom messages into stream +#[async_trait::async_trait] +pub trait Sink: SinkExt + Unpin +where + S: Send + Sync + 'static, +{ + /// Error type returned by the sink + type Err: std::error::Error + Send + Sync + 'static; + + /// Message type used by the underlying sink + type Message: StreamMessage + Send; + + /// Encoded message and sends it to the stream + async fn send(&mut self, message: S) -> Result<()> { + Ok(tokio::time::timeout( + TIMEOUT, + >::send( + self, + Self::Message::binary(message.encode_versioned()?), + ), + ) + .await + .wrap_err("Send message timeout")??) + } +} + +/// Trait for reading custom messages from stream +#[async_trait::async_trait] +pub trait Stream: + StreamExt> + Unpin +{ + /// Error type returned by the stream + type Err: std::error::Error + Send + Sync + 'static; + + /// Message type used by the underlying stream + type Message: StreamMessage; + + /// Receives and decodes message from the stream + async fn recv(&mut self) -> Result { + let subscription_request_message = tokio::time::timeout(TIMEOUT, self.next()) + .await + .wrap_err("Read message timeout")? + .ok_or_else(|| eyre!("No message"))??; + + if !subscription_request_message.is_binary() { + return Err(eyre!("Expected binary message")); + } + + Ok(R::decode_versioned( + subscription_request_message.as_bytes(), + )?) + } +} + +impl StreamMessage for warp::ws::Message { + fn binary(source: Vec) -> Self { + Self::binary(source) + } + fn as_bytes(&self) -> &[u8] { + self.as_bytes() + } + fn is_binary(&self) -> bool { + self.is_binary() + } +} + +#[async_trait::async_trait] +impl Sink for warp::ws::WebSocket +where + M: Send + Sync + 'static, +{ + type Err = warp::Error; + type Message = warp::ws::Message; +} +#[async_trait::async_trait] +impl Stream for warp::ws::WebSocket { + type Err = warp::Error; + type Message = warp::ws::Message; +} + +#[cfg(test)] +mod tests { + use warp::test::WsClient; + + use super::*; + + #[async_trait::async_trait] + impl Stream for WsClient { + type Err = warp::test::WsError; + type Message = warp::ws::Message; + } + #[async_trait::async_trait] + impl Sink for WsClient + where + M: Send + Sync + 'static, + { + type Err = warp::test::WsError; + type Message = warp::ws::Message; + } +} diff --git a/core/src/torii/mod.rs b/core/src/torii/mod.rs index 1c058171a99..32a256cba10 100644 --- a/core/src/torii/mod.rs +++ b/core/src/torii/mod.rs @@ -3,8 +3,8 @@ use std::{convert::Infallible, fmt::Debug, net::ToSocketAddrs, sync::Arc}; -use eyre::Context; -use futures::stream::{FuturesUnordered, StreamExt}; +use eyre::{eyre, Context}; +use futures::{stream::FuturesUnordered, StreamExt}; use iroha_config::{Configurable, GetConfiguration, PostConfiguration}; use iroha_data_model::prelude::*; use iroha_telemetry::metrics::Status; @@ -19,10 +19,11 @@ use warp::{ Filter, Reply, }; -#[macro_use] -mod utils; - use crate::{ + block::stream::{ + BlockPublisherMessage, BlockSubscriberMessage, VersionedBlockPublisherMessage, + VersionedBlockSubscriberMessage, + }, event::{Consumer, EventsSender}, prelude::*, queue::{self, Queue}, @@ -30,10 +31,14 @@ use crate::{ isi::query::{self, VerifiedQueryRequest}, permissions::IsQueryAllowedBoxed, }, + stream::{Sink, Stream}, wsv::WorldTrait, Addr, Configuration, IrohaNetwork, }; +#[macro_use] +mod utils; + /// Main network handler and the only entrypoint of the Iroha. pub struct Torii { iroha_cfg: Configuration, @@ -226,7 +231,7 @@ impl Torii { .and(warp::body::json()), )); - let ws_router = warp::path(uri::SUBSCRIPTION) + let events_ws_router = warp::path(uri::SUBSCRIPTION) .and(add_state!(self.events)) .and(warp::ws()) .map(|events, ws: Ws| { @@ -237,6 +242,28 @@ impl Torii { }) }); + // `warp` panics if there is `/` in the string given to the `warp::path` filter + // Path filter has to be boxed to have a single uniform type during iteration + let block_ws_router_path = uri::BLOCKS_STREAM + .split('/') + .skip_while(|p| p.is_empty()) + .fold(warp::any().boxed(), |path_filter, path| { + path_filter.and(warp::path(path)).boxed() + }); + + let blocks_ws_router = block_ws_router_path + .and(add_state!(self.wsv)) + .and(warp::ws()) + .map(|wsv: Arc<_>, ws: Ws| { + ws.on_upgrade(|this_ws| async move { + if let Err(error) = handle_blocks_stream(&wsv, this_ws).await { + iroha_logger::error!(%error, "Failed to subscribe to blocks stream"); + } + }) + }); + + let ws_router = events_ws_router.or(blocks_ws_router); + ws_router .or(warp::post().and(post_router)) .or(warp::get().and(get_router)) @@ -438,22 +465,69 @@ async fn handle_post_configuration( } #[iroha_futures::telemetry_future] -async fn handle_subscription(events: EventsSender, stream: WebSocket) -> eyre::Result<()> { - let mut events = events.subscribe(); - let mut consumer = Consumer::new(stream).await?; +async fn handle_blocks_stream( + wsv: &WorldStateView, + mut stream: WebSocket, +) -> eyre::Result<()> { + let subscription_request: VersionedBlockSubscriberMessage = stream.recv().await?; + let mut from_height = subscription_request.into_v1().try_into()?; + + stream + .send(VersionedBlockPublisherMessage::from( + BlockPublisherMessage::SubscriptionAccepted, + )) + .await?; - while let Ok(change) = events.recv().await { - iroha_logger::trace!(event = ?change); + let mut rx = wsv.subscribe_to_new_block_notifications(); + stream_blocks(&mut from_height, wsv, &mut stream).await?; + + loop { + rx.changed().await?; + stream_blocks(&mut from_height, wsv, &mut stream).await?; + } +} + +async fn stream_blocks( + from_height: &mut u64, + wsv: &WorldStateView, + stream: &mut WebSocket, +) -> eyre::Result<()> { + #[allow(clippy::expect_used)] + for block in wsv.blocks_from_height( + (*from_height) + .try_into() + .expect("Blockchain size limit reached"), + ) { + stream + .send(VersionedBlockPublisherMessage::from( + BlockPublisherMessage::from(block), + )) + .await?; - if let Err(error) = consumer.consume(&change).await { - iroha_logger::error!(%error, "Failed to notify client. Closed connection."); - break; + let message: VersionedBlockSubscriberMessage = stream.recv().await?; + if let BlockSubscriberMessage::BlockReceived = message.into_v1() { + *from_height += 1; + } else { + return Err(eyre!("Expected `BlockReceived` message")); } } Ok(()) } +#[iroha_futures::telemetry_future] +async fn handle_subscription(events: EventsSender, stream: WebSocket) -> eyre::Result<()> { + let mut events = events.subscribe(); + let mut consumer = Consumer::new(stream).await?; + + loop { + let event = events.recv().await?; + + iroha_logger::trace!(?event); + consumer.consume(event).await?; + } +} + async fn handle_metrics( wsv: Arc>, network: Addr, diff --git a/core/src/torii/tests.rs b/core/src/torii/tests.rs index fcccc556613..c7c6d021b5c 100644 --- a/core/src/torii/tests.rs +++ b/core/src/torii/tests.rs @@ -6,12 +6,14 @@ use futures::future::FutureExt; use iroha_actor::{broker::Broker, Actor}; use iroha_version::prelude::*; use tokio::time; +use warp::test::WsClient; use super::*; use crate::{ queue::Queue, samples::{get_config, get_trusted_peers}, smartcontracts::permissions::DenyAll, + stream::{Sink, Stream}, wsv::World, }; @@ -480,3 +482,61 @@ async fn query_with_no_find() { .assert() .await } +#[tokio::test] +async fn blocks_stream() { + const BLOCK_COUNT: usize = 4; + + let (torii, _) = create_torii().await; + let router = torii.create_api_router(); + + // Initialize blockchain + let mut block = ValidBlock::new_dummy().commit(); + for i in 1..=BLOCK_COUNT { + block.header.height = i as u64; + let block: VersionedCommittedBlock = block.clone().into(); + torii.wsv.apply(block).await.unwrap(); + } + + let mut client = warp::test::ws() + .path("/block/stream") + .handshake(router) + .await + .unwrap(); + + >::send( + &mut client, + VersionedBlockSubscriberMessage::from(BlockSubscriberMessage::SubscriptionRequest(2)), + ) + .await + .unwrap(); + + let subscription_accepted_message: VersionedBlockPublisherMessage = + >::recv(&mut client).await.unwrap(); + assert!(matches!( + subscription_accepted_message.into_v1(), + BlockPublisherMessage::SubscriptionAccepted + )); + + for i in 2..=BLOCK_COUNT { + let block_message: VersionedBlockPublisherMessage = + >::recv(&mut client).await.unwrap(); + let block: VersionedCommittedBlock = block_message.into_v1().try_into().unwrap(); + assert_eq!(block.header().height, i as u64); + + >::send( + &mut client, + VersionedBlockSubscriberMessage::from(BlockSubscriberMessage::BlockReceived), + ) + .await + .unwrap(); + } + + block.header.height = BLOCK_COUNT as u64 + 1; + let block: VersionedCommittedBlock = block.clone().into(); + torii.wsv.apply(block).await.unwrap(); + + let block_message: VersionedBlockPublisherMessage = + >::recv(&mut client).await.unwrap(); + let block: VersionedCommittedBlock = block_message.into_v1().try_into().unwrap(); + assert_eq!(block.header().height, BLOCK_COUNT as u64 + 1); +} diff --git a/core/src/wsv.rs b/core/src/wsv.rs index 3c42d872317..0baa962ca5b 100644 --- a/core/src/wsv.rs +++ b/core/src/wsv.rs @@ -25,6 +25,11 @@ use crate::{ smartcontracts::{Execute, FindError}, }; +/// Sender type of the new block notification channel +pub type NewBlockNotificationSender = tokio::sync::watch::Sender<()>; +/// Receiver type of the new block notification channel +pub type NewBlockNotificationReceiver = tokio::sync::watch::Receiver<()>; + /// World trait for mocking pub trait WorldTrait: Deref + DerefMut + Send + Sync + 'static + Debug + Default + Sized + Clone @@ -67,6 +72,8 @@ pub struct WorldStateView { pub transactions: DashSet>, /// Metrics for prometheus endpoint. pub metrics: Arc, + /// Notifies subscribers when new block is applied + new_block_notifier: Arc, } impl Default for WorldStateView { @@ -103,17 +110,20 @@ impl WorldTrait for World { impl WorldStateView { /// Default `WorldStateView` constructor. pub fn new(world: W) -> Self { + let (new_block_notifier, _) = tokio::sync::watch::channel(()); + WorldStateView { world, config: Configuration::default(), transactions: DashSet::new(), blocks: Arc::new(Chain::new()), metrics: Arc::new(Metrics::default()), + new_block_notifier: Arc::new(new_block_notifier), } } /// [`WorldStateView`] constructor. - pub fn from_config(config: Configuration, world: W) -> Self { + pub fn from_configuration(config: Configuration, world: W) -> Self { WorldStateView { config, ..WorldStateView::new(world) @@ -178,9 +188,16 @@ impl WorldStateView { } self.block_commit_metrics_update_callback(); self.blocks.push(block); + self.new_block_notifier.send_replace(()); Ok(()) } + /// Returns receiving end of the spmc channel through which subscribers are notified when + /// new block is added to the blockchain(after block validation) + pub fn subscribe_to_new_block_notifications(&self) -> NewBlockNotificationReceiver { + self.new_block_notifier.subscribe() + } + /// Hash of latest block pub fn latest_block_hash(&self) -> HashOf { self.blocks @@ -242,18 +259,40 @@ impl WorldStateView { self.metrics.block_height.inc(); } - /// Returns blocks after hash - pub fn blocks_after( + // TODO: There could be just this one method `blocks` instead of `blocks_from_height` and + // `blocks_after_height`. Also, this method would return references instead of cloning + // blockchain but comes with the risk of deadlock if consumer of the iterator stores + // references to blocks + /// Returns iterator over blockchain blocks + /// + /// **Locking behaviour**: Holding references to blocks stored in the blockchain can induce + /// deadlock. This limitation is imposed by the fact that blockchain is backed by [`Dashmap`] + pub fn blocks( + &self, + ) -> impl Iterator + '_> + '_ { + self.blocks.iter() + } + + /// Returns iterator over blockchain blocks starting with the block of the given `height` + pub fn blocks_from_height( + &self, + height: usize, + ) -> impl Iterator + '_ { + self.blocks + .iter() + .skip(height.saturating_sub(1)) + .map(|block_entry| block_entry.value().clone()) + } + + /// Returns iterator over blockchain blocks after the block with the given `hash` + pub fn blocks_after_hash( &self, hash: HashOf, - max_blocks: u32, - ) -> Vec { + ) -> impl Iterator + '_ { self.blocks .iter() - .skip_while(|block_entry| block_entry.value().header().previous_block_hash != hash) - .take(max_blocks as usize) + .skip_while(move |block_entry| block_entry.value().header().previous_block_hash != hash) .map(|block_entry| block_entry.value().clone()) - .collect() } /// Get an immutable view of the `World`. @@ -606,7 +645,6 @@ mod tests { #[tokio::test] async fn get_blocks_after_hash() { const BLOCK_CNT: usize = 10; - const BATCH_SIZE: u32 = 3; let mut block = ValidBlock::new_dummy().commit(); let wsv = WorldStateView::::default(); @@ -622,16 +660,30 @@ mod tests { wsv.apply(block).await.unwrap(); } + assert!(wsv + .blocks_after_hash(block_hashes[6]) + .map(|block| block.hash()) + .eq(block_hashes.into_iter().skip(7))); + } + + #[tokio::test] + async fn get_blocks_from_height() { + const BLOCK_CNT: usize = 10; + + let mut block = ValidBlock::new_dummy().commit(); + let wsv = WorldStateView::::default(); + + for i in 1..=BLOCK_CNT { + block.header.height = i as u64; + let block: VersionedCommittedBlock = block.clone().into(); + wsv.apply(block).await.unwrap(); + } + assert_eq!( - wsv.blocks_after(block_hashes[2], BATCH_SIZE) - .iter() - .map(VersionedCommittedBlock::hash) - .collect::>(), - block_hashes - .into_iter() - .skip(3) - .take(BATCH_SIZE as usize) + &wsv.blocks_from_height(8) + .map(|block| block.header().height) .collect::>(), + &[8, 9, 10] ); } } diff --git a/data_model/src/lib.rs b/data_model/src/lib.rs index 56077999e82..adcdd079ff7 100644 --- a/data_model/src/lib.rs +++ b/data_model/src/lib.rs @@ -2133,9 +2133,11 @@ pub mod uri { /// Health URI is used to handle incoming Healthcheck requests. pub const HEALTH: &str = "health"; /// The URI used for block synchronization. - pub const BLOCK_SYNC: &str = "block"; + pub const BLOCK_SYNC: &str = "block/sync"; /// The web socket uri used to subscribe to block and transactions statuses. pub const SUBSCRIPTION: &str = "events"; + /// The web socket uri used to subscribe to blocks stream. + pub const BLOCKS_STREAM: &str = "block/stream"; /// Get pending transactions. pub const PENDING_TRANSACTIONS: &str = "pending_transactions"; /// The URI for local config changing inspecting diff --git a/data_model/src/transaction.rs b/data_model/src/transaction.rs index cddb4c3b587..fa674ab6c2d 100644 --- a/data_model/src/transaction.rs +++ b/data_model/src/transaction.rs @@ -258,7 +258,7 @@ impl Txn for Transaction { } } -declare_versioned_with_scale!(VersionedPendingTransactions 1..2, FromVariant, Debug, Clone); +declare_versioned_with_scale!(VersionedPendingTransactions 1..2, Debug, Clone, FromVariant); impl VersionedPendingTransactions { /// Converts from `&VersionedPendingTransactions` to V1 reference @@ -504,7 +504,7 @@ pub struct InstructionExecutionFail { impl Display for InstructionExecutionFail { fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { use Instruction::*; - let type_ = match self.instruction { + let kind = match self.instruction { Burn(_) => "burn", Fail(_) => "fail", If(_) => "if", @@ -521,7 +521,7 @@ impl Display for InstructionExecutionFail { write!( f, "Failed to execute instruction of type {}: {}", - type_, self.reason + kind, self.reason ) } } diff --git a/docs/source/references/api_spec.md b/docs/source/references/api_spec.md index 9f8d33c1a07..56ffed2b1ea 100644 --- a/docs/source/references/api_spec.md +++ b/docs/source/references/api_spec.md @@ -71,35 +71,59 @@ Hint and whether each object exists: **Protocol Upgrade**: `WebSocket` -**Encoding**: JSON +**Encoding**: [Parity Scale Codec](#parity-scale-codec) **Endpoint**: `/events` -**Expects**: +**Expects**: -First message after handshake from client: `SubscriptionRequest` [*](#iroha-structures) +First message after handshake from client: `EventStreamSubscriptionRequest` [*](#iroha-structures) -When server is ready to transmit events it sends: `SubscriptionAccepted` [*](#iroha-structures) +When server is ready to transmit events it sends: `EventStreamSubscriptionAccepted` [*](#iroha-structures) Server sends `Event` and expects `EventReceived` [*](#iroha-structures) after each, before sending the next event. **Notes**: -Usually, the client waits for Transaction events. +Usually, the client waits for Transaction events. Transaction event statuses can be either `Validating`, `Committed` or `Rejected`. -Transaction statuses proceed from `Validating` to either `Committed` or `Rejected`. +Transaction statuses proceed from `Validating` to either `Committed` or `Rejected`. However, due to the distributed nature of the network, some peers might receive events out of order (e.g. `Committed` before `Validating`). It's possible that some peers in the network are offline for the validation round. If the client connects to them while they are offline, the peers might not respond with the `Validating` status. But when the offline peers come back online they will synchronize the blocks. They are then guaranteed to respond with the `Committed` (or `Rejected`) status depending on the information found in the block. +### Blocks stream + +**Protocol**: HTTP + +**Protocol Upgrade**: `WebSocket` + +**Encoding**: [Parity Scale Codec](#parity-scale-codec) + +**Endpoint**: `/block/stream` + +**Expects**: + +First message after handshake to initiate communication from client: `BlockStreamSubscriptionRequest` [*](#iroha-structures) + +When server is ready to transmit blocks it sends: `BlockStreamSubscriptionAccepted` [*](#iroha-structures) + +Server sends `Block` and expects `BlockReceived` [*](#iroha-structures) after each, before sending the next block. + +**Notes**: + +Via this endpoint client first provides the starting block number(i.e. height) in the subscription request. After sending +the confirmation message, server starts streaming all the blocks from the given block number up to the current block and +continues to stream blocks as they are added to the blockchain. + ### Configuration **Protocol**: HTTP -**Encoding**: Json +**Encoding**: JSON **Endpoint**: `/configuration` @@ -115,7 +139,7 @@ There are 2 variants: } ``` -**Examples**: +**Examples**: To get the top-level configuration docs for [`Torii`] ```bash curl -X GET -H 'content-type: application/json' http://127.0.0.1:8080/configuration -d '{"Docs" : ["torii"]} ' -i @@ -129,7 +153,7 @@ curl -X GET -H 'content-type: application/json' http://127.0.0.1:8080/configurat **Protocol**: HTTP -**Encoding**: Json +**Encoding**: JSON **Endpoint**: `/health` @@ -150,7 +174,7 @@ Also returns current status of peer in json string: **Protocol**: HTTP -**Encoding**: Json +**Encoding**: JSON **Endpoint**: `/status` @@ -221,7 +245,13 @@ For more information on codec check [Substrate Dev Hub](https://substrate.dev/do - `VersionedTransaction` - `iroha_data_model::transaction::VersionedTransaction` - `VersionedSignedQueryRequest` - `iroha_data_model::query::VersionedSignedQueryRequest` - `VersionedQueryResult` - `iroha_data_model::query::VersionedQueryResult` -- `SubscriptionRequest` - `iroha_data_model::events::EventSocketMessage::SubscriptionRequest` -- `SubscriptionAccepted` - `iroha_data_model::events::EventSocketMessage::SubscriptionAccepted` -- `Event` - `iroha_data_model::events::EventSocketMessage::Event` -- `EventReceived` - `iroha_data_model::events::EventSocketMessage::EventReceived` + +- `EventStreamSubscriptionRequest` - `iroha_data_model::events::EventSubscriberMessage::SubscriptionRequest` +- `EventStreamSubscriptionAccepted` - `iroha_data_model::events::EventPublisherMessage::SubscriptionAccepted` +- `Event` - `iroha_data_model::events::EventPublisherMessage::Event` +- `EventReceived` - `iroha_data_model::events::EventSubscriberMessage::EventReceived` + +- `BlockStreamSubscriptionAccepted` - `iroha_core::block::stream::BlockPublisherMessage::SubscriptionAccepted` +- `BlockStreamSubscriptionRequest` - `iroha_core::block::stream::BlockSubscriberMessage::SubscriptionRequest` +- `Block` - `iroha_core::block::stream::BlockPublisherMessage::Block` +- `BlockReceived` - `iroha_core::block::stream::BlockSubscriberMessage::BlockReceived` diff --git a/schema/bin/src/main.rs b/schema/bin/src/main.rs index 6162a61c656..228b548ba6c 100644 --- a/schema/bin/src/main.rs +++ b/schema/bin/src/main.rs @@ -4,6 +4,7 @@ use std::collections::BTreeMap; +use iroha_core::block::stream::prelude::*; use iroha_schema::prelude::*; macro_rules! to_json { @@ -26,6 +27,8 @@ fn main() { // $ rg '^pub (struct|enum)' | rg -v '(<|Builder|LengthLimits|QueryRequest)' | cut -d' ' -f3 | sed -e 's/[(].*//' -e 's/$/,/' | sort Add, And, + BlockPublisherMessage, + BlockSubscriberMessage, BurnBox, Contains, ContainsAll, @@ -71,11 +74,13 @@ fn main() { Where, // All versioned - VersionedTransaction, - VersionedSignedQueryRequest, - VersionedQueryResult, + VersionedBlockPublisherMessage, + VersionedBlockSubscriberMessage, VersionedEventPublisherMessage, VersionedEventSubscriberMessage, + VersionedQueryResult, + VersionedSignedQueryRequest, + VersionedTransaction, RawGenesisBlock };