Skip to content

Commit 0d3ca6d

Browse files
committed
Refactor codes.
1 parent f63d315 commit 0d3ca6d

File tree

1 file changed

+22
-3
lines changed

1 file changed

+22
-3
lines changed

src/publisher.rs

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use super::common::SubscriptionFuncStream;
99

1010
use super::common::{Observable, RawFunc, Subscription, SubscriptionFunc, UniqueId};
1111
use super::handler::Handler;
12+
use super::sync::{BlockingQueue, Queue};
1213

1314
/**
1415
The `Publisher` implements *PubSub-like* features.
@@ -43,7 +44,7 @@ impl<X> Default for Publisher<X> {
4344
}
4445
}
4546

46-
impl<X: Send + Sync + 'static> Publisher<X> {
47+
impl<X> Publisher<X> {
4748
pub fn new() -> Publisher<X> {
4849
Default::default()
4950
}
@@ -53,6 +54,14 @@ impl<X: Send + Sync + 'static> Publisher<X> {
5354
new_one
5455
}
5556

57+
pub fn subscribe_on(&mut self, h: Option<Arc<Mutex<dyn Handler + 'static>>>) {
58+
self.sub_handler = h;
59+
}
60+
}
61+
impl<X> Publisher<X>
62+
where
63+
X: Send + Sync + 'static,
64+
{
5665
pub fn publish(&mut self, val: X) {
5766
self.notify_observers(Arc::new(val));
5867
}
@@ -85,8 +94,18 @@ impl<X: Send + Sync + 'static> Publisher<X> {
8594
self.delete_observer(s);
8695
}
8796

88-
pub fn subscribe_on(&mut self, h: Option<Arc<Mutex<dyn Handler + 'static>>>) {
89-
self.sub_handler = h;
97+
pub fn subscribe_blocking_queue(
98+
&mut self,
99+
queue: Arc<Mutex<BlockingQueue<Arc<X>>>>,
100+
) -> Arc<Mutex<SubscriptionFunc<X>>> {
101+
self.subscribe_fn(move |v| queue.lock().unwrap().offer(v))
102+
}
103+
pub fn as_blocking_queue(&mut self) -> BlockingQueue<Arc<X>> {
104+
let queue = BlockingQueue::new();
105+
let queue_result = queue.clone();
106+
self.subscribe_blocking_queue(Arc::new(Mutex::new(queue)));
107+
108+
queue_result
90109
}
91110
}
92111

0 commit comments

Comments
 (0)