Skip to content

Commit efc8c8f

Browse files
committed
fix publisher.subscribe_blocking_queue() strange implementations.
1 parent c964df8 commit efc8c8f

File tree

2 files changed

+11
-9
lines changed

2 files changed

+11
-9
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "fp_rust"
3-
version = "0.2.0"
3+
version = "0.2.1"
44
license = "MIT"
55
authors = ["JunYi JohnTeee Lee <johnteee@gmail.com>"]
66
edition = "2018"

src/publisher.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -96,16 +96,18 @@ where
9696

9797
pub fn subscribe_blocking_queue(
9898
&mut self,
99-
queue: Arc<Mutex<BlockingQueue<Arc<X>>>>,
100-
) -> Arc<Mutex<SubscriptionFunc<X>>> {
101-
self.subscribe_fn(move |v| queue.lock().unwrap().offer(v))
99+
queue: BlockingQueue<Arc<X>>,
100+
) -> BlockingQueue<Arc<X>> {
101+
let mut queue_new: BlockingQueue<Arc<X>>;
102+
{
103+
queue_new = queue.clone();
104+
}
105+
self.subscribe_fn(move |v| queue_new.put(v));
106+
107+
queue
102108
}
103109
pub fn as_blocking_queue(&mut self) -> BlockingQueue<Arc<X>> {
104-
let queue = BlockingQueue::new();
105-
let queue_result = queue.clone();
106-
self.subscribe_blocking_queue(Arc::new(Mutex::new(queue)));
107-
108-
queue_result
110+
self.subscribe_blocking_queue(BlockingQueue::new())
109111
}
110112
}
111113

0 commit comments

Comments
 (0)