Skip to content

Commit

Permalink
Merge pull request #1198 from iced-rs/subscription-helpers
Browse files Browse the repository at this point in the history
`websocket` example and helpers to create custom subscriptions
  • Loading branch information
hecrj authored Jan 17, 2022
2 parents b7bc169 + 5ce8653 commit 92a699b
Show file tree
Hide file tree
Showing 10 changed files with 615 additions and 137 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ members = [
"examples/tooltip",
"examples/tour",
"examples/url_handler",
"examples/websocket",
]

[dependencies]
Expand Down
6 changes: 1 addition & 5 deletions examples/clock/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,12 @@ impl Application for Clock {
}

fn view(&mut self) -> Element<Message> {
let canvas = Canvas::new(self)
.width(Length::Units(400))
.height(Length::Units(400));
let canvas = Canvas::new(self).width(Length::Fill).height(Length::Fill);

Container::new(canvas)
.width(Length::Fill)
.height(Length::Fill)
.padding(20)
.center_x()
.center_y()
.into()
}
}
Expand Down
142 changes: 55 additions & 87 deletions examples/download_progress/src/download.rs
Original file line number Diff line number Diff line change
@@ -1,111 +1,79 @@
use iced_futures::futures;
use std::hash::{Hash, Hasher};
use iced_native::subscription;

use std::hash::Hash;

// Just a little utility function
pub fn file<I: 'static + Hash + Copy + Send, T: ToString>(
pub fn file<I: 'static + Hash + Copy + Send + Sync, T: ToString>(
id: I,
url: T,
) -> iced::Subscription<(I, Progress)> {
iced::Subscription::from_recipe(Download {
id,
url: url.to_string(),
subscription::unfold(id, State::Ready(url.to_string()), move |state| {
download(id, state)
})
}

#[derive(Debug, Hash, Clone)]
pub struct Download<I> {
id: I,
url: String,
}

// Make sure iced can use our download stream
impl<H, I, T> iced_native::subscription::Recipe<H, I> for Download<T>
where
T: 'static + Hash + Copy + Send,
H: Hasher,
{
type Output = (T, Progress);

fn hash(&self, state: &mut H) {
struct Marker;
std::any::TypeId::of::<Marker>().hash(state);

self.id.hash(state);
}
async fn download<I: Copy>(
id: I,
state: State,
) -> (Option<(I, Progress)>, State) {
match state {
State::Ready(url) => {
let response = reqwest::get(&url).await;

fn stream(
self: Box<Self>,
_input: futures::stream::BoxStream<'static, I>,
) -> futures::stream::BoxStream<'static, Self::Output> {
let id = self.id;
match response {
Ok(response) => {
if let Some(total) = response.content_length() {
(
Some((id, Progress::Started)),
State::Downloading {
response,
total,
downloaded: 0,
},
)
} else {
(Some((id, Progress::Errored)), State::Finished)
}
}
Err(_) => (Some((id, Progress::Errored)), State::Finished),
}
}
State::Downloading {
mut response,
total,
downloaded,
} => match response.chunk().await {
Ok(Some(chunk)) => {
let downloaded = downloaded + chunk.len() as u64;

Box::pin(futures::stream::unfold(
State::Ready(self.url),
move |state| async move {
match state {
State::Ready(url) => {
let response = reqwest::get(&url).await;
let percentage = (downloaded as f32 / total as f32) * 100.0;

match response {
Ok(response) => {
if let Some(total) = response.content_length() {
Some((
(id, Progress::Started),
State::Downloading {
response,
total,
downloaded: 0,
},
))
} else {
Some((
(id, Progress::Errored),
State::Finished,
))
}
}
Err(_) => {
Some(((id, Progress::Errored), State::Finished))
}
}
}
(
Some((id, Progress::Advanced(percentage))),
State::Downloading {
mut response,
response,
total,
downloaded,
} => match response.chunk().await {
Ok(Some(chunk)) => {
let downloaded = downloaded + chunk.len() as u64;

let percentage =
(downloaded as f32 / total as f32) * 100.0;

Some((
(id, Progress::Advanced(percentage)),
State::Downloading {
response,
total,
downloaded,
},
))
}
Ok(None) => {
Some(((id, Progress::Finished), State::Finished))
}
Err(_) => {
Some(((id, Progress::Errored), State::Finished))
}
},
State::Finished => {
// We do not let the stream die, as it would start a
// new download repeatedly if the user is not careful
// in case of errors.
let _: () = iced::futures::future::pending().await;
)
}
Ok(None) => (Some((id, Progress::Finished)), State::Finished),
Err(_) => (Some((id, Progress::Errored)), State::Finished),
},
State::Finished => {
// We do not let the stream die, as it would start a
// new download repeatedly if the user is not careful
// in case of errors.
let _: () = iced::futures::future::pending().await;

None
}
}
},
))
unreachable!()
}
}
}

Expand Down
22 changes: 22 additions & 0 deletions examples/websocket/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[package]
name = "websocket"
version = "0.1.0"
authors = ["Héctor Ramón Jiménez <hector0193@gmail.com>"]
edition = "2018"
publish = false

[dependencies]
iced = { path = "../..", features = ["tokio", "debug"] }
iced_native = { path = "../../native" }
iced_futures = { path = "../../futures" }

[dependencies.async-tungstenite]
version = "0.16"
features = ["tokio-rustls-webpki-roots"]

[dependencies.tokio]
version = "1"
features = ["time"]

[dependencies.warp]
version = "0.3"
17 changes: 17 additions & 0 deletions examples/websocket/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
## Websocket

A simple example that keeps a WebSocket connection open to an echo server.

The example consists of 3 modules:
- [`main`] contains the `Application` logic.
- [`echo`] implements a WebSocket client for the [`echo::server`] with `async-tungstenite`.
- [`echo::server`] implements a simple WebSocket echo server with `warp`.

You can run it with `cargo run`:
```
cargo run --package websocket
```

[`main`]: src/main.rs
[`echo`]: src/echo.rs
[`echo::server`]: src/echo/server.rs
146 changes: 146 additions & 0 deletions examples/websocket/src/echo.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
pub mod server;

use iced_futures::futures;
use iced_native::subscription::{self, Subscription};

use futures::channel::mpsc;
use futures::sink::SinkExt;
use futures::stream::StreamExt;

use async_tungstenite::tungstenite;

pub fn connect() -> Subscription<Event> {
struct Connect;

subscription::unfold(
std::any::TypeId::of::<Connect>(),
State::Disconnected,
|state| async move {
match state {
State::Disconnected => {
const ECHO_SERVER: &str = "ws://localhost:3030";

match async_tungstenite::tokio::connect_async(ECHO_SERVER)
.await
{
Ok((websocket, _)) => {
let (sender, receiver) = mpsc::channel(100);

(
Some(Event::Connected(Connection(sender))),
State::Connected(websocket, receiver),
)
}
Err(_) => {
let _ = tokio::time::sleep(
tokio::time::Duration::from_secs(1),
)
.await;

(Some(Event::Disconnected), State::Disconnected)
}
}
}
State::Connected(mut websocket, mut input) => {
let mut fused_websocket = websocket.by_ref().fuse();

futures::select! {
received = fused_websocket.select_next_some() => {
match received {
Ok(tungstenite::Message::Text(message)) => {
(
Some(Event::MessageReceived(Message::User(message))),
State::Connected(websocket, input)
)
}
Ok(_) => {
(None, State::Connected(websocket, input))
}
Err(_) => {
(Some(Event::Disconnected), State::Disconnected)
}
}
}

message = input.select_next_some() => {
let result = websocket.send(tungstenite::Message::Text(String::from(message))).await;

if result.is_ok() {
(None, State::Connected(websocket, input))
} else {
(Some(Event::Disconnected), State::Disconnected)
}
}
}
}
}
},
)
}

#[derive(Debug)]
enum State {
Disconnected,
Connected(
async_tungstenite::WebSocketStream<
async_tungstenite::tokio::ConnectStream,
>,
mpsc::Receiver<Message>,
),
}

#[derive(Debug, Clone)]
pub enum Event {
Connected(Connection),
Disconnected,
MessageReceived(Message),
}

#[derive(Debug, Clone)]
pub struct Connection(mpsc::Sender<Message>);

impl Connection {
pub fn send(&mut self, message: Message) {
let _ = self
.0
.try_send(message)
.expect("Send message to echo server");
}
}

#[derive(Debug, Clone)]
pub enum Message {
Connected,
Disconnected,
User(String),
}

impl Message {
pub fn new(message: &str) -> Option<Self> {
if message.is_empty() {
None
} else {
Some(Self::User(message.to_string()))
}
}

pub fn connected() -> Self {
Message::Connected
}

pub fn disconnected() -> Self {
Message::Disconnected
}
}

impl From<Message> for String {
fn from(message: Message) -> Self {
match message {
Message::Connected => String::from("Connected successfully!"),
Message::Disconnected => {
String::from("Connection lost... Retrying...")
}
Message::User(message) => message,
}
}
}
Loading

0 comments on commit 92a699b

Please sign in to comment.