Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions src/bot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use core::fmt;
use std::{fmt::Display, sync::Arc};

use log::error;
use tokio::{signal, sync::Mutex};
use tokio::{signal, sync::Mutex, task};

use crate::service::{OverallStatus, Service, ServiceManager, ServiceManagerBuilder};

Expand Down Expand Up @@ -91,16 +91,22 @@ impl Bot {
}
});

let task_id = match task::try_id() {
Some(id) => id.to_string(),
None => "None".to_string(),
};
let subscriber_name = format!("Bot join on task {}", task_id);

let service_manager_clone = self.service_manager.clone();
let mut receiver = self
let (_, mut receiver) = self
.service_manager
.on_status_change
.event
.subscribe_channel("t", 2, true, true)
.subscribe_channel(subscriber_name, 2, true, true)
.await;
let status_task = tokio::spawn(async move {
let service_manager = service_manager_clone;
while (receiver.receiver.recv().await).is_some() {
while (receiver.recv().await).is_some() {
let overall_status = service_manager.overall_status().await;
if overall_status == OverallStatus::Unhealthy {
return;
Expand Down
2 changes: 0 additions & 2 deletions src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@ pub mod event;
pub mod event_repeater;
pub mod observable;
pub mod subscriber;
pub mod subscription;

pub use arc_observable::ArcObservable;
pub use event::Event;
pub use event_repeater::EventRepeater;
pub use observable::{Observable, ObservableResult};
pub use subscriber::{Callback, DispatchError, Subscriber};
pub use subscription::{ReceiverSubscription, Subscription};
11 changes: 10 additions & 1 deletion src/event/arc_observable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ where
}

pub async fn get(&self) -> Arc<Mutex<T>> {
Arc::clone(&self.value)
self.value.clone()
}

pub async fn set(&self, value: T) -> ObservableResult<Mutex<T>> {
Expand Down Expand Up @@ -58,3 +58,12 @@ where
}
}
}

impl<T> AsRef<Event<Mutex<T>>> for ArcObservable<T>
where
T: Send + 'static + Hash,
{
fn as_ref(&self) -> &Event<Mutex<T>> {
&self.on_change
}
}
53 changes: 29 additions & 24 deletions src/event/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ use std::{
fmt::{self, Debug, Formatter},
sync::Arc,
};
use tokio::sync::{mpsc::channel, Mutex};
use tokio::sync::{
mpsc::{channel, Receiver},
Mutex,
};
use uuid::Uuid;

use super::{Callback, DispatchError, ReceiverSubscription, Subscriber, Subscription};
use super::{Callback, DispatchError, Subscriber};

pub struct Event<T>
where
Expand Down Expand Up @@ -45,7 +48,7 @@ where
buffer: usize,
log_on_error: bool,
remove_on_error: bool,
) -> ReceiverSubscription<Arc<T>>
) -> (Uuid, Receiver<Arc<T>>)
where
S: Into<String>,
{
Expand All @@ -57,13 +60,12 @@ where
Callback::Channel(sender),
);

let subscription = Subscription::from(&subscriber);
let receiver_subscription = ReceiverSubscription::new(subscription, receiver);
let uuid = subscriber.uuid;

let mut subscribers = self.subscribers.lock().await;
subscribers.push(subscriber);

receiver_subscription
(uuid, receiver)
}

pub async fn subscribe_async_closure<S>(
Expand All @@ -72,7 +74,7 @@ where
closure: impl Fn(Arc<T>) -> PinnedBoxedFutureResult<()> + Send + Sync + 'static,
log_on_error: bool,
remove_on_error: bool,
) -> Subscription
) -> Uuid
where
S: Into<String>,
{
Expand All @@ -82,12 +84,13 @@ where
remove_on_error,
Callback::AsyncClosure(Box::new(closure)),
);
let subscription = Subscription::from(&subscriber);

let uuid = subscriber.uuid;

let mut subscribers = self.subscribers.lock().await;
subscribers.push(subscriber);

subscription
uuid
}

pub async fn subscribe_closure<S>(
Expand All @@ -96,7 +99,7 @@ where
closure: impl Fn(Arc<T>) -> Result<(), BoxedError> + Send + Sync + 'static,
log_on_error: bool,
remove_on_error: bool,
) -> Subscription
) -> Uuid
where
S: Into<String>,
{
Expand All @@ -106,30 +109,32 @@ where
remove_on_error,
Callback::Closure(Box::new(closure)),
);
let subscription = Subscription::from(&subscriber);

let uuid = subscriber.uuid;

let mut subscribers = self.subscribers.lock().await;
subscribers.push(subscriber);

subscription
uuid
}

pub async fn unsubscribe<S>(&self, subscription: S) -> Option<Subscription>
pub async fn unsubscribe<UUID>(&self, uuid: &UUID) -> bool
where
S: Into<Subscription>,
UUID: AsRef<Uuid>,
{
let subscription_to_remove = subscription.into();
let uuid = uuid.as_ref();

let mut subscribers = self.subscribers.lock().await;
let index = subscribers.iter().position(|subscription_of_event| {
subscription_of_event.uuid == subscription_to_remove.uuid
});

if let Some(index) = index {
subscribers.remove(index);
None
} else {
Some(subscription_to_remove)
let index = subscribers
.iter()
.position(|subscriber| subscriber.uuid == *uuid);

match index {
Some(index) => {
subscribers.remove(index);
true
}
None => false,
}
}

Expand Down
11 changes: 4 additions & 7 deletions src/event/event_repeater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use thiserror::Error;
use tokio::{sync::Mutex, task::JoinHandle};
use uuid::Uuid;

use super::{Event, Subscription};
use super::Event;

#[derive(Debug, Error)]
pub enum AttachError {
Expand Down Expand Up @@ -52,7 +52,7 @@ where
{
pub event: Event<T>,
weak: OnceLock<Weak<Self>>,
subscriptions: Mutex<HashMap<Uuid, (Subscription, JoinHandle<()>)>>,
subscriptions: Mutex<HashMap<Uuid, (Uuid, JoinHandle<()>)>>,
}

impl<T> EventRepeater<T>
Expand Down Expand Up @@ -118,19 +118,16 @@ where
});
}

let receiver_subscription = event
let (uuid, mut receiver) = event
.subscribe_channel(&self.event.name, buffer, true, true)
.await;

let subscription = receiver_subscription.subscription;
let mut receiver = receiver_subscription.receiver;

let join_handle = tokio::spawn(async move {
while let Some(value) = receiver.recv().await {
let _ = arc.event.dispatch(value).await;
}
});
subscriptions.insert(event.uuid, (subscription, join_handle));
subscriptions.insert(event.uuid, (uuid, join_handle));

Ok(())
}
Expand Down
71 changes: 0 additions & 71 deletions src/event/subscription.rs

This file was deleted.

Loading