Skip to content

Commit ed11da8

Browse files
committed
Implement Actor model. Make standalone features as possible.
1 parent 907a68d commit ed11da8

File tree

4 files changed

+314
-12
lines changed

4 files changed

+314
-12
lines changed

Cargo.toml

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,24 @@ name = "fp_rust"
2020
path = "src/lib.rs"
2121

2222
[features]
23-
default = []
23+
default = [ "pure" ]
2424
for_futures = [ "futures", "futures-test" ]
2525
# for_futures = [ "futures", "tokio" ]
26-
pure = [ ]
26+
pure = [
27+
"fp",
28+
"maybe",
29+
"monadio",
30+
"cor",
31+
"actor",
32+
]
33+
fp = [ ]
34+
maybe = [ ]
35+
monadio = [ ]
36+
cor = [ ]
37+
actor = [ ]
2738

2839
# For test
29-
test_runtime = [ "for_futures" ]
40+
test_runtime = [ "pure", "for_futures" ]
3041

3142
[dependencies]
3243
# tokio = { version = "^1.9.0", features = ["full"], optional = true }

src/actor.rs

Lines changed: 280 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,280 @@
1+
/*!
2+
In this module there're implementations & tests of `Actor`.
3+
*/
4+
5+
use std::collections::HashMap;
6+
use std::sync::{
7+
atomic::{AtomicBool, Ordering},
8+
Arc, Mutex,
9+
};
10+
use std::thread;
11+
12+
use super::common::{generate_id, UniqueId};
13+
use super::sync::{BlockingQueue, Queue};
14+
15+
/**
16+
`Actor` defines concepts of `Actor`: Send/Receive Messages, States, Methods.
17+
18+
# Arguments
19+
20+
* `Msg` - The generic type of Message data
21+
* `ContextValue` - The generic type of ContextValue
22+
23+
# Remarks
24+
25+
It defines simple and practical hehaviors of `Actor` model.
26+
27+
``
28+
*/
29+
pub trait Actor<Msg, ContextValue, HandleType, Functor>: UniqueId<String> {
30+
fn receive(&mut self, message: Msg, context: &mut HashMap<String, ContextValue>);
31+
fn spawn_with_handle(&self, name: impl Into<String>, func: Functor) -> HandleType;
32+
33+
fn get_handle(&self) -> HandleType;
34+
fn get_handle_child(&self, name: impl Into<String>) -> Option<HandleType>;
35+
fn get_handle_parent(&self) -> Option<HandleType>;
36+
}
37+
38+
pub trait Handle<Msg, ContextValue>: UniqueId<String> {
39+
fn send(&self, message: Msg);
40+
}
41+
42+
#[derive(Debug, Clone)]
43+
pub struct HandleAsync<Msg>
44+
where
45+
Msg: Send + 'static,
46+
{
47+
id: String,
48+
queue: Arc<Mutex<BlockingQueue<Msg>>>,
49+
}
50+
51+
impl<Msg, ContextValue> Handle<Msg, ContextValue> for HandleAsync<Msg>
52+
where
53+
Msg: Send + 'static,
54+
{
55+
fn send(&self, message: Msg) {
56+
self.queue.lock().unwrap().offer(message);
57+
}
58+
}
59+
impl<Msg> UniqueId<String> for HandleAsync<Msg>
60+
where
61+
Msg: Send + 'static,
62+
{
63+
fn get_id(&self) -> String {
64+
self.id.clone()
65+
}
66+
}
67+
68+
// #[derive(Clone)]
69+
pub struct ActorAsync<Msg, ContextValue> {
70+
started_alive: Arc<Mutex<(AtomicBool, AtomicBool)>>,
71+
72+
id: String,
73+
parent_handle: Option<Arc<dyn Handle<Msg, ContextValue> + Send + Sync + 'static>>,
74+
children_handle_map:
75+
Arc<Mutex<HashMap<String, Arc<dyn Handle<Msg, ContextValue> + Send + Sync + 'static>>>>,
76+
77+
context: Arc<Mutex<Box<HashMap<String, ContextValue>>>>,
78+
queue: Arc<Mutex<BlockingQueue<Msg>>>,
79+
effect: Arc<Mutex<dyn FnMut(Msg, &mut HashMap<String, ContextValue>) + Send + Sync + 'static>>,
80+
81+
join_handle: Arc<Mutex<Option<thread::JoinHandle<()>>>>,
82+
}
83+
impl<Msg, ContextValue> Clone for ActorAsync<Msg, ContextValue> {
84+
fn clone(&self) -> Self {
85+
Self {
86+
started_alive: self.started_alive.clone(),
87+
88+
id: self.id.clone(),
89+
parent_handle: self.parent_handle.clone(),
90+
children_handle_map: self.children_handle_map.clone(),
91+
92+
context: self.context.clone(),
93+
queue: self.queue.clone(),
94+
effect: self.effect.clone(),
95+
join_handle: self.join_handle.clone(),
96+
}
97+
}
98+
}
99+
100+
impl<Msg, ContextValue> ActorAsync<Msg, ContextValue> {
101+
pub fn new(
102+
effect: impl FnMut(Msg, &mut HashMap<String, ContextValue>) + Send + Sync + 'static,
103+
) -> Self {
104+
Self::new_with_options(effect, None, Arc::new(Mutex::new(BlockingQueue::new())))
105+
}
106+
107+
pub fn new_with_options(
108+
effect: impl FnMut(Msg, &mut HashMap<String, ContextValue>) + Send + Sync + 'static,
109+
parent_handle: Option<Arc<dyn Handle<Msg, ContextValue> + Send + Sync + 'static>>,
110+
queue: Arc<Mutex<BlockingQueue<Msg>>>,
111+
) -> Self {
112+
Self {
113+
queue,
114+
parent_handle,
115+
id: generate_id(),
116+
children_handle_map: Arc::new(Mutex::new(HashMap::new())),
117+
context: Arc::new(Mutex::new(Box::new(HashMap::new()))),
118+
started_alive: Arc::new(Mutex::new((AtomicBool::new(false), AtomicBool::new(false)))),
119+
join_handle: Arc::new(Mutex::new(None)),
120+
effect: Arc::new(Mutex::new(effect)),
121+
}
122+
}
123+
124+
pub fn is_started(&mut self) -> bool {
125+
let _started_alive = self.started_alive.clone();
126+
let started_alive = _started_alive.lock().unwrap();
127+
let &(ref started, _) = &*started_alive;
128+
started.load(Ordering::SeqCst)
129+
}
130+
131+
pub fn is_alive(&mut self) -> bool {
132+
let _started_alive = self.started_alive.clone();
133+
let started_alive = _started_alive.lock().unwrap();
134+
let &(_, ref alive) = &*started_alive;
135+
alive.load(Ordering::SeqCst)
136+
}
137+
138+
pub fn stop(&mut self) {
139+
{
140+
let _started_alive = self.started_alive.clone();
141+
let started_alive = _started_alive.lock().unwrap();
142+
let &(ref started, ref alive) = &*started_alive;
143+
144+
if !started.load(Ordering::SeqCst) {
145+
return;
146+
}
147+
if !alive.load(Ordering::SeqCst) {
148+
return;
149+
}
150+
alive.store(false, Ordering::SeqCst);
151+
}
152+
153+
// NOTE: Kill thread <- OS depending
154+
// let mut join_handle = self.join_handle.lock().unwrap();
155+
// join_handle
156+
// .take()
157+
// .expect("Called stop on non-running thread")
158+
// .join()
159+
// .expect("Could not join spawned thread");
160+
}
161+
}
162+
163+
impl<Msg, ContextValue> ActorAsync<Msg, ContextValue>
164+
where
165+
Msg: Clone + Send + 'static,
166+
ContextValue: Send + 'static,
167+
{
168+
pub fn start(&mut self) {
169+
{
170+
let _started_alive = self.started_alive.clone();
171+
let started_alive = _started_alive.lock().unwrap();
172+
let &(ref started, ref alive) = &*started_alive;
173+
174+
if started.load(Ordering::SeqCst) {
175+
return;
176+
}
177+
started.store(true, Ordering::SeqCst);
178+
if alive.load(Ordering::SeqCst) {
179+
return;
180+
}
181+
alive.store(true, Ordering::SeqCst);
182+
}
183+
184+
let mut this = self.clone();
185+
let started_alive_thread = self.started_alive.clone();
186+
self.join_handle = Arc::new(Mutex::new(Some(thread::spawn(move || {
187+
let mut queue = { this.queue.lock().unwrap().clone() };
188+
189+
while {
190+
let started_alive = started_alive_thread.lock().unwrap();
191+
let &(_, ref alive) = &*started_alive;
192+
193+
alive.load(Ordering::SeqCst)
194+
} {
195+
let v = queue.take();
196+
197+
match v {
198+
Some(m) => {
199+
this.receive(m, this.context.clone().lock().unwrap().as_mut());
200+
}
201+
None => {
202+
let started_alive = started_alive_thread.lock().unwrap();
203+
let &(_, ref alive) = &*started_alive;
204+
205+
alive.store(false, Ordering::SeqCst);
206+
}
207+
}
208+
}
209+
210+
this.stop();
211+
}))));
212+
}
213+
}
214+
215+
impl<Msg, ContextValue> UniqueId<String> for ActorAsync<Msg, ContextValue>
216+
where
217+
Msg: Send + 'static,
218+
{
219+
fn get_id(&self) -> String {
220+
self.id.clone()
221+
}
222+
}
223+
224+
impl<Msg, ContextValue>
225+
Actor<
226+
Msg,
227+
ContextValue,
228+
Arc<dyn Handle<Msg, ContextValue> + Send + Sync + 'static>,
229+
Box<dyn FnMut(Msg, &mut HashMap<String, ContextValue>) + Send + Sync + 'static>,
230+
> for ActorAsync<Msg, ContextValue>
231+
where
232+
Msg: Send + 'static,
233+
ContextValue: 'static,
234+
{
235+
fn receive(&mut self, message: Msg, context: &mut HashMap<String, ContextValue>) {
236+
{
237+
self.effect.lock().unwrap()(message, context);
238+
}
239+
}
240+
fn spawn_with_handle(
241+
&self,
242+
name: impl Into<String>,
243+
func: Box<dyn FnMut(Msg, &mut HashMap<String, ContextValue>) + Send + Sync + 'static>,
244+
) -> Arc<dyn Handle<Msg, ContextValue> + Send + Sync + 'static> {
245+
let mut new_one = Self::new(func);
246+
new_one.parent_handle = Some(self.get_handle());
247+
{
248+
self.children_handle_map
249+
.lock()
250+
.unwrap()
251+
.insert(name.into(), new_one.get_handle());
252+
}
253+
return new_one.get_handle();
254+
}
255+
fn get_handle(&self) -> Arc<dyn Handle<Msg, ContextValue> + Send + Sync + 'static> {
256+
return Arc::new(HandleAsync {
257+
id: self.id.clone(),
258+
queue: self.queue.clone(),
259+
});
260+
}
261+
fn get_handle_child(
262+
&self,
263+
name: impl Into<String>,
264+
) -> Option<Arc<dyn Handle<Msg, ContextValue> + Send + Sync + 'static>> {
265+
match self.children_handle_map.lock().unwrap().get(&name.into()) {
266+
Some(v) => Some(v.clone()),
267+
None => None,
268+
}
269+
}
270+
fn get_handle_parent(
271+
&self,
272+
) -> Option<Arc<dyn Handle<Msg, ContextValue> + Send + Sync + 'static>> {
273+
return self.parent_handle.clone();
274+
}
275+
}
276+
277+
#[test]
278+
fn test_actor_common() {
279+
// assert_eq!(false, Maybe::just(None::<bool>).or(false));
280+
}

src/common.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,14 @@ pub trait UniqueId<T> {
231231
fn get_id(&self) -> T;
232232
}
233233

234+
pub fn generate_id() -> String {
235+
let since_the_epoch = SystemTime::now()
236+
.duration_since(UNIX_EPOCH)
237+
.expect("Time went backwards");
238+
239+
format!("{:?}{:?}", thread::current().id(), since_the_epoch)
240+
}
241+
234242
/**
235243
`SubscriptionFunc` struct implements the interface of `Subscription`,
236244
for general purposes crossing over many modules of fpRust.
@@ -261,11 +269,7 @@ pub struct SubscriptionFunc<T> {
261269

262270
impl<T> SubscriptionFunc<T> {
263271
fn generate_id() -> String {
264-
let since_the_epoch = SystemTime::now()
265-
.duration_since(UNIX_EPOCH)
266-
.expect("Time went backwards");
267-
268-
format!("{:?}{:?}", thread::current().id(), since_the_epoch)
272+
generate_id()
269273
}
270274

271275
pub fn new(func: impl FnMut(Arc<T>) + Send + Sync + 'static) -> SubscriptionFunc<T> {

src/lib.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,19 @@ extern crate futures;
33
// #[cfg(feature = "for_futures")]
44
// extern crate tokio;
55

6-
pub mod cor;
7-
pub mod fp;
86
pub mod handler;
9-
pub mod maybe;
10-
pub mod monadio;
117
pub mod publisher;
128
pub mod sync;
139

1410
pub mod common;
11+
12+
#[cfg(feature = "actor")]
13+
pub mod actor;
14+
#[cfg(feature = "cor")]
15+
pub mod cor;
16+
#[cfg(feature = "fp")]
17+
pub mod fp;
18+
#[cfg(feature = "maybe")]
19+
pub mod maybe;
20+
#[cfg(feature = "monadio")]
21+
pub mod monadio;

0 commit comments

Comments
 (0)