Skip to content

Commit eb7ef1b

Browse files
committed
Reduce clone(). Fix lock() issues.
1 parent d8897d2 commit eb7ef1b

File tree

8 files changed

+72
-71
lines changed

8 files changed

+72
-71
lines changed

README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ let monadio_async = MonadIO::new_with_handlers(
116116
let latch = CountDownLatch::new(1);
117117
let latch2 = latch.clone();
118118

119-
thread::sleep(time::Duration::from_millis(100));
119+
thread::sleep(time::Duration::from_millis(1));
120120

121121
let subscription = Arc::new(SubscriptionFunc::new(move |x: Arc<String>| {
122122
println!("monadio_async {:?}", x); // monadio_async ok
@@ -142,7 +142,7 @@ monadio_async.subscribe(Arc::new(SubscriptionFunc::new(move |x: Arc<String>| {
142142
handler_observe_on.post(RawFunc::new(move || {}));
143143
handler_observe_on.post(RawFunc::new(move || {}));
144144
}
145-
thread::sleep(time::Duration::from_millis(100));
145+
thread::sleep(time::Duration::from_millis(1));
146146

147147
// Waiting for being unlcoked
148148
latch.clone().wait();
@@ -263,7 +263,7 @@ let _cor2 = cor_newmutex!(
263263
cor_start!(_cor1);
264264
cor_start!(_cor2);
265265

266-
thread::sleep(time::Duration::from_millis(100));
266+
thread::sleep(time::Duration::from_millis(1));
267267
```
268268

269269
## Do Notation (Haskell DoNotation-like)
@@ -452,7 +452,7 @@ root_handle.send(Value::Int(30));
452452
// Send Shutdown
453453
root_handle.send(Value::Shutdown);
454454

455-
thread::sleep(Duration::from_millis(10));
455+
thread::sleep(Duration::from_millis(1));
456456
// 3 children Actors
457457
assert_eq!(3, result_string.pop_front().unwrap().len());
458458

src/actor.rs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,12 @@ It defines simple and practical hehaviors of `Actor` model.
2727
``
2828
*/
2929
pub trait Actor<Msg, ContextValue, HandleType, Functor>: UniqueId<String> {
30-
fn receive(&mut self, message: Msg, context: &mut HashMap<String, ContextValue>);
30+
fn receive(
31+
&mut self,
32+
this: &mut Self,
33+
message: Msg,
34+
context: &mut HashMap<String, ContextValue>,
35+
);
3136
fn spawn_with_handle(&self, func: Functor) -> HandleType;
3237

3338
fn get_handle(&self) -> HandleType;
@@ -199,6 +204,7 @@ where
199204
}
200205

201206
let mut this = self.clone();
207+
let mut this_for_receive = self.clone();
202208
let this_for_context = self.clone();
203209
let started_alive_thread = self.started_alive.clone();
204210
self.join_handle = Arc::new(Mutex::new(Some(thread::spawn(move || {
@@ -213,7 +219,7 @@ where
213219
match v {
214220
Some(m) => {
215221
let mut context = this_for_context.context.lock().unwrap();
216-
this.receive(m, context.as_mut());
222+
this.receive(&mut this_for_receive, m, context.as_mut());
217223
}
218224
None => {
219225
let started_alive = started_alive_thread.lock().unwrap();
@@ -254,10 +260,14 @@ where
254260
Msg: Clone + Send + 'static,
255261
ContextValue: Send + 'static,
256262
{
257-
fn receive(&mut self, message: Msg, context: &mut HashMap<String, ContextValue>) {
263+
fn receive(
264+
&mut self,
265+
this: &mut Self,
266+
message: Msg,
267+
context: &mut HashMap<String, ContextValue>,
268+
) {
258269
{
259-
let effect = self.effect.clone();
260-
effect.lock().unwrap()(self, message, context);
270+
self.effect.lock().unwrap()(this, message, context);
261271
}
262272
}
263273
fn spawn_with_handle(
@@ -389,7 +399,7 @@ fn test_actor_common() {
389399
// Send Shutdown
390400
root_handle.send(Value::Shutdown);
391401

392-
thread::sleep(Duration::from_millis(10));
402+
thread::sleep(Duration::from_millis(1));
393403
// 3 children Actors
394404
assert_eq!(3, result_string.pop_front().unwrap().len());
395405

src/common.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -162,8 +162,8 @@ impl<T> LinkedListAsync<T> {
162162
#[cfg(feature = "for_futures")]
163163
{
164164
{
165-
let alive = { self.alive.lock().unwrap().load(Ordering::SeqCst) };
166-
if alive {
165+
let alive = self.alive.lock().unwrap();
166+
if alive.load(Ordering::SeqCst) {
167167
self.inner.lock().unwrap().push_back(input);
168168
}
169169

@@ -186,7 +186,8 @@ impl<T> LinkedListAsync<T> {
186186
#[cfg(feature = "for_futures")]
187187
#[inline]
188188
fn wake(&self) {
189-
if let Some(waker) = self.waker.lock().unwrap().take() {
189+
let mut waker = self.waker.lock().unwrap();
190+
if let Some(waker) = waker.take() {
190191
waker.wake();
191192
}
192193
}
@@ -249,8 +250,8 @@ where
249250

250251
fn size_hint(&self) -> (usize, Option<usize>) {
251252
// Check alive
252-
let alive = { self.alive.lock().unwrap().load(Ordering::SeqCst) };
253-
if alive {
253+
let alive = self.alive.lock().unwrap();
254+
if alive.load(Ordering::SeqCst) {
254255
return (0, Some(0));
255256
}
256257
return (0, None);

src/cor.rs

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -395,8 +395,7 @@ impl<RETURN: Send + Sync + 'static, RECEIVE: Send + Sync + 'static> Cor<RETURN,
395395

396396
let op;
397397
{
398-
let op_ch = &*_op_ch_receiver.lock().unwrap();
399-
op = op_ch.recv();
398+
op = _op_ch_receiver.lock().unwrap().recv();
400399
}
401400

402401
if let Ok(_x) = op {
@@ -481,8 +480,7 @@ impl<RETURN: Send + Sync + 'static, RECEIVE: Send + Sync + 'static> Cor<RETURN,
481480
482481
*/
483482
pub fn is_started(&self) -> bool {
484-
let _started_alive = self.started_alive.clone();
485-
let started_alive = _started_alive.lock().unwrap();
483+
let started_alive = self.started_alive.lock().unwrap();
486484
let &(ref started, _) = &*started_alive;
487485
started.load(Ordering::SeqCst)
488486
}
@@ -493,8 +491,7 @@ impl<RETURN: Send + Sync + 'static, RECEIVE: Send + Sync + 'static> Cor<RETURN,
493491
494492
*/
495493
pub fn is_alive(&self) -> bool {
496-
let _started_alive = self.started_alive.clone();
497-
let started_alive = _started_alive.lock().unwrap();
494+
let started_alive = self.started_alive.lock().unwrap();
498495
let &(_, ref alive) = &*started_alive;
499496
alive.load(Ordering::SeqCst)
500497
}
@@ -509,8 +506,7 @@ impl<RETURN: Send + Sync + 'static, RECEIVE: Send + Sync + 'static> Cor<RETURN,
509506
*/
510507
pub fn stop(&mut self) {
511508
{
512-
let _started_alive = self.started_alive.clone();
513-
let started_alive = _started_alive.lock().unwrap();
509+
let started_alive = self.started_alive.lock().unwrap();
514510
let &(ref started, ref alive) = &*started_alive;
515511

516512
if !started.load(Ordering::SeqCst) {
@@ -532,19 +528,17 @@ impl<RETURN: Send + Sync + 'static, RECEIVE: Send + Sync + 'static> Cor<RETURN,
532528
result_ch_sender: Arc<Mutex<Sender<Option<RETURN>>>>,
533529
given_as_request: Option<RECEIVE>,
534530
) {
535-
let _op_ch_sender = self.op_ch_sender.clone();
536-
537531
// do_close_safe
538-
if !self.is_alive() {
532+
// if !self.is_alive() {
533+
let started_alive = self.started_alive.lock().unwrap();
534+
let &(_, ref alive) = &*started_alive;
535+
if !alive.load(Ordering::SeqCst) {
539536
return;
540537
}
541538

542539
{
543-
let __started_alive = self.started_alive.clone();
544-
let _started_alive = __started_alive.lock().unwrap();
545-
546-
// do: (effect)();
547-
let _result = _op_ch_sender.lock().unwrap().send(CorOp {
540+
let op_ch_sender = self.op_ch_sender.lock().unwrap();
541+
let _result = op_ch_sender.send(CorOp {
548542
// cor: cor,
549543
result_ch_sender,
550544
val: given_as_request,
@@ -559,8 +553,7 @@ impl<RETURN: Send + Sync + 'static, RECEIVE: Send + Sync + 'static> Cor<RETURN,
559553
}
560554
561555
{
562-
let __started_alive = self.started_alive.clone();
563-
let _started_alive = __started_alive.lock().unwrap();
556+
let _started_alive = self.started_alive.lock().unwrap();
564557
565558
(effect)();
566559
}
@@ -710,5 +703,5 @@ fn test_cor_new() {
710703
cor_start!(_cor1);
711704
cor_start!(_cor2);
712705

713-
thread::sleep(time::Duration::from_millis(10));
706+
thread::sleep(time::Duration::from_millis(1));
714707
}

src/handler.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -275,13 +275,13 @@ fn test_handler_new() {
275275
}));
276276
println!("Test");
277277

278-
thread::sleep(time::Duration::from_millis(10));
278+
thread::sleep(time::Duration::from_millis(1));
279279

280280
assert_eq!(true, h.is_alive());
281281
assert_eq!(true, h.is_started());
282282

283283
h.stop();
284-
thread::sleep(time::Duration::from_millis(10));
284+
thread::sleep(time::Duration::from_millis(1));
285285

286286
assert_eq!(false, h.is_alive());
287287
assert_eq!(true, h.is_started());

src/monadio.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ fn test_monadio_new() {
236236
let latch = CountDownLatch::new(1);
237237
let latch2 = latch.clone();
238238

239-
thread::sleep(time::Duration::from_millis(10));
239+
thread::sleep(time::Duration::from_millis(1));
240240

241241
let subscription = Arc::new(SubscriptionFunc::new(move |x: Arc<String>| {
242242
println!("monadio_async {:?}", x); // monadio_async ok
@@ -262,7 +262,7 @@ fn test_monadio_new() {
262262
handler_observe_on.post(RawFunc::new(move || {}));
263263
handler_observe_on.post(RawFunc::new(move || {}));
264264
}
265-
thread::sleep(time::Duration::from_millis(10));
265+
thread::sleep(time::Duration::from_millis(1));
266266

267267
// Waiting for being unlcoked
268268
latch.clone().wait();

src/publisher.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ async fn test_publisher_stream() {
241241
}));
242242
}
243243

244-
std::thread::sleep(std::time::Duration::from_millis(10));
244+
std::thread::sleep(std::time::Duration::from_millis(1));
245245

246246
got_list = s.clone().collect::<Vec<_>>().await;
247247
assert_eq!(

src/sync.rs

Lines changed: 29 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -138,19 +138,16 @@ where
138138
}
139139

140140
fn start(&mut self) {
141-
{
142-
let started_alive = self.started_alive.lock().unwrap();
143-
let &(ref started, ref alive) = &*started_alive;
144-
145-
if started.load(Ordering::SeqCst) {
146-
return;
147-
}
148-
started.store(true, Ordering::SeqCst);
149-
if alive.load(Ordering::SeqCst) {
150-
return;
151-
}
152-
alive.store(true, Ordering::SeqCst);
141+
let started_alive = self.started_alive.lock().unwrap();
142+
let &(ref started, ref alive) = &*started_alive;
143+
if started.load(Ordering::SeqCst) {
144+
return;
145+
}
146+
started.store(true, Ordering::SeqCst);
147+
if alive.load(Ordering::SeqCst) {
148+
return;
153149
}
150+
alive.store(true, Ordering::SeqCst);
154151

155152
let mut this = self.clone();
156153
let _effect = self.effect.clone();
@@ -171,18 +168,15 @@ where
171168
}
172169

173170
fn stop(&mut self) {
174-
{
175-
let started_alive = self.started_alive.lock().unwrap();
176-
let &(ref started, ref alive) = &*started_alive;
177-
178-
if !started.load(Ordering::SeqCst) {
179-
return;
180-
}
181-
if !alive.load(Ordering::SeqCst) {
182-
return;
183-
}
184-
alive.store(false, Ordering::SeqCst);
171+
let started_alive = self.started_alive.lock().unwrap();
172+
let &(ref started, ref alive) = &*started_alive;
173+
if !started.load(Ordering::SeqCst) {
174+
return;
175+
}
176+
if !alive.load(Ordering::SeqCst) {
177+
return;
185178
}
179+
alive.store(false, Ordering::SeqCst);
186180

187181
#[cfg(feature = "for_futures")]
188182
{
@@ -212,9 +206,12 @@ where
212206
{
213207
type Output = Option<T>;
214208

215-
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
216-
if self.is_started() && (!self.is_alive()) {
217-
Poll::Ready(self.result())
209+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
210+
let started_alive = self.started_alive.lock().unwrap();
211+
let &(ref started, ref alive) = &*started_alive;
212+
213+
if started.load(Ordering::SeqCst) && (!alive.load(Ordering::SeqCst)) {
214+
Poll::Ready(self.clone().result())
218215
} else {
219216
{
220217
self.waker.lock().unwrap().replace(cx.waker().clone());
@@ -265,15 +262,16 @@ impl CountDownLatch {
265262

266263
#[cfg(feature = "for_futures")]
267264
{
268-
if let Some(waker) = self.waker.lock().unwrap().take() {
265+
let mut waker = self.waker.lock().unwrap();
266+
if let Some(waker) = waker.take() {
269267
waker.wake()
270268
}
271269
}
272270
}
273271
}
274272

275273
pub fn wait(&self) {
276-
let &(ref lock, ref cvar) = &*self.pair.clone();
274+
let &(ref lock, ref cvar) = &*self.pair;
277275

278276
/*
279277
let mut result = lock.lock();
@@ -303,8 +301,7 @@ impl Future for CountDownLatch {
303301
type Output = ();
304302

305303
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
306-
let pair = self.pair.clone();
307-
let &(ref remaining, _) = &*pair;
304+
let &(ref remaining, _) = &*self.pair;
308305
let count = remaining.lock().unwrap();
309306
if *count > 0 {
310307
{
@@ -568,7 +565,7 @@ async fn test_sync_future() {
568565
h.start();
569566
println!("test_sync_future hh2 running");
570567
}
571-
std::thread::sleep(Duration::from_millis(10));
568+
std::thread::sleep(Duration::from_millis(1));
572569

573570
pub1.publish(1);
574571
println!("test_sync_future pub1.publish");
@@ -603,7 +600,7 @@ fn test_will_sync_new() {
603600
})));
604601
h.start();
605602
latch.clone().wait();
606-
thread::sleep(time::Duration::from_millis(10));
603+
thread::sleep(time::Duration::from_millis(1));
607604
assert_eq!(false, h.is_alive());
608605
assert_eq!(true, h.is_started());
609606
assert_eq!(1, h.result().unwrap());

0 commit comments

Comments
 (0)