Skip to content

Commit

Permalink
Changed FE message to a generic subscribe request, which can be cance…
Browse files Browse the repository at this point in the history
…lled with a token. Added subscribe functionality when Stream mode enabled.
  • Loading branch information
dmackdev committed Aug 3, 2023
1 parent 2917eda commit 11575d2
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 22 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ members = ["pubsubman_gui", "pubsubman_backend"]
[workspace.dependencies]
chrono = { version = "0.4.26", features = ["clock"] }
tokio = { version = "1.29.1", features = ["full"] }
tokio-util = "0.7.8"

[profile.release]
opt-level = 2 # fast and small wasm
Expand Down
1 change: 1 addition & 0 deletions pubsubman_backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ chrono = { workspace = true }
futures-util = "0.3.28"
google-cloud-pubsub = "0.18.0"
tokio = { workspace = true }
tokio-util = { workspace = true }

[dependencies.uuid]
version = "1.4.1"
Expand Down
11 changes: 6 additions & 5 deletions pubsubman_backend/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::time::Duration;

use futures_util::StreamExt;
use google_cloud_pubsub::{
client::{Client, ClientConfig},
Expand All @@ -8,8 +6,8 @@ use google_cloud_pubsub::{
use message::{BackendMessage, FrontendMessage};
use tokio::{
runtime::Builder,
select,
sync::mpsc::{Receiver, Sender},
time::timeout,
};
use uuid::Uuid;

Expand Down Expand Up @@ -72,7 +70,7 @@ impl Backend {
.unwrap();
});
}
FrontendMessage::PullMessages(sub_id) => {
FrontendMessage::Subscribe(sub_id, cancel_token) => {
let back_tx = self.back_tx.clone();

rt.spawn(async move {
Expand All @@ -95,7 +93,10 @@ impl Backend {
}
};

let _ = timeout(Duration::from_secs(1), pull_messages_future).await;
select! {
_ = cancel_token.cancelled() => {}
_ = pull_messages_future => {}
}
});
}
}
Expand Down
4 changes: 3 additions & 1 deletion pubsubman_backend/src/message.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use tokio_util::sync::CancellationToken;

use crate::pubsub_message::PubsubMessage;

#[derive(Debug)]
pub enum FrontendMessage {
RefreshTopicsRequest,
CreateSubscriptionRequest(String),
PullMessages(String),
Subscribe(String, CancellationToken),
}

#[derive(Debug)]
Expand Down
1 change: 1 addition & 0 deletions pubsubman_gui/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ serde = { version = "1", features = ["derive"] }

env_logger = "0.10"
tokio = { workspace = true }
tokio-util = { workspace = true }
pubsubman_backend = { version = "0.1.0", path = "../pubsubman_backend" }
55 changes: 39 additions & 16 deletions pubsubman_gui/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use pubsubman_backend::{
Backend,
};
use tokio::sync::mpsc::{Receiver, Sender};
use tokio_util::sync::CancellationToken;

use crate::{subscription::Subscription, topic::Topic};

Expand Down Expand Up @@ -112,9 +113,10 @@ impl TemplateApp {
ui.heading("Topics");

for topic in self.topics.iter() {
let is_selected = self.topic_view.as_ref().is_some_and(
|topic_view| *topic_view.selected_topic_id == topic.id,
);
let is_selected = self
.topic_view
.as_ref()
.is_some_and(|topic_view| *topic_view.selected_topic_id == topic.id);

let on_click = || {
self.topic_view = Some(TopicViewState::new(topic.id.to_string()));
Expand Down Expand Up @@ -160,28 +162,49 @@ impl TemplateApp {
ui.add_enabled_ui(!topic_view.stream_messages_enabled, |ui| {

let pull_button = ui.button("Pull");

if pull_button.clicked() {
let front_tx = self.front_tx.clone();
let sub_id = subscription.id.clone();

let cancel_token = CancellationToken::new();
let cancel_token_clone = cancel_token.clone();

tokio::spawn(async move {
front_tx
.send(FrontendMessage::PullMessages(sub_id))
.send(FrontendMessage::Subscribe(sub_id, cancel_token))
.await
.unwrap();
});

tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
cancel_token_clone.cancel();
});
}
pull_button.on_hover_text(
"Retrieve batch of all undelivered messages on this subscription.",
).on_disabled_hover_text("Disable Stream mode to Pull messages.");
});
}

pull_button.on_hover_text(
"Retrieve batch of all undelivered messages on this subscription.",
).on_disabled_hover_text("Disable Stream mode to Pull messages.");
});

ui.toggle_value(
&mut topic_view.stream_messages_enabled,

let stream_mode_toggle = ui.toggle_value( &mut topic_view.stream_messages_enabled,
"Stream",
).on_hover_text("Continuously retrieve messages delivered to this subscription.");
);

if stream_mode_toggle.changed() && topic_view.stream_messages_enabled {
let front_tx = self.front_tx.clone();
let sub_id = subscription.id.clone();
let cancel_token = CancellationToken::new();

tokio::spawn(async move {
front_tx
.send(FrontendMessage::Subscribe(sub_id, cancel_token))
.await
.unwrap();
});

// TODO: Store the cancellation token and cancel it when steam mode is disabled or this topic view is closed.
}

stream_mode_toggle.on_hover_text("Continuously retrieve messages delivered to this subscription.");
});

egui::ScrollArea::vertical()
Expand Down

0 comments on commit 11575d2

Please sign in to comment.