Skip to content

Commit d173df7

Browse files
committed
Add Actor Ask test/examples (Akka-style).
1 parent 942c68e commit d173df7

File tree

2 files changed

+141
-0
lines changed

2 files changed

+141
-0
lines changed

README.md

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,8 @@ println!("test_map_reduce_filter Result is {:?}", result);
359359

360360
## Actor
361361

362+
### Actor common(send/receive/spawn/states)
363+
362364
Example:
363365

364366
```rust
@@ -475,3 +477,74 @@ assert_eq!(
475477
v.as_slice()
476478
)
477479
```
480+
481+
### Actor Ask (inspired by Akka/Erlang)
482+
483+
Example:
484+
485+
```rust
486+
use std::time::Duration;
487+
488+
use fp_rust::common::LinkedListAsync;
489+
490+
#[derive(Clone, Debug)]
491+
enum Value {
492+
AskIntByLinkedListAsync((i32, LinkedListAsync<i32>)),
493+
AskIntByBlockingQueue((i32, BlockingQueue<i32>)),
494+
}
495+
496+
let mut root = ActorAsync::new(
497+
move |_: &mut ActorAsync<_, _>, msg: Value, _: &mut HashMap<String, Value>| match msg {
498+
Value::AskIntByLinkedListAsync(v) => {
499+
println!("Actor AskIntByLinkedListAsync");
500+
v.1.push_back(v.0 * 10);
501+
}
502+
Value::AskIntByBlockingQueue(mut v) => {
503+
println!("Actor AskIntByBlockingQueue");
504+
505+
// NOTE If negative, hanging for testing timeout
506+
if v.0 < 0 {
507+
return;
508+
}
509+
510+
// NOTE General Cases
511+
v.1.offer(v.0 * 10);
512+
} // _ => {}
513+
},
514+
);
515+
516+
let mut root_handle = root.get_handle();
517+
root.start();
518+
519+
// LinkedListAsync<i32>
520+
let result_i32 = LinkedListAsync::<i32>::new();
521+
root_handle.send(Value::AskIntByLinkedListAsync((1, result_i32.clone())));
522+
root_handle.send(Value::AskIntByLinkedListAsync((2, result_i32.clone())));
523+
root_handle.send(Value::AskIntByLinkedListAsync((3, result_i32.clone())));
524+
thread::sleep(Duration::from_millis(1));
525+
let i = result_i32.pop_front();
526+
assert_eq!(Some(10), i);
527+
let i = result_i32.pop_front();
528+
assert_eq!(Some(20), i);
529+
let i = result_i32.pop_front();
530+
assert_eq!(Some(30), i);
531+
532+
// BlockingQueue<i32>
533+
let mut result_i32 = BlockingQueue::<i32>::new();
534+
result_i32.timeout = Some(Duration::from_millis(1));
535+
root_handle.send(Value::AskIntByBlockingQueue((4, result_i32.clone())));
536+
root_handle.send(Value::AskIntByBlockingQueue((5, result_i32.clone())));
537+
root_handle.send(Value::AskIntByBlockingQueue((6, result_i32.clone())));
538+
thread::sleep(Duration::from_millis(1));
539+
let i = result_i32.take();
540+
assert_eq!(Some(40), i);
541+
let i = result_i32.take();
542+
assert_eq!(Some(50), i);
543+
let i = result_i32.take();
544+
assert_eq!(Some(60), i);
545+
546+
// Timeout case:
547+
root_handle.send(Value::AskIntByBlockingQueue((-1, result_i32.clone())));
548+
let i = result_i32.take();
549+
assert_eq!(None, i);
550+
```

src/actor.rs

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -422,3 +422,71 @@ fn test_actor_common() {
422422
v.as_slice()
423423
)
424424
}
425+
426+
#[test]
427+
fn test_actor_ask() {
428+
use std::time::Duration;
429+
430+
use super::common::LinkedListAsync;
431+
432+
#[derive(Clone, Debug)]
433+
enum Value {
434+
AskIntByLinkedListAsync((i32, LinkedListAsync<i32>)),
435+
AskIntByBlockingQueue((i32, BlockingQueue<i32>)),
436+
}
437+
438+
let mut root = ActorAsync::new(
439+
move |_: &mut ActorAsync<_, _>, msg: Value, _: &mut HashMap<String, Value>| match msg {
440+
Value::AskIntByLinkedListAsync(v) => {
441+
println!("Actor AskIntByLinkedListAsync");
442+
v.1.push_back(v.0 * 10);
443+
}
444+
Value::AskIntByBlockingQueue(mut v) => {
445+
println!("Actor AskIntByBlockingQueue");
446+
447+
// NOTE If negative, hanging for testing timeout
448+
if v.0 < 0 {
449+
return;
450+
}
451+
452+
// NOTE General Cases
453+
v.1.offer(v.0 * 10);
454+
} // _ => {}
455+
},
456+
);
457+
458+
let mut root_handle = root.get_handle();
459+
root.start();
460+
461+
// LinkedListAsync<i32>
462+
let result_i32 = LinkedListAsync::<i32>::new();
463+
root_handle.send(Value::AskIntByLinkedListAsync((1, result_i32.clone())));
464+
root_handle.send(Value::AskIntByLinkedListAsync((2, result_i32.clone())));
465+
root_handle.send(Value::AskIntByLinkedListAsync((3, result_i32.clone())));
466+
thread::sleep(Duration::from_millis(1));
467+
let i = result_i32.pop_front();
468+
assert_eq!(Some(10), i);
469+
let i = result_i32.pop_front();
470+
assert_eq!(Some(20), i);
471+
let i = result_i32.pop_front();
472+
assert_eq!(Some(30), i);
473+
474+
// BlockingQueue<i32>
475+
let mut result_i32 = BlockingQueue::<i32>::new();
476+
result_i32.timeout = Some(Duration::from_millis(1));
477+
root_handle.send(Value::AskIntByBlockingQueue((4, result_i32.clone())));
478+
root_handle.send(Value::AskIntByBlockingQueue((5, result_i32.clone())));
479+
root_handle.send(Value::AskIntByBlockingQueue((6, result_i32.clone())));
480+
thread::sleep(Duration::from_millis(1));
481+
let i = result_i32.take();
482+
assert_eq!(Some(40), i);
483+
let i = result_i32.take();
484+
assert_eq!(Some(50), i);
485+
let i = result_i32.take();
486+
assert_eq!(Some(60), i);
487+
488+
// Timeout case:
489+
root_handle.send(Value::AskIntByBlockingQueue((-1, result_i32.clone())));
490+
let i = result_i32.take();
491+
assert_eq!(None, i);
492+
}

0 commit comments

Comments
 (0)