diff --git a/Cargo.lock b/Cargo.lock index ef567d8acac..2d16a53b989 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1752,6 +1752,7 @@ name = "iroha_telemetry" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "chrono", "eyre", "futures", diff --git a/core/genesis.json b/core/genesis.json index c4e915ddaa2..ec91a3184bf 100644 --- a/core/genesis.json +++ b/core/genesis.json @@ -11,7 +11,7 @@ "name": "wonderland", "accounts": {}, "asset_definitions": {}, - "metadata": {} + "metadata": {} } } } diff --git a/docs/source/references/config.md b/docs/source/references/config.md index 53da3d71830..7e6fe551c74 100644 --- a/docs/source/references/config.md +++ b/docs/source/references/config.md @@ -83,6 +83,8 @@ Configuration of iroha is done via options in the following document. Here is de "TELEMETRY": { "NAME": null, "URL": null, + "MIN_PERIOD": 1, + "MAX_EXPONENT": 4, "FILE": null }, "NETWORK": { @@ -530,6 +532,8 @@ Has type `iroha_telemetry::Configuration`. Can be configured via environment var ```json { "FILE": null, + "MAX_EXPONENT": 4, + "MIN_PERIOD": 1, "NAME": null, "URL": null } @@ -545,6 +549,26 @@ Has type `Option`. Can be configured via environment variable `TELEMETR null ``` +### `telemetry.max_exponent` + +The maximum exponent of 2 that is used for increasing delay between reconnections + +Has type `u8`. Can be configured via environment variable `TELEMETRY_MAX_EXPONENT` + +```json +4 +``` + +### `telemetry.min_period` + +The minimum period of time in seconds to wait before reconnecting + +Has type `u64`. Can be configured via environment variable `TELEMETRY_MIN_PERIOD` + +```json +1 +``` + ### `telemetry.name` The node's name to be seen on the telemetry diff --git a/telemetry/Cargo.toml b/telemetry/Cargo.toml index a9fc714edb9..5839774a37d 100644 --- a/telemetry/Cargo.toml +++ b/telemetry/Cargo.toml @@ -9,6 +9,7 @@ build = "build.rs" dev-telemetry = [] [dependencies] +async-trait = "0.1" chrono = "0.4" eyre = "0.6.5" futures = { version = "0.3.17", default-features = false, features = ["std", "async-await"] } diff --git a/telemetry/src/config.rs b/telemetry/src/config.rs index 3062c649a04..1c6e5eb7c77 100644 --- a/telemetry/src/config.rs +++ b/telemetry/src/config.rs @@ -5,9 +5,12 @@ use iroha_config::derive::Configurable; use serde::{Deserialize, Serialize}; use url::Url; +use crate::types::RetryPeriod; + /// Configuration parameters container -#[derive(Clone, Default, Deserialize, Serialize, Debug, Configurable, PartialEq, Eq)] +#[derive(Clone, Deserialize, Serialize, Debug, Configurable, PartialEq, Eq)] #[serde(rename_all = "UPPERCASE")] +#[serde(default)] #[config(env_prefix = "TELEMETRY_")] pub struct Configuration { /// The node's name to be seen on the telemetry @@ -16,8 +19,35 @@ pub struct Configuration { /// The url of the telemetry, e.g., ws://127.0.0.1:8001/submit #[config(serde_as_str)] pub url: Option, + /// The minimum period of time in seconds to wait before reconnecting + #[serde(default = "default_min_period")] + pub min_period: u64, + /// The maximum exponent of 2 that is used for increasing delay between reconnections + #[serde(default = "default_max_exponent")] + pub max_exponent: u8, /// The filepath that to write dev-telemetry to #[cfg(feature = "dev-telemetry")] #[config(serde_as_str)] pub file: Option, } + +impl Default for Configuration { + fn default() -> Self { + Self { + name: None, + url: None, + min_period: RetryPeriod::DEFAULT_MIN_PERIOD, + max_exponent: RetryPeriod::DEFAULT_MAX_EXPONENT, + #[cfg(feature = "dev-telemetry")] + file: None, + } + } +} + +fn default_min_period() -> u64 { + RetryPeriod::DEFAULT_MIN_PERIOD +} + +fn default_max_exponent() -> u8 { + RetryPeriod::DEFAULT_MAX_EXPONENT +} diff --git a/telemetry/src/lib.rs b/telemetry/src/lib.rs index 42cb4082876..713a9d5a968 100644 --- a/telemetry/src/lib.rs +++ b/telemetry/src/lib.rs @@ -4,6 +4,7 @@ mod config; #[cfg(feature = "dev-telemetry")] pub mod dev; pub mod futures; +mod types; pub mod ws; pub use config::Configuration; diff --git a/telemetry/src/types.rs b/telemetry/src/types.rs new file mode 100644 index 00000000000..3215025d531 --- /dev/null +++ b/telemetry/src/types.rs @@ -0,0 +1,41 @@ +/// Encapsulates the retry period that is calculated as `min_period + 2 ^ min(exponent, max_exponent)` +pub struct RetryPeriod { + /// The minimum period + min_period: u64, + /// The maximum exponent + max_exponent: u8, + /// The current exponent + exponent: u8, +} + +impl RetryPeriod { + pub const DEFAULT_MIN_PERIOD: u64 = 1; + pub const DEFAULT_MAX_EXPONENT: u8 = 4; + + /// Constructs a new object + pub fn new(min_period: u64, max_exponent: u8) -> Self { + Self { + min_period, + max_exponent, + exponent: 0, + } + } + + /// Increases the exponent if it isn't at its maximum + pub fn increase_exponent(&mut self) { + if self.exponent < self.max_exponent { + self.exponent += 1; + } + } + + /// Returns the period + pub fn period(&mut self) -> u64 { + self.min_period + 2_u64.saturating_mul(self.exponent.into()) + } +} + +impl Default for RetryPeriod { + fn default() -> Self { + Self::new(Self::DEFAULT_MIN_PERIOD, Self::DEFAULT_MAX_EXPONENT) + } +} diff --git a/telemetry/src/ws.rs b/telemetry/src/ws.rs index 62c257c17b1..3a88100644d 100644 --- a/telemetry/src/ws.rs +++ b/telemetry/src/ws.rs @@ -1,15 +1,26 @@ //! Telemetry sent to a server +use std::time::Duration; + use chrono::Local; use eyre::{eyre, Result}; -use futures::{Sink, SinkExt, StreamExt}; +use futures::{stream::SplitSink, Sink, SinkExt, StreamExt}; use iroha_logger::telemetry::Telemetry; use serde_json::Map; -use tokio::sync::mpsc::Receiver; +use tokio::{ + net::TcpStream, + sync::mpsc::{self, Receiver, Sender}, +}; use tokio_stream::wrappers::ReceiverStream; -use tokio_tungstenite::tungstenite::{Error, Message}; +use tokio_tungstenite::{ + tungstenite::{Error, Message}, + MaybeTlsStream, WebSocketStream, +}; +use url::Url; + +use crate::{types::RetryPeriod, Configuration}; -use crate::Configuration; +type WebSocketSplitSink = SplitSink>, Message>; /// Starts telemetry sending data to a server /// # Errors @@ -18,10 +29,17 @@ pub async fn start(config: &Configuration, telemetry: Receiver) -> Re if let (Some(name), Some(url)) = (&config.name, &config.url) { iroha_logger::info!("Starting telemetry to {}", url); let (ws, _) = tokio_tungstenite::connect_async(url).await?; - let name = name.clone(); + let (write, _read) = ws.split(); + let (internal_sender, internal_receiver) = mpsc::channel(10); + let client = Client::new( + name.clone(), + write, + WebsocketSinkFactory::new(url.clone()), + RetryPeriod::new(config.min_period, config.max_exponent), + internal_sender, + ); tokio::task::spawn(async move { - let (write, _read) = ws.split(); - run(name, telemetry, write).await; + client.run(telemetry, internal_receiver).await; }); Ok(true) } else { @@ -29,47 +47,139 @@ pub async fn start(config: &Configuration, telemetry: Receiver) -> Re } } -async fn run(name: String, receiver: Receiver, mut sink: S) +struct Client { + name: String, + sink_factory: F, + retry_period: RetryPeriod, + internal_sender: Sender, + sink: Option, + init_msg: Option, +} + +impl Client where S: SinkExt + Sink + Send + Unpin, + F: SinkFactory + Send, { - let mut stream = ReceiverStream::new(receiver); - while let Some(telemetry) = stream.next().await { - match prepare_message(&name, telemetry) { - Ok(msg) => { - match sink.send(msg).await { - Ok(_) => {} - Err(Error::AlreadyClosed | Error::ConnectionClosed) => { - iroha_logger::debug!("websocket closed"); - // TBD: It makes sense to retry connection. Should we wait, should we have limit number of attempts? + pub fn new( + name: String, + sink: S, + sink_factory: F, + retry_period: RetryPeriod, + internal_sender: Sender, + ) -> Self { + Self { + name, + sink_factory, + retry_period, + internal_sender, + sink: Some(sink), + init_msg: None, + } + } + + pub async fn run( + mut self, + receiver: Receiver, + internal_receiver: Receiver, + ) { + let mut stream = ReceiverStream::new(receiver).fuse(); + let mut internal_stream = ReceiverStream::new(internal_receiver).fuse(); + #[allow(clippy::restriction)] + loop { + tokio::select! { + msg = stream.next() => { + if let Some(msg) = msg { + self.on_telemetry(msg).await; + } else { + break; } - Err(e) => { - iroha_logger::error!("send failed: {:?}", e); - // TBD: What is the proper way to signal about it? + } + msg = internal_stream.next() => { + if let Some(InternalMessage::Reconnect) = msg { + self.on_reconnect().await; } } } + } + } + + async fn on_telemetry(&mut self, telemetry: Telemetry) { + match prepare_message(&self.name, telemetry) { + Ok((msg, msg_kind)) => { + if let Some(MessageKind::Initialization) = msg_kind { + self.init_msg = Some(msg.clone()); + } + self.send_msg(msg).await; + } Err(e) => { iroha_logger::error!("prepare_message failed: {:?}", e); - // TBD: What is the proper way to signal about it? } } } -} -fn prepare_message(name: &str, telemetry: Telemetry) -> Result { - enum Msg { - SystemConnected, - Other, + async fn on_reconnect(&mut self) { + if let Ok(sink) = self.sink_factory.create().await { + if let Some(msg) = self.init_msg.as_ref() { + iroha_logger::debug!("Reconnected telemetry"); + self.sink = Some(sink); + let msg = msg.clone(); + self.send_msg(msg).await; + } else { + // The reconnect is required if sending a message fails. + // The first message to be sent is initialization. + // The path is assumed to be unreachable. + iroha_logger::error!( + "Cannot reconnect telemetry because there is no initialization message" + ); + } + } else { + self.schedule_reconnect(); + } } + + async fn send_msg(&mut self, msg: Message) { + if let Some(sink) = self.sink.as_mut() { + match sink.send(msg).await { + Ok(_) => {} + Err(Error::AlreadyClosed | Error::ConnectionClosed) => { + iroha_logger::debug!("Closed connection to telemetry"); + self.sink = None; + self.schedule_reconnect(); + } + Err(e) => { + iroha_logger::error!("send failed: {:?}", e); + } + } + } + } + + fn schedule_reconnect(&mut self) { + self.retry_period.increase_exponent(); + let period = self.retry_period.period(); + iroha_logger::debug!("Scheduled reconnecting to telemetry in {} seconds", period); + let sender = self.internal_sender.clone(); + tokio::task::spawn(async move { + tokio::time::sleep(Duration::from_secs(period)).await; + let _ = sender.send(InternalMessage::Reconnect).await; + }); + } +} + +#[derive(Debug)] +enum InternalMessage { + Reconnect, +} + +fn prepare_message(name: &str, telemetry: Telemetry) -> Result<(Message, Option)> { let fields = telemetry.fields.0; - let msg = fields + let msg_kind = fields .iter() .find_map(|(name, map)| (*name == "msg").then(|| map)) .and_then(|v| { v.as_str().map(|v| match v { - "system.connected" => Msg::SystemConnected, - _ => Msg::Other, + "system.connected" => Some(MessageKind::Initialization), + _ => None, }) }) .ok_or_else(|| eyre!("Failed to read 'msg'"))?; @@ -89,7 +199,7 @@ fn prepare_message(name: &str, telemetry: Telemetry) -> Result { (field, map) }) .collect(); - if let Msg::SystemConnected = msg { + if let Some(MessageKind::Initialization) = msg_kind { payload.insert("name".into(), name.into()); payload.insert("chain".into(), "Iroha".into()); payload.insert("implementation".into(), "".into()); @@ -115,7 +225,41 @@ fn prepare_message(name: &str, telemetry: Telemetry) -> Result { map.insert("id".into(), 0.into()); map.insert("ts".into(), Local::now().to_rfc3339().into()); map.insert("payload".into(), payload.into()); - Ok(Message::Binary(serde_json::to_vec(&map)?)) + let msg = Message::Binary(serde_json::to_vec(&map)?); + Ok((msg, msg_kind)) +} + +#[derive(Debug, Clone, Copy)] +enum MessageKind { + Initialization, +} + +#[async_trait::async_trait] +trait SinkFactory { + type Sink: SinkExt + Sink + Send + Unpin; + + async fn create(&mut self) -> Result; +} + +struct WebsocketSinkFactory { + url: Url, +} + +impl WebsocketSinkFactory { + pub fn new(url: Url) -> Self { + Self { url } + } +} + +#[async_trait::async_trait] +impl SinkFactory for WebsocketSinkFactory { + type Sink = WebSocketSplitSink; + + async fn create(&mut self) -> Result { + let (ws, _) = tokio_tungstenite::connect_async(&self.url).await?; + let (write, _) = ws.split(); + Ok(write) + } } #[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)] @@ -123,24 +267,34 @@ fn prepare_message(name: &str, telemetry: Telemetry) -> Result { mod tests { use std::{ pin::Pin, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, task::{Context, Poll}, + time::Duration, }; - use futures::{channel::mpsc::Sender, Sink, StreamExt}; - use iroha_logger::telemetry::{Telemetry, TelemetryFields}; + use eyre::{eyre, Result}; + use futures::{Sink, StreamExt}; + use iroha_logger::{ + config::LoggerConfiguration, + telemetry::{Telemetry, TelemetryFields}, + }; use serde_json::{Map, Value}; + use tokio::task::JoinHandle; use tokio_tungstenite::tungstenite::{Error, Message}; - pub struct ManagedSender { - sender: Sender, - before_send: Box Result<(), E> + Send>, + use crate::ws::{Client, RetryPeriod, SinkFactory}; + + #[derive(Clone)] + pub struct FailableSender { + sender: futures::channel::mpsc::Sender, + before_send: F, } - impl ManagedSender { - pub fn new( - sender: Sender, - before_send: Box Result<(), E> + Send>, - ) -> Self { + impl FailableSender { + pub fn new(sender: futures::channel::mpsc::Sender, before_send: F) -> Self { Self { sender, before_send, @@ -148,14 +302,18 @@ mod tests { } } - impl Sink for ManagedSender { + impl Sink for FailableSender + where + F: FnMut() -> Result<(), E> + Unpin, + { type Error = E; fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = Pin::into_inner(self); match this.sender.poll_ready(cx) { Poll::Ready(r) => { - Poll::Ready((this.before_send)().map(|_| r.expect("failed to send"))) + let result = (this.before_send)().map(|_| r.expect("failed to send")); + Poll::Ready(result) } Poll::Pending => Poll::Pending, } @@ -181,58 +339,119 @@ mod tests { } } + struct MockSinkFactory { + fail: Arc, + sender: FailableSender, + } + + #[async_trait::async_trait] + impl SinkFactory for MockSinkFactory + where + F: FnMut() -> Result<(), Error> + Clone + Send + Unpin, + { + type Sink = FailableSender; + + async fn create(&mut self) -> Result { + if self.fail.load(Ordering::Acquire) { + Err(eyre!("failed to create")) + } else { + Ok(self.sender.clone()) + } + } + } + + struct Suite { + fail_send: Arc, + fail_factory_create: Arc, + telemetry_sender: tokio::sync::mpsc::Sender, + message_receiver: futures::channel::mpsc::Receiver, + run_handle: JoinHandle<()>, + } + + impl Suite { + pub fn new() -> Self { + let (telemetry_sender, telemetry_receiver) = tokio::sync::mpsc::channel(100); + let (message_sender, message_receiver) = futures::channel::mpsc::channel(100); + let fail_send = Arc::new(AtomicBool::new(false)); + let message_sender = { + let fail = Arc::clone(&fail_send); + FailableSender::new(message_sender, move || { + if fail.load(Ordering::Acquire) { + Err(Error::ConnectionClosed) + } else { + Ok(()) + } + }) + }; + let fail_factory_create = Arc::new(AtomicBool::new(false)); + let (internal_sender, internal_receiver) = tokio::sync::mpsc::channel(10); + let run_handle = { + let client = Client::new( + "node".to_owned(), + message_sender.clone(), + MockSinkFactory { + fail: Arc::clone(&fail_factory_create), + sender: message_sender, + }, + RetryPeriod::new(1, 0), + internal_sender, + ); + tokio::task::spawn(async move { + client.run(telemetry_receiver, internal_receiver).await; + }) + }; + Self { + fail_send, + fail_factory_create, + telemetry_sender, + message_receiver, + run_handle, + } + } + } + + fn system_connected_telemetry() -> Telemetry { + Telemetry { + target: "telemetry::test", + fields: TelemetryFields(vec![ + ("msg", Value::String("system.connected".to_owned())), + ( + "genesis_hash", + Value::String("00000000000000000000000000000000".to_owned()), + ), + ]), + } + } + + fn system_interval_telemetry(peers: u64) -> Telemetry { + Telemetry { + target: "telemetry::test", + fields: TelemetryFields(vec![ + ("msg", Value::String("system.interval".to_owned())), + ("peers", Value::Number(peers.into())), + ]), + } + } + #[tokio::test] - async fn run() { - let (sender, receiver) = tokio::sync::mpsc::channel(100); - let (sender_sink, mut receiver_sink) = futures::channel::mpsc::channel(100); - let mut send_index = 0; - let sender_sink = ManagedSender::new( - sender_sink, - Box::new(move || { - if send_index < 2 { - send_index += 1; - Ok(()) - } else { - Err(Error::ConnectionClosed) - } - }), - ); - let run_handle = tokio::task::spawn(super::run("Node".to_owned(), receiver, sender_sink)); - sender - .send(Telemetry { - target: "telemetry::test", - fields: TelemetryFields(vec![ - ("msg", Value::String("system.connected".to_owned())), - ( - "genesis_hash", - Value::String("00000000000000000000000000000000".to_owned()), - ), - ]), - }) - .await - .unwrap(); - sender - .send(Telemetry { - target: "telemetry::test", - fields: TelemetryFields(vec![ - ("msg", Value::String("system.interval".to_owned())), - ("peers", Value::Number(10.into())), - ]), - }) - .await - .unwrap(); - sender - .send(Telemetry { - target: "telemetry::test", - fields: TelemetryFields(vec![ - ("msg", Value::String("system.interval".to_owned())), - ("peers", Value::Number(10.into())), - ]), - }) + async fn send_succeeds() { + iroha_logger::init(&LoggerConfiguration::default()).unwrap(); + + let Suite { + telemetry_sender, + mut message_receiver, + run_handle, + .. + } = Suite::new(); + + // The first message is initialization + telemetry_sender + .send(system_connected_telemetry()) .await .unwrap(); + tokio::time::sleep(Duration::from_millis(100)).await; { - let msg = receiver_sink.next().await.unwrap(); + let msg = message_receiver.next().await.unwrap(); let bytes = if let Message::Binary(bytes) = msg { bytes } else { @@ -260,8 +479,15 @@ mod tests { assert!(payload.contains_key("startup_time")); assert!(payload.contains_key("network_id")); } + + // The second message is update + telemetry_sender + .send(system_interval_telemetry(2)) + .await + .unwrap(); + tokio::time::sleep(Duration::from_millis(100)).await; { - let msg = receiver_sink.next().await.unwrap(); + let msg = message_receiver.next().await.unwrap(); let bytes = if let Message::Binary(bytes) = msg { bytes } else { @@ -276,9 +502,99 @@ mod tests { payload.get("msg"), Some(&Value::String("system.interval".to_owned())) ); - assert_eq!(payload.get("peers"), Some(&Value::Number(10.into()))); + assert_eq!(payload.get("peers"), Some(&Value::Number(2.into()))); } - drop(sender); + + drop(telemetry_sender); + run_handle.await.unwrap(); + } + + #[tokio::test] + async fn reconnect_fails() { + iroha_logger::init(&LoggerConfiguration::default()).unwrap(); + + let Suite { + fail_send, + fail_factory_create, + telemetry_sender, + mut message_receiver, + run_handle, + } = Suite::new(); + + // Fail sending the first message + fail_send.store(true, Ordering::Release); + telemetry_sender + .send(system_connected_telemetry()) + .await + .unwrap(); + assert!(message_receiver.try_next().is_err()); + tokio::time::sleep(Duration::from_millis(100)).await; + + // The second message is not sent because the sink is reset + fail_send.store(false, Ordering::Release); + telemetry_sender + .send(system_interval_telemetry(1)) + .await + .unwrap(); + assert!(message_receiver.try_next().is_err()); + tokio::time::sleep(Duration::from_millis(100)).await; + + // Fail the reconnection + fail_factory_create.store(true, Ordering::Release); + tokio::time::sleep(Duration::from_secs(1)).await; + + // The third message is not sent because the sink is not created yet + telemetry_sender + .send(system_interval_telemetry(1)) + .await + .unwrap(); + assert!(message_receiver.try_next().is_err()); + + drop(telemetry_sender); + run_handle.await.unwrap(); + } + + #[tokio::test] + async fn send_after_reconnect_fails() { + iroha_logger::init(&LoggerConfiguration::default()).unwrap(); + + let Suite { + fail_send, + telemetry_sender, + mut message_receiver, + run_handle, + .. + } = Suite::new(); + + // Fail sending the first message + fail_send.store(true, Ordering::Release); + telemetry_sender + .send(system_connected_telemetry()) + .await + .unwrap(); + assert!(message_receiver.try_next().is_err()); + tokio::time::sleep(Duration::from_millis(100)).await; + + // The second message is not sent because the sink is reset + fail_send.store(false, Ordering::Release); + telemetry_sender + .send(system_interval_telemetry(1)) + .await + .unwrap(); + assert!(message_receiver.try_next().is_err()); + tokio::time::sleep(Duration::from_millis(100)).await; + + // Fail sending the first message after reconnect + fail_send.store(true, Ordering::Release); + tokio::time::sleep(Duration::from_secs(1)).await; + assert!(message_receiver.try_next().is_err()); + + // The message is sent + fail_send.store(false, Ordering::Release); + tokio::time::sleep(Duration::from_secs(1)).await; + assert!(message_receiver.try_next().is_ok()); + + drop(telemetry_sender); run_handle.await.unwrap(); } }