Skip to content

Commit fd42094

Browse files
authored
Merge refactor/service-weak-references into staging (#58)
refactor: use Weak instead of Arc self-references
1 parent dd4452f commit fd42094

File tree

2 files changed

+68
-25
lines changed

2 files changed

+68
-25
lines changed

src/event/event_repeater.rs

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,26 @@
1-
use std::{collections::HashMap, sync::Arc};
1+
use log::error;
2+
use std::{
3+
collections::HashMap,
4+
sync::{Arc, Weak},
5+
};
26
use thiserror::Error;
37
use tokio::{sync::Mutex, task::JoinHandle};
48
use uuid::Uuid;
59

10+
use crate::setlock::{SetLock, SetLockError};
11+
612
use super::{Event, Subscription};
713

814
#[derive(Debug, Error)]
915
pub enum AttachError {
10-
#[error("Tried to attach event {event_name} to EventRepeater {repeater_name} before it was initialized. Did you not use EventRepeater<T>::new()?")]
16+
#[error("Tried to attach event {event_name} to EventRepeater {repeater_name} while it was uninitialized. Did you not use EventRepeater<T>::new()?")]
1117
NotInitialized {
1218
event_name: String,
1319
repeater_name: String,
1420
},
1521

1622
#[error(
17-
"Tried to attach event {event_name} to EventRepeater {repeater_name}, which was already attached."
23+
"Tried to attach event {event_name} to EventRepeater {repeater_name}, which was already attached to it."
1824
)]
1925
AlreadyAttached {
2026
event_name: String,
@@ -47,7 +53,7 @@ where
4753
T: Send + Sync + 'static,
4854
{
4955
pub event: Event<T>,
50-
self_arc: Mutex<Option<Arc<Self>>>,
56+
weak: Mutex<SetLock<Weak<Self>>>,
5157
subscriptions: Mutex<HashMap<Uuid, (Subscription, JoinHandle<()>)>>,
5258
}
5359

@@ -62,27 +68,38 @@ where
6268
{
6369
let event = Event::new(name);
6470
let event_repeater = Self {
65-
self_arc: Mutex::new(None),
71+
weak: Mutex::new(SetLock::new()),
6672
event,
6773
subscriptions: Mutex::new(HashMap::new()),
6874
};
6975

70-
let self_arc = Arc::new(event_repeater);
71-
let mut lock = self_arc.self_arc.lock().await;
72-
let self_arc_clone = Arc::clone(&self_arc);
73-
*lock = Some(self_arc_clone);
74-
drop(lock);
76+
let arc = Arc::new(event_repeater);
77+
let weak = Arc::downgrade(&arc);
78+
79+
let result = arc.weak.lock().await.set(weak);
80+
if let Err(err) = result {
81+
match err {
82+
SetLockError::AlreadySet => {
83+
error!("Failed to set EventRepeater {}'s Weak self-reference because it was already set. This should never happen. Shutting down ungracefully to prevent further undefined behavior.", arc.event.name);
84+
unreachable!(
85+
"Unable to set EventRepeater {}'s Weak self-reference because it was already set.",
86+
arc.event.name
87+
);
88+
}
89+
}
90+
}
7591

76-
self_arc
92+
arc
7793
}
7894

7995
pub async fn subscription_count(&self) -> usize {
8096
self.subscriptions.lock().await.len()
8197
}
8298

8399
pub async fn attach(&self, event: &Event<T>, buffer: usize) -> Result<(), AttachError> {
84-
let self_arc = match self.self_arc.lock().await.as_ref() {
85-
Some(arc) => Arc::clone(arc),
100+
let lock = self.weak.lock().await;
101+
let weak = match lock.get() {
102+
Some(weak) => weak,
86103
None => {
87104
return Err(AttachError::NotInitialized {
88105
event_name: event.name.clone(),
@@ -91,6 +108,15 @@ where
91108
}
92109
};
93110

111+
// This can't fail because the Arc is guaranteed to be valid as long as &self is valid.
112+
let arc = match weak.upgrade() {
113+
Some(arc) => arc,
114+
None => {
115+
error!("EventRepeater {}'s Weak self-reference could not be upgraded to Arc while attaching event {}. This should never happen. Shutting down ungracefully to prevent further undefined behavior.", self.event.name, event.name);
116+
unreachable!("EventRepeater {}'s Weak self-reference could not be upgraded to Arc while attaching event {}.", self.event.name, event.name);
117+
}
118+
};
119+
94120
let mut subscriptions = self.subscriptions.lock().await;
95121
if subscriptions.contains_key(&event.uuid) {
96122
return Err(AttachError::AlreadyAttached {
@@ -108,7 +134,7 @@ where
108134

109135
let join_handle = tokio::spawn(async move {
110136
while let Some(value) = receiver.recv().await {
111-
let _ = self_arc.event.dispatch(value).await;
137+
let _ = arc.event.dispatch(value).await;
112138
}
113139
});
114140
subscriptions.insert(event.uuid, (subscription, join_handle));

src/service/service_manager.rs

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::{
66
event::EventRepeater, service::Watchdog, setlock::{SetLock, SetLockError}
77
};
88
use log::{error, info, warn};
9-
use std::{collections::HashMap, fmt::Display, mem, sync::Arc, time::Duration};
9+
use std::{collections::HashMap, fmt::Display, mem, sync::{Arc, Weak}, time::Duration};
1010
use tokio::{
1111
spawn,
1212
sync::{Mutex, MutexGuard},
@@ -54,31 +54,31 @@ pub fn new() -> Self {
5454

5555
pub async fn build(self) -> Arc<ServiceManager> {
5656
let service_manager = ServiceManager {
57-
arc: Mutex::new(SetLock::new()),
57+
weak: Mutex::new(SetLock::new()),
5858
services: self.services,
5959
background_tasks: Mutex::new(HashMap::new()),
6060
on_status_change: EventRepeater::new("service_manager_on_status_change").await,
6161
};
6262

63-
let self_arc = Arc::new(service_manager);
64-
let self_arc_clone = Arc::clone(&self_arc);
65-
66-
let result = self_arc_clone.arc.lock().await.set(Arc::clone(&self_arc_clone));
63+
let arc = Arc::new(service_manager);
64+
let weak = Arc::downgrade(&arc);
6765

66+
let result = arc.weak.lock().await.set(weak);
6867
if let Err(err) = result {
6968
match err {
7069
SetLockError::AlreadySet => {
71-
unreachable!("Unable to set ServiceManager's self-arc in ServiceManagerBuilder because it was already set. This should never happen. How did you...?");
70+
error!("Unable to set ServiceManager's Weak self-reference in ServiceManagerBuilder because it was already set. This should never happen. Shutting down ungracefully to prevent further undefined behavior.");
71+
unreachable!("Unable to set ServiceManager's Weak self-reference in ServiceManagerBuilder because it was already set.");
7272
}
7373
}
7474
}
7575

76-
self_arc
76+
arc
7777
}
7878
}
7979

8080
pub struct ServiceManager {
81-
arc: Mutex<SetLock<Arc<Self>>>,
81+
weak: Mutex<SetLock<Weak<Self>>>,
8282
background_tasks: Mutex<HashMap<String, JoinHandle<()>>>,
8383

8484
pub services: Vec<Arc<Mutex<dyn Service>>>,
@@ -326,10 +326,27 @@ impl ServiceManager {
326326
&self,
327327
service: &mut MutexGuard<'_, dyn Service>,
328328
) -> Result<(), StartupError> {
329-
let service_manager = Arc::clone(self.arc.lock().await.unwrap());
329+
let lock = self.weak.lock().await;
330+
let weak = match lock.get() {
331+
Some(weak) => weak,
332+
None => {
333+
error!("ServiceManager's Weak self-reference was None while initializing service {}. This should never happen. Did you not use a ServiceManagerBuilder? Shutting down ungracefully to prevent further undefined behavior.", service.info().name);
334+
unreachable!("ServiceManager's Weak self-reference was None while initializing service {}.", service.info().name);
335+
}
336+
};
337+
338+
// This can't fail because the Arc is guaranteed to be valid as long as &self is valid.
339+
let arc = match weak.upgrade() {
340+
Some(arc) => arc,
341+
None => {
342+
error!("ServiceManager's Weak self-reference could not be upgraded to Arc while initializing service {}. This should never happen. Shutting down ungracefully to prevent further undefined behavior.", service.info().name);
343+
unreachable!("ServiceManager's Weak self-reference could not be upgraded to Arc while initializing service {}.", service.info().name);
344+
}
345+
};
346+
330347

331348
//TODO: Add to config instead of hardcoding duration
332-
let start = service.start(service_manager);
349+
let start = service.start(arc);
333350
let timeout_result = timeout(Duration::from_secs(10), start).await;
334351

335352
match timeout_result {

0 commit comments

Comments
 (0)