Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial implementation of actor runtime #99

Merged
merged 36 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
e455e80
actor implementation
danielgerlag Apr 29, 2023
ae7aef7
wip
danielgerlag Apr 29, 2023
7db861e
wip
danielgerlag May 2, 2023
2bd95e1
wip
danielgerlag May 2, 2023
ebf701e
wip
danielgerlag May 4, 2023
2e34b22
nits
danielgerlag May 4, 2023
c95a9c0
tests
danielgerlag May 4, 2023
45eab14
make client cloneable
danielgerlag May 10, 2023
3f286d5
logs
danielgerlag May 10, 2023
dc6156f
logging
danielgerlag May 10, 2023
7981005
async methods
danielgerlag May 16, 2023
513b334
shutdown semantics
danielgerlag Jul 18, 2023
6dbe2b7
clone actor client context
danielgerlag Jul 18, 2023
7abc466
actor implementation
danielgerlag Aug 2, 2023
0a0ff9b
wip
danielgerlag Aug 2, 2023
188753d
move tests
danielgerlag Aug 2, 2023
b9cd1f6
actor factory
danielgerlag Aug 2, 2023
8581e82
wip
danielgerlag Aug 2, 2023
d18f355
wip
danielgerlag Aug 2, 2023
b6b6fe3
readme
danielgerlag Aug 3, 2023
6971ab5
pr feedback Signed-off-by: Daniel Gerlag <daniel@gerlag.ca>
danielgerlag Aug 21, 2023
a4241d5
cargo fmt
danielgerlag Aug 21, 2023
8e77624
cargo clippy --fix
danielgerlag Aug 21, 2023
a58192b
proc macro
danielgerlag Aug 22, 2023
6fc06ac
dependency shuffle
danielgerlag Aug 22, 2023
029f482
Update lib.rs
danielgerlag Aug 22, 2023
a908fac
docs
danielgerlag Aug 22, 2023
877efed
enable decorating type alias
danielgerlag Aug 22, 2023
a979a1f
graceful shutdown
danielgerlag Oct 11, 2023
6a3c6d8
Merge remote-tracking branch 'upstream/master' into actors-refactor
danielgerlag Jan 8, 2024
8769c21
merge issues
danielgerlag Jan 8, 2024
0030ddd
cargo fmt
danielgerlag Jan 8, 2024
d0e5384
update rust version
danielgerlag Jan 8, 2024
0c0ee4a
publish macro crate
danielgerlag Jan 8, 2024
f32f4cd
dependency issue
danielgerlag Jan 8, 2024
a2d3494
clippy warnings
danielgerlag Jan 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
wip
Signed-off-by: Daniel Gerlag <daniel@gerlag.ca>
  • Loading branch information
danielgerlag committed Aug 21, 2023
commit 2bd95e1a15976e8d726afd559da21f2068306a07
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ keywords = ["microservices", "dapr"]


[dependencies]
futures = "0.3"
tonic = "0.8"
prost = "0.11"
bytes = "1"
Expand Down
26 changes: 19 additions & 7 deletions examples/actors/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@ This example demonstrates the Dapr actor framework. To define an actor, impleme
- `on_reminder` - Called when a reminder is recieved from the Dapr sidecar
- `on_timer` - Called when a timer is recieved from the Dapr sidecar

The `invoke_method_json` method is a convenience helper that can be used within the `on_invoke` implementation, to automatically deserialize the input and serialize the output to JSON.

```rust
impl actor::Actor for MyActor {

fn on_activate(&mut self) -> Result<(), ActorError> {
async fn on_activate(&mut self) -> Result<(), ActorError> {
println!("on_activate {}", self.id);
Ok(())
}
Expand All @@ -23,18 +21,32 @@ impl actor::Actor for MyActor {
}

fn on_invoke(&mut self, method: &str, data: Vec<u8>) -> Result<Vec<u8>, actor::ActorError> {
println!("on_invoke {} {:?}", method, from_utf8(&data));

match method {
"do_stuff" => actor::invoke_method_json(self, &MyActor::do_stuff, data),
"do_stuff" => {
let args = serde_json::from_slice::<MyRequest>(&data);
if args.is_err() {
return Err(ActorError::SerializationError());
}

match self.do_stuff(args.unwrap()).await {
Ok(r) => Ok(serde_json::to_vec(&r).unwrap()),
Err(e) => Err(e)
}
}
_ => Err(actor::ActorError::MethodNotFound)
}
}

}

impl MyActor {
fn new(id: &str) -> Self {
fn new(actor_type: &str, id: &str, client: Box<ActorContextClient<TonicClient>>) -> Self {
println!("creating actor {} {}", id, actor_type);
MyActor {
id: id.to_string(),
client
}
}

Expand All @@ -45,11 +57,11 @@ impl MyActor {
}
```

An actor host requires an Http server to recieve callbacks from the Dapr sidecar. The `DaprHttpServer` object implements this functionality and also encapsulates the actor runtime to service any hosted actors. Use the `register_actor` method to register an actor type to be serviced, this method takes the actor type name and a factory to construct a new instance of that actor type when one is required to be activated by the runtime.
An actor host requires an Http server to recieve callbacks from the Dapr sidecar. The `DaprHttpServer` object implements this functionality and also encapsulates the actor runtime to service any hosted actors. Use the `register_actor` method to register an actor type to be serviced, this method takes the actor type name and a factory to construct a new instance of that actor type when one is required to be activated by the runtime. The parameters passed to the factory will be the actor type, actor ID, and a Dapr client for managing state, timers and reminders for the actor.

```rust
let mut dapr_server = dapr::server::DaprHttpServer::new();
dapr_server.register_actor("MyActor", Box::new(|id| Arc::new(Mutex::new(MyActor::new(id)))));
dapr_server.register_actor("MyActor", Box::new(|actor_type, id, client| Arc::new(Mutex::new(MyActor::new(actor_type, id, client)))));
dapr_server.start(None, None).await?;
```

Expand Down
55 changes: 2 additions & 53 deletions examples/actors/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
use std::{collections::HashMap, str::from_utf8};

use dapr::server::actor::context_client::ActorStateOperation;

use std::{collections::HashMap};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
Expand All @@ -12,7 +9,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
std::thread::sleep(std::time::Duration::new(2, 0));

// Get the Dapr port and create a connection
let port: u16 = 60128; //std::env::var("DAPR_GRPC_PORT")?.parse()?;
let port: u16 = std::env::var("DAPR_GRPC_PORT")?.parse()?;
let addr = format!("https://127.0.0.1:{}", port);

// Create the client
Expand All @@ -30,59 +27,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
Ok(r) => {
let s = String::from_utf8(r.data);
println!("Response: {:#?}", s);


},
Err(e) => {
println!("Error: {:#?}", e);
}
}

let addr = format!("https://127.0.0.1:{}", port);
let mut actor_client = dapr::server::actor::context_client::ActorContextClient::<dapr::client::TonicClient>::connect(addr, "MyActor".to_string(), "a1".to_string()).await?;

// let timer_data = "someData";
// let timer_data = timer_data.as_bytes().to_vec();

// //let period = Duration::from_secs(5);
// match actor_client.register_actor_reminder("remind3", Some(std::time::Duration::from_secs(5)), Some(std::time::Duration::from_secs(10)), timer_data, None).await {
// Ok(r) => {
// println!("reminder registered: {:#?}", r);
// },
// Err(e) => {
// println!("Error: {:#?}", e);
// }
// }

//actor_client.unregister_actor_reminder("remind3").await;

//let s = prost_types::Any::from("test".as_bytes().to_vec());

// let ops = vec![
// ActorStateOperation::Upsert { key: "key1".to_string(), value: Some("value1".as_bytes().to_vec()) }
// ];

// match actor_client.execute_actor_state_transaction(ops).await {
// Ok(r) => {
// println!("state transaction: {:#?}", r);
// },
// Err(e) => {
// println!("Error: {:#?}", e);
// }
// }

match actor_client.get_actor_state("key1").await {
Ok(r) => {

let hs = String::from_utf8(r.data);
println!("data: {:#?}", hs);
//println!("state transaction: {:#?}", r);
},
Err(e) => {
println!("Error: {:#?}", e);
}
}


Ok(())
}
51 changes: 31 additions & 20 deletions examples/actors/server.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{sync::{Arc, Mutex}, str::from_utf8};
use dapr::{server::actor::{self, ActorError, context_client::{ActorContextClient, DaprActorInterface}}, dapr::dapr::proto::runtime::v1::dapr_client::DaprClient, client::{DaprInterface, TonicClient}};
use async_trait::async_trait;
use dapr::{server::actor::{self, ActorError, context_client::{ActorContextClient, GrpcDaprClient}, ActorFactory, ActorInstance}, client::{DaprInterface, TonicClient}};
use serde::{Serialize, Deserialize};
use tonic::transport::Channel;

#[derive(Serialize, Deserialize, Debug)]
pub struct MyResponse {
Expand All @@ -14,54 +14,59 @@ pub struct MyRequest {
}

struct MyActor {
actor_type: String,
id: String,
client: Box<ActorContextClient<TonicClient>>
}

#[async_trait]
impl actor::Actor for MyActor {

fn on_activate(&mut self) -> Result<(), ActorError> {
async fn on_activate(&mut self) -> Result<(), ActorError> {
println!("on_activate {}", self.id);
Ok(())
}

fn on_deactivate(&mut self) -> Result<(), ActorError> {
async fn on_deactivate(&mut self) -> Result<(), ActorError> {
println!("on_deactivate");
Ok(())
}

fn on_invoke(&mut self, method: &str, data: Vec<u8>) -> Result<Vec<u8>, actor::ActorError> {
println!("on_invoke {}", method);

async fn on_invoke(&mut self, method: &str, data: Vec<u8>) -> Result<Vec<u8>, actor::ActorError> {
println!("on_invoke {} {:?}", method, from_utf8(&data));
match method {
"do_stuff" => actor::invoke_method_json(self, &MyActor::do_stuff, data),
"do_stuff" => {
let args = serde_json::from_slice::<MyRequest>(&data);
if args.is_err() {
return Err(ActorError::SerializationError());
}

match self.do_stuff(args.unwrap()).await {
Ok(r) => Ok(serde_json::to_vec(&r).unwrap()),
Err(e) => Err(e)
}
}
_ => Err(actor::ActorError::MethodNotFound)
}
}

fn on_reminder(&mut self, reminder_name: &str, data: Vec<u8>) -> Result<(), actor::ActorError> {
async fn on_reminder(&mut self, reminder_name: &str, data: Vec<u8>) -> Result<(), actor::ActorError> {
println!("on_reminder {} {:?}", reminder_name, from_utf8(&data));
Ok(())
}

fn on_timer(&mut self, timer_name: &str, data: Vec<u8>) -> Result<(), actor::ActorError> {
async fn on_timer(&mut self, timer_name: &str, data: Vec<u8>) -> Result<(), actor::ActorError> {
println!("on_timer {} {:?}", timer_name, from_utf8(&data));
Ok(())
}

}

impl MyActor {
fn new(id: &str, actor_type: &str, client: Box<ActorContextClient<TonicClient>>) -> Self {
println!("creating actor {} {}", id, actor_type);
MyActor {
id: id.to_string(),
client
}
}
async fn do_stuff(&mut self, data: MyRequest) -> Result<MyResponse, actor::ActorError> {
async fn do_stuff(&mut self, data: MyRequest) -> Result<MyResponse, actor::ActorError> {
println!("doing stuff with {}", data.name);
let r = self.client.get_actor_state("key1").await?;
let r = self.client.get_actor_state("key1").await;
println!("get_actor_state {:?}", r);
Ok(MyResponse { available: true })
}
Expand All @@ -71,7 +76,13 @@ impl MyActor {
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));
let mut dapr_server = dapr::server::DaprHttpServer::new().await;
dapr_server.register_actor("MyActor", Box::new(|id, actor_type, client| Arc::new(Mutex::new(MyActor::new(id, actor_type, client)))));
dapr_server.register_actor("MyActor", Box::new(
|actor_type, id, client| Arc::new(Mutex::new(MyActor{
actor_type: actor_type.to_string(),
id: id.to_string(),
client
}))));

dapr_server.start(None, None).await?;

Ok(())
Expand Down
14 changes: 7 additions & 7 deletions src/server/actor/context_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::dapr::dapr::proto::{runtime::v1 as dapr_v1};
use crate::error::Error as DaprError;


pub type GrpcActorClient = dapr_v1::dapr_client::DaprClient<TonicChannel>;
pub type GrpcDaprClient = dapr_v1::dapr_client::DaprClient<TonicChannel>;

pub enum ActorStateOperation {
Upsert {
Expand Down Expand Up @@ -92,12 +92,12 @@ pub struct ActorContextClient<T>{

impl<T: DaprActorInterface> ActorContextClient<T> {

pub async fn connect(addr: String, actor_type: String, actor_id: String) -> Result<Self, DaprError> {
Ok(ActorContextClient{
client: T::connect(addr).await?,
actor_type: actor_type,
actor_id: actor_id,
})
pub fn new(client: T, actor_type: &str, actor_id: &str) -> Self {
ActorContextClient{
client,
actor_type: actor_type.to_string(),
actor_id: actor_id.to_string(),
}
}

pub async fn get_actor_state<K>(&mut self, key: K) -> Result<GetActorStateResponse, DaprError>
Expand Down
56 changes: 33 additions & 23 deletions src/server/actor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use std::{sync::Arc, sync::Mutex, error::Error};
use std::{sync::Arc, sync::Mutex, error::Error, future::{Future}};
use async_trait::async_trait;
use serde::{Serialize, Deserialize};

//use self::context_client::{DaprActorInterface};
use self::context_client::{ActorContextClient, DaprActorInterface};

pub mod context_client;
pub mod runtime;

pub type ActorInstance = Arc<Mutex<dyn Actor>>;
pub type ActorFactory<TActorClient> = Box<dyn Fn(&str, &str, Box<TActorClient>) -> ActorInstance>;
pub type ActorFactory<TActorClient> = Box<dyn Fn(&str, &str, Box<ActorContextClient<TActorClient>>) -> ActorInstance>;

#[derive(Debug)]
pub enum ActorError {
Expand All @@ -19,28 +20,37 @@ pub enum ActorError {
SerializationError()
}

#[async_trait]
pub trait Actor {
fn on_activate(&mut self) -> Result<(), ActorError> { Ok(()) }
fn on_deactivate(&mut self) -> Result<(), ActorError> { Ok(()) }
fn on_invoke(&mut self, method: &str, data: Vec<u8>) -> Result<Vec<u8>, ActorError>;
fn on_reminder(&mut self, _reminder_name: &str, _data : Vec<u8>) -> Result<(), ActorError> { Ok(()) }
fn on_timer(&mut self, _timer_name: &str, _data : Vec<u8>) -> Result<(), ActorError> { Ok(()) }
async fn on_activate(&mut self) -> Result<(), ActorError>;
async fn on_deactivate(&mut self) -> Result<(), ActorError>;
async fn on_invoke(&mut self, method: &str, data: Vec<u8>) -> Result<Vec<u8>, ActorError>;
async fn on_reminder(&mut self, _reminder_name: &str, _data : Vec<u8>) -> Result<(), ActorError>;
async fn on_timer(&mut self, _timer_name: &str, _data : Vec<u8>) -> Result<(), ActorError>;
}

pub fn invoke_method_json<TActor, TInput, TOutput>(actor: &mut TActor, method: &dyn Fn(&mut TActor, TInput) -> Result<TOutput, ActorError>, data: Vec<u8>) -> Result<Vec<u8>, ActorError>
where TActor: Actor, TInput: for<'a> Deserialize<'a>, TOutput: Serialize
{
let args = serde_json::from_slice::<TInput>(&data);
if args.is_err() {
return Err(ActorError::SerializationError());
}
match method(actor, args.unwrap()) {
Ok(r) => {
let serialized = serde_json::to_vec(&r).unwrap();
Ok(serialized)
},
Err(e) => Err(e)
}
pub trait ActorBuilder<T: DaprActorInterface> {
fn build(&self, actor_type: &str, id: &str, client: Box<ActorContextClient<T>>) -> ActorInstance;
}


// pub async fn invoke_method_json<TActor, TInput, TOutput, TMethod, TFuture>(actor: &mut TActor, method: TMethod, data: Vec<u8>) -> Result<Vec<u8>, ActorError>
// where
// TActor: Actor,
// TInput: for<'a> Deserialize<'a>,
// TOutput: Serialize,
// TMethod: Fn(&mut TActor, TInput) -> TFuture,
// TFuture: Future<Output = Result<TOutput, ActorError>>
// {
// let args = serde_json::from_slice::<TInput>(&data);
// if args.is_err() {
// return Err(ActorError::SerializationError());
// }

// match method(actor, args.unwrap()).await {
// Ok(r) => {
// let serialized = serde_json::to_vec(&r).unwrap();
// Ok(serialized)
// },
// Err(e) => Err(e)
// }
// }
Loading