Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial implementation of actor runtime #99

Merged
merged 36 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
e455e80
actor implementation
danielgerlag Apr 29, 2023
ae7aef7
wip
danielgerlag Apr 29, 2023
7db861e
wip
danielgerlag May 2, 2023
2bd95e1
wip
danielgerlag May 2, 2023
ebf701e
wip
danielgerlag May 4, 2023
2e34b22
nits
danielgerlag May 4, 2023
c95a9c0
tests
danielgerlag May 4, 2023
45eab14
make client cloneable
danielgerlag May 10, 2023
3f286d5
logs
danielgerlag May 10, 2023
dc6156f
logging
danielgerlag May 10, 2023
7981005
async methods
danielgerlag May 16, 2023
513b334
shutdown semantics
danielgerlag Jul 18, 2023
6dbe2b7
clone actor client context
danielgerlag Jul 18, 2023
7abc466
actor implementation
danielgerlag Aug 2, 2023
0a0ff9b
wip
danielgerlag Aug 2, 2023
188753d
move tests
danielgerlag Aug 2, 2023
b9cd1f6
actor factory
danielgerlag Aug 2, 2023
8581e82
wip
danielgerlag Aug 2, 2023
d18f355
wip
danielgerlag Aug 2, 2023
b6b6fe3
readme
danielgerlag Aug 3, 2023
6971ab5
pr feedback Signed-off-by: Daniel Gerlag <daniel@gerlag.ca>
danielgerlag Aug 21, 2023
a4241d5
cargo fmt
danielgerlag Aug 21, 2023
8e77624
cargo clippy --fix
danielgerlag Aug 21, 2023
a58192b
proc macro
danielgerlag Aug 22, 2023
6fc06ac
dependency shuffle
danielgerlag Aug 22, 2023
029f482
Update lib.rs
danielgerlag Aug 22, 2023
a908fac
docs
danielgerlag Aug 22, 2023
877efed
enable decorating type alias
danielgerlag Aug 22, 2023
a979a1f
graceful shutdown
danielgerlag Oct 11, 2023
6a3c6d8
Merge remote-tracking branch 'upstream/master' into actors-refactor
danielgerlag Jan 8, 2024
8769c21
merge issues
danielgerlag Jan 8, 2024
0030ddd
cargo fmt
danielgerlag Jan 8, 2024
d0e5384
update rust version
danielgerlag Jan 8, 2024
0c0ee4a
publish macro crate
danielgerlag Jan 8, 2024
f32f4cd
dependency issue
danielgerlag Jan 8, 2024
a2d3494
clippy warnings
danielgerlag Jan 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
cargo fmt
Signed-off-by: Daniel Gerlag <daniel@gerlag.ca>
  • Loading branch information
danielgerlag committed Jan 8, 2024
commit 0030dddddbb83449076a282656e9e532d50f39a3
14 changes: 6 additions & 8 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,10 +423,9 @@ impl DaprInterface for dapr_v1::dapr_client::DaprClient<TonicChannel> {
}

async fn publish_event(&mut self, request: PublishEventRequest) -> Result<(), Error> {
self
.publish_event(Request::new(request))
.await?
.into_inner();
self.publish_event(Request::new(request))
.await?
.into_inner();
Ok(())
}

Expand All @@ -449,10 +448,9 @@ impl DaprInterface for dapr_v1::dapr_client::DaprClient<TonicChannel> {
}

async fn delete_bulk_state(&mut self, request: DeleteBulkStateRequest) -> Result<(), Error> {
self
.delete_bulk_state(Request::new(request))
.await?
.into_inner();
self.delete_bulk_state(Request::new(request))
.await?
.into_inner();
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ pub mod server;
pub use client::Client;

extern crate dapr_macros;
pub use dapr_macros::actor;
pub use dapr_macros::actor;
145 changes: 70 additions & 75 deletions src/server/actor/context_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ impl From<ActorStateOperation> for TransactionalActorStateOperation {
operation_type: "upsert".to_string(),
key,
value: value.map(|v| Any {
type_url: "type.googleapis.com/bytes".to_string(),
value: v,
}),
type_url: "type.googleapis.com/bytes".to_string(),
value: v,
}),
metadata: HashMap::new(),
},
ActorStateOperation::Delete { key } => TransactionalActorStateOperation {
Expand Down Expand Up @@ -81,15 +81,14 @@ impl ActorContextClient {
&mut self,
operations: Vec<ActorStateOperation>,
) -> Result<(), DaprError> {
self
.client
.execute_actor_state_transaction(ExecuteActorStateTransactionRequest {
actor_type: self.actor_type.to_string(),
actor_id: self.actor_id.to_string(),
operations: operations.into_iter().map(|o| o.into()).collect(),
})
.await?
.into_inner();
self.client
.execute_actor_state_transaction(ExecuteActorStateTransactionRequest {
actor_type: self.actor_type.to_string(),
actor_id: self.actor_id.to_string(),
operations: operations.into_iter().map(|o| o.into()).collect(),
})
.await?
.into_inner();
Ok(())
}

Expand All @@ -112,28 +111,27 @@ impl ActorContextClient {
where
I: Into<String>,
{
self
.client
.register_actor_reminder(RegisterActorReminderRequest {
actor_type: self.actor_type.to_string(),
actor_id: self.actor_id.to_string(),
name: name.into(),
due_time: match due_time {
None => "".to_string(),
Some(t) => chrono::Duration::from_std(t).unwrap().to_string(),
},
period: match period {
None => "".to_string(),
Some(t) => chrono::Duration::from_std(t).unwrap().to_string(),
},
data,
ttl: match ttl {
None => "".to_string(),
Some(t) => chrono::Duration::from_std(t).unwrap().to_string(),
},
})
.await?
.into_inner();
self.client
.register_actor_reminder(RegisterActorReminderRequest {
actor_type: self.actor_type.to_string(),
actor_id: self.actor_id.to_string(),
name: name.into(),
due_time: match due_time {
None => "".to_string(),
Some(t) => chrono::Duration::from_std(t).unwrap().to_string(),
},
period: match period {
None => "".to_string(),
Some(t) => chrono::Duration::from_std(t).unwrap().to_string(),
},
data,
ttl: match ttl {
None => "".to_string(),
Some(t) => chrono::Duration::from_std(t).unwrap().to_string(),
},
})
.await?
.into_inner();
Ok(())
}

Expand All @@ -145,15 +143,14 @@ impl ActorContextClient {
where
I: Into<String>,
{
self
.client
.unregister_actor_reminder(UnregisterActorReminderRequest {
actor_type: self.actor_type.to_string(),
actor_id: self.actor_id.to_string(),
name: name.into(),
})
.await?
.into_inner();
self.client
.unregister_actor_reminder(UnregisterActorReminderRequest {
actor_type: self.actor_type.to_string(),
actor_id: self.actor_id.to_string(),
name: name.into(),
})
.await?
.into_inner();
Ok(())
}

Expand All @@ -178,29 +175,28 @@ impl ActorContextClient {
where
I: Into<String>,
{
self
.client
.register_actor_timer(RegisterActorTimerRequest {
actor_type: self.actor_type.to_string(),
actor_id: self.actor_id.to_string(),
name: name.into(),
due_time: match due_time {
None => "".to_string(),
Some(t) => chrono::Duration::from_std(t).unwrap().to_string(),
},
period: match period {
None => "".to_string(),
Some(t) => chrono::Duration::from_std(t).unwrap().to_string(),
},
data,
callback: callback.unwrap_or_default(),
ttl: match ttl {
None => "".to_string(),
Some(t) => chrono::Duration::from_std(t).unwrap().to_string(),
},
})
.await?
.into_inner();
self.client
.register_actor_timer(RegisterActorTimerRequest {
actor_type: self.actor_type.to_string(),
actor_id: self.actor_id.to_string(),
name: name.into(),
due_time: match due_time {
None => "".to_string(),
Some(t) => chrono::Duration::from_std(t).unwrap().to_string(),
},
period: match period {
None => "".to_string(),
Some(t) => chrono::Duration::from_std(t).unwrap().to_string(),
},
data,
callback: callback.unwrap_or_default(),
ttl: match ttl {
None => "".to_string(),
Some(t) => chrono::Duration::from_std(t).unwrap().to_string(),
},
})
.await?
.into_inner();
Ok(())
}

Expand All @@ -212,15 +208,14 @@ impl ActorContextClient {
where
I: Into<String>,
{
self
.client
.unregister_actor_timer(UnregisterActorTimerRequest {
actor_type: self.actor_type.to_string(),
actor_id: self.actor_id.to_string(),
name: name.into(),
})
.await?
.into_inner();
self.client
.unregister_actor_timer(UnregisterActorTimerRequest {
actor_type: self.actor_type.to_string(),
actor_id: self.actor_id.to_string(),
name: name.into(),
})
.await?
.into_inner();
Ok(())
}
}
Expand Down
7 changes: 4 additions & 3 deletions src/server/actor/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ pub struct ActorState {
pub runtime: Arc<ActorRuntime>,
}

type MethodRegistrationMap = HashMap<String, Box<dyn (FnOnce(Router, Arc<ActorRuntime>) -> Router) + Send + Sync>>;
type MethodRegistrationMap =
HashMap<String, Box<dyn (FnOnce(Router, Arc<ActorRuntime>) -> Router) + Send + Sync>>;

/// Describes the registration of an actor type, including the methods that can be invoked on it and the factory to create instances of it.
/// # Example:
Expand Down Expand Up @@ -64,7 +65,7 @@ type MethodRegistrationMap = HashMap<String, Box<dyn (FnOnce(Router, Arc<ActorRu
/// # todo!()
/// # }
/// # }
///
///
/// # async fn main_async() {
/// let mut dapr_server = dapr::server::DaprHttpServer::new().await;
///
Expand Down Expand Up @@ -192,7 +193,7 @@ impl ActorTypeRegistration {

fn create_actor(&self, actor_id: &str, client: TonicClient) -> Arc<dyn Actor> {
let client = ActorContextClient::new(client, &self.name, actor_id);

(self.factory)(&self.name, actor_id, client) as _
}
}
Expand Down
9 changes: 4 additions & 5 deletions src/server/actor/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,20 @@ use std::{
sync::Arc,
};

use dapr::server::{
actor::{runtime::ActorTypeRegistration, Actor, ActorError},
DaprHttpServer,
};
use async_trait::async_trait;
use axum::{Json, Router};
use axum_test::TestServer;
use dapr::server::{
actor::{runtime::ActorTypeRegistration, Actor, ActorError},
DaprHttpServer,
};
use dapr_macros::actor;
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
use serde_json::json;
use tokio::sync::Mutex;
use uuid::Uuid;


#[derive(Serialize, Deserialize, Debug, PartialEq)]
pub struct MyResponse {
pub actor_id: String,
Expand Down
24 changes: 11 additions & 13 deletions src/server/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use axum::{
Json, Router,
};
use futures::{Future, FutureExt};
use std::{net::SocketAddr, sync::Arc, pin::Pin};
use std::{net::SocketAddr, pin::Pin, sync::Arc};

use super::super::client::TonicClient;
use super::actor::runtime::{ActorRuntime, ActorTypeRegistration};
Expand Down Expand Up @@ -107,7 +107,8 @@ impl DaprHttpServer {
}

pub fn with_graceful_shutdown<F>(self, signal: F) -> Self
where F: Future<Output = ()> + Send + 'static
where
F: Future<Output = ()> + Send + 'static,
{
DaprHttpServer {
shutdown_signal: Some(signal.boxed()),
Expand Down Expand Up @@ -137,20 +138,19 @@ impl DaprHttpServer {

let addr = SocketAddr::from(([127, 0, 0, 1], port.unwrap_or(default_port)));

let server = axum::Server::bind(&addr)
.serve(app.into_make_service());

let server = axum::Server::bind(&addr).serve(app.into_make_service());

let final_result = match self.shutdown_signal.take() {
Some(signal) => {
server.with_graceful_shutdown(async move {
signal.await;
})
.await
server
.with_graceful_shutdown(async move {
signal.await;
})
.await
}
None => server.await,
};

self.actor_runtime.deactivate_all().await;

Ok(final_result?)
Expand Down Expand Up @@ -182,9 +182,7 @@ impl DaprHttpServer {
put(invoke_timer).with_state(rt.clone()),
);


self
.actor_runtime
self.actor_runtime
.configure_method_routes(app, rt.clone())
.await
}
Expand Down
Loading