Skip to content

Commit

Permalink
[feature] #1210 Block streaming - server side (#1724)
Browse files Browse the repository at this point in the history
* move transaction related functionality to data_model/transaction module

Signed-off-by: Marin Veršić <marin.versic101@gmail.com>

* split web socket event producer and consumer

Signed-off-by: Marin Veršić <marin.versic101@gmail.com>

* use publisher/subscriber terminology

Signed-off-by: Marin Veršić <marin.versic101@gmail.com>

* add block streaming over websocket to torii

Signed-off-by: Marin Veršić <marin.versic101@gmail.com>

* add server side tests

Signed-off-by: Marin Veršić <marin.versic101@gmail.com>

* refactor implemented block streaming code

Signed-off-by: Marin Veršić <marin.versic101@gmail.com>

* update api_spec

Signed-off-by: Marin Veršić <marin.versic101@gmail.com>
  • Loading branch information
mversic authored Dec 20, 2021
1 parent b122578 commit 494431d
Show file tree
Hide file tree
Showing 12 changed files with 522 additions and 112 deletions.
93 changes: 93 additions & 0 deletions core/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,99 @@ impl From<&CommittedBlock> for Vec<Event> {
}
}

// TODO: Move to data_model after release
pub mod stream {
//! Blocks for streaming API.

use iroha_macro::FromVariant;
use iroha_schema::prelude::*;
use iroha_version::prelude::*;
use parity_scale_codec::{Decode, Encode};

use crate::block::VersionedCommittedBlock;

declare_versioned_with_scale!(VersionedBlockPublisherMessage 1..2, Debug, Clone, FromVariant, IntoSchema);

impl VersionedBlockPublisherMessage {
/// Converts from `&VersionedBlockPublisherMessage` to V1 reference
pub const fn as_v1(&self) -> &BlockPublisherMessage {
match self {
Self::V1(v1) => v1,
}
}

/// Converts from `&mut VersionedBlockPublisherMessage` to V1 mutable reference
pub fn as_mut_v1(&mut self) -> &mut BlockPublisherMessage {
match self {
Self::V1(v1) => v1,
}
}

/// Performs the conversion from `VersionedBlockPublisherMessage` to V1
pub fn into_v1(self) -> BlockPublisherMessage {
match self {
Self::V1(v1) => v1,
}
}
}

/// Message sent by the stream producer
#[version_with_scale(n = 1, versioned = "VersionedBlockPublisherMessage")]
#[derive(Debug, Clone, Decode, Encode, FromVariant, IntoSchema)]
#[allow(clippy::large_enum_variant)]
pub enum BlockPublisherMessage {
/// Answer sent by the peer.
/// The message means that block stream connection is initialized and will be supplying
/// events starting with the next message.
SubscriptionAccepted,
/// Block sent by the peer.
Block(VersionedCommittedBlock),
}

declare_versioned_with_scale!(VersionedBlockSubscriberMessage 1..2, Debug, Clone, FromVariant, IntoSchema);

impl VersionedBlockSubscriberMessage {
/// Converts from `&VersionedBlockSubscriberMessage` to V1 reference
pub const fn as_v1(&self) -> &BlockSubscriberMessage {
match self {
Self::V1(v1) => v1,
}
}

/// Converts from `&mut VersionedBlockSubscriberMessage` to V1 mutable reference
pub fn as_mut_v1(&mut self) -> &mut BlockSubscriberMessage {
match self {
Self::V1(v1) => v1,
}
}

/// Performs the conversion from `VersionedBlockSubscriberMessage` to V1
pub fn into_v1(self) -> BlockSubscriberMessage {
match self {
Self::V1(v1) => v1,
}
}
}

/// Message sent by the stream consumer
#[version_with_scale(n = 1, versioned = "VersionedBlockSubscriberMessage")]
#[derive(Debug, Clone, Copy, Decode, Encode, FromVariant, IntoSchema)]
pub enum BlockSubscriberMessage {
/// Request sent to subscribe to blocks stream starting from the given height.
SubscriptionRequest(u64),
/// Acknowledgment of receiving block sent from the peer.
BlockReceived,
}

/// Exports common structs and enums from this module.
pub mod prelude {
pub use super::{
BlockPublisherMessage, BlockSubscriberMessage, VersionedBlockPublisherMessage,
VersionedBlockSubscriberMessage,
};
}
}

#[cfg(test)]
mod tests {
#![allow(clippy::restriction)]
Expand Down
7 changes: 6 additions & 1 deletion core/src/block_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,12 @@ pub mod message {
return;
}

let blocks = block_sync.wsv.blocks_after(*hash, block_sync.batch_size);
let blocks: Vec<_> = block_sync
.wsv
.blocks_after_hash(*hash)
.take(block_sync.batch_size as usize)
.collect();

if blocks.is_empty() {
warn!(%hash, "Block hash not found");
} else {
Expand Down
79 changes: 21 additions & 58 deletions core/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,18 @@
//! This module contains descriptions of such an events and
//! utilitary Iroha Special Instructions to work with them.

use std::{fmt::Debug, time::Duration};

use eyre::{eyre, Result, WrapErr};
use futures::{SinkExt, StreamExt};
use eyre::{eyre, Result};
use iroha_data_model::events::prelude::*;
use iroha_version::prelude::*;
use tokio::{sync::broadcast, time};
use warp::ws::{self, WebSocket};
use tokio::sync::broadcast;
use warp::ws::WebSocket;

use crate::stream::{Sink, Stream};

/// Type of `Sender<Event>` which should be used for channels of `Event` messages.
pub type EventsSender = broadcast::Sender<Event>;
/// Type of `Receiver<Event>` which should be used for channels of `Event` messages.
pub type EventsReceiver = broadcast::Receiver<Event>;

#[cfg(test)]
const TIMEOUT: Duration = Duration::from_millis(10_000);
#[cfg(not(test))]
const TIMEOUT: Duration = Duration::from_millis(1000);

/// Consumer for Iroha `Event`(s).
/// Passes the events over the corresponding connection `stream` if they match the `filter`.
#[derive(Debug)]
Expand All @@ -36,29 +29,14 @@ impl Consumer {
/// Can fail due to timeout or without message at websocket or during decoding request
#[iroha_futures::telemetry_future]
pub async fn new(mut stream: WebSocket) -> Result<Self> {
let message = time::timeout(TIMEOUT, stream.next())
.await
.wrap_err("Read message timeout")?
.ok_or_else(|| eyre!("Failed to read message: no message"))?
.wrap_err("Web Socket failure")?;

if !message.is_binary() {
return Err(eyre!("Unexpected message type"));
}
let filter = VersionedEventSubscriberMessage::decode_versioned(message.as_bytes())?
.into_v1()
.try_into()?;
let subscription_request: VersionedEventSubscriberMessage = stream.recv().await?;
let filter = subscription_request.into_v1().try_into()?;

time::timeout(
TIMEOUT,
stream.send(ws::Message::binary(
VersionedEventPublisherMessage::from(EventPublisherMessage::SubscriptionAccepted)
.encode_versioned()?,
)),
)
.await
.wrap_err("Send message timeout")?
.wrap_err("Failed to send message")?;
stream
.send(VersionedEventPublisherMessage::from(
EventPublisherMessage::SubscriptionAccepted,
))
.await?;

Ok(Consumer { stream, filter })
}
Expand All @@ -68,34 +46,19 @@ impl Consumer {
/// # Errors
/// Can fail due to timeout or sending event. Also receiving might fail
#[iroha_futures::telemetry_future]
pub async fn consume(&mut self, event: &Event) -> Result<()> {
if !self.filter.apply(event) {
pub async fn consume(&mut self, event: Event) -> Result<()> {
if !self.filter.apply(&event) {
return Ok(());
}

let event =
VersionedEventPublisherMessage::from(EventPublisherMessage::from(event.clone()))
.encode_versioned()
.wrap_err("Failed to serialize event")?;
time::timeout(TIMEOUT, self.stream.send(ws::Message::binary(event)))
.await
.wrap_err("Send message timeout")?
.wrap_err("Failed to send message")?;

let message = time::timeout(TIMEOUT, self.stream.next())
.await
.wrap_err("Failed to read receipt")?
.ok_or_else(|| eyre!("Failed to read receipt: no receipt"))?
.wrap_err("Web Socket failure")?;

if !message.is_binary() {
return Err(eyre!("Unexpected message type"));
}
self.stream
.send(VersionedEventPublisherMessage::from(
EventPublisherMessage::from(event),
))
.await?;

if let EventSubscriberMessage::EventReceived =
VersionedEventSubscriberMessage::decode_versioned(message.as_bytes())?.into_v1()
{
self.stream.flush().await?;
let message: VersionedEventSubscriberMessage = self.stream.recv().await?;
if let EventSubscriberMessage::EventReceived = message.into_v1() {
Ok(())
} else {
Err(eyre!("Expected `EventReceived`."))
Expand Down
3 changes: 2 additions & 1 deletion core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub mod modules;
pub mod queue;
pub mod samples;
pub mod smartcontracts;
pub mod stream;
pub mod sumeragi;
pub mod torii;
pub mod tx;
Expand Down Expand Up @@ -194,7 +195,7 @@ where
let network_addr = network.start().await;

let (events_sender, _) = broadcast::channel(100);
let wsv = Arc::new(WorldStateView::from_config(
let wsv = Arc::new(WorldStateView::from_configuration(
config.wsv,
W::with(
init::domains(&config).wrap_err("Failed to get initial domains")?,
Expand Down
125 changes: 125 additions & 0 deletions core/src/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
//! Extension to the [`futures::StreamExt`] and [`futures::SinkExt`].
//! Adds support for sending custom Iroha messages over the stream, taking care
//! of encoding/decoding as well as timeouts

use std::time::Duration;

use eyre::{eyre, Context, Result};
use futures::{SinkExt, StreamExt};
use iroha_version::prelude::*;

#[cfg(test)]
const TIMEOUT: Duration = Duration::from_millis(10_000);
#[cfg(not(test))]
const TIMEOUT: Duration = Duration::from_millis(1000);

/// Represents messsage used by the stream
pub trait StreamMessage {
/// Constructs new binary message
fn binary(source: Vec<u8>) -> Self;
/// Decodes the message into byte slice
fn as_bytes(&self) -> &[u8];
/// Returns `true` if the message is binary
fn is_binary(&self) -> bool;
}

/// Trait for writing custom messages into stream
#[async_trait::async_trait]
pub trait Sink<S: EncodeVersioned>: SinkExt<Self::Message, Error = Self::Err> + Unpin
where
S: Send + Sync + 'static,
{
/// Error type returned by the sink
type Err: std::error::Error + Send + Sync + 'static;

/// Message type used by the underlying sink
type Message: StreamMessage + Send;

/// Encoded message and sends it to the stream
async fn send(&mut self, message: S) -> Result<()> {
Ok(tokio::time::timeout(
TIMEOUT,
<Self as SinkExt<Self::Message>>::send(
self,
Self::Message::binary(message.encode_versioned()?),
),
)
.await
.wrap_err("Send message timeout")??)
}
}

/// Trait for reading custom messages from stream
#[async_trait::async_trait]
pub trait Stream<R: DecodeVersioned>:
StreamExt<Item = std::result::Result<Self::Message, Self::Err>> + Unpin
{
/// Error type returned by the stream
type Err: std::error::Error + Send + Sync + 'static;

/// Message type used by the underlying stream
type Message: StreamMessage;

/// Receives and decodes message from the stream
async fn recv(&mut self) -> Result<R> {
let subscription_request_message = tokio::time::timeout(TIMEOUT, self.next())
.await
.wrap_err("Read message timeout")?
.ok_or_else(|| eyre!("No message"))??;

if !subscription_request_message.is_binary() {
return Err(eyre!("Expected binary message"));
}

Ok(R::decode_versioned(
subscription_request_message.as_bytes(),
)?)
}
}

impl StreamMessage for warp::ws::Message {
fn binary(source: Vec<u8>) -> Self {
Self::binary(source)
}
fn as_bytes(&self) -> &[u8] {
self.as_bytes()
}
fn is_binary(&self) -> bool {
self.is_binary()
}
}

#[async_trait::async_trait]
impl<M: EncodeVersioned> Sink<M> for warp::ws::WebSocket
where
M: Send + Sync + 'static,
{
type Err = warp::Error;
type Message = warp::ws::Message;
}
#[async_trait::async_trait]
impl<M: DecodeVersioned> Stream<M> for warp::ws::WebSocket {
type Err = warp::Error;
type Message = warp::ws::Message;
}

#[cfg(test)]
mod tests {
use warp::test::WsClient;

use super::*;

#[async_trait::async_trait]
impl<M: DecodeVersioned> Stream<M> for WsClient {
type Err = warp::test::WsError;
type Message = warp::ws::Message;
}
#[async_trait::async_trait]
impl<M: EncodeVersioned> Sink<M> for WsClient
where
M: Send + Sync + 'static,
{
type Err = warp::test::WsError;
type Message = warp::ws::Message;
}
}
Loading

0 comments on commit 494431d

Please sign in to comment.