From 3a4ab27b767707e23b17b1c99bd57eb6072ee372 Mon Sep 17 00:00:00 2001 From: Sean Lawlor Date: Tue, 10 Jan 2023 21:39:01 -0500 Subject: [PATCH] Initial sketchings of how distributed node's might look, based heavily on the Erlang protocol. This is a collection of tcp managing actors and session management for automated session handling Related issue: #16 --- .github/workflows/ci.yaml | 6 + Cargo.toml | 1 + ractor-cluster/Cargo.toml | 38 ++ ractor-cluster/src/build.rs | 31 ++ ractor-cluster/src/hash.rs | 25 + .../mod.rs => ractor-cluster/src/lib.rs | 43 +- ractor-cluster/src/macros.rs | 150 ++++++ ractor-cluster/src/net/listener.rs | 97 ++++ ractor-cluster/src/net/mod.rs | 21 + ractor-cluster/src/net/session.rs | 377 ++++++++++++++ ractor-cluster/src/node/auth.rs | 168 ++++++ ractor-cluster/src/node/client.rs | 73 +++ ractor-cluster/src/node/mod.rs | 378 ++++++++++++++ ractor-cluster/src/node/node_session.rs | 488 ++++++++++++++++++ ractor-cluster/src/protocol/auth.proto | 116 +++++ ractor-cluster/src/protocol/meta.proto | 23 + ractor-cluster/src/protocol/mod.rs | 24 + ractor-cluster/src/protocol/node.proto | 56 ++ ractor-cluster/src/remote_actor/mod.rs | 118 +++++ ractor/Cargo.toml | 7 +- ractor/benches/actor.rs | 4 +- ractor/examples/philosophers.rs | 2 +- .../src/actor/actor_cell/actor_properties.rs | 40 +- ractor/src/actor/actor_cell/mod.rs | 75 ++- ractor/src/actor/messages.rs | 53 +- ractor/src/actor/mod.rs | 91 +++- ractor/src/actor_id.rs | 33 +- ractor/src/lib.rs | 34 +- ractor/src/message.rs | 156 ++++++ ractor/src/port/mod.rs | 22 +- ractor/src/registry/mod.rs | 34 ++ ractor/src/remote/mod.rs | 35 ++ ractor/src/rpc/mod.rs | 18 +- ractor/src/time/tests.rs | 2 +- 34 files changed, 2707 insertions(+), 132 deletions(-) create mode 100644 ractor-cluster/Cargo.toml create mode 100644 ractor-cluster/src/build.rs create mode 100644 ractor-cluster/src/hash.rs rename ractor/src/distributed/mod.rs => ractor-cluster/src/lib.rs (53%) create mode 100644 ractor-cluster/src/macros.rs create mode 100644 ractor-cluster/src/net/listener.rs create mode 100644 ractor-cluster/src/net/mod.rs create mode 100644 ractor-cluster/src/net/session.rs create mode 100644 ractor-cluster/src/node/auth.rs create mode 100644 ractor-cluster/src/node/client.rs create mode 100644 ractor-cluster/src/node/mod.rs create mode 100644 ractor-cluster/src/node/node_session.rs create mode 100644 ractor-cluster/src/protocol/auth.proto create mode 100644 ractor-cluster/src/protocol/meta.proto create mode 100644 ractor-cluster/src/protocol/mod.rs create mode 100644 ractor-cluster/src/protocol/node.proto create mode 100644 ractor-cluster/src/remote_actor/mod.rs create mode 100644 ractor/src/message.rs create mode 100644 ractor/src/remote/mod.rs diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 3ed601d0..1d9f0a2b 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -17,6 +17,12 @@ jobs: - name: Run the default tests package: ractor # flags: + - name: Test ractor with the `cluster` feature + package: ractor + flags: -F cluster + - name: Test ractor-cluster + package: ractor-cluster + # flags: steps: - uses: actions/checkout@main diff --git a/Cargo.toml b/Cargo.toml index 0515a6f6..eae5cf38 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,7 @@ members = [ "ractor", + "ractor-cluster", "ractor-playground", "xtask" ] diff --git a/ractor-cluster/Cargo.toml b/ractor-cluster/Cargo.toml new file mode 100644 index 00000000..6ec01c54 --- /dev/null +++ b/ractor-cluster/Cargo.toml @@ -0,0 +1,38 @@ +[package] +name = "ractor-cluster" +version = "0.4.0" +authors = ["Sean Lawlor", "Evan Au", "Dillon George"] +description = "Distributed cluster environment of Ractor actors" +documentation = "https://docs.rs/ractor" +license = "MIT" +edition = "2018" +keywords = ["actor", "ractor", "cluster"] +repository = "https://github.com/slawlor/ractor" +readme = "../README.md" +homepage = "https://github.com/slawlor/ractor" +categories = ["actor", "erlang"] +build = "src/build.rs" + +[build-dependencies] +protobuf-src = "1" +prost-build = { version = "0.11" } + +[dependencies] +## Required dependencies +async-trait = "0.1" +bytes = { version = "1" } +# dashmap = "5" +# futures = "0.3" +log = "0.4" +# once_cell = "1" +prost = { version = "0.11" } +ractor = { version = "0.4", features = ["cluster"], path = "../ractor" } +rand = "0.8" +sha2 = "0.10" +tokio = { version = "1", features = ["rt", "time", "sync", "macros", "net", "io-util"]} + +## Optional dependencies +# tokio-rustls = { version = "0.23", optional = true } + +[dev-dependencies] +tokio = { version = "1", features = ["rt", "time", "sync", "macros", "rt-multi-thread"] } diff --git a/ractor-cluster/src/build.rs b/ractor-cluster/src/build.rs new file mode 100644 index 00000000..d1b4fc48 --- /dev/null +++ b/ractor-cluster/src/build.rs @@ -0,0 +1,31 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! This is the pre-compilation build script for the crate `ractor` when running in distrubted +//! mode. It's used to compile protobuf into Rust code prior to compilation. + +/// The shared-path for all protobuf specifications +const PROTOBUF_BASE_DIRECTORY: &str = "src/protocol"; +/// The list of protobuf files to generate inside PROBUF_BASE_DIRECTORY +const PROTOBUF_FILES: [&str; 3] = ["meta", "node", "auth"]; + +fn build_protobufs() { + std::env::set_var("PROTOC", protobuf_src::protoc()); + + let mut protobuf_files = Vec::with_capacity(PROTOBUF_FILES.len()); + + for file in PROTOBUF_FILES.iter() { + let proto_file = format!("{}/{}.proto", PROTOBUF_BASE_DIRECTORY, file); + println!("cargo:rerun-if-changed={}", proto_file); + protobuf_files.push(proto_file); + } + + prost_build::compile_protos(&protobuf_files, &[PROTOBUF_BASE_DIRECTORY]).unwrap(); +} + +fn main() { + // compile the spec files into Rust code + build_protobufs(); +} diff --git a/ractor-cluster/src/hash.rs b/ractor-cluster/src/hash.rs new file mode 100644 index 00000000..fc03b11e --- /dev/null +++ b/ractor-cluster/src/hash.rs @@ -0,0 +1,25 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! Hashing utilities mainly used around challenge computation + +pub(crate) const DIGEST_BYTES: usize = 32; +pub(crate) type Digest = [u8; DIGEST_BYTES]; + +/// Compute a challenge digest +pub(crate) fn challenge_digest(secret: &'_ str, challenge: u32) -> Digest { + use sha2::Digest; + + let secret_bytes = secret.as_bytes(); + let mut data = Vec::with_capacity(secret_bytes.len() + 4); + + let challenge_bytes = challenge.to_be_bytes(); + data.copy_from_slice(&challenge_bytes); + data[4..].copy_from_slice(secret_bytes); + + let hash = sha2::Sha256::digest(&data); + + hash.into() +} diff --git a/ractor/src/distributed/mod.rs b/ractor-cluster/src/lib.rs similarity index 53% rename from ractor/src/distributed/mod.rs rename to ractor-cluster/src/lib.rs index acdf01ea..ed43f0a6 100644 --- a/ractor/src/distributed/mod.rs +++ b/ractor-cluster/src/lib.rs @@ -4,33 +4,36 @@ // LICENSE-MIT file in the root directory of this source tree. //! Support for remote nodes in a distributed cluster. -//! +//! //! A node is the same as [Erlang's definition](https://www.erlang.org/doc/reference_manual/distributed.html) //! for distributed Erlang, in that it's a remote "hosting" process in the distributed pool of processes. -//! +//! //! In this realization, nodes are simply actors which handle an external connection to the other nodes in the pool. //! When nodes connect, they identify all of the nodes the remote node is also connected to and additionally connect //! to them as well. They merge registries and pg groups together in order to create larger clusters of services. -//! -//! For messages to be transmittable across the [Node] boundaries to other [Node]s in the pool, they need to be -//! serializable to a binary format (say protobuf) +//! +//! We have chosen protobuf for our inter-node defined protocol, however you can chose whatever medium you like +//! for binary serialization + deserialization. The "remote" actor will simply encode your message type and send it +//! over the wire for you -use dashmap::DashMap; +// #![deny(warnings)] +#![warn(unused_imports)] +#![warn(unsafe_code)] +#![warn(missing_docs)] +#![warn(unused_crate_dependencies)] +#![cfg_attr(docsrs, feature(doc_cfg))] +#![feature(min_specialization)] -/// Represents messages that can cross the node boundary which can be serialized and sent over the wire -pub trait NodeSerializableMessage { - /// Serialize the message to binary - fn serialize(&self) -> &[u8]; +mod hash; +mod net; +pub mod node; +pub(crate) mod protocol; +pub(crate) mod remote_actor; - /// Deserialize from binary back into the message type - fn deserialize(&self, data: &[u8]) -> Self; -} +pub mod macros; -/// The identifier of a node is a globally unique u64 -pub type NodeId = u64; +// Re-exports +pub use node::NodeServer; -/// A node in the distributed compute pool. -pub struct Node { - node_id: u64, - other_nodes: DashMap, -} \ No newline at end of file +/// Node's are representing by an integer id +pub type NodeId = u64; diff --git a/ractor-cluster/src/macros.rs b/ractor-cluster/src/macros.rs new file mode 100644 index 00000000..e2189345 --- /dev/null +++ b/ractor-cluster/src/macros.rs @@ -0,0 +1,150 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! Macro helpers for remote actors + +/// `serialized_rpc_forward!` converts a traditional RPC port to a port which recieves a serialized +/// [Vec<_>] message and can rebuild the reply. This is necessary for RPCs which can occur over the network. +/// +/// However when defining the serialized logic, the cost will ONLY be incurred for actors which live +/// on another `node()`, never locally. Local actors will always use the local [ractor::message::BoxedMessage] +/// notation. +/// +/// An example usage is +/// ```rust +/// #![feature(min_specialization)] +/// use ractor::concurrency::Duration; +/// use ractor::{RpcReplyPort, Message}; +/// use ractor::message::SerializedMessage; +/// use ractor_cluster::serialized_rpc_forward; +/// +/// enum MessageType { +/// #[allow(unused)] +/// Cast(String), +/// #[allow(unused)] +/// Call(String, RpcReplyPort), +/// } +/// +/// impl Message for MessageType { +/// fn serializable() -> bool { +/// true +/// } +/// +/// fn serialize(self) -> SerializedMessage { +/// match self { +/// Self::Cast(args) => SerializedMessage::Cast(args.into_bytes()), +/// Self::Call(args, reply) => { +/// let tx = serialized_rpc_forward!(reply, |bytes| String::from_utf8(bytes).unwrap()); +/// SerializedMessage::Call(args.into_bytes(), tx.into()) +/// } +/// } +/// } +/// } +/// ``` +#[macro_export] +macro_rules! serialized_rpc_forward { + ($typed_port:expr, $converter:expr) => {{ + let (tx, rx) = ractor::concurrency::oneshot(); + ractor::concurrency::spawn(async move { + match $typed_port.get_timeout() { + Some(timeout) => { + if let Ok(Ok(result)) = ractor::concurrency::timeout(timeout, rx).await { + let _ = $typed_port.send($converter(result)); + } + } + None => { + if let Ok(result) = rx.await { + let _ = $typed_port.send($converter(result)); + } + } + } + }); + tx + }}; +} + +#[cfg(test)] +mod tests { + use ractor::concurrency::Duration; + use ractor::message::SerializedMessage; + use ractor::{Message, RpcReplyPort}; + + enum MessageType { + #[allow(unused)] + Cast(String), + #[allow(unused)] + Call(String, RpcReplyPort), + } + + impl Message for MessageType { + fn serializable() -> bool { + true + } + + fn serialize(self) -> SerializedMessage { + match self { + Self::Cast(args) => SerializedMessage::Cast(args.into_bytes()), + Self::Call(args, reply) => { + let tx = + serialized_rpc_forward!(reply, |bytes| String::from_utf8(bytes).unwrap()); + SerializedMessage::Call(args.into_bytes(), tx.into()) + } + } + } + } + + #[tokio::test] + async fn no_timeout_rpc() { + let (tx, rx) = ractor::concurrency::oneshot(); + let no_timeout = MessageType::Call("test".to_string(), tx.into()); + let no_timeout_serialized = no_timeout.serialize(); + match no_timeout_serialized { + SerializedMessage::Call(args, reply) => { + let _ = reply.send(args); + } + _ => panic!("Invalid"), + } + + let no_timeout_reply = rx.await.expect("Receive error"); + assert_eq!(no_timeout_reply, "test".to_string()); + } + + #[tokio::test] + async fn with_timeout_rpc() { + let (tx, rx) = ractor::concurrency::oneshot(); + let duration = Duration::from_millis(10); + let with_timeout = MessageType::Call("test".to_string(), (tx, duration).into()); + + let with_timeout_serialized = with_timeout.serialize(); + match with_timeout_serialized { + SerializedMessage::Call(args, reply) => { + let _ = reply.send(args); + } + _ => panic!("Invalid"), + } + + let with_timeout_reply = rx.await.expect("Receive error"); + assert_eq!(with_timeout_reply, "test".to_string()); + } + + #[tokio::test] + async fn timeout_rpc() { + let (tx, rx) = ractor::concurrency::oneshot(); + let duration = Duration::from_millis(10); + let with_timeout = MessageType::Call("test".to_string(), (tx, duration).into()); + + let with_timeout_serialized = with_timeout.serialize(); + match with_timeout_serialized { + SerializedMessage::Call(args, reply) => { + ractor::concurrency::sleep(Duration::from_millis(50)).await; + let _ = reply.send(args); + } + _ => panic!("Invalid"), + } + + let result = rx.await; + assert!(matches!(result, Err(_))); + } +} diff --git a/ractor-cluster/src/net/listener.rs b/ractor-cluster/src/net/listener.rs new file mode 100644 index 00000000..a61667be --- /dev/null +++ b/ractor-cluster/src/net/listener.rs @@ -0,0 +1,97 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! TCP Server to accept incoming sessions + +use ractor::cast; +use ractor::{Actor, ActorRef}; +use tokio::net::TcpListener; + +use crate::node::SessionManagerMessage; + +/// A Tcp Socket [Listener] responsible for accepting new connections and spawning [super::session::Session]s +/// which handle the message sending and receiving over the socket. +/// +/// The [Listener] supervises all of the TCP [super::session::Session] actors and is responsible for logging +/// connects and disconnects as well as tracking the current open [super::session::Session] actors. +pub struct Listener { + port: super::NetworkPort, + session_manager: ActorRef, +} + +impl Listener { + /// Create a new `Listener` + pub fn new( + port: super::NetworkPort, + session_manager: ActorRef, + ) -> Self { + Self { + port, + session_manager, + } + } +} + +/// The Node listener's state +pub struct ListenerState { + listener: Option, +} + +#[async_trait::async_trait] +impl Actor for Listener { + type Msg = (); + + type State = ListenerState; + + async fn pre_start(&self, myself: ActorRef) -> Self::State { + let addr = format!("0.0.0.0:{}", self.port); + let listener = match TcpListener::bind(&addr).await { + Ok(l) => l, + Err(err) => { + panic!("Error listening to socket: {}", err); + } + }; + + // startup the event processing loop by sending an initial msg + let _ = myself.cast(()); + + // create the initial state + Self::State { + listener: Some(listener), + } + } + + async fn post_stop(&self, _myself: ActorRef, state: &mut Self::State) { + // close the listener properly, in case anyone else has handles to the actor stopping + // total droppage + drop(state.listener.take()); + } + + async fn handle(&self, myself: ActorRef, _message: Self::Msg, state: &mut Self::State) { + if let Some(listener) = &mut state.listener { + match listener.accept().await { + Ok((stream, addr)) => { + let _ = cast!( + self.session_manager, + SessionManagerMessage::ConnectionOpened { + stream, + is_server: true + } + ); + log::info!("TCP Session opened for {}", addr); + } + Err(socket_accept_error) => { + log::warn!( + "Error accepting socket {} on Node server", + socket_accept_error + ); + } + } + } + + // continue accepting new sockets + let _ = myself.cast(()); + } +} diff --git a/ractor-cluster/src/net/mod.rs b/ractor-cluster/src/net/mod.rs new file mode 100644 index 00000000..33f4c3bb --- /dev/null +++ b/ractor-cluster/src/net/mod.rs @@ -0,0 +1,21 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! TCP server and session actors which transmit [prost::Message] encoded messages + +// TODO: we need a way to identify which session messages are coming from + going to. Therefore +// we should actually have a notification when a new session is launched, which can be used +// to match which session is tied to which actor id + +pub mod listener; +pub mod session; + +/// A trait which implements [prost::Message], [Default], and has a static lifetime +/// denoting protobuf-encoded messages which can be transmitted over the wire +pub trait NetworkMessage: prost::Message + Default + 'static {} +impl NetworkMessage for T {} + +/// A network port +pub type NetworkPort = u16; diff --git a/ractor-cluster/src/net/session.rs b/ractor-cluster/src/net/session.rs new file mode 100644 index 00000000..a54ce9b8 --- /dev/null +++ b/ractor-cluster/src/net/session.rs @@ -0,0 +1,377 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! TCP session actor which is managing the specific communication to a node + +// TODO: RUSTLS + Tokio : https://github.com/tokio-rs/tls/blob/master/tokio-rustls/examples/server/src/main.rs + +use std::marker::PhantomData; +use std::net::SocketAddr; + +use bytes::Bytes; +use prost::Message; +use ractor::{Actor, ActorCell, ActorRef}; +use ractor::{SpawnErr, SupervisionEvent}; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncWriteExt; +use tokio::io::ErrorKind; +use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; +use tokio::net::TcpStream; + +use super::NetworkMessage; + +/// Helper method to read exactly `len` bytes from the stream into a pre-allocated buffer +/// of bytes +async fn read_n_bytes(stream: &mut OwnedReadHalf, len: usize) -> Result, tokio::io::Error> { + let mut buf = Vec::with_capacity(len); + let mut c_len = 0; + while c_len < len { + let n = stream.read(&mut buf[c_len..]).await?; + c_len += n; + } + Ok(buf) +} + +// ========================= Node Session actor ========================= // + +/// Represents a bi-directional tcp connection along with send + receive operations +/// +/// The [Session] actor supervises two child actors, [SessionReader] and [SessionWriter]. Should +/// either the reader or writer exit, they will terminate the entire session. +pub struct Session { + pub(crate) handler: ActorRef, + pub(crate) addr: SocketAddr, +} + +impl Session { + pub(crate) async fn spawn_linked( + handler: ActorRef, + stream: TcpStream, + addr: SocketAddr, + supervisor: ActorCell, + ) -> Result, SpawnErr> { + match Actor::spawn_linked(None, Session { handler, addr }, supervisor).await { + Err(err) => { + log::error!("Failed to spawn session writer actor: {}", err); + Err(err) + } + Ok((a, _)) => { + // intiialize this actor & its children + let _ = a.cast(SessionMessage::SetStream(stream)); + // return the actor handle + Ok(a) + } + } + } +} + +/// The node connection messages +pub enum SessionMessage { + /// Set the session's tcp stream, which initializes all underlying states + SetStream(TcpStream), + + /// Send a message over the channel + Send(crate::protocol::NetworkMessage), + + /// An object was received on the channel + ObjectAvailable(crate::protocol::NetworkMessage), +} + +/// The node session's state +pub struct SessionState { + writer: ActorRef>, + reader: ActorRef, +} + +#[async_trait::async_trait] +impl Actor for Session { + type Msg = SessionMessage; + type State = SessionState; + + async fn pre_start(&self, myself: ActorRef) -> Self::State { + // spawn writer + reader child actors + let (writer, _) = Actor::spawn_linked( + None, + SessionWriter:: { + _phantom: PhantomData, + }, + myself.get_cell(), + ) + .await + .expect("Failed to start session writer"); + let (reader, _) = Actor::spawn_linked( + None, + SessionReader { + session: myself.clone(), + }, + myself.get_cell(), + ) + .await + .expect("Failed to start session reader"); + + Self::State { writer, reader } + } + + async fn post_stop(&self, _myself: ActorRef, _state: &mut Self::State) { + log::info!("TCP Session closed for {}", self.addr); + } + + async fn handle(&self, _myself: ActorRef, message: Self::Msg, state: &mut Self::State) { + match message { + Self::Msg::SetStream(stream) => { + let (read, write) = stream.into_split(); + // initialize the writer & reader state's + let _ = state.writer.cast(SessionWriterMessage::SetStream(write)); + let _ = state.reader.cast(SessionReaderMessage::SetStream(read)); + } + Self::Msg::Send(msg) => { + let _ = state.writer.cast(SessionWriterMessage::WriteObject(msg)); + } + Self::Msg::ObjectAvailable(msg) => { + let _ = self + .handler + .cast(crate::node::SessionMessage::MessageReceived(msg)); + } + } + } + + async fn handle_supervisor_evt( + &self, + myself: ActorRef, + message: SupervisionEvent, + state: &mut Self::State, + ) { + // sockets open, they close, the world goes round... If a reader or writer exits for any reason, we'll start the shutdown procedure + // which requires that all actors exit + match message { + SupervisionEvent::ActorPanicked(actor, panic_msg) => { + if actor.get_id() == state.reader.get_id() { + log::error!("TCP Session's reader panicked with '{}'", panic_msg); + } else if actor.get_id() == state.writer.get_id() { + log::error!("TCP Session's writer panicked with '{}'", panic_msg); + } else { + log::error!("TCP Session received a child panic from an unknown child actor ({}) - '{}'", actor.get_id(), panic_msg); + } + myself.stop(Some("child_panic".to_string())); + } + SupervisionEvent::ActorTerminated(actor, _, exit_reason) => { + if actor.get_id() == state.reader.get_id() { + log::debug!("TCP Session's reader exited"); + } else if actor.get_id() == state.writer.get_id() { + log::debug!("TCP Session's writer exited"); + } else { + log::warn!("TCP Session received a child exit from an unknown child actor ({}) - '{:?}'", actor.get_id(), exit_reason); + } + myself.stop(Some("child_terminate".to_string())); + } + _ => { + // all ok + } + } + } +} + +// ========================= Node Session writer ========================= // + +struct SessionWriter +where + TMsg: NetworkMessage, +{ + _phantom: PhantomData, +} + +struct SessionWriterState { + writer: Option, +} + +enum SessionWriterMessage +where + TMsg: NetworkMessage, +{ + /// Set the stream, providing a [TcpStream], which + /// to utilize for this node's connection + SetStream(OwnedWriteHalf), + + /// Write an object over the wire + WriteObject(TMsg), +} + +#[async_trait::async_trait] +impl Actor for SessionWriter +where + TMsg: NetworkMessage, +{ + type Msg = SessionWriterMessage; + + type State = SessionWriterState; + + async fn pre_start(&self, _myself: ActorRef) -> Self::State { + // OK we've established connection, now we can process requests + + Self::State { writer: None } + } + + async fn post_stop(&self, _myself: ActorRef, state: &mut Self::State) { + // drop the channel to close it should we be exiting + drop(state.writer.take()); + } + + async fn handle(&self, _myself: ActorRef, message: Self::Msg, state: &mut Self::State) { + match message { + Self::Msg::SetStream(stream) if state.writer.is_none() => { + state.writer = Some(stream); + } + Self::Msg::WriteObject(msg) if state.writer.is_some() => { + if let Some(stream) = &mut state.writer { + stream.writable().await.unwrap(); + + let length = msg.encoded_len(); + + // encode the length into a buffer and first write those bytes + let mut length_buf = Vec::with_capacity(10); + prost::encode_length_delimiter(length, &mut length_buf).unwrap(); + if let Err(write_err) = stream.write_all(&length_buf).await { + log::warn!("Error writing to the stream: '{}'", write_err); + } else { + // Serialize the full object and write it over the wire + let mut buf = Vec::with_capacity(length); + msg.encode(&mut buf).unwrap(); + if let Err(write_err) = stream.write_all(&buf).await { + log::warn!("Error writing to the stream: '{}'", write_err); + } + } + } + } + _ => { + // no-op, wait for next send request + } + } + } +} + +// ========================= Node Session reader ========================= // + +struct SessionReader { + session: ActorRef, +} + +/// The node connection messages +pub enum SessionReaderMessage { + /// Set the stream, providing a [TcpStream], which + /// to utilize for this node's connection + SetStream(OwnedReadHalf), + + /// Wait for an object from the stream + WaitForObject, + + /// Read next object off the stream + ReadObject(usize), +} + +struct SessionReaderState { + reader: Option, +} + +#[async_trait::async_trait] +impl Actor for SessionReader { + type Msg = SessionReaderMessage; + + type State = SessionReaderState; + + async fn pre_start(&self, _myself: ActorRef) -> Self::State { + Self::State { reader: None } + } + + async fn post_stop(&self, _myself: ActorRef, state: &mut Self::State) { + // drop the channel to close it should we be exiting + drop(state.reader.take()); + } + + async fn handle(&self, myself: ActorRef, message: Self::Msg, state: &mut Self::State) { + match message { + Self::Msg::SetStream(stream) if state.reader.is_none() => { + state.reader = Some(stream); + // wait for an incoming object + let _ = myself.cast(SessionReaderMessage::WaitForObject); + } + Self::Msg::WaitForObject if state.reader.is_some() => { + if let Some(stream) = &mut state.reader { + stream.readable().await.unwrap(); + match read_n_bytes(stream, 10).await { + Ok(buf) => { + let bytes = Bytes::from(buf); + match prost::decode_length_delimiter(bytes) { + Ok(protobuf_len) => { + let _ = + myself.cast(SessionReaderMessage::ReadObject(protobuf_len)); + return; + } + Err(decode_err) => { + log::warn!( + "Failed to decode protobuf object length with {}", + decode_err + ); + // continue processing + } + } + } + Err(err) if err.kind() == ErrorKind::UnexpectedEof => { + // EOF, close the stream by dropping the stream + drop(state.reader.take()); + myself.stop(Some("channel_closed".to_string())); + } + Err(_other_err) => { + // some other TCP error, more handling necessary + } + } + } + + let _ = myself.cast(SessionReaderMessage::WaitForObject); + } + Self::Msg::ReadObject(length) if state.reader.is_some() => { + if let Some(stream) = &mut state.reader { + match read_n_bytes(stream, length).await { + Ok(buf) => { + // NOTE: Our implementation writes 2 messages when sending something over the wire, the first + // is exactly 10 bytes which constitute the length of the payload message, followed by the payload. + // This tells our TCP reader how much data to read off the wire + + // [buf] here should contain the exact amount of data to decode an object properly. + let bytes = Bytes::from(buf); + match crate::protocol::NetworkMessage::decode(bytes) { + Ok(msg) => { + // we decoded a message, pass it up the chain + let _ = self.session.cast(SessionMessage::ObjectAvailable(msg)); + } + Err(decode_err) => { + log::error!( + "Error decoding network message: '{}'. Discarding", + decode_err + ); + } + } + } + Err(err) if err.kind() == ErrorKind::UnexpectedEof => { + // EOF, close the stream by dropping the stream + drop(state.reader.take()); + myself.stop(Some("channel_closed".to_string())); + return; + } + Err(_other_err) => { + // TODO: some other TCP error, more handling necessary + } + } + } + + // we've read the object, now wait for next object + let _ = myself.cast(SessionReaderMessage::WaitForObject); + } + _ => { + // no stream is available, keep looping until one is available + let _ = myself.cast(SessionReaderMessage::WaitForObject); + } + } + } +} diff --git a/ractor-cluster/src/node/auth.rs b/ractor-cluster/src/node/auth.rs new file mode 100644 index 00000000..4d45cea4 --- /dev/null +++ b/ractor-cluster/src/node/auth.rs @@ -0,0 +1,168 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! Define's a node's authentication process between peers. Definition +//! can be found in [Erlang's handshake](https://www.erlang.org/doc/apps/erts/erl_dist_protocol.html) + +use rand::RngCore; + +use crate::hash::Digest; +use crate::protocol::auth as proto; + +/// Server authentication FSM +pub(crate) enum ServerAuthenticationProcess { + /// (1) Client initiates handshake by sending their peer name + WaitingOnPeerName, + + /// (2) We have the peer name, and have replied with our own [proto::ServerStatus] + /// reply + HavePeerName(proto::NameMessage), + + /// (2B) Waiting on the client's status (true/false), if [proto::ClientStatus] was `alive` + WaitingOnClientStatus, + + /// (3) Waiting on the client's reply to the [proto::Challenge] from the server. + /// State is the name message from the client, the challenge, and the expected digest reply + /// from the client + /// + /// Arguments are the challenge to send to the client and the expected digest we should get back + WaitingOnClientChallengeReply(u32, Digest), + + /// (4) We processed the client challenge value, and replied and we're ok with the channel. + /// The client has the final decision after they check our challenge computation which we send + /// with [proto::ChallengeAck] + /// + /// Argument is the digest to send to the client + Ok(Digest), + + /// Close + Close, +} + +impl ServerAuthenticationProcess { + /// Initialize the FSM state + pub fn init() -> Self { + Self::WaitingOnPeerName + } + + pub fn start_challenge(&self, cookie: &'_ str) -> Self { + if matches!(self, Self::WaitingOnClientStatus | Self::HavePeerName(_)) { + let challenge = rand::thread_rng().next_u32(); + let digest = crate::hash::challenge_digest(cookie, challenge); + Self::WaitingOnClientChallengeReply(challenge, digest) + } else { + Self::Close + } + } + + /// Implement the FSM state transitions + pub fn next(&self, auth_message: proto::AuthenticationMessage, cookie: &'_ str) -> Self { + if let Some(msg) = auth_message.msg { + match msg { + proto::authentication_message::Msg::Name(name) => { + if let Self::WaitingOnPeerName = &self { + return Self::HavePeerName(name); + } + } + proto::authentication_message::Msg::ClientStatus(status) => { + if let Self::WaitingOnClientStatus = &self { + // client says to not continue the session + if !status.status { + return Self::Close; + } else { + return self.start_challenge(cookie); + } + } + } + proto::authentication_message::Msg::ClientChallenge(challenge_reply) => { + if let Self::WaitingOnClientChallengeReply(_, digest) = &self { + if digest.to_vec() == challenge_reply.digest { + let reply_digest = + crate::hash::challenge_digest(cookie, challenge_reply.challenge); + return Self::Ok(reply_digest); + } else { + // digest's don't match! + return Self::Close; + } + } + } + _ => {} + } + } + // received either an empty message or an out-of-order message. The node can't be trusted + Self::Close + } +} + +/// Client authentication FSM +pub(crate) enum ClientAuthenticationProcess { + /// (1) After the client has sent their peer name + /// they wait for the [proto::ServerStatus] from the server + WaitingForServerStatus, + + /// (2) We've potentially sent our client status. Either way + /// we're waiting for the [proto::Challenge] from the server + WaitingForServerChallenge(proto::ServerStatus), + + /// (3) We've sent our challenge to the server, and we're waiting + /// on the server's calculation to determine if we should open the + /// channel. State is our challenge value and the expected digest + /// + /// Arguments are servers_challenge, server_digest_reply, client_challenge_value, expected_digest + WaitingForServerChallengeAck(proto::Challenge, Digest, u32, Digest), + + /// (4) We've validated the server's challenge digest and agree + /// that the channel is now open for node inter-communication + Ok, + + /// Close + Close, +} + +impl ClientAuthenticationProcess { + /// Initialize the FSM state + pub fn init() -> Self { + Self::WaitingForServerStatus + } + + /// Implement the client FSM transitions + pub fn next(&self, auth_message: proto::AuthenticationMessage, cookie: &'_ str) -> Self { + if let Some(msg) = auth_message.msg { + match msg { + proto::authentication_message::Msg::ServerStatus(status) => { + if let Self::WaitingForServerStatus = &self { + return Self::WaitingForServerChallenge(status); + } + } + proto::authentication_message::Msg::ServerChallenge(challenge_msg) => { + if let Self::WaitingForServerChallenge(_) = &self { + let server_digest = + crate::hash::challenge_digest(cookie, challenge_msg.challenge); + let challenge = rand::thread_rng().next_u32(); + let expected_digest = crate::hash::challenge_digest(cookie, challenge); + return Self::WaitingForServerChallengeAck( + challenge_msg, + server_digest, + challenge, + expected_digest, + ); + } + } + proto::authentication_message::Msg::ServerAck(challenge_ack) => { + if let Self::WaitingForServerChallengeAck(_, _, _, expected_digest) = &self { + if expected_digest.to_vec() == challenge_ack.digest { + return Self::Ok; + } else { + return Self::Close; + } + } + } + _ => {} + } + } + // received either an empty message or an out-of-order message. The node can't be trusted + Self::Close + } +} diff --git a/ractor-cluster/src/node/client.rs b/ractor-cluster/src/node/client.rs new file mode 100644 index 00000000..a3680011 --- /dev/null +++ b/ractor-cluster/src/node/client.rs @@ -0,0 +1,73 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! This module contains the logic for initiating client requests to other [super::NodeServer]s + +use ractor::{cast, ActorRef, MessagingErr, SpawnErr}; +use tokio::net::TcpStream; + +/// Client connection error types +pub enum ClientConnectError { + /// Socket failed to bind, returning the underlying tokio error + Socket(tokio::io::Error), + /// Error communicating to the [super::NodeServer] actor. Actor receiving port is + /// closed + Messaging(MessagingErr), + /// A timeout in trying to start a new [NodeSession] + Timeout, + /// Error spawning the tcp session actor supervision tree + TcpSpawn(SpawnErr), +} + +impl From for ClientConnectError { + fn from(value: tokio::io::Error) -> Self { + Self::Socket(value) + } +} + +impl From for ClientConnectError { + fn from(value: MessagingErr) -> Self { + Self::Messaging(value) + } +} + +impl From for ClientConnectError { + fn from(value: SpawnErr) -> Self { + Self::TcpSpawn(value) + } +} + +/// Connect to another [super::NodeServer] instance +/// +/// * `host` - The hostname to connect to +/// * `port` - The host's port to connect to +/// +/// Returns: [Ok(())] if the connection was successful and the [NodeSession] was started. Handshake will continue +/// automatically. Results in a [Err(ClientConnectError)] if any part of the process failed to initiate +pub async fn connect( + node_server: ActorRef, + host: &'static str, + port: crate::net::NetworkPort, +) -> Result<(), ClientConnectError> { + // connect to the socket + let stream = TcpStream::connect(format!("{host}:{port}")).await?; + + // Startup the TCP handler, linked to the newly created `NodeSession` + let addr = stream.peer_addr()?; + + let _ = cast!( + node_server, + super::SessionManagerMessage::ConnectionOpened { + stream, + is_server: false + } + ); + + // // notify the `NodeSession` about it's tcp connection + // let _ = session_handler.cast(super::SessionMessage::SetTcpSession(tcp_actor)); + log::info!("TCP Session opened for {}", addr); + + Ok(()) +} diff --git a/ractor-cluster/src/node/mod.rs b/ractor-cluster/src/node/mod.rs new file mode 100644 index 00000000..317fbfc0 --- /dev/null +++ b/ractor-cluster/src/node/mod.rs @@ -0,0 +1,378 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! Erlang `node()` host communication for managing remote actor communication in +//! a cluster +//! +//! The supervision tree is the following +//! +//! [NodeServer] supervises +//! 1. The server-socket TCP [crate::net::listener::Listener] +//! 2. All of the individual [NodeSession]s +//! +//! Each [NodeSession] supervises +//! 1. The TCP [crate::net::session::Session] connection +//! 2. (todo) All of the remote referenced actors. That way if the overall node session closes (due to tcp err for example) will lose connectivity +//! to all of the remote actors +//! +//! Each [crate::net::session::Session] supervises +//! 1. A TCP writer actor (`crate::net::session::SessionWriter`) +//! 2. A TCP reader actor (`crate::net::session::SessionReader`) +//! -> If either child actor closes, then it will terminate the overall [crate::net::session::Session] which in +//! turn will terminate the [NodeSession] and the [NodeServer] will de-register the [NodeSession] from its +//! internal state +//! + +/* +TODO: + +Overview: + +A `NodeServer` handles opening the TCP listener and managing incoming and outgoing `NodeSession` requests. `NodeSession`s +will represent a remote server locally. + +Additionally, you can open a session as a "client" by requesting a new session from the NodeServer +after intially connecting a [TcpStream] to the desired endpoint and then attaching the NodeSession +to the TcpStream (and linking the actor). (See src/node/client.rs) + +What's there to do? +1. The inter-node messaging protocol -> Based heavily on https://www.erlang.org/doc/apps/erts/erl_dist_protocol.html#protocol-between-connected-nodes +2. Having a [NodeSession] manage child actors from the remote system +3. Remote-supportive actors, which support serializing their payloads over the wire +4. Populating the global + pg registries with remote registered actors +5. Adjustments in the default Message type which allow messages to be serializable. (with `cluster` feature enabled) +6. Allow actor id's to be set for remote actors, tied to a specific node session + +*/ + +pub mod auth; +pub mod client; +pub mod node_session; +pub use node_session::NodeSession; +use tokio::net::TcpStream; + +use std::collections::HashMap; +use std::{cmp::Ordering, collections::hash_map::Entry}; + +use ractor::{cast, Actor, ActorId, ActorRef, RpcReplyPort, SupervisionEvent}; + +use crate::protocol::auth as auth_protocol; + +const PROTOCOL_VERSION: u32 = 1; + +/// Reply to a [SessionManagerMessage::CheckSession] message +pub enum SessionCheckReply { + /// There is no other connection with this peer + NoOtherConnection, + /// There is another connection with this peer, and it + /// should continue. Shutdown this connection. + OtherConnectionContinues, + /// There is another connection with this peer, but + /// this connection should take over. Terminating the other + /// connection + ThisConnectionContinues, + /// There is another connection with the peer, + /// in the same format as this attempted connection. + /// Perhaps the other connection is dying or the peer is + /// confused + DuplicateConnection, +} + +impl From for auth_protocol::server_status::Status { + fn from(value: SessionCheckReply) -> Self { + match value { + SessionCheckReply::NoOtherConnection => Self::Ok, + SessionCheckReply::ThisConnectionContinues => Self::OkSimultaneous, + SessionCheckReply::OtherConnectionContinues => Self::NotOk, + SessionCheckReply::DuplicateConnection => Self::Alive, + } + } +} + +/// Messages to/from the session aggregator +pub enum SessionManagerMessage { + /// Notifies the session manager that a new incoming (`is_server = true`) or outgoing (`is_server = false`) + /// [TcpStream] was accepted + ConnectionOpened { + /// The [TcpStream] for this network connection + stream: TcpStream, + /// Flag denoting if it's a server (incoming) connection when [true], [false] for outgoing + is_server: bool, + }, + + /// A request to check if a session is currently open, and if it is is the ordering such that we should + /// reject the incoming request + /// + /// i.e. if A is connected to B and A.name > B.name, but then B connects to A, B's request to connect + /// to A should be rejected + CheckSession { + /// The peer's name to investigate + peer_name: auth_protocol::NameMessage, + /// Reply channel for RPC + reply: RpcReplyPort, + }, + + /// A request to update the session mapping with this now known node's name + UpdateSession { + /// The ID of the [NodeSession] actor + actor_id: ActorId, + /// The node's name (now that we've received it) + name: auth_protocol::NameMessage, + }, +} + +/// Message from the TCP [session::Session] actor and the +/// monitoring Sesson actor +pub enum SessionMessage { + /// The Session actor is setting it's handle + SetTcpStream(TcpStream), + + /// A network message was received from the network + MessageReceived(crate::protocol::NetworkMessage), + + /// Send a message over the node channel to the remote `node()` + SendMessage(crate::protocol::node::NodeMessage), +} + +/// Represents the server which is managing all node session instances +/// +/// The [NodeServer] supervises a single [crate::net::listener::Listener] actor which is +/// responsible for hosting a server port for incoming `node()` connections. It also supervises +/// all of the [NodeSession] actors which are tied to tcp sessions and manage the FSM around `node()`s +/// establishing inter connections. +pub struct NodeServer { + port: crate::net::NetworkPort, + cookie: String, + node_name: String, + hostname: String, +} + +impl NodeServer { + /// Create a new node server instance + pub fn new( + port: crate::net::NetworkPort, + cookie: String, + node_name: String, + hostname: String, + ) -> Self { + Self { + port, + cookie, + node_name, + hostname, + } + } +} + +struct NodeServerSessionInformation { + actor: ActorRef, + peer_name: Option, + is_server: bool, + node_id: u64, +} + +impl NodeServerSessionInformation { + fn new(actor: ActorRef, node_id: u64, is_server: bool) -> Self { + Self { + actor, + peer_name: None, + is_server, + node_id, + } + } + + fn update(&mut self, peer_name: auth_protocol::NameMessage) { + self.peer_name = Some(peer_name); + } +} + +/// The state of the node server +pub struct NodeServerState { + listener: ActorRef, + node_sessions: HashMap, + node_id_counter: u64, + this_node_name: auth_protocol::NameMessage, +} + +impl NodeServerState { + fn check_peers(&self, new_peer: auth_protocol::NameMessage) -> SessionCheckReply { + for (_key, value) in self.node_sessions.iter() { + if let Some(existing_peer) = &value.peer_name { + if existing_peer.name == new_peer.name { + match ( + existing_peer.name.cmp(&self.this_node_name.name), + value.is_server, + ) { + // the peer's name is > this node's name and they connected to us + // od + // the peer's name is < this node's name and we connected to them + (Ordering::Greater, true) | (Ordering::Less, false) => { + value.actor.stop(Some("duplicate_connection".to_string())); + return SessionCheckReply::OtherConnectionContinues; + } + (Ordering::Greater, false) | (Ordering::Less, true) => { + // the inverse of the first two conditions, terminate the other + // connection and let this one continue + return SessionCheckReply::ThisConnectionContinues; + } + _ => { + // something funky is going on... + return SessionCheckReply::DuplicateConnection; + } + } + } + } + } + SessionCheckReply::NoOtherConnection + } +} + +#[async_trait::async_trait] +impl Actor for NodeServer { + type Msg = SessionManagerMessage; + type State = NodeServerState; + async fn pre_start(&self, myself: ActorRef) -> Self::State { + let listener = crate::net::listener::Listener::new(self.port, myself.clone()); + + let (actor_ref, _) = Actor::spawn_linked(None, listener, myself.get_cell()) + .await + .expect("Failed to start listener"); + + Self::State { + node_sessions: HashMap::new(), + listener: actor_ref, + node_id_counter: 0, + this_node_name: auth_protocol::NameMessage { + flags: Some(auth_protocol::NodeFlags { + version: PROTOCOL_VERSION, + }), + name: format!("{}@{}", self.node_name, self.hostname), + }, + } + } + + async fn handle(&self, myself: ActorRef, message: Self::Msg, state: &mut Self::State) { + match message { + Self::Msg::ConnectionOpened { stream, is_server } => { + let node_id = state.node_id_counter; + if let Ok((actor, _)) = Actor::spawn_linked( + None, + NodeSession::new( + node_id, + is_server, + self.cookie.clone(), + myself.clone(), + state.this_node_name.clone(), + ), + myself.get_cell(), + ) + .await + { + let _ = cast!(actor, SessionMessage::SetTcpStream(stream)); + state.node_sessions.insert( + actor.get_id(), + NodeServerSessionInformation::new(actor.clone(), node_id, is_server), + ); + state.node_id_counter += 1; + } else { + // failed to startup actor, drop the socket + log::warn!("Failed to startup `NodeSession`, dropping connection"); + drop(stream); + } + } + Self::Msg::UpdateSession { actor_id, name } => { + if let Some(entry) = state.node_sessions.get_mut(&actor_id) { + entry.update(name); + } + } + Self::Msg::CheckSession { peer_name, reply } => { + let _ = reply.send(state.check_peers(peer_name)); + } + } + } + + async fn handle_supervisor_evt( + &self, + myself: ActorRef, + message: SupervisionEvent, + state: &mut Self::State, + ) { + match message { + SupervisionEvent::ActorPanicked(actor, msg) => { + if state.listener.get_id() == actor.get_id() { + log::error!( + "The Node server's TCP listener failed with '{}'. Respawning!", + msg + ); + + // try to re-create the listener. If it's a port-bind issue, we will have already panicked on + // trying to start the NodeServer + let listener = crate::net::listener::Listener::new(self.port, myself.clone()); + + let (actor_ref, _) = Actor::spawn_linked(None, listener, myself.get_cell()) + .await + .expect("Failed to start listener"); + state.listener = actor_ref; + } else { + match state.node_sessions.entry(actor.get_id()) { + Entry::Occupied(o) => { + log::warn!( + "Node session {:?} panicked with '{}'", + o.get().peer_name, + msg + ); + o.remove(); + } + Entry::Vacant(_) => { + log::warn!( + "An unknown actor ({:?}) panicked with '{}'", + actor.get_id(), + msg + ); + } + } + } + } + SupervisionEvent::ActorTerminated(actor, _, maybe_reason) => { + if state.listener.get_id() == actor.get_id() { + log::error!( + "The Node server's TCP listener exited with '{:?}'. Respawning!", + maybe_reason + ); + + // try to re-create the listener. If it's a port-bind issue, we will have already panicked on + // trying to start the NodeServer + let listener = crate::net::listener::Listener::new(self.port, myself.clone()); + + let (actor_ref, _) = Actor::spawn_linked(None, listener, myself.get_cell()) + .await + .expect("Failed to start listener"); + state.listener = actor_ref; + } else { + match state.node_sessions.entry(actor.get_id()) { + Entry::Occupied(o) => { + log::warn!( + "Node session {:?} exited with '{:?}'", + o.get().peer_name, + maybe_reason + ); + o.remove(); + } + Entry::Vacant(_) => { + log::warn!( + "An unknown actor ({:?}) exited with '{:?}'", + actor.get_id(), + maybe_reason + ); + } + } + } + } + _ => { + //no-op + } + } + } +} diff --git a/ractor-cluster/src/node/node_session.rs b/ractor-cluster/src/node/node_session.rs new file mode 100644 index 00000000..46bd7fdf --- /dev/null +++ b/ractor-cluster/src/node/node_session.rs @@ -0,0 +1,488 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! A [NodeSession] is an individual connection between a specific pair of +//! `node()`s and all of its authentication and communication for that +//! pairing + +use std::collections::HashMap; + +use ractor::message::SerializedMessage; +use ractor::rpc::CallResult; +use ractor::{Actor, ActorId, ActorRef, SupervisionEvent}; +use tokio::time::Duration; + +use super::{auth, NodeServer}; +use crate::net::session::SessionMessage; +use crate::protocol::auth as auth_protocol; +use crate::protocol::node as node_protocol; +use crate::remote_actor::RemoteActor; + +enum AuthenticationState { + AsClient(auth::ClientAuthenticationProcess), + AsServer(auth::ServerAuthenticationProcess), +} + +impl AuthenticationState { + fn is_ok(&self) -> bool { + match self { + Self::AsClient(c) => matches!(c, auth::ClientAuthenticationProcess::Ok), + Self::AsServer(s) => matches!(s, auth::ServerAuthenticationProcess::Ok(_)), + } + } + + fn is_close(&self) -> bool { + match self { + Self::AsClient(c) => matches!(c, auth::ClientAuthenticationProcess::Close), + Self::AsServer(s) => matches!(s, auth::ServerAuthenticationProcess::Close), + } + } +} + +/// Represents a session with a specific node +pub struct NodeSession { + node_id: u64, + is_server: bool, + cookie: String, + node_server: ActorRef, + node_name: auth_protocol::NameMessage, +} + +impl NodeSession { + /// Construct a new [NodeSession] with the supplied + /// arguments + pub fn new( + node_id: u64, + is_server: bool, + cookie: String, + node_server: ActorRef, + node_name: auth_protocol::NameMessage, + ) -> Self { + Self { + node_id, + is_server, + cookie, + node_server, + node_name, + } + } +} + +impl NodeSession { + async fn handle_auth( + &self, + state: &mut NodeSessionState, + message: auth_protocol::AuthenticationMessage, + myself: ActorRef, + ) { + if state.auth.is_ok() { + // nothing to do, we're already authenticated + return; + } + if state.auth.is_close() { + // we need to shutdown, the session needs to be terminated + myself.stop(Some("auth_fail".to_string())); + if let Some(tcp) = &state.tcp { + tcp.stop(Some("auth_fail".to_string())); + } + } + + match &state.auth { + AuthenticationState::AsClient(client_auth) => { + let mut next = client_auth.next(message, &self.cookie); + match &next { + auth::ClientAuthenticationProcess::WaitingForServerChallenge(server_status) => { + match server_status.status() { + auth_protocol::server_status::Status::Ok => { + // this handshake will continue + } + auth_protocol::server_status::Status::OkSimultaneous => { + // this handshake will continue, but there is another handshake underway + // that will be shut down (i.e. this was a server connection and we're currently trying + // a client connection) + } + auth_protocol::server_status::Status::NotOk => { + // The handshake will not continue, as there's already another client handshake underway + // which itself initiated (Simultaneous connect where the other connection's name is > this node + // name) + next = auth::ClientAuthenticationProcess::Close; + } + auth_protocol::server_status::Status::NotAllowed => { + // unspecified auth reason + next = auth::ClientAuthenticationProcess::Close; + } + auth_protocol::server_status::Status::Alive => { + // A connection to the node is already alive, which means either the + // node is confused in its connection state or the previous TCP connection is + // breaking down. Send ClientStatus + // TODO: check the status properly + state.tcp_send_auth(auth_protocol::AuthenticationMessage { + msg: Some( + auth_protocol::authentication_message::Msg::ClientStatus( + auth_protocol::ClientStatus { status: true }, + ), + ), + }); + } + } + } + auth::ClientAuthenticationProcess::WaitingForServerChallengeAck( + server_challenge_value, + reply_to_server, + our_challenge, + _expected_digest, + ) => { + // record the name + state.name = Some(auth_protocol::NameMessage { + name: server_challenge_value.name.clone(), + flags: server_challenge_value.flags.clone(), + }); + // tell the node server that we now know this peer's name information + let _ = + self.node_server + .cast(super::SessionManagerMessage::UpdateSession { + actor_id: myself.get_id(), + name: self.node_name.clone(), + }); + // send the client challenge to the server + let reply = auth_protocol::AuthenticationMessage { + msg: Some(auth_protocol::authentication_message::Msg::ClientChallenge( + auth_protocol::ChallengeReply { + digest: reply_to_server.to_vec(), + challenge: *our_challenge, + }, + )), + }; + state.tcp_send_auth(reply); + } + _ => { + // no message to send + } + } + + if let auth::ClientAuthenticationProcess::Close = &next { + myself.stop(Some("auth_fail".to_string())); + } + state.auth = AuthenticationState::AsClient(next); + } + AuthenticationState::AsServer(server_auth) => { + let mut next = server_auth.next(message, &self.cookie); + + match &next { + auth::ServerAuthenticationProcess::HavePeerName(peer_name) => { + // store the peer node's name in the session state + state.name = Some(peer_name.clone()); + + // send the status message, followed by the server's challenge + let server_status_result = self + .node_server + .call( + |tx| super::SessionManagerMessage::CheckSession { + peer_name: peer_name.clone(), + reply: tx, + }, + Some(Duration::from_millis(500)), + ) + .await; + match server_status_result { + Err(_) | Ok(CallResult::Timeout) | Ok(CallResult::SenderError) => { + next = auth::ServerAuthenticationProcess::Close; + } + Ok(CallResult::Success(reply)) => { + let server_status: auth_protocol::server_status::Status = + reply.into(); + // Send the server's status message + let status_msg = auth_protocol::AuthenticationMessage { + msg: Some( + auth_protocol::authentication_message::Msg::ServerStatus( + auth_protocol::ServerStatus { + status: server_status.into(), + }, + ), + ), + }; + state.tcp_send_auth(status_msg); + + match server_status { + auth_protocol::server_status::Status::Ok + | auth_protocol::server_status::Status::OkSimultaneous => { + // Good to proceed, start a challenge + next = next.start_challenge(&self.cookie); + if let auth::ServerAuthenticationProcess::WaitingOnClientChallengeReply( + challenge, + _digest, + ) = &next + { + let challenge_msg = auth_protocol::AuthenticationMessage { + msg: Some( + auth_protocol::authentication_message::Msg::ServerChallenge( + auth_protocol::Challenge { + name: self.node_name.name.clone(), + flags: self.node_name.flags.clone(), + challenge: *challenge, + }, + ), + ), + }; + state.tcp_send_auth(challenge_msg); + } + } + auth_protocol::server_status::Status::NotOk + | auth_protocol::server_status::Status::NotAllowed => { + next = auth::ServerAuthenticationProcess::Close; + } + auth_protocol::server_status::Status::Alive => { + // we sent the `Alive` status, so we're waiting on the client to confirm their status + // before continuing + next = auth::ServerAuthenticationProcess::WaitingOnClientStatus; + } + } + } + } + } + auth::ServerAuthenticationProcess::Ok(digest) => { + let client_challenge_reply = auth_protocol::AuthenticationMessage { + msg: Some(auth_protocol::authentication_message::Msg::ServerAck( + auth_protocol::ChallengeAck { + digest: digest.to_vec(), + }, + )), + }; + state.tcp_send_auth(client_challenge_reply); + } + _ => { + // no message to send + } + } + + if let auth::ServerAuthenticationProcess::Close = &next { + myself.stop(Some("auth_fail".to_string())); + } + state.auth = AuthenticationState::AsServer(next); + } + } + } + + fn handle_node( + &self, + state: &mut NodeSessionState, + message: node_protocol::NodeMessage, + myself: ActorRef, + ) { + if let Some(msg) = message.msg { + match msg { + node_protocol::node_message::Msg::Cast(cast_args) => { + if let Some(actor) = ractor::registry::get_pid(ActorId::from_pid(cast_args.to)) + { + let _ = actor.send_serialized(SerializedMessage::Cast(cast_args.what)); + } + } + node_protocol::node_message::Msg::Call(call_args) => { + let to = call_args.to; + let tag = call_args.tag; + if let Some(actor) = ractor::registry::get_pid(ActorId::from_pid(call_args.to)) + { + let (tx, rx) = ractor::concurrency::oneshot(); + + // send off the transmission in the serialized format, letting the message's own deserialization handle + // the conversion + let maybe_timeout = + call_args.timeout_ms.map(|to| Duration::from_millis(to)); + if let Some(timeout) = maybe_timeout.clone() { + let _ = actor.send_serialized(SerializedMessage::Call( + call_args.what, + (tx, timeout).into(), + )); + } else { + let _ = actor.send_serialized(SerializedMessage::Call( + call_args.what, + tx.into(), + )); + } + + // kick off a background task to reply to the channel request, threading the tag and who to reply to + let _ = ractor::concurrency::spawn(async move { + if let Some(timeout) = maybe_timeout { + if let Ok(Ok(result)) = + ractor::concurrency::timeout(timeout, rx).await + { + let reply = node_protocol::node_message::Msg::Reply( + node_protocol::CallReply { + tag, + to, + what: result, + }, + ); + let _ = ractor::cast!( + myself, + super::SessionMessage::SendMessage( + node_protocol::NodeMessage { msg: Some(reply) } + ) + ); + } + } else { + if let Ok(result) = rx.await { + let reply = node_protocol::node_message::Msg::Reply( + node_protocol::CallReply { + tag, + to, + what: result, + }, + ); + let _ = ractor::cast!( + myself, + super::SessionMessage::SendMessage( + node_protocol::NodeMessage { msg: Some(reply) } + ) + ); + } + } + }); + } + } + node_protocol::node_message::Msg::Reply(call_reply_args) => { + if let Some(actor) = state.remote_actors.get(&call_reply_args.to) { + let _ = actor.send_serialized(SerializedMessage::CallReply( + call_reply_args.tag, + call_reply_args.what, + )); + } + } + } + } + } +} + +/// The state of the node session +pub struct NodeSessionState { + tcp: Option>, + name: Option, + auth: AuthenticationState, + remote_actors: HashMap>, +} + +impl NodeSessionState { + fn is_tcp_actor(&self, actor: ActorId) -> bool { + self.tcp + .as_ref() + .map(|t| t.get_id() == actor) + .unwrap_or(false) + } + + fn tcp_send_auth(&self, msg: auth_protocol::AuthenticationMessage) { + if let Some(tcp) = &self.tcp { + let net_msg = crate::protocol::NetworkMessage { + message: Some(crate::protocol::meta::network_message::Message::Auth(msg)), + }; + let _ = tcp.cast(SessionMessage::Send(net_msg)); + } + } + + fn tcp_send_node(&self, msg: node_protocol::NodeMessage) { + if let Some(tcp) = &self.tcp { + let net_msg = crate::protocol::NetworkMessage { + message: Some(crate::protocol::meta::network_message::Message::Node(msg)), + }; + let _ = tcp.cast(SessionMessage::Send(net_msg)); + } + } +} + +#[async_trait::async_trait] +impl Actor for NodeSession { + type Msg = super::SessionMessage; + type State = NodeSessionState; + async fn pre_start(&self, _myself: ActorRef) -> Self::State { + Self::State { + tcp: None, + name: None, + auth: if self.is_server { + AuthenticationState::AsServer(auth::ServerAuthenticationProcess::init()) + } else { + AuthenticationState::AsClient(auth::ClientAuthenticationProcess::init()) + }, + remote_actors: HashMap::new(), + } + } + + async fn handle(&self, myself: ActorRef, message: Self::Msg, state: &mut Self::State) { + match message { + Self::Msg::SetTcpStream(stream) if state.tcp.is_none() => { + let addr = stream.peer_addr().expect("Failed to get peer address"); + // startup the TCP socket handler for message write + reading + let actor = crate::net::session::Session::spawn_linked( + myself.clone(), + stream, + addr, + myself.get_cell(), + ) + .await + .expect("Failed to spawn TCP session"); + state.tcp = Some(actor); + } + Self::Msg::MessageReceived(maybe_network_message) if state.tcp.is_some() => { + if let Some(network_message) = maybe_network_message.message { + match network_message { + crate::protocol::meta::network_message::Message::Auth(auth_message) => { + self.handle_auth(state, auth_message, myself).await; + } + crate::protocol::meta::network_message::Message::Node(node_message) => { + self.handle_node(state, node_message, myself); + } + } + } + } + Self::Msg::SendMessage(node_message) if state.tcp.is_some() => { + state.tcp_send_node(node_message); + } + _ => { + // no-op, ignore + } + } + } + + async fn handle_supervisor_evt( + &self, + myself: ActorRef, + message: SupervisionEvent, + state: &mut Self::State, + ) { + match message { + SupervisionEvent::ActorPanicked(actor, msg) => { + if state.is_tcp_actor(actor.get_id()) { + log::error!( + "Node session {:?}'s TCP session panicked with '{}'", + state.name, + msg + ); + myself.stop(Some("tcp_session_err".to_string())); + } else { + // TODO: handle other actors + log::warn!( + "Node session {:?} recieved an unknown child panic of '{}'", + state.name, + msg + ); + } + } + SupervisionEvent::ActorTerminated(actor, _, maybe_reason) => { + if state.is_tcp_actor(actor.get_id()) { + log::info!("Connection closed to node {:?}", state.name); + myself.stop(Some("tcp_session_closed".to_string())); + } else { + // TODO: handle other actors + log::debug!( + "Node session {:?} received a child exit with reason '{:?}'", + state.name, + maybe_reason + ); + } + } + _ => { + //no-op + } + } + } +} diff --git a/ractor-cluster/src/protocol/auth.proto b/ractor-cluster/src/protocol/auth.proto new file mode 100644 index 00000000..05081cca --- /dev/null +++ b/ractor-cluster/src/protocol/auth.proto @@ -0,0 +1,116 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +// NOTE: We make heavy use of oneof syntax here in order to deal with a rust-like enum +// type. You can see https://developers.google.com/protocol-buffers/docs/proto3#oneof for +// the syntax and guide + +// The protocol messages defined here roughly follow the Erlang distributed systems guide +// found at: https://www.erlang.org/doc/apps/erts/erl_dist_protocol.html#distribution-handshake + +syntax = "proto3"; + +package auth; + +// Placeholder to represent a node's flags +message NodeFlags { + // The node version + uint32 version = 1; +} + +// A message containing the node's name +message NameMessage { + // The node's full name + // Format: `node_name@hostname` + string name = 1; + + // The node's capability flags + NodeFlags flags = 2; +} + +// Server -> Client: `SendStatus` is the server replying with the handshake status to the client +message ServerStatus { + // Status types + enum Status { + // The handshake will continue + OK = 0; + // The handshake will continue, but A is informed that B has another ongoing + // connection attempt that will be shut down (simultaneous connect where A's + // name is greater than B's name, compared literally). + OK_SIMULTANEOUS = 1; + // The handshake will not continue, as B already has an ongoing handshake, which + // it itself has initiated (simultaneous connect where B's name is greater than A's). + NOT_OK = 2; + // The connection is disallowed for some (unspecified) security reason. + NOT_ALLOWED = 3; + // A connection to the node is already active, which either means that node A is confused + // or that the TCP connection breakdown of a previous node with this name has not yet + // reached node B. + ALIVE = 4; + + // Skipped NAMED = 5; + } + + // The status + Status status = 1; +} + +// The client's status reply if the `ServerStatus` was ALIVE +// +// If status was alive, node A answers with another status message containing either true, +// which means that the connection is to continue (the old connection from this node is +// broken), or false, which means that the connection is to be closed (the connection +// attempt was a mistake. +message ClientStatus { + // The status + bool status = 1; +} + +// The server's initial challenge request +message Challenge { + // The server's name + string name = 1; + // The node's capability flags + NodeFlags flags = 2; + // The challenge value + uint32 challenge = 3; +} + +// The reply to the server's challenge. +message ChallengeReply { + // The client's own challenge for the server to handle + uint32 challenge = 1; + // An MD5 digest that the client constructed from the server's + // challenge value + bytes digest = 2; +} + +// The server's reply to the client about their own +// challenge +message ChallengeAck { + // Another MD5 digest that the server constructed from the + // client's challenge value + bytes digest = 1; +} + +// A authentication message +message AuthenticationMessage { + // The inner message type + oneof msg { + // Send the name + NameMessage name = 1; + // Send the status + ServerStatus server_status = 2; + // Send the client status + ClientStatus client_status = 3; + // Server's challenge to the client + Challenge server_challenge = 4; + // Client's reply to server's challenge and + // client's own challenge to the server + ChallengeReply client_challenge = 5; + // Server's reply to the client's challenge + ChallengeAck server_ack = 6; + } +} diff --git a/ractor-cluster/src/protocol/meta.proto b/ractor-cluster/src/protocol/meta.proto new file mode 100644 index 00000000..155a7391 --- /dev/null +++ b/ractor-cluster/src/protocol/meta.proto @@ -0,0 +1,23 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + + +syntax = "proto3"; + +import "auth.proto"; +import "node.proto"; + +package meta; + +// Represents a message over the network +message NetworkMessage { + // The inner message + oneof message { + // An authentication message + auth.AuthenticationMessage auth = 1; + // An inter-node message + node.NodeMessage node = 2; + } +} diff --git a/ractor-cluster/src/protocol/mod.rs b/ractor-cluster/src/protocol/mod.rs new file mode 100644 index 00000000..c0f61983 --- /dev/null +++ b/ractor-cluster/src/protocol/mod.rs @@ -0,0 +1,24 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! Protobuf specifications for over-the-wire intercommuncation +//! between nodes. Generated via [prost] + +/// Node authentication protocol +pub mod auth { + include!(concat!(env!("OUT_DIR"), "/auth.rs")); +} + +/// Node inter-communication protocol +pub mod node { + include!(concat!(env!("OUT_DIR"), "/node.rs")); +} + +/// Meta types which include all base network protocol message types +pub mod meta { + include!(concat!(env!("OUT_DIR"), "/meta.rs")); +} + +pub use meta::NetworkMessage; diff --git a/ractor-cluster/src/protocol/node.proto b/ractor-cluster/src/protocol/node.proto new file mode 100644 index 00000000..5ab32b45 --- /dev/null +++ b/ractor-cluster/src/protocol/node.proto @@ -0,0 +1,56 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +// NOTE: We make heavy use of oneof syntax here in order to deal with a rust-like enum +// type. You can see https://developers.google.com/protocol-buffers/docs/proto3#oneof for +// the syntax and guide + +syntax = "proto3"; + +package node; + +// Represents a cast to a remote actor +message Cast { + // `to` is the intended actor + uint64 to = 1; + // `what` is the payload for the cast operation + bytes what = 2; +} + +message Call { + // `to` is the intended actor + uint64 to = 1; + // `what` is the serialized arguments to the call + bytes what = 2; + // `tag` is a unique request tag which the RemoteActor applied in order + // to match requests back up to replies + uint64 tag = 3; + // `timeout_ms` is the timeout in milliseconds for the call to complete + optional uint64 timeout_ms = 4; +} + +message CallReply { + // `to` is the intended RemoteActor + uint64 to = 1; + // `tag` is a unique request tag which the RemoteActor applied in order + // to match requests back up to replies + uint64 tag = 2; + // `what` is the payload for the call reply + bytes what = 3; +} + +// A placeholder message representing an +// authenticated inter-ndoe message +message NodeMessage { + // The message payload + oneof msg { + // A cast to a remote actor + Cast cast = 1; + // A call to a remote actor + Call call = 2; + // A reply to a call from the remote actor + CallReply reply = 3; + } +} diff --git a/ractor-cluster/src/remote_actor/mod.rs b/ractor-cluster/src/remote_actor/mod.rs new file mode 100644 index 00000000..5658928a --- /dev/null +++ b/ractor-cluster/src/remote_actor/mod.rs @@ -0,0 +1,118 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! A [RemoteActor] is an actor which handles serialized messages, and represents an actor +//! running on a remote `node()` server. See [crate::node::NodeServer] for more on inter-node +//! protocols + +use std::collections::HashMap; + +use ractor::cast; +use ractor::concurrency::JoinHandle; +use ractor::message::SerializedMessage; +use ractor::{Actor, ActorCell, ActorId, ActorName, ActorRef, RpcReplyPort, SpawnErr}; + +use crate::node::SessionMessage; + +/// A Remote actor is an actor which represents an actor on another node +pub(crate) struct RemoteActor { + /// The owning node session + session: ActorRef, +} + +impl RemoteActor { + /// Spawn an actor of this type with a supervisor, automatically starting the actor + /// + /// * `name`: A name to give the actor. Useful for global referencing or debug printing + /// * `handler` The implementation of Self + /// * `supervisor`: The [ActorCell] which is to become the supervisor (parent) of this actor + /// + /// Returns a [Ok((ActorRef, JoinHandle<()>))] upon successful start, denoting the actor reference + /// along with the join handle which will complete when the actor terminates. Returns [Err(SpawnErr)] if + /// the actor failed to start + pub(crate) async fn spawn_linked( + name: Option, + handler: Self, + id: ActorId, + supervisor: ActorCell, + ) -> Result<(ActorRef, JoinHandle<()>), SpawnErr> { + ractor::ActorRuntime::<_, _, Self>::spawn_linked_remote(name, handler, id, supervisor).await + } +} + +#[derive(Default)] +pub(crate) struct RemoteActorState { + tag: u64, + pending_requests: HashMap>>, +} + +impl RemoteActorState { + fn next_tag(&mut self) -> u64 { + self.tag += 1; + self.tag + } +} + +#[async_trait::async_trait] +impl Actor for RemoteActor { + type Msg = (); + type State = RemoteActorState; + + async fn pre_start(&self, _myself: ActorRef) -> Self::State { + Self::State::default() + } + + async fn handle(&self, _myself: ActorRef, _message: Self::Msg, _state: &mut Self::State) { + panic!("Remote actors cannot handle local messages!"); + } + + async fn handle_serialized( + &self, + myself: ActorRef, + message: SerializedMessage, + state: &mut Self::State, + ) { + // get the local pid on the remote system + let to = myself.get_id().pid(); + // messages should be forwarded over the network link (i.e. sent through the node session) to the intended + // target node's relevant actor. The receiving runtime NodeSession will decode the message and pass it up + // to the parent + match message { + SerializedMessage::Call(args, reply) => { + let tag = state.next_tag(); + let node_msg = crate::protocol::node::NodeMessage { + msg: Some(crate::protocol::node::node_message::Msg::Call( + crate::protocol::node::Call { + to, + tag, + what: args, + timeout_ms: reply.get_timeout().map(|t| t.as_millis() as u64), + }, + )), + }; + state.pending_requests.insert(tag, reply); + let _ = cast!(self.session, SessionMessage::SendMessage(node_msg)); + } + SerializedMessage::Cast(args) => { + let node_msg = crate::protocol::node::NodeMessage { + msg: Some(crate::protocol::node::node_message::Msg::Cast( + crate::protocol::node::Cast { to, what: args }, + )), + }; + let _ = cast!(self.session, SessionMessage::SendMessage(node_msg)); + } + SerializedMessage::CallReply(message_tag, reply_data) => { + match state.pending_requests.remove(&message_tag) { + Some(port) => { + let _ = port.send(reply_data); + } + _ => { + // ignore + } + } + } + } + } +} diff --git a/ractor/Cargo.toml b/ractor/Cargo.toml index b44f723b..fa49c0b5 100644 --- a/ractor/Cargo.toml +++ b/ractor/Cargo.toml @@ -12,16 +12,19 @@ readme = "../README.md" homepage = "https://github.com/slawlor/ractor" categories = ["actor", "erlang"] +[features] # WIP -# [features] # tokio_runtime = ["tokio/time"] # async_std_runtime = ["async-std"] # default = ["tokio_runtime"] # default = ["async_std_runtime"] +cluster = [] +default = [] + [dependencies] -async-std = { version = "1", optional = true } +## Required dependencies async-trait = "0.1" dashmap = "5" futures = "0.3" diff --git a/ractor/benches/actor.rs b/ractor/benches/actor.rs index 4dc71e0f..f3fd6036 100644 --- a/ractor/benches/actor.rs +++ b/ractor/benches/actor.rs @@ -95,7 +95,7 @@ fn schedule_work(c: &mut Criterion) { }) }, |mut handles| { - runtime.block_on(async move { while let Some(_) = handles.join_next().await {} }) + runtime.block_on(async move { while handles.join_next().await.is_some() {} }) }, BatchSize::PerIteration, ); @@ -118,7 +118,7 @@ fn schedule_work(c: &mut Criterion) { }) }, |mut handles| { - runtime.block_on(async move { while let Some(_) = handles.join_next().await {} }) + runtime.block_on(async move { while handles.join_next().await.is_some() {} }) }, BatchSize::PerIteration, ); diff --git a/ractor/examples/philosophers.rs b/ractor/examples/philosophers.rs index 1b456a0f..1f3de975 100644 --- a/ractor/examples/philosophers.rs +++ b/ractor/examples/philosophers.rs @@ -494,7 +494,7 @@ async fn main() { } // wait for everything to shut down - while let Some(_) = all_handles.join_next().await {} + while all_handles.join_next().await.is_some() {} // print metrics println!("Simulation results"); diff --git a/ractor/src/actor/actor_cell/actor_properties.rs b/ractor/src/actor/actor_cell/actor_properties.rs index 50c0830b..9471f860 100644 --- a/ractor/src/actor/actor_cell/actor_properties.rs +++ b/ractor/src/actor/actor_cell/actor_properties.rs @@ -6,14 +6,17 @@ use std::sync::atomic::{AtomicU8, Ordering}; use std::sync::Arc; -use crate::concurrency as mpsc; +use crate::{concurrency as mpsc, Message}; -use crate::actor::messages::{BoxedMessage, StopMessage}; +use crate::actor::messages::StopMessage; use crate::actor::supervision::SupervisionTree; use crate::concurrency::{ MpscReceiver as BoundedInputPortReceiver, MpscSender as BoundedInputPort, MpscUnboundedReceiver as InputPortReceiver, MpscUnboundedSender as InputPort, }; +use crate::message::BoxedMessage; +#[cfg(feature = "cluster")] +use crate::message::SerializedMessage; use crate::{Actor, ActorId, ActorName, ActorStatus, MessagingErr, Signal, SupervisionEvent}; // The inner-properties of an Actor @@ -39,6 +42,22 @@ impl ActorProperties { InputPortReceiver, InputPortReceiver, ) + where + TActor: Actor, + { + Self::new_remote::(name, crate::actor_id::get_new_local_id()) + } + + pub fn new_remote( + name: Option, + id: ActorId, + ) -> ( + Self, + BoundedInputPortReceiver, + BoundedInputPortReceiver, + InputPortReceiver, + InputPortReceiver, + ) where TActor: Actor, { @@ -48,7 +67,7 @@ impl ActorProperties { let (tx_message, rx_message) = mpsc::mpsc_unbounded(); ( Self { - id: crate::actor_id::get_new_local_id(), + id, name, status: Arc::new(AtomicU8::new(ActorStatus::Unstarted as u8)), signal: tx_signal, @@ -92,11 +111,22 @@ impl ActorProperties { where TActor: Actor, { - if self.type_id != std::any::TypeId::of::() { + // Only type-check messages of local actors, remote actors send serialized + // payloads + if self.id.is_local() && self.type_id != std::any::TypeId::of::() { return Err(MessagingErr::InvalidActorType); } - let boxed = BoxedMessage::new(message); + let boxed = message.box_message(&self.id); + self.message.send(boxed).map_err(|e| e.into()) + } + + #[cfg(feature = "cluster")] + pub fn send_serialized(&self, message: SerializedMessage) -> Result<(), MessagingErr> { + let boxed = BoxedMessage { + msg: None, + serialized_msg: Some(message), + }; self.message.send(boxed).map_err(|e| e.into()) } diff --git a/ractor/src/actor/actor_cell/mod.rs b/ractor/src/actor/actor_cell/mod.rs index f7266243..269a258c 100644 --- a/ractor/src/actor/actor_cell/mod.rs +++ b/ractor/src/actor/actor_cell/mod.rs @@ -12,13 +12,16 @@ use std::sync::Arc; use super::errors::MessagingErr; -use super::messages::{BoxedMessage, Signal, StopMessage}; - +use super::messages::{Signal, StopMessage}; use super::SupervisionEvent; use crate::concurrency::{ MpscReceiver as BoundedInputPortReceiver, MpscUnboundedReceiver as InputPortReceiver, }; -use crate::{Actor, ActorId, ActorName, SpawnErr}; +use crate::message::BoxedMessage; +#[cfg(feature = "cluster")] +use crate::message::SerializedMessage; +use crate::ActorId; +use crate::{Actor, ActorName, SpawnErr}; pub mod actor_ref; pub use actor_ref::ActorRef; @@ -52,17 +55,26 @@ pub const ACTIVE_STATES: [ActorStatus; 3] = [ ]; /// The collection of ports an actor needs to listen to -pub(crate) struct ActorPortSet { - pub(crate) signal_rx: BoundedInputPortReceiver, - pub(crate) stop_rx: BoundedInputPortReceiver, - pub(crate) supervisor_rx: InputPortReceiver, - pub(crate) message_rx: InputPortReceiver, +pub struct ActorPortSet { + /// The inner signal port + pub signal_rx: BoundedInputPortReceiver, + /// The inner stop port + pub stop_rx: BoundedInputPortReceiver, + /// The inner supervisor port + pub supervisor_rx: InputPortReceiver, + /// The inner message port + pub message_rx: InputPortReceiver, } -pub(crate) enum ActorPortMessage { +/// Messages that come in off an actor's port, with associated priority +pub enum ActorPortMessage { + /// A signal message Signal(Signal), + /// A stop message Stop(StopMessage), + /// A supervision message Supervision(SupervisionEvent), + /// A regular message Message(BoxedMessage), } @@ -152,6 +164,8 @@ impl ActorCell { let cell = Self { inner: Arc::new(props), }; + #[cfg(feature = "cluster")] + crate::registry::register_pid(cell.get_id(), cell.clone()); if let Some(r_name) = name { crate::registry::register(r_name, cell.clone())?; } @@ -167,6 +181,37 @@ impl ActorCell { )) } + /// Create a new remote actor, to be called from the `ractor-cluster` crate + #[cfg(feature = "cluster")] + pub(crate) fn new_remote( + name: Option, + id: ActorId, + ) -> Result<(Self, ActorPortSet), SpawnErr> + where + TActor: Actor, + { + if !id.is_local() { + panic!("Cannot create a new remote actor handler without the actor id being marked as a remote actor!"); + } + + let (props, rx1, rx2, rx3, rx4) = ActorProperties::new_remote::(name, id); + let cell = Self { + inner: Arc::new(props), + }; + if let Some(r_name) = name { + crate::registry::register(r_name, cell.clone())?; + } + Ok(( + cell, + ActorPortSet { + signal_rx: rx1, + stop_rx: rx2, + supervisor_rx: rx3, + message_rx: rx4, + }, + )) + } + /// Retrieve the [super::Actor]'s unique identifier [ActorId] pub fn get_id(&self) -> ActorId { self.inner.id @@ -193,6 +238,8 @@ impl ActorCell { pub(crate) fn set_status(&self, status: ActorStatus) { // The actor is shut down if status == ActorStatus::Stopped || status == ActorStatus::Stopping { + #[cfg(feature = "cluster")] + crate::registry::unregister_pid(self.get_id()); // If it's enrolled in the registry, remove it if let Some(name) = self.get_name() { crate::registry::unregister(name); @@ -279,6 +326,16 @@ impl ActorCell { self.inner.send_message::(message) } + /// Send a sserialized binary message to the actor. + /// + /// * `message` - The message to send + /// + /// Returns [Ok(())] on successful message send, [Err(MessagingErr)] otherwise + #[cfg(feature = "cluster")] + pub fn send_serialized(&self, message: SerializedMessage) -> Result<(), MessagingErr> { + self.inner.send_serialized(message) + } + /// Notify the supervisors that a supervision event occurred /// /// * `evt` - The event to send to this [super::Actor]'s supervisors diff --git a/ractor/src/actor/messages.rs b/ractor/src/actor/messages.rs index 79d6372f..0d23b9a5 100644 --- a/ractor/src/actor/messages.rs +++ b/ractor/src/actor/messages.rs @@ -5,56 +5,15 @@ //! Messages which are built-in for `ractor`'s processing routines //! -//! Additionally contains definitions for [BoxedMessage] and [BoxedState] -//! which are used to handle strongly-typed messages and states in a +//! Additionally contains definitions for [BoxedState] +//! which are used to handle strongly-typed states in a //! generic way without having to know the strong type in the underlying framework use std::any::Any; use std::fmt::Debug; -use crate::{Message, State}; - -/// An error downcasting a boxed item to a strong type -#[derive(Debug)] -pub struct BoxedDowncastErr; - -/// A "boxed" message denoting a strong-type message -/// but generic so it can be passed around without type -/// constraints -pub struct BoxedMessage { - /// The message value - pub msg: Option>, -} - -impl BoxedMessage { - /// Create a new [BoxedMessage] from a strongly-typed message - pub fn new(msg: T) -> Self - where - T: Message, - { - Self { - msg: Some(Box::new(msg)), - } - } - - /// Try and take the resulting message as a specific type, consumes - /// the boxed message - pub fn take(&mut self) -> Result - where - T: Message, - { - match self.msg.take() { - Some(m) => { - if m.is::() { - Ok(*m.downcast::().unwrap()) - } else { - Err(BoxedDowncastErr) - } - } - None => Err(BoxedDowncastErr), - } - } -} +use crate::message::BoxedDowncastErr; +use crate::State; /// A "boxed" message denoting a strong-type message /// but generic so it can be passed around without type @@ -95,8 +54,8 @@ impl BoxedState { } /// Messages to stop an actor -pub(crate) enum StopMessage { - // Normal stop +pub enum StopMessage { + /// Normal stop Stop, /// Stop with a reason Reason(String), diff --git a/ractor/src/actor/mod.rs b/ractor/src/actor/mod.rs index 48c1ebad..367a563c 100644 --- a/ractor/src/actor/mod.rs +++ b/ractor/src/actor/mod.rs @@ -11,9 +11,12 @@ use std::{panic::AssertUnwindSafe, sync::Arc}; -use crate::concurrency::JoinHandle; use futures::TryFutureExt; +use crate::concurrency::JoinHandle; +#[cfg(feature = "cluster")] +use crate::ActorId; + pub mod messages; use messages::*; @@ -88,7 +91,7 @@ pub trait Actor: Sized + Sync + Send + 'static { #[allow(unused_variables)] async fn post_stop(&self, myself: ActorRef, state: &mut Self::State) {} - /// Handle the incoming message from the event processing loop. Unhandled panicks will be + /// Handle the incoming message from the event processing loop. Unhandled panickes will be /// captured and sent to the supervisor(s) /// /// * `myself` - A handle to the [ActorCell] representing this actor @@ -97,8 +100,25 @@ pub trait Actor: Sized + Sync + Send + 'static { #[allow(unused_variables)] async fn handle(&self, myself: ActorRef, message: Self::Msg, state: &mut Self::State) {} + /// Handle the remote incoming message from the event processing loop. Unhandled panickes will be + /// captured and sent to the supervisor(s) + /// + /// * `myself` - A handle to the [ActorCell] representing this actor + /// * `message` - The serialized messgae to handle + /// * `state` - A mutable reference to the internal actor's state + #[allow(unused_variables)] + #[cfg(feature = "cluster")] + async fn handle_serialized( + &self, + myself: ActorRef, + message: crate::message::SerializedMessage, + state: &mut Self::State, + ) { + } + /// Handle the incoming supervision event. Unhandled panicks will captured and - /// sent the the supervisor(s) + /// sent the the supervisor(s). The default supervision behavior is to ignore all + /// child events. To override this behavior, implement this method. /// /// * `myself` - A handle to the [ActorCell] representing this actor /// * `message` - The message to process @@ -197,6 +217,41 @@ where actor.start(ports, Some(supervisor)).await } + /// Spawn a REMOTE actor with a supervisor, automatically starting the actor. Only for use + /// by `ractor_cluster::node::NodeSession` + /// + /// * `name`: A name to give the actor. Useful for global referencing or debug printing + /// * `handler` The [Actor] defining the logic for this actor + /// * `supervisor`: The [ActorCell] which is to become the supervisor (parent) of this actor + /// + /// Returns a [Ok((ActorRef, JoinHandle<()>))] upon successful start, denoting the actor reference + /// along with the join handle which will complete when the actor terminates. Returns [Err(SpawnErr)] if + /// the actor failed to start + #[cfg(feature = "cluster")] + pub async fn spawn_linked_remote( + name: Option, + handler: THandler, + id: ActorId, + supervisor: ActorCell, + ) -> Result<(ActorRef, JoinHandle<()>), SpawnErr> { + if !id.is_local() { + Err(SpawnErr::StartupPanic( + "Cannot spawn a remote actor when the identifier is not remote!".to_string(), + )) + } else { + let (actor_cell, ports) = actor_cell::ActorCell::new_remote::(name, id)?; + + let (actor, ports) = ( + Self { + base: actor_cell.into(), + handler: Arc::new(handler), + }, + ports, + ); + actor.start(ports, Some(supervisor)).await + } + } + /// Create a new actor with some handler implementation and initial state /// /// * `name`: A name to give the actor. Useful for global referencing or debug printing @@ -361,7 +416,7 @@ where match ports.listen_in_priority().await { Ok(actor_port_message) => match actor_port_message { actor_cell::ActorPortMessage::Signal(signal) => { - (true, Self::handle_signal(myself, signal).await) + (true, Self::handle_signal(myself, signal)) } actor_cell::ActorPortMessage::Stop(stop_message) => { let exit_reason = match stop_message { @@ -380,7 +435,7 @@ where let new_state = ports.run_with_signal(new_state_future).await; match new_state { Ok(()) => (false, None), - Err(signal) => (true, Self::handle_signal(myself, signal).await), + Err(signal) => (true, Self::handle_signal(myself, signal)), } } actor_cell::ActorPortMessage::Message(msg) => { @@ -389,7 +444,7 @@ where let new_state = ports.run_with_signal(new_state_future).await; match new_state { Ok(()) => (false, None), - Err(signal) => (true, Self::handle_signal(myself, signal).await), + Err(signal) => (true, Self::handle_signal(myself, signal)), } } }, @@ -411,10 +466,28 @@ where myself: ActorRef, state: &mut TState, handler: Arc, - mut msg: BoxedMessage, + msg: crate::message::BoxedMessage, ) { // panic in order to kill the actor - let typed_msg = match msg.take() { + #[cfg(feature = "cluster")] + { + if !myself.get_id().is_local() { + match msg.serialized_msg { + Some(serialized_msg) => { + handler + .handle_serialized(myself, serialized_msg, state) + .await; + return; + } + None => { + panic!("Failed to read serialized message from `BoxedMessage`"); + } + } + } + } + + // panic in order to kill the actor + let typed_msg = match TMsg::from_boxed(msg) { Ok(m) => m, Err(_) => { panic!( @@ -427,7 +500,7 @@ where handler.handle(myself, typed_msg, state).await } - async fn handle_signal(myself: ActorRef, signal: Signal) -> Option { + fn handle_signal(myself: ActorRef, signal: Signal) -> Option { match &signal { Signal::Kill => { myself.terminate(); diff --git a/ractor/src/actor_id.rs b/ractor/src/actor_id.rs index cf386fcd..12b2d1da 100644 --- a/ractor/src/actor_id.rs +++ b/ractor/src/actor_id.rs @@ -15,7 +15,12 @@ pub enum ActorId { Local(u64), /// A remote actor on another system (system, id) - Remote(u64, u64), + Remote { + /// The remote node id + node_id: u64, + /// The local id on the remote system + pid: u64, + }, } impl ActorId { @@ -25,13 +30,27 @@ impl ActorId { pub fn is_local(&self) -> bool { matches!(self, ActorId::Local(_)) } + + /// Retrieve the actor's PID + pub fn pid(&self) -> u64 { + match self { + ActorId::Local(pid) => *pid, + ActorId::Remote { pid, .. } => *pid, + } + } + + /// Build an actor id from just it's pid. Assumes LOCAL only + #[cfg(feature = "cluster")] + pub fn from_pid(pid: u64) -> ActorId { + ActorId::Local(pid) + } } impl Display for ActorId { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { ActorId::Local(id) => write!(f, "0.{}", id), - ActorId::Remote(system_id, id) => write!(f, "{}.{}", system_id, id), + ActorId::Remote { node_id, pid } => write!(f, "{}.{}", node_id, pid), } } } @@ -39,17 +58,23 @@ impl Display for ActorId { /// The local id allocator for actors static ACTOR_ID_ALLOCATOR: AtomicU64 = AtomicU64::new(0u64); -/// Retreiev a new local id +/// Retrieve a new local id pub(crate) fn get_new_local_id() -> ActorId { ActorId::Local(ACTOR_ID_ALLOCATOR.fetch_add(1, std::sync::atomic::Ordering::AcqRel)) } +/// Create a new actor id for an actor on a remote `node()` +#[cfg(feature = "cluster")] +pub fn new_remote_id(node_id: u64, pid: u64) -> ActorId { + ActorId::Remote { node_id, pid } +} + impl ActorId { /// Retrieve the PID of the actor, ignoring local/remote properties pub fn get_pid(&self) -> u64 { match self { ActorId::Local(pid) => *pid, - ActorId::Remote(_, pid) => *pid, + ActorId::Remote { pid, .. } => *pid, } } } diff --git a/ractor/src/lib.rs b/ractor/src/lib.rs index 08b88ebf..405cf059 100644 --- a/ractor/src/lib.rs +++ b/ractor/src/lib.rs @@ -134,8 +134,7 @@ #![warn(missing_docs)] #![warn(unused_crate_dependencies)] #![cfg_attr(docsrs, feature(doc_cfg))] - -use std::any::Any; +#![feature(min_specialization)] /// An actor's name, equivalent to an [Erlang `atom()`](https://www.erlang.org/doc/reference_manual/data_types.html#atom) pub type ActorName = &'static str; @@ -147,12 +146,16 @@ pub mod actor; pub mod actor_id; pub mod concurrency; pub mod macros; +pub mod message; pub mod pg; pub mod port; pub mod registry; pub mod rpc; pub mod time; +// #[cfg(feature = "cluster")] +// pub mod remote; + #[cfg(test)] mod tests; @@ -161,40 +164,15 @@ use criterion as _; #[cfg(test)] use rand as _; -// WIP -// #[cfg(feature = "remote")] -// pub mod distributed; - // re-exports pub use actor::actor_cell::{ActorCell, ActorRef, ActorStatus, ACTIVE_STATES}; pub use actor::errors::{ActorErr, MessagingErr, SpawnErr}; pub use actor::messages::{Signal, SupervisionEvent}; pub use actor::{Actor, ActorRuntime}; pub use actor_id::ActorId; +pub use message::{Message, State}; pub use port::{OutputMessage, OutputPort, RpcReplyPort}; -/// Message type for an actor. Generally an enum -/// which muxes the various types of inner-messages the actor -/// supports -/// -/// ## Example -/// -/// ```rust -/// pub enum MyMessage { -/// /// Record the name to the actor state -/// RecordName(String), -/// /// Print the recorded name from the state to command line -/// PrintName, -/// } -/// ``` -pub trait Message: Any + Send + 'static {} -impl Message for T {} - -/// Represents the state of an actor. Must be safe -/// to send between threads (same bounds as a [Message]) -pub trait State: Message {} -impl State for T {} - /// Error types which can result from Ractor processes #[derive(Debug)] pub enum RactorErr { diff --git a/ractor/src/message.rs b/ractor/src/message.rs new file mode 100644 index 00000000..1d23f606 --- /dev/null +++ b/ractor/src/message.rs @@ -0,0 +1,156 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! Message trait definition for inter-actor messaging + +use std::any::Any; + +use crate::ActorId; +#[cfg(feature = "cluster")] +use crate::RpcReplyPort; + +/// An error downcasting a boxed item to a strong type +#[derive(Debug)] +pub struct BoxedDowncastErr; + +/// Represents a serialized call or cast message +#[cfg(feature = "cluster")] +pub enum SerializedMessage { + /// A cast (one-way) with the serialized payload + Cast(Vec), + /// A call (remote procedure call, waiting on a reply) with the + /// serialized arguments and reply channel + Call(Vec, RpcReplyPort>), + /// A serialized reply from a call operation. Format is + /// (`message_tag`, `reply_data`). It should not be the output + /// of [Message::serialize] function, and is only generated + /// from the `NodeSession` + CallReply(u64, Vec), +} + +/// A "boxed" message denoting a strong-type message +/// but generic so it can be passed around without type +/// constraints +pub struct BoxedMessage { + pub(crate) msg: Option>, + /// A serialized message for a remote actor, accessed only by the `RemoteActorRuntime` + #[cfg(feature = "cluster")] + pub serialized_msg: Option, +} + +/// Message type for an actor. Generally an enum +/// which muxes the various types of inner-messages the actor +/// supports +/// +/// ## Example +/// +/// ```rust +/// pub enum MyMessage { +/// /// Record the name to the actor state +/// RecordName(String), +/// /// Print the recorded name from the state to command line +/// PrintName, +/// } +/// ``` +pub trait Message: Any + Send + Sized + 'static { + /// Convert a [BoxedMessage] to this concrete type + #[cfg(feature = "cluster")] + fn from_boxed(mut m: BoxedMessage) -> Result { + if m.msg.is_some() { + match m.msg.take() { + Some(m) => { + if m.is::() { + Ok(*m.downcast::().unwrap()) + } else { + Err(BoxedDowncastErr) + } + } + _ => Err(BoxedDowncastErr), + } + } else if m.serialized_msg.is_some() { + match m.serialized_msg.take() { + Some(m) => Self::deserialize(m), + _ => Err(BoxedDowncastErr), + } + } else { + Err(BoxedDowncastErr) + } + } + + /// Convert a [BoxedMessage] to this concrete type + #[cfg(not(feature = "cluster"))] + fn from_boxed(mut m: BoxedMessage) -> Result { + match m.msg.take() { + Some(m) => { + if m.is::() { + Ok(*m.downcast::().unwrap()) + } else { + Err(BoxedDowncastErr) + } + } + _ => Err(BoxedDowncastErr), + } + } + + /// Convert this message to a [BoxedMessage] + #[cfg(feature = "cluster")] + fn box_message(self, pid: &ActorId) -> BoxedMessage { + if Self::serializable() && !pid.is_local() { + // it's a message to a remote actor, serialize it and send it over the wire! + BoxedMessage { + msg: None, + serialized_msg: Some(self.serialize()), + } + } else if pid.is_local() { + BoxedMessage { + msg: Some(Box::new(self)), + serialized_msg: None, + } + } else { + panic!("Cannot send message to remote actor without the message being serializable"); + } + } + + /// Convert this message to a [BoxedMessage] + #[cfg(not(feature = "cluster"))] + fn box_message(self, _pid: &ActorId) -> BoxedMessage { + BoxedMessage { + msg: Some(Box::new(self)), + } + } + + /// Determines if this type is serializable + #[cfg(feature = "cluster")] + fn serializable() -> bool; + + /// Serializes this message (if supported) + #[cfg(feature = "cluster")] + fn serialize(self) -> SerializedMessage; + + /// Deserialize binary data to this message type + #[cfg(feature = "cluster")] + fn deserialize(bytes: SerializedMessage) -> Result; +} +impl Message for T { + #[cfg(feature = "cluster")] + default fn serializable() -> bool { + false + } + + #[cfg(feature = "cluster")] + default fn serialize(self) -> SerializedMessage { + SerializedMessage::Cast(vec![]) + } + + #[cfg(feature = "cluster")] + default fn deserialize(_bytes: SerializedMessage) -> Result { + Err(BoxedDowncastErr) + } +} + +/// Represents the state of an actor. Must be safe +/// to send between threads (same bounds as a [Message]) +pub trait State: Message {} +impl State for T {} diff --git a/ractor/src/port/mod.rs b/ractor/src/port/mod.rs index d26ac051..7516afbb 100644 --- a/ractor/src/port/mod.rs +++ b/ractor/src/port/mod.rs @@ -24,9 +24,17 @@ pub use output::*; /// consistent error type pub struct RpcReplyPort { port: concurrency::OneshotSender, + timeout: Option, } impl RpcReplyPort { + /// Read the timeout of this RPC reply port + /// + /// Returns [Some(concurrency::Duration)] if a timeout is set, [None] otherwise + pub fn get_timeout(&self) -> Option { + self.timeout + } + /// Send a message to the Rpc reply port. This consumes the port /// /// * `msg` - The message to send @@ -48,6 +56,18 @@ impl RpcReplyPort { impl From> for RpcReplyPort { fn from(value: concurrency::OneshotSender) -> Self { - Self { port: value } + Self { + port: value, + timeout: None, + } + } +} + +impl From<(concurrency::OneshotSender, concurrency::Duration)> for RpcReplyPort { + fn from((value, timeout): (concurrency::OneshotSender, concurrency::Duration)) -> Self { + Self { + port: value, + timeout: Some(timeout), + } } } diff --git a/ractor/src/registry/mod.rs b/ractor/src/registry/mod.rs index d4a39506..5445a36a 100644 --- a/ractor/src/registry/mod.rs +++ b/ractor/src/registry/mod.rs @@ -38,6 +38,8 @@ use dashmap::mapref::entry::Entry::{Occupied, Vacant}; use dashmap::DashMap; use once_cell::sync::OnceCell; +#[cfg(feature = "cluster")] +use crate::ActorId; use crate::{ActorCell, ActorName}; #[cfg(test)] @@ -51,11 +53,17 @@ pub enum ActorRegistryErr { /// The name'd actor registry static ACTOR_REGISTRY: OnceCell>> = OnceCell::new(); +#[cfg(feature = "cluster")] +static PID_REGISTRY: OnceCell>> = OnceCell::new(); /// Retrieve the named actor registry handle fn get_actor_registry<'a>() -> &'a Arc> { ACTOR_REGISTRY.get_or_init(|| Arc::new(DashMap::new())) } +#[cfg(feature = "cluster")] +fn get_pid_registry<'a>() -> &'a Arc> { + PID_REGISTRY.get_or_init(|| Arc::new(DashMap::new())) +} /// Put an actor into the registry pub(crate) fn register(name: ActorName, actor: ActorCell) -> Result<(), ActorRegistryErr> { @@ -67,6 +75,12 @@ pub(crate) fn register(name: ActorName, actor: ActorCell) -> Result<(), ActorReg } } } +#[cfg(feature = "cluster")] +pub(crate) fn register_pid(id: ActorId, actor: ActorCell) { + if id.is_local() { + get_pid_registry().insert(id, actor); + } +} /// Remove an actor from the registry given it's actor name pub(crate) fn unregister(name: ActorName) { @@ -74,6 +88,12 @@ pub(crate) fn unregister(name: ActorName) { let _ = reg.remove(&name); } } +#[cfg(feature = "cluster")] +pub(crate) fn unregister_pid(id: ActorId) { + if id.is_local() { + let _ = get_pid_registry().remove(&id); + } +} /// Try and retrieve an actor from the registry /// @@ -94,3 +114,17 @@ pub fn registered() -> Vec { let reg = get_actor_registry(); reg.iter().map(|kvp| *kvp.key()).collect::>() } + +/// Retrieve an actor from the global registery of all local actors +/// +/// * `id` - The **local** id of the actor to retrieve +/// +/// Returns [Some(_)] if the actor exists locally, [None] otherwise +#[cfg(feature = "cluster")] +pub fn get_pid(id: ActorId) -> Option { + if id.is_local() { + get_pid_registry().get(&id).map(|v| v.value().clone()) + } else { + None + } +} diff --git a/ractor/src/remote/mod.rs b/ractor/src/remote/mod.rs new file mode 100644 index 00000000..1c5147d7 --- /dev/null +++ b/ractor/src/remote/mod.rs @@ -0,0 +1,35 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! Remote actor support + +use std::any::Any; + +use crate::{ + actor::messages::{MessageChannelTrait, LocalMessage, BoxedDowncastErr}, + ActorId, +}; + +// Automatically implement the [RemotableMessage] trait for +// prost (protobuf) encoded messages +impl Message for T { + fn serializable() -> bool { + true + } + + fn serialize(&self) -> Vec { + let length = self.encoded_len(); + let mut buf = Vec::with_capacity(length); + self.encode(&mut buf) + .expect("Failed to encode message to binary bytes"); + buf + } + + fn deserialize(buf: Vec) -> Result { + let bytes = bytes::Bytes::from(buf); + T::decode(bytes) + .map_err(|prost_err| format!("Failed to decode message with error '{}'", prost_err)) + } +} diff --git a/ractor/src/rpc/mod.rs b/ractor/src/rpc/mod.rs index f408165f..2a9f43de 100644 --- a/ractor/src/rpc/mod.rs +++ b/ractor/src/rpc/mod.rs @@ -53,7 +53,11 @@ where TMsgBuilder: FnOnce(RpcReplyPort) -> TActor::Msg, { let (tx, rx) = concurrency::oneshot(); - actor.send_message::(msg_builder(tx.into()))?; + let port: RpcReplyPort = match timeout_option { + Some(duration) => (tx, duration).into(), + None => tx.into(), + }; + actor.send_message::(msg_builder(port))?; // wait for the reply Ok(if let Some(duration) = timeout_option { @@ -95,7 +99,11 @@ where // send to all actors for actor in actors { let (tx, rx) = concurrency::oneshot(); - actor.send_message::(msg_builder(tx.into()))?; + let port: RpcReplyPort = match timeout_option { + Some(duration) => (tx, duration).into(), + None => tx.into(), + }; + actor.send_message::(msg_builder(port))?; rx_ports.push(rx); } @@ -168,7 +176,11 @@ where FwdMapFn: FnOnce(TReply) -> TForwardActor::Msg + Send + 'static, { let (tx, rx) = concurrency::oneshot(); - actor.send_message::(msg_builder(tx.into()))?; + let port: RpcReplyPort = match timeout_option { + Some(duration) => (tx, duration).into(), + None => tx.into(), + }; + actor.send_message::(msg_builder(port))?; // wait for the reply Ok(crate::concurrency::spawn(async move { diff --git a/ractor/src/time/tests.rs b/ractor/src/time/tests.rs index c84b163a..9cd24c15 100644 --- a/ractor/src/time/tests.rs +++ b/ractor/src/time/tests.rs @@ -55,7 +55,7 @@ async fn test_intervals() { // therefore the counter should be empty assert_eq!(0, counter.load(Ordering::Relaxed)); - crate::concurrency::sleep(Duration::from_millis(120)).await; + crate::concurrency::sleep(Duration::from_millis(150)).await; // kill the actor actor_ref.stop(None);