Skip to content

Commit a51a905

Browse files
committed
refactor: events
- Remove Subscription - Remove ReceiverSubscription - Fix wrong event name used in Bot::join() - Adapt code to use UUIDs instead of Subscriptions
1 parent 30a3f10 commit a51a905

File tree

5 files changed

+43
-108
lines changed

5 files changed

+43
-108
lines changed

src/bot.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use core::fmt;
22
use std::{fmt::Display, sync::Arc};
33

44
use log::error;
5-
use tokio::{signal, sync::Mutex};
5+
use tokio::{signal, sync::Mutex, task};
66

77
use crate::service::{OverallStatus, Service, ServiceManager, ServiceManagerBuilder};
88

@@ -91,16 +91,22 @@ impl Bot {
9191
}
9292
});
9393

94+
let task_id = match task::try_id() {
95+
Some(id) => id.to_string(),
96+
None => "None".to_string(),
97+
};
98+
let subscriber_name = format!("Bot join on task {}", task_id);
99+
94100
let service_manager_clone = self.service_manager.clone();
95-
let mut receiver = self
101+
let (_, mut receiver) = self
96102
.service_manager
97103
.on_status_change
98104
.event
99-
.subscribe_channel("t", 2, true, true)
105+
.subscribe_channel(subscriber_name, 2, true, true)
100106
.await;
101107
let status_task = tokio::spawn(async move {
102108
let service_manager = service_manager_clone;
103-
while (receiver.receiver.recv().await).is_some() {
109+
while (receiver.recv().await).is_some() {
104110
let overall_status = service_manager.overall_status().await;
105111
if overall_status == OverallStatus::Unhealthy {
106112
return;

src/event.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,9 @@ pub mod event;
33
pub mod event_repeater;
44
pub mod observable;
55
pub mod subscriber;
6-
pub mod subscription;
76

87
pub use arc_observable::ArcObservable;
98
pub use event::Event;
109
pub use event_repeater::EventRepeater;
1110
pub use observable::{Observable, ObservableResult};
1211
pub use subscriber::{Callback, DispatchError, Subscriber};
13-
pub use subscription::{ReceiverSubscription, Subscription};

src/event/event.rs

Lines changed: 29 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,13 @@ use std::{
44
fmt::{self, Debug, Formatter},
55
sync::Arc,
66
};
7-
use tokio::sync::{mpsc::channel, Mutex};
7+
use tokio::sync::{
8+
mpsc::{channel, Receiver},
9+
Mutex,
10+
};
811
use uuid::Uuid;
912

10-
use super::{Callback, DispatchError, ReceiverSubscription, Subscriber, Subscription};
13+
use super::{Callback, DispatchError, Subscriber};
1114

1215
pub struct Event<T>
1316
where
@@ -45,7 +48,7 @@ where
4548
buffer: usize,
4649
log_on_error: bool,
4750
remove_on_error: bool,
48-
) -> ReceiverSubscription<Arc<T>>
51+
) -> (Uuid, Receiver<Arc<T>>)
4952
where
5053
S: Into<String>,
5154
{
@@ -57,13 +60,12 @@ where
5760
Callback::Channel(sender),
5861
);
5962

60-
let subscription = Subscription::from(&subscriber);
61-
let receiver_subscription = ReceiverSubscription::new(subscription, receiver);
63+
let uuid = subscriber.uuid;
6264

6365
let mut subscribers = self.subscribers.lock().await;
6466
subscribers.push(subscriber);
6567

66-
receiver_subscription
68+
(uuid, receiver)
6769
}
6870

6971
pub async fn subscribe_async_closure<S>(
@@ -72,7 +74,7 @@ where
7274
closure: impl Fn(Arc<T>) -> PinnedBoxedFutureResult<()> + Send + Sync + 'static,
7375
log_on_error: bool,
7476
remove_on_error: bool,
75-
) -> Subscription
77+
) -> Uuid
7678
where
7779
S: Into<String>,
7880
{
@@ -82,12 +84,13 @@ where
8284
remove_on_error,
8385
Callback::AsyncClosure(Box::new(closure)),
8486
);
85-
let subscription = Subscription::from(&subscriber);
87+
88+
let uuid = subscriber.uuid;
8689

8790
let mut subscribers = self.subscribers.lock().await;
8891
subscribers.push(subscriber);
8992

90-
subscription
93+
uuid
9194
}
9295

9396
pub async fn subscribe_closure<S>(
@@ -96,7 +99,7 @@ where
9699
closure: impl Fn(Arc<T>) -> Result<(), BoxedError> + Send + Sync + 'static,
97100
log_on_error: bool,
98101
remove_on_error: bool,
99-
) -> Subscription
102+
) -> Uuid
100103
where
101104
S: Into<String>,
102105
{
@@ -106,30 +109,32 @@ where
106109
remove_on_error,
107110
Callback::Closure(Box::new(closure)),
108111
);
109-
let subscription = Subscription::from(&subscriber);
112+
113+
let uuid = subscriber.uuid;
110114

111115
let mut subscribers = self.subscribers.lock().await;
112116
subscribers.push(subscriber);
113117

114-
subscription
118+
uuid
115119
}
116120

117-
pub async fn unsubscribe<S>(&self, subscription: S) -> Option<Subscription>
121+
pub async fn unsubscribe<UUID>(&self, uuid: &UUID) -> bool
118122
where
119-
S: Into<Subscription>,
123+
UUID: AsRef<Uuid>,
120124
{
121-
let subscription_to_remove = subscription.into();
125+
let uuid = uuid.as_ref();
122126

123127
let mut subscribers = self.subscribers.lock().await;
124-
let index = subscribers.iter().position(|subscription_of_event| {
125-
subscription_of_event.uuid == subscription_to_remove.uuid
126-
});
127-
128-
if let Some(index) = index {
129-
subscribers.remove(index);
130-
None
131-
} else {
132-
Some(subscription_to_remove)
128+
let index = subscribers
129+
.iter()
130+
.position(|subscriber| subscriber.uuid == *uuid);
131+
132+
match index {
133+
Some(index) => {
134+
subscribers.remove(index);
135+
true
136+
}
137+
None => false,
133138
}
134139
}
135140

src/event/event_repeater.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use thiserror::Error;
77
use tokio::{sync::Mutex, task::JoinHandle};
88
use uuid::Uuid;
99

10-
use super::{Event, Subscription};
10+
use super::Event;
1111

1212
#[derive(Debug, Error)]
1313
pub enum AttachError {
@@ -52,7 +52,7 @@ where
5252
{
5353
pub event: Event<T>,
5454
weak: OnceLock<Weak<Self>>,
55-
subscriptions: Mutex<HashMap<Uuid, (Subscription, JoinHandle<()>)>>,
55+
subscriptions: Mutex<HashMap<Uuid, (Uuid, JoinHandle<()>)>>,
5656
}
5757

5858
impl<T> EventRepeater<T>
@@ -118,19 +118,16 @@ where
118118
});
119119
}
120120

121-
let receiver_subscription = event
121+
let (uuid, mut receiver) = event
122122
.subscribe_channel(&self.event.name, buffer, true, true)
123123
.await;
124124

125-
let subscription = receiver_subscription.subscription;
126-
let mut receiver = receiver_subscription.receiver;
127-
128125
let join_handle = tokio::spawn(async move {
129126
while let Some(value) = receiver.recv().await {
130127
let _ = arc.event.dispatch(value).await;
131128
}
132129
});
133-
subscriptions.insert(event.uuid, (subscription, join_handle));
130+
subscriptions.insert(event.uuid, (uuid, join_handle));
134131

135132
Ok(())
136133
}

src/event/subscription.rs

Lines changed: 0 additions & 71 deletions
This file was deleted.

0 commit comments

Comments
 (0)