Skip to content

Commit

Permalink
added FromActorRef
Browse files Browse the repository at this point in the history
  • Loading branch information
anabyv042 committed Dec 23, 2024
1 parent 3c94f0f commit 8479a89
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 0 deletions.
59 changes: 59 additions & 0 deletions ractor/src/actor/from_actor_ref.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright (c) Sean Lawlor
//
// This source code is licensed under both the MIT license found in the
// LICENSE-MIT file in the root directory of this source tree.

//! [FromActorRef] wraps an [ActorCell] to send messages that can be converted
//! to its accepted type using [From]
use crate::{ActorCell, ActorRef, Message, MessagingErr};

/// [FromActorRef] wraps an [ActorCell] to send messages that can be converted
/// into its accepted type using [From]. [FromActorRef] allows to create isolation
/// between actors by hiding the actual message type.
pub struct FromActorRef<TFrom> {
converter: Box<dyn Fn(TFrom) -> Result<(), MessagingErr<TFrom>>>,
cell: ActorCell,
}

impl<TFrom> std::fmt::Debug for FromActorRef<TFrom> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FromActorRef")
.field("cell", &self.cell)
.finish()
}

Check warning on line 24 in ractor/src/actor/from_actor_ref.rs

View check run for this annotation

Codecov / codecov/patch

ractor/src/actor/from_actor_ref.rs#L20-L24

Added lines #L20 - L24 were not covered by tests
}

impl<TFrom> FromActorRef<TFrom> {
/// Casts the message to the target message type of [ActorCell] and sends it
///
/// * `message` - The message to send
///
/// Returns [Ok(())] on successful message send, [Err(MessagingErr)] otherwise
pub fn send_message(&self, message: TFrom) -> Result<(), MessagingErr<TFrom>> {
(self.converter)(message)
}

/// Retrieve a cloned [ActorCell] representing this [FromActorRef]
pub fn get_cell(&self) -> ActorCell {
self.cell.clone()
}

Check warning on line 40 in ractor/src/actor/from_actor_ref.rs

View check run for this annotation

Codecov / codecov/patch

ractor/src/actor/from_actor_ref.rs#L38-L40

Added lines #L38 - L40 were not covered by tests
}

impl<TMessage: Message> ActorRef<TMessage> {
/// Constructs the [FromActorRef] for a specific type from [ActorRef]. Consumes
/// the [ActorRef].
pub fn from_ref<TFrom: Into<TMessage> + Clone>(self) -> FromActorRef<TFrom> {
let actor_ref = self.clone();
let cast_and_send = move |msg: TFrom| match actor_ref.send_message(msg.clone().into()) {
Ok(_) => Ok(()),
Err(MessagingErr::SendErr(_)) => Err(MessagingErr::SendErr(msg)),
Err(MessagingErr::ChannelClosed) => Err(MessagingErr::ChannelClosed),
Err(MessagingErr::InvalidActorType) => Err(MessagingErr::InvalidActorType),

Check warning on line 52 in ractor/src/actor/from_actor_ref.rs

View check run for this annotation

Codecov / codecov/patch

ractor/src/actor/from_actor_ref.rs#L50-L52

Added lines #L50 - L52 were not covered by tests
};
FromActorRef::<TFrom> {
converter: Box::new(cast_and_send),
cell: self.get_cell(),
}
}
}
1 change: 1 addition & 0 deletions ractor/src/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ pub mod actor_cell;
pub mod actor_id;
pub(crate) mod actor_properties;
pub mod actor_ref;
pub mod from_actor_ref;
mod supervision;

#[cfg(test)]
Expand Down
78 changes: 78 additions & 0 deletions ractor/src/actor/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::sync::{
};

use crate::{
actor::from_actor_ref::FromActorRef,
common_test::periodic_check,
concurrency::{sleep, Duration},
MessagingErr, RactorErr,
Expand Down Expand Up @@ -1197,3 +1198,80 @@ async fn wait_for_death() {
actor.stop(None);
handle.await.unwrap();
}

#[crate::concurrency::test]
#[tracing_test::traced_test]

Check warning on line 1203 in ractor/src/actor/tests/mod.rs

View check run for this annotation

Codecov / codecov/patch

ractor/src/actor/tests/mod.rs#L1203

Added line #L1203 was not covered by tests
async fn from_actor_ref() {
let result_counter = Arc::new(AtomicU32::new(0));

struct TestActor {
counter: Arc<AtomicU32>,
}

#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl Actor for TestActor {
type Msg = u32;
type Arguments = ();
type State = ();

async fn pre_start(
&self,
_this_actor: crate::ActorRef<Self::Msg>,
_: (),
) -> Result<Self::State, ActorProcessingErr> {
Ok(())
}

async fn handle(
&self,
_myself: ActorRef<Self::Msg>,
message: Self::Msg,
_state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
self.counter.fetch_add(message, Ordering::Relaxed);
Ok(())
}
}

let (actor, handle) = Actor::spawn(
None,
TestActor {
counter: result_counter.clone(),
},
(),
)
.await
.expect("Actor failed to start");

let mut sum: u32 = 0;

let from_u8: FromActorRef<u8> = actor.clone().from_ref();
let u8_message: u8 = 1;
sum += u8_message as u32;
from_u8
.send_message(u8_message)
.expect("Failed to send message to actor");

periodic_check(
|| result_counter.load(Ordering::Relaxed) == sum,
Duration::from_millis(500),
)
.await;

let from_u16: FromActorRef<u16> = actor.clone().from_ref();
let u16_message: u16 = 2;
sum += u16_message as u32;
from_u16
.send_message(u16_message)
.expect("Failed to send message to actor");

periodic_check(
|| result_counter.load(Ordering::Relaxed) == sum,
Duration::from_millis(500),
)
.await;

// cleanup
actor.stop(None);
handle.await.unwrap();
}

0 comments on commit 8479a89

Please sign in to comment.