Skip to content

Commit 2aa02e2

Browse files
committed
Make the Subscription of Publisher by Arc<SubscriptionFunc<T>> instead of Arc<Mutex>
1 parent baafbe3 commit 2aa02e2

File tree

5 files changed

+93
-151
lines changed

5 files changed

+93
-151
lines changed

Cargo.toml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "fp_rust"
3-
version = "0.2.8"
3+
version = "0.3.0"
44
license = "MIT"
55
authors = ["JunYi JohnTeee Lee <johnteee@gmail.com>"]
66
edition = "2018"
@@ -20,7 +20,10 @@ name = "fp_rust"
2020
path = "src/lib.rs"
2121

2222
[features]
23-
default = [ "pure" ]
23+
default = [
24+
"pure",
25+
# "for_futures",
26+
]
2427
for_futures = [ "futures", "futures-test" ]
2528
# for_futures = [ "futures", "tokio" ]
2629
pure = [

README.md

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,10 @@ use fp_rust::monadio::{
8585
use fp_rust::sync::CountDownLatch;
8686

8787
// fmap & map (sync)
88-
let mut _subscription = Arc::new(Mutex::new(SubscriptionFunc::new(move |x: Arc<u16>| {
88+
let mut _subscription = Arc::new(SubscriptionFunc::new(move |x: Arc<u16>| {
8989
println!("monadio_sync {:?}", x); // monadio_sync 36
9090
assert_eq!(36, *Arc::make_mut(&mut x.clone()));
91-
})));
91+
}));
9292
let subscription = _subscription.clone();
9393
let monadio_sync = MonadIO::just(1)
9494
.fmap(|x| MonadIO::new(move || x * 4))
@@ -113,15 +113,15 @@ let latch2 = latch.clone();
113113

114114
thread::sleep(time::Duration::from_millis(100));
115115

116-
let subscription = Arc::new(Mutex::new(SubscriptionFunc::new(move |x: Arc<String>| {
116+
let subscription = Arc::new(SubscriptionFunc::new(move |x: Arc<String>| {
117117
println!("monadio_async {:?}", x); // monadio_async ok
118118

119119
latch2.countdown(); // Unlock here
120-
})));
120+
}));
121121
monadio_async.subscribe(subscription);
122-
monadio_async.subscribe(Arc::new(Mutex::new(SubscriptionFunc::new(move |x: Arc<String>| {
122+
monadio_async.subscribe(Arc::new(SubscriptionFunc::new(move |x: Arc<String>| {
123123
println!("monadio_async sub2 {:?}", x); // monadio_async sub2 ok
124-
}))));
124+
})));
125125
{
126126
let mut handler_observe_on = _handler_observe_on.lock().unwrap();
127127
let mut handler_subscribe_on = _handler_subscribe_on.lock().unwrap();
@@ -171,11 +171,11 @@ let mut pub2 = Publisher::new_with_handlers(Some(_h.clone()));
171171
let latch = CountDownLatch::new(1);
172172
let latch2 = latch.clone();
173173

174-
let s = Arc::new(Mutex::new(SubscriptionFunc::new(move |x: Arc<String>| {
174+
let s = Arc::new(SubscriptionFunc::new(move |x: Arc<String>| {
175175
println!("pub2-s1 I got {:?}", x);
176176

177177
latch2.countdown();
178-
})));
178+
}));
179179
pub2.subscribe(s.clone());
180180
pub2.map(move |x: Arc<String>| {
181181
println!("pub2-s2 I got {:?}", x);

src/common.rs

Lines changed: 49 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ pub struct LinkedListAsync<T> {
137137
inner: Arc<Mutex<LinkedList<T>>>,
138138

139139
#[cfg(feature = "for_futures")]
140-
alive: Option<Arc<Mutex<AtomicBool>>>,
140+
alive: Arc<Mutex<AtomicBool>>,
141141
#[cfg(feature = "for_futures")]
142142
waker: Arc<Mutex<Option<Waker>>>,
143143

@@ -150,7 +150,7 @@ impl<T> LinkedListAsync<T> {
150150
inner: Arc::new(Mutex::new(LinkedList::new())),
151151

152152
#[cfg(feature = "for_futures")]
153-
alive: None,
153+
alive: Arc::new(Mutex::new(AtomicBool::new(true))),
154154
#[cfg(feature = "for_futures")]
155155
waker: Arc::new(Mutex::new(None)),
156156

@@ -159,10 +159,23 @@ impl<T> LinkedListAsync<T> {
159159
}
160160

161161
pub fn push_back(&self, input: T) {
162-
self.inner.lock().unwrap().push_back(input);
163-
164162
#[cfg(feature = "for_futures")]
165-
self.wake();
163+
{
164+
{
165+
let alive = { self.alive.lock().unwrap().load(Ordering::SeqCst) };
166+
if alive {
167+
self.inner.lock().unwrap().push_back(input);
168+
}
169+
}
170+
171+
self.wake();
172+
return;
173+
}
174+
175+
#[cfg(not(feature = "for_futures"))]
176+
{
177+
self.inner.lock().unwrap().push_back(input);
178+
}
166179
}
167180

168181
pub fn pop_front(&self) -> Option<T> {
@@ -178,26 +191,14 @@ impl<T> LinkedListAsync<T> {
178191

179192
#[cfg(feature = "for_futures")]
180193
fn open_stream(&mut self) {
181-
match &self.alive {
182-
Some(alive) => {
183-
alive.lock().unwrap().store(true, Ordering::SeqCst);
184-
}
185-
None => {
186-
self.alive = Some(Arc::new(Mutex::new(AtomicBool::new(true))));
187-
}
188-
}
194+
self.alive.lock().unwrap().store(true, Ordering::SeqCst);
189195
}
190196

191197
#[cfg(feature = "for_futures")]
192198
pub fn close_stream(&mut self) {
193-
if let Some(alive) = &self.alive {
194-
{
195-
alive.lock().unwrap().store(false, Ordering::SeqCst);
196-
}
197-
self.alive = None;
199+
self.alive.lock().unwrap().store(false, Ordering::SeqCst);
198200

199-
self.wake()
200-
}
201+
self.wake()
201202
}
202203
}
203204

@@ -209,10 +210,6 @@ where
209210
type Item = T;
210211

211212
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
212-
if self.alive.is_none() {
213-
return Poll::Ready(None);
214-
}
215-
216213
let picked: Option<T>;
217214
{
218215
picked = self.inner.lock().unwrap().pop_front();
@@ -222,45 +219,34 @@ where
222219
}
223220

224221
// Check alive
225-
if let Some(alive) = &self.alive {
226-
// Check alive
227-
let alive = { alive.lock().unwrap().load(Ordering::SeqCst) };
228-
if alive {
229-
// Check cached
230-
let picked: Option<T>;
231-
{
232-
picked = self.inner.lock().unwrap().pop_front();
233-
}
222+
let alive = { self.alive.lock().unwrap().load(Ordering::SeqCst) };
223+
if alive {
224+
// Check cached
225+
let picked: Option<T>;
226+
{
227+
picked = self.inner.lock().unwrap().pop_front();
228+
}
234229

235-
// Check Pending(None) or Ready(Some(item))
236-
if picked.is_none() {
237-
// Keep Pending
238-
{
239-
self.waker.lock().unwrap().replace(cx.waker().clone());
240-
};
241-
return Poll::Pending;
242-
}
243-
return Poll::Ready(picked);
230+
// Check Pending(None) or Ready(Some(item))
231+
if picked.is_none() {
232+
// Keep Pending
233+
{
234+
self.waker.lock().unwrap().replace(cx.waker().clone());
235+
};
236+
return Poll::Pending;
244237
}
245-
return Poll::Ready(None);
238+
return Poll::Ready(picked);
246239
}
247240
return Poll::Ready(None);
248241
}
249242

250243
fn size_hint(&self) -> (usize, Option<usize>) {
251-
if self.alive.is_some() {
252-
if let Some(alive) = &self.alive {
253-
// Check alive
254-
let alive = { alive.lock().unwrap().load(Ordering::SeqCst) };
255-
if alive {
256-
return (0, Some(0));
257-
}
258-
return (0, None);
259-
}
260-
return (0, None);
261-
} else {
262-
return (0, None);
244+
// Check alive
245+
let alive = { self.alive.lock().unwrap().load(Ordering::SeqCst) };
246+
if alive {
247+
return (0, Some(0));
263248
}
249+
return (0, None);
264250
}
265251
}
266252

@@ -293,7 +279,7 @@ pub trait Observable<X, T: Subscription<X>> {
293279
* `observer` - The given `Subscription`.
294280
295281
*/
296-
fn add_observer(&mut self, observer: Arc<Mutex<T>>);
282+
fn add_observer(&mut self, observer: Arc<T>);
297283

298284
/**
299285
Remove the observer.
@@ -303,7 +289,7 @@ pub trait Observable<X, T: Subscription<X>> {
303289
* `observer` - The given `Subscription`.
304290
305291
*/
306-
fn delete_observer(&mut self, observer: Arc<Mutex<T>>);
292+
fn delete_observer(&mut self, observer: Arc<T>);
307293

308294
/**
309295
Notify all `Subscription` subscribers with a given value `Arc<X>`.
@@ -389,7 +375,7 @@ pub struct SubscriptionFunc<T> {
389375
pub receiver: RawReceiver<T>,
390376

391377
#[cfg(feature = "for_futures")]
392-
cached: Option<LinkedListAsync<Arc<T>>>,
378+
cached: LinkedListAsync<Arc<T>>,
393379
}
394380

395381
impl<T> SubscriptionFunc<T> {
@@ -403,7 +389,7 @@ impl<T> SubscriptionFunc<T> {
403389
receiver: RawReceiver::new(func),
404390

405391
#[cfg(feature = "for_futures")]
406-
cached: None,
392+
cached: LinkedListAsync::new(),
407393
}
408394
}
409395
}
@@ -426,18 +412,7 @@ impl<T> Clone for SubscriptionFunc<T> {
426412
#[cfg(feature = "for_futures")]
427413
impl<T> SubscriptionFunc<T> {
428414
pub fn close_stream(&mut self) {
429-
// let old_cached = self.cached.clone();
430-
if let Some(cached) = &mut self.cached {
431-
cached.close_stream();
432-
self.cached = None;
433-
}
434-
/*
435-
if let Some(cached) = &old_cached {
436-
{
437-
cached.lock().unwrap().clear();
438-
}
439-
}
440-
// */
415+
self.cached.close_stream();
441416
}
442417
}
443418

@@ -447,17 +422,13 @@ where
447422
T: Unpin,
448423
{
449424
fn open_stream(&mut self) {
450-
if self.cached.is_none() {
451-
let mut inner = LinkedListAsync::new();
452-
inner.open_stream();
453-
self.cached = Some(inner);
454-
}
425+
self.cached.open_stream();
455426
}
456427

457428
pub fn as_stream(&mut self) -> LinkedListAsync<Arc<T>> {
458429
self.open_stream();
459430

460-
self.cached.clone().unwrap()
431+
self.cached.clone()
461432
}
462433
}
463434

@@ -479,30 +450,11 @@ impl<T: Send + Sync + 'static> Subscription<T> for SubscriptionFunc<T> {
479450

480451
#[cfg(feature = "for_futures")]
481452
{
482-
if let Some(cached) = &self.cached {
483-
if let Some(alive) = &cached.alive {
484-
let alive = { alive.lock().unwrap().load(Ordering::SeqCst) };
485-
if alive {
486-
cached.push_back(x.clone());
487-
}
488-
}
489-
}
453+
self.cached.push_back(x);
490454
}
491455
}
492456
}
493457

494-
/*
495-
#[cfg(feature = "for_futures")]
496-
impl<T> LinkedListAsync<Arc<T>>
497-
where
498-
T: 'static + Send + Unpin,
499-
{
500-
pub fn open_stream(&mut self) {
501-
self.0.open_stream();
502-
}
503-
}
504-
*/
505-
506458
/**
507459
`RawReceiver` struct implements an useful container of `FnMut`(`Arc<T>`)
508460
, receiving an `Arc<T>` as its parameter.

0 commit comments

Comments
 (0)