Skip to content

Commit

Permalink
Added PubSub client init success/failure handling and channel messages.
Browse files Browse the repository at this point in the history
  • Loading branch information
dmackdev committed Sep 21, 2023
1 parent 0f5ccfb commit d17c70e
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 13 deletions.
5 changes: 4 additions & 1 deletion pubsubman/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ impl App {
let (back_tx, back_rx) = tokio::sync::mpsc::channel(10);

std::thread::spawn(|| {
Backend::new(back_tx, front_rx, emulator_project_id).init();
if let Ok(mut backend) = Backend::new(back_tx, front_rx, emulator_project_id) {
backend.init();
};
});

refresh_topics(&front_tx, None);
Expand All @@ -67,6 +69,7 @@ impl App {
fn handle_backend_message(&mut self) {
match self.back_rx.try_recv() {
Ok(message) => match message {
BackendMessage::ClientInitialised => {}
BackendMessage::TopicsUpdated(topic_names) => {
self.topic_names = topic_names;
refresh_topics(&self.front_tx, Some(5000));
Expand Down
42 changes: 30 additions & 12 deletions pubsubman_backend/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::{error::Error, sync::Arc};

use futures_util::StreamExt;
use google_cloud_gax::conn::Environment;
Expand Down Expand Up @@ -34,20 +34,38 @@ impl Backend {
back_tx: Sender<BackendMessage>,
front_rx: Receiver<FrontendMessage>,
emulator_project_id: Option<String>,
) -> Self {
) -> Result<Self, Box<dyn Error>> {
let rt = Builder::new_multi_thread()
.worker_threads(4)
.enable_all()
.build()
.unwrap();

let client = rt.block_on(async { create_client(emulator_project_id).await });

Self {
back_tx,
front_rx,
client: Arc::new(client),
rt,
match rt.block_on(async { create_client(emulator_project_id).await }) {
Ok(client) => {
let back_tx_clone = back_tx.clone();
rt.spawn(async move {
back_tx_clone
.send(BackendMessage::ClientInitialised)
.await
.unwrap();
});
Ok(Self {
back_tx,
front_rx,
client: Arc::new(client),
rt,
})
}
Err(err) => {
rt.spawn(async move {
back_tx
.send(BackendMessage::Error(BackendError::ClientInitFailed))
.await
.unwrap();
});
Err(err)
}
}
}

Expand Down Expand Up @@ -217,14 +235,14 @@ impl Backend {
}
}

async fn create_client(emulator_project_id: Option<String>) -> Client {
let mut config = ClientConfig::default().with_auth().await.unwrap();
async fn create_client(emulator_project_id: Option<String>) -> Result<Client, Box<dyn Error>> {
let mut config = ClientConfig::default().with_auth().await?;

if let (Environment::Emulator(_), Some(emulator_project_id)) =
(&config.environment, emulator_project_id)
{
config.project_id = Some(emulator_project_id);
}

Client::new(config).await.unwrap()
Ok(Client::new(config).await?)
}
2 changes: 2 additions & 0 deletions pubsubman_backend/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub enum FrontendMessage {

#[derive(Debug)]
pub enum BackendMessage {
ClientInitialised,
TopicsUpdated(Vec<TopicName>),
SubscriptionCreated(TopicName, SubscriptionName),
MessageReceived(TopicName, PubsubMessage),
Expand All @@ -22,6 +23,7 @@ pub enum BackendMessage {

#[derive(Debug)]
pub enum BackendError {
ClientInitFailed,
GetTopicsFailed,
CreateSubscriptionFailed(TopicName),
StreamMessagesFailed(TopicName, SubscriptionName),
Expand Down

0 comments on commit d17c70e

Please sign in to comment.