Skip to content

Commit 68b5e91

Browse files
committed
Extract LinkedListAsync(simple wrapping).
1 parent dcf48db commit 68b5e91

File tree

1 file changed

+38
-10
lines changed

1 file changed

+38
-10
lines changed

src/common.rs

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ for general purposes crossing over many modules of `fpRust`.
44
*/
55

66
use std::cmp::PartialEq;
7+
use std::collections::LinkedList;
78
use std::marker::PhantomData;
89
use std::sync::{Arc, Mutex};
910
use std::thread;
@@ -14,8 +15,6 @@ use futures::executor::ThreadPool;
1415
#[cfg(feature = "for_futures")]
1516
use futures::stream::Stream;
1617
#[cfg(feature = "for_futures")]
17-
use std::collections::LinkedList;
18-
#[cfg(feature = "for_futures")]
1918
use std::mem;
2019
#[cfg(feature = "for_futures")]
2120
use std::pin::Pin;
@@ -133,6 +132,37 @@ pub fn get_mut<'a, T>(v: &'a mut Vec<T>, index: usize) -> Option<&'a mut T> {
133132
None
134133
}
135134

135+
#[derive(Debug, Clone)]
136+
pub struct LinkedListAsync<T> {
137+
inner: Arc<Mutex<LinkedList<T>>>,
138+
139+
_t: PhantomData<T>,
140+
}
141+
142+
impl<T> LinkedListAsync<T> {
143+
pub fn new() -> Self {
144+
Self {
145+
inner: Arc::new(Mutex::new(LinkedList::new())),
146+
147+
_t: PhantomData,
148+
}
149+
}
150+
151+
pub fn push_back(&self, input: T) {
152+
self.inner.lock().unwrap().push_back(input)
153+
}
154+
155+
pub fn pop_front(&self) -> Option<T> {
156+
self.inner.lock().unwrap().pop_front()
157+
}
158+
}
159+
160+
impl<T> Default for LinkedListAsync<T> {
161+
fn default() -> Self {
162+
Self::new()
163+
}
164+
}
165+
136166
/**
137167
`Observable` memorizes all `Subscription` and send notifications.
138168
@@ -252,7 +282,7 @@ pub struct SubscriptionFunc<T> {
252282
pub receiver: RawReceiver<T>,
253283

254284
#[cfg(feature = "for_futures")]
255-
cached: Option<Arc<Mutex<LinkedList<Arc<T>>>>>,
285+
cached: Option<LinkedListAsync<Arc<T>>>,
256286
#[cfg(feature = "for_futures")]
257287
alive: Option<Arc<Mutex<AtomicBool>>>,
258288
#[cfg(feature = "for_futures")]
@@ -345,7 +375,7 @@ where
345375
}
346376

347377
if self.cached.is_none() {
348-
self.cached = Some(Arc::new(Mutex::new(LinkedList::new())));
378+
self.cached = Some(LinkedListAsync::new());
349379
}
350380
}
351381

@@ -378,10 +408,8 @@ impl<T: Send + Sync + 'static> Subscription<T> for SubscriptionFunc<T> {
378408
if let Some(cached) = &self.cached {
379409
let alive = { alive.lock().unwrap().load(Ordering::SeqCst) };
380410
if alive {
381-
{
382-
let mut cached = cached.lock().unwrap();
383-
cached.push_back(x.clone())
384-
};
411+
cached.push_back(x.clone());
412+
385413
{
386414
if let Some(waker) = self.waker.lock().unwrap().take() {
387415
waker.wake()
@@ -432,7 +460,7 @@ where
432460
if let Some(cached) = &self.0.cached {
433461
let picked: Option<Arc<T>>;
434462
{
435-
picked = cached.lock().unwrap().pop_front();
463+
picked = cached.pop_front();
436464
}
437465
if picked.is_some() {
438466
return Poll::Ready(picked);
@@ -448,7 +476,7 @@ where
448476
if let Some(cached) = &self.0.cached {
449477
let picked: Option<Arc<T>>;
450478
{
451-
picked = cached.lock().unwrap().pop_front();
479+
picked = cached.pop_front();
452480
}
453481

454482
// Check Pending(None) or Ready(Some(item))

0 commit comments

Comments
 (0)