Skip to content

feat: backoff + websocket example reconnect #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@
- [BREAKING] Renamed `to_kbevent` to `to_keyboard_event`.
- [BREAKING] `after_next_render` returns `RenderInfo`.
- `web_sys` and `wasm_bindgen` available in `seed::web_sys` and `seed::wasm_bindgen`.
- Added `WebSocket` + related items.
- Added `WebSocket` + related items (#8).
- Exposed `App::mailbox`.
- Added `streams::backoff` + updated `websocket` example.

## v0.6.0
- Implemented `UpdateEl` for `Filter` and `FilterMap`.
Expand Down
11 changes: 6 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,21 @@ enclose = "1.1.8"
gloo-timers = { version = "0.2.1", features = ["futures"] }
gloo-file = { version = "0.1.0", features = ["futures"] }
indexmap = "1.3.2"
js-sys = "0.3.37"
pulldown-cmark = "0.7.0"
js-sys = "0.3.39"
pulldown-cmark = "0.7.1"
rand = { version = "0.7.3", features = ["wasm-bindgen", "small_rng"] }
serde = { version = "1.0.106", features = ['derive'] }
serde_json = "1.0.51"
wasm-bindgen = {version = "0.2.60", features = ["serde-serialize"]}
wasm-bindgen-futures = "0.4.10"
wasm-bindgen = {version = "0.2.62", features = ["serde-serialize"]}
wasm-bindgen-futures = "0.4.12"
# @TODO: remove once we can use entities without `Debug` in `log!` and `error!` on `stable` Rust.
# https://github.com/Centril/rfcs/blob/rfc/quick-debug-macro/text/0000-quick-debug-macro.md#types-which-are-not-debug
dbg = "1.0.4"
futures = "0.3.4"
uuid = { version = "0.8.1", features = ["v4", "wasm-bindgen"] }

[dependencies.web-sys]
version = "0.3.37"
version = "0.3.39"
features = [
"AbortController",
"AbortSignal",
Expand Down
51 changes: 41 additions & 10 deletions examples/websocket/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,50 +13,62 @@ struct Model {
messages: Vec<String>,
input_text: String,
web_socket: WebSocket,
web_socket_reconnector: Option<StreamHandle>,
}

// ------ ------
// Init
// ------ ------

fn init(_: Url, orders: &mut impl Orders<Msg>) -> Model {
let web_socket = WebSocket::builder(WS_URL, orders)
.on_open(|| log!("WebSocket connection is open now"))
.on_message(Msg::MessageReceived)
.on_close(Msg::WebSocketClosed)
.on_error(|| log!("Error"))
.build_and_open()
.unwrap();

Model {
sent_messages_count: 0,
messages: Vec::new(),
input_text: String::new(),
web_socket,
web_socket: create_websocket(orders),
web_socket_reconnector: None,
}
}

fn create_websocket(orders: &impl Orders<Msg>) -> WebSocket {
WebSocket::builder(WS_URL, orders)
.on_open(|| Msg::WebSocketOpened)
.on_message(Msg::MessageReceived)
.on_close(Msg::WebSocketClosed)
.on_error(|| Msg::WebSocketFailed)
.build_and_open()
.unwrap()
}

// ------ ------
// Update
// ------ ------

enum Msg {
WebSocketOpened,
MessageReceived(WebSocketMessage),
CloseWebSocket,
WebSocketClosed(CloseEvent),
WebSocketFailed,
ReconnectWebSocket(usize),
InputTextChanged(String),
SendMessage(shared::ClientMessage),
}

fn update(msg: Msg, mut model: &mut Model, _: &mut impl Orders<Msg>) {
fn update(msg: Msg, mut model: &mut Model, orders: &mut impl Orders<Msg>) {
match msg {
Msg::WebSocketOpened => {
model.web_socket_reconnector = None;
log!("WebSocket connection is open now");
}
Msg::MessageReceived(message) => {
log!("Client received a message");
model
.messages
.push(message.json::<shared::ServerMessage>().unwrap().text);
}
Msg::CloseWebSocket => {
model.web_socket_reconnector = None;
model
.web_socket
.close(None, Some("user clicked Close button"))
Expand All @@ -69,6 +81,25 @@ fn update(msg: Msg, mut model: &mut Model, _: &mut impl Orders<Msg>) {
log!("Code:", close_event.code());
log!("Reason:", close_event.reason());
log!("==================");

// Chrome doesn't invoke `on_error` when the connection is lost.
if !close_event.was_clean() && model.web_socket_reconnector.is_none() {
model.web_socket_reconnector = Some(
orders.stream_with_handle(streams::backoff(None, Msg::ReconnectWebSocket)),
);
}
}
Msg::WebSocketFailed => {
log!("WebSocket failed");
if model.web_socket_reconnector.is_none() {
model.web_socket_reconnector = Some(
orders.stream_with_handle(streams::backoff(None, Msg::ReconnectWebSocket)),
);
}
}
Msg::ReconnectWebSocket(retries) => {
log!("Reconnect attempt:", retries);
model.web_socket = create_websocket(orders);
}
Msg::InputTextChanged(input_text) => {
model.input_text = input_text;
Expand Down
32 changes: 32 additions & 0 deletions src/app/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ use web_sys::Event;
mod event_stream;
use event_stream::EventStream;

mod backoff_stream;
use backoff_stream::BackoffStream;

// ------ Interval stream ------

/// Stream no values on predefined time interval in milliseconds.
Expand All @@ -31,6 +34,35 @@ pub fn interval<MsU>(
IntervalStream::new(ms).map(move |_| handler.clone()())
}

// ------ Backoff stream ------

/// Stream retries count in increasing intervals.
///
/// Algorithm - [Truncated exponential backoff](https://cloud.google.com/storage/docs/exponential-backoff)
///
/// # Arguments
///
/// * `max_seconds` - Typically `32` or `64` seconds. Default is `32`.
/// * `handler` - Receives the number of retries (starting from 1); Has to return `Msg`, `Option<Msg>` or `()`.
///
/// # Example
///
/// ```rust,no_run
///orders.stream(streams::backoff(None, |_retries| Msg::OnTick));
///orders.stream_with_handle(streams::backoff(Some(15), |_| log!("Tick!")));
/// ```
///
/// # Panics
///
/// Panics when the handler doesn't return `Msg`, `Option<Msg>` or `()`.
/// (It will be changed to a compile-time error).
pub fn backoff<MsU>(
max_seconds: Option<u32>,
handler: impl FnOnce(usize) -> MsU + Clone + 'static,
) -> impl Stream<Item = MsU> {
BackoffStream::new(max_seconds.unwrap_or(32)).map(move |retries| handler.clone()(retries))
}

// ------ Window Event stream ------

/// Stream `Window` `web_sys::Event`s.
Expand Down
73 changes: 73 additions & 0 deletions src/app/streams/backoff_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use futures::channel::mpsc;
use futures::stream::Stream;
use gloo_timers::callback::Timeout;
use rand::{rngs::SmallRng, Rng, SeedableRng};
use std::convert::TryFrom;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};

// ------ BackoffStream ------

/// [Truncated exponential backoff](https://cloud.google.com/storage/docs/exponential-backoff)
#[derive(Debug)]
pub struct BackoffStream {
max_seconds: u32,
retries: usize,
timeout: Timeout,
tick_sender: Rc<mpsc::UnboundedSender<()>>,
tick_receiver: mpsc::UnboundedReceiver<()>,
}

impl BackoffStream {
pub fn new(max_seconds: u32) -> Self {
let (tick_sender, tick_receiver) = mpsc::unbounded();
let tick_sender = Rc::new(tick_sender);

let retries = 0;
Self {
max_seconds,
retries,
timeout: start_timeout(wait_time(retries, max_seconds), &tick_sender),
tick_sender,
tick_receiver,
}
}
}

impl Stream for BackoffStream {
type Item = usize;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
match Stream::poll_next(Pin::new(&mut self.tick_receiver), cx) {
Poll::Ready(Some(_)) => {
self.retries += 1;
self.timeout =
start_timeout(wait_time(self.retries, self.max_seconds), &self.tick_sender);
Poll::Ready(Some(self.retries))
}
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}

fn wait_time(retries: usize, max_seconds: u32) -> u32 {
let retries = u32::try_from(retries).unwrap_or(u32::max_value());
let random_ms = SmallRng::from_entropy().gen_range(0, 1001);

let duration = 2_u32
.saturating_pow(retries)
.saturating_mul(1000)
.saturating_add(random_ms);
let max_duration = max_seconds.saturating_mul(1000);

u32::min(duration, max_duration)
}

fn start_timeout(ms: u32, tick_sender: &Rc<mpsc::UnboundedSender<()>>) -> Timeout {
let tick_sender = Rc::clone(tick_sender);
Timeout::new(ms, move || {
tick_sender.unbounded_send(()).expect("send backoff tick")
})
}
3 changes: 2 additions & 1 deletion src/app/streams/event_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use web_sys::EventTarget;

// ------ EventStream ------

// @TODO Replace `mpsc` with `crossbeam`? (And integrate it into the other Seed parts (e.g. `Listener`, `SubManager`)).
// @TODO Replace `mpsc` with `crossbeam`, `futures-signals` or `flume`?
// (And integrate it into the other Seed parts (e.g. `Listener`, `SubManager`, `BackoffStream`)).

// @TODO Update it to support different `web_sys` events
// during implementation of https://github.com/seed-rs/seed/issues/331
Expand Down
3 changes: 2 additions & 1 deletion src/browser/web_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ pub enum WebSocketError {
///
/// [MDN reference](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket)
#[derive(Debug)]
#[must_use = "WebSocket is closed on drop"]
pub struct WebSocket {
ws: web_sys::WebSocket,
callbacks: Callbacks,
Expand All @@ -93,7 +94,7 @@ impl WebSocket {
/// _Note:_ Always prefer `wss://` - encrypted and more reliable.
pub fn builder<U: AsRef<str>, Ms: 'static, O: Orders<Ms>>(
url: U,
orders: &mut O,
orders: &O,
) -> Builder<U, Ms, O> {
Builder::new(url, orders)
}
Expand Down
7 changes: 5 additions & 2 deletions src/browser/web_socket/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub(crate) struct Callbacks {
///```
pub struct Builder<'a, U: AsRef<str>, Ms: 'static, O: Orders<Ms>> {
url: U,
orders: &'a mut O,
orders: &'a O,
callbacks: Callbacks,
protocols: &'a [&'a str],
binary_type: Option<BinaryType>,
Expand All @@ -40,7 +40,7 @@ pub struct Builder<'a, U: AsRef<str>, Ms: 'static, O: Orders<Ms>> {

impl<'a, U: AsRef<str>, Ms: 'static, O: Orders<Ms>> Builder<'a, U, Ms, O> {
// Note: `WebSocket::builder` is the preferred way how to crate a new `Builder` instance.
pub(crate) fn new(url: U, orders: &'a mut O) -> Self {
pub(crate) fn new(url: U, orders: &'a O) -> Self {
Self {
url,
orders,
Expand Down Expand Up @@ -160,6 +160,9 @@ impl<'a, U: AsRef<str>, Ms: 'static, O: Orders<Ms>> Builder<'a, U, Ms, O> {
///
/// Returns `WebSocketError::OpenError` when Web Socket opening fails.
/// E.g. when the chosen port is blocked.
///
/// _Note:_: It doesn't return error when the socket is open on the client side,
/// but fails to connect to the server - use `on_error` handler to resolve such cases.
pub fn build_and_open(self) -> Result<WebSocket> {
WebSocket::new(
self.url.as_ref(),
Expand Down