Skip to content

Commit 36f5a64

Browse files
committed
Make LinkedListAsync General & Streamable.
1 parent 68b5e91 commit 36f5a64

File tree

2 files changed

+125
-129
lines changed

2 files changed

+125
-129
lines changed

src/common.rs

Lines changed: 121 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,11 @@ pub fn get_mut<'a, T>(v: &'a mut Vec<T>, index: usize) -> Option<&'a mut T> {
136136
pub struct LinkedListAsync<T> {
137137
inner: Arc<Mutex<LinkedList<T>>>,
138138

139+
#[cfg(feature = "for_futures")]
140+
alive: Option<Arc<Mutex<AtomicBool>>>,
141+
#[cfg(feature = "for_futures")]
142+
waker: Arc<Mutex<Option<Waker>>>,
143+
139144
_t: PhantomData<T>,
140145
}
141146

@@ -144,6 +149,11 @@ impl<T> LinkedListAsync<T> {
144149
Self {
145150
inner: Arc::new(Mutex::new(LinkedList::new())),
146151

152+
#[cfg(feature = "for_futures")]
153+
alive: None,
154+
#[cfg(feature = "for_futures")]
155+
waker: Arc::new(Mutex::new(None)),
156+
147157
_t: PhantomData,
148158
}
149159
}
@@ -155,9 +165,108 @@ impl<T> LinkedListAsync<T> {
155165
pub fn pop_front(&self) -> Option<T> {
156166
self.inner.lock().unwrap().pop_front()
157167
}
168+
169+
#[cfg(feature = "for_futures")]
170+
fn wake(&self) {
171+
if let Some(waker) = self.waker.lock().unwrap().take() {
172+
waker.wake()
173+
}
174+
}
175+
176+
#[cfg(feature = "for_futures")]
177+
fn open_stream(&mut self) {
178+
match &self.alive {
179+
Some(alive) => {
180+
alive.lock().unwrap().store(true, Ordering::SeqCst);
181+
}
182+
None => {
183+
self.alive = Some(Arc::new(Mutex::new(AtomicBool::new(true))));
184+
}
185+
}
186+
}
187+
188+
#[cfg(feature = "for_futures")]
189+
pub fn close_stream(&mut self) {
190+
if let Some(alive) = &self.alive {
191+
{
192+
alive.lock().unwrap().store(false, Ordering::SeqCst);
193+
}
194+
self.alive = None;
195+
196+
{
197+
if let Some(waker) = self.waker.clone().lock().unwrap().take() {
198+
self.waker = Arc::new(Mutex::new(None));
199+
waker.wake();
200+
}
201+
}
202+
}
203+
}
204+
}
205+
206+
#[cfg(feature = "for_futures")]
207+
impl<T> Stream for LinkedListAsync<T>
208+
where
209+
T: 'static + Send + Unpin,
210+
{
211+
type Item = T;
212+
213+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
214+
if self.alive.is_none() {
215+
return Poll::Ready(None);
216+
}
217+
218+
let picked: Option<T>;
219+
{
220+
picked = self.inner.lock().unwrap().pop_front();
221+
}
222+
if picked.is_some() {
223+
return Poll::Ready(picked);
224+
}
225+
226+
// Check alive
227+
if let Some(alive) = &self.alive {
228+
// Check alive
229+
let alive = { alive.lock().unwrap().load(Ordering::SeqCst) };
230+
if alive {
231+
// Check cached
232+
let picked: Option<T>;
233+
{
234+
picked = self.inner.lock().unwrap().pop_front();
235+
}
236+
237+
// Check Pending(None) or Ready(Some(item))
238+
if picked.is_none() {
239+
// Keep Pending
240+
{
241+
self.waker.lock().unwrap().replace(cx.waker().clone());
242+
};
243+
return Poll::Pending;
244+
}
245+
return Poll::Ready(picked);
246+
}
247+
return Poll::Ready(None);
248+
}
249+
return Poll::Ready(None);
250+
}
251+
252+
fn size_hint(&self) -> (usize, Option<usize>) {
253+
if self.alive.is_some() {
254+
if let Some(alive) = &self.alive {
255+
// Check alive
256+
let alive = { alive.lock().unwrap().load(Ordering::SeqCst) };
257+
if alive {
258+
return (0, Some(0));
259+
}
260+
return (0, None);
261+
}
262+
return (0, None);
263+
} else {
264+
return (0, None);
265+
}
266+
}
158267
}
159268

160-
impl<T> Default for LinkedListAsync<T> {
269+
impl<T> Default for LinkedListAsync<Arc<T>> {
161270
fn default() -> Self {
162271
Self::new()
163272
}
@@ -283,10 +392,6 @@ pub struct SubscriptionFunc<T> {
283392

284393
#[cfg(feature = "for_futures")]
285394
cached: Option<LinkedListAsync<Arc<T>>>,
286-
#[cfg(feature = "for_futures")]
287-
alive: Option<Arc<Mutex<AtomicBool>>>,
288-
#[cfg(feature = "for_futures")]
289-
waker: Arc<Mutex<Option<Waker>>>,
290395
}
291396

292397
impl<T> SubscriptionFunc<T> {
@@ -301,10 +406,6 @@ impl<T> SubscriptionFunc<T> {
301406

302407
#[cfg(feature = "for_futures")]
303408
cached: None,
304-
#[cfg(feature = "for_futures")]
305-
alive: None,
306-
#[cfg(feature = "for_futures")]
307-
waker: Arc::new(Mutex::new(None)),
308409
}
309410
}
310411
}
@@ -320,26 +421,16 @@ impl<T> Clone for SubscriptionFunc<T> {
320421
},
321422
#[cfg(feature = "for_futures")]
322423
cached: self.cached.clone(),
323-
#[cfg(feature = "for_futures")]
324-
alive: self.alive.clone(),
325-
#[cfg(feature = "for_futures")]
326-
waker: self.waker.clone(),
327424
}
328425
}
329426
}
330427

331428
#[cfg(feature = "for_futures")]
332429
impl<T> SubscriptionFunc<T> {
333430
pub fn close_stream(&mut self) {
334-
if let Some(alive) = &self.alive {
335-
{
336-
alive.lock().unwrap().store(false, Ordering::SeqCst);
337-
}
338-
self.alive = None;
339-
}
340-
341431
// let old_cached = self.cached.clone();
342-
if self.cached.is_some() {
432+
if let Some(cached) = &mut self.cached {
433+
cached.close_stream();
343434
self.cached = None;
344435
}
345436
/*
@@ -349,13 +440,6 @@ impl<T> SubscriptionFunc<T> {
349440
}
350441
}
351442
// */
352-
353-
{
354-
if let Some(waker) = self.waker.clone().lock().unwrap().take() {
355-
self.waker = Arc::new(Mutex::new(None));
356-
waker.wake();
357-
}
358-
}
359443
}
360444
}
361445

@@ -365,24 +449,17 @@ where
365449
T: Unpin,
366450
{
367451
fn open_stream(&mut self) {
368-
match &self.alive {
369-
Some(alive) => {
370-
alive.lock().unwrap().store(true, Ordering::SeqCst);
371-
}
372-
None => {
373-
self.alive = Some(Arc::new(Mutex::new(AtomicBool::new(true))));
374-
}
375-
}
376-
377452
if self.cached.is_none() {
378-
self.cached = Some(LinkedListAsync::new());
453+
let mut inner = LinkedListAsync::new();
454+
inner.open_stream();
455+
self.cached = Some(inner);
379456
}
380457
}
381458

382-
pub fn as_stream(&mut self) -> SubscriptionFuncStream<T> {
459+
pub fn as_stream(&mut self) -> LinkedListAsync<Arc<T>> {
383460
self.open_stream();
384461

385-
SubscriptionFuncStream { 0: self.clone() }
462+
self.cached.clone().unwrap()
386463
}
387464
}
388465

@@ -404,16 +481,14 @@ impl<T: Send + Sync + 'static> Subscription<T> for SubscriptionFunc<T> {
404481

405482
#[cfg(feature = "for_futures")]
406483
{
407-
if let Some(alive) = &self.alive {
408-
if let Some(cached) = &self.cached {
484+
if let Some(cached) = &self.cached {
485+
if let Some(alive) = &cached.alive {
409486
let alive = { alive.lock().unwrap().load(Ordering::SeqCst) };
410487
if alive {
411488
cached.push_back(x.clone());
412489

413490
{
414-
if let Some(waker) = self.waker.lock().unwrap().take() {
415-
waker.wake()
416-
}
491+
cached.wake();
417492
}
418493
}
419494
}
@@ -422,20 +497,9 @@ impl<T: Send + Sync + 'static> Subscription<T> for SubscriptionFunc<T> {
422497
}
423498
}
424499

425-
#[cfg(feature = "for_futures")]
426-
#[derive(Clone)]
427-
pub struct SubscriptionFuncStream<T>(SubscriptionFunc<T>);
428-
429-
#[cfg(feature = "for_futures")]
430-
impl<T> SubscriptionFuncStream<T> {
431-
pub fn close_stream(&mut self) {
432-
self.0.close_stream();
433-
}
434-
}
435-
436500
/*
437501
#[cfg(feature = "for_futures")]
438-
impl<T> SubscriptionFuncStream<T>
502+
impl<T> LinkedListAsync<Arc<T>>
439503
where
440504
T: 'static + Send + Unpin,
441505
{
@@ -445,74 +509,6 @@ where
445509
}
446510
*/
447511

448-
#[cfg(feature = "for_futures")]
449-
impl<T> Stream for SubscriptionFuncStream<T>
450-
where
451-
T: 'static + Send + Unpin,
452-
{
453-
type Item = Arc<T>;
454-
455-
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
456-
if self.0.alive.is_none() && self.0.cached.is_none() {
457-
return Poll::Ready(None);
458-
}
459-
460-
if let Some(cached) = &self.0.cached {
461-
let picked: Option<Arc<T>>;
462-
{
463-
picked = cached.pop_front();
464-
}
465-
if picked.is_some() {
466-
return Poll::Ready(picked);
467-
}
468-
}
469-
470-
// Check alive
471-
if let Some(alive) = &self.0.alive {
472-
// Check alive
473-
let alive = { alive.lock().unwrap().load(Ordering::SeqCst) };
474-
if alive {
475-
// Check cached
476-
if let Some(cached) = &self.0.cached {
477-
let picked: Option<Arc<T>>;
478-
{
479-
picked = cached.pop_front();
480-
}
481-
482-
// Check Pending(None) or Ready(Some(item))
483-
if picked.is_none() {
484-
// Keep Pending
485-
{
486-
self.0.waker.lock().unwrap().replace(cx.waker().clone());
487-
};
488-
return Poll::Pending;
489-
}
490-
return Poll::Ready(picked);
491-
}
492-
return Poll::Ready(None);
493-
}
494-
return Poll::Ready(None);
495-
}
496-
return Poll::Ready(None);
497-
}
498-
499-
fn size_hint(&self) -> (usize, Option<usize>) {
500-
if self.0.alive.is_some() && self.0.cached.is_some() {
501-
if let Some(alive) = &self.0.alive {
502-
// Check alive
503-
let alive = { alive.lock().unwrap().load(Ordering::SeqCst) };
504-
if alive {
505-
return (0, Some(0));
506-
}
507-
return (0, None);
508-
}
509-
return (0, None);
510-
} else {
511-
return (0, None);
512-
}
513-
}
514-
}
515-
516512
/**
517513
`RawReceiver` struct implements an useful container of `FnMut`(`Arc<T>`)
518514
, receiving an `Arc<T>` as its parameter.

src/publisher.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::marker::PhantomData;
55
use std::sync::{Arc, Mutex};
66

77
#[cfg(feature = "for_futures")]
8-
use super::common::SubscriptionFuncStream;
8+
use super::common::LinkedListAsync;
99

1010
use super::common::{Observable, RawFunc, Subscription, SubscriptionFunc, UniqueId};
1111
use super::handler::Handler;
@@ -116,16 +116,16 @@ impl<X: Send + Sync + 'static + Unpin> Publisher<X> {
116116
pub fn subscribe_as_stream(
117117
&mut self,
118118
s: Arc<Mutex<SubscriptionFunc<X>>>,
119-
) -> SubscriptionFuncStream<X> {
119+
) -> LinkedListAsync<Arc<X>> {
120120
self.subscribe(s).lock().unwrap().as_stream()
121121
}
122122
pub fn subscribe_fn_as_stream(
123123
&mut self,
124124
func: impl FnMut(Arc<X>) + Send + Sync + 'static,
125-
) -> SubscriptionFuncStream<X> {
125+
) -> LinkedListAsync<Arc<X>> {
126126
self.subscribe_fn(func).lock().unwrap().as_stream()
127127
}
128-
pub fn as_stream(&mut self) -> SubscriptionFuncStream<X> {
128+
pub fn as_stream(&mut self) -> LinkedListAsync<Arc<X>> {
129129
self.subscribe_fn_as_stream(|_| {})
130130
}
131131
}

0 commit comments

Comments
 (0)