Skip to content

Commit

Permalink
Initial sketchings of how distributed node's might look, based heavil…
Browse files Browse the repository at this point in the history
…y on the Erlang

protocol.

This is a collection of tcp managing actors and session management for automated session
handling

Related issue: #16
  • Loading branch information
slawlor committed Jan 23, 2023
1 parent 59e6ff8 commit 3a4ab27
Show file tree
Hide file tree
Showing 34 changed files with 2,707 additions and 132 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ jobs:
- name: Run the default tests
package: ractor
# flags:
- name: Test ractor with the `cluster` feature
package: ractor
flags: -F cluster
- name: Test ractor-cluster
package: ractor-cluster
# flags:

steps:
- uses: actions/checkout@main
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

members = [
"ractor",
"ractor-cluster",
"ractor-playground",
"xtask"
]
38 changes: 38 additions & 0 deletions ractor-cluster/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
[package]
name = "ractor-cluster"
version = "0.4.0"
authors = ["Sean Lawlor", "Evan Au", "Dillon George"]
description = "Distributed cluster environment of Ractor actors"
documentation = "https://docs.rs/ractor"
license = "MIT"
edition = "2018"
keywords = ["actor", "ractor", "cluster"]
repository = "https://github.com/slawlor/ractor"
readme = "../README.md"
homepage = "https://github.com/slawlor/ractor"
categories = ["actor", "erlang"]
build = "src/build.rs"

[build-dependencies]
protobuf-src = "1"
prost-build = { version = "0.11" }

[dependencies]
## Required dependencies
async-trait = "0.1"
bytes = { version = "1" }
# dashmap = "5"
# futures = "0.3"
log = "0.4"
# once_cell = "1"
prost = { version = "0.11" }
ractor = { version = "0.4", features = ["cluster"], path = "../ractor" }
rand = "0.8"
sha2 = "0.10"
tokio = { version = "1", features = ["rt", "time", "sync", "macros", "net", "io-util"]}

## Optional dependencies
# tokio-rustls = { version = "0.23", optional = true }

[dev-dependencies]
tokio = { version = "1", features = ["rt", "time", "sync", "macros", "rt-multi-thread"] }
31 changes: 31 additions & 0 deletions ractor-cluster/src/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright (c) Sean Lawlor
//
// This source code is licensed under both the MIT license found in the
// LICENSE-MIT file in the root directory of this source tree.

//! This is the pre-compilation build script for the crate `ractor` when running in distrubted
//! mode. It's used to compile protobuf into Rust code prior to compilation.

/// The shared-path for all protobuf specifications
const PROTOBUF_BASE_DIRECTORY: &str = "src/protocol";
/// The list of protobuf files to generate inside PROBUF_BASE_DIRECTORY
const PROTOBUF_FILES: [&str; 3] = ["meta", "node", "auth"];

fn build_protobufs() {
std::env::set_var("PROTOC", protobuf_src::protoc());

let mut protobuf_files = Vec::with_capacity(PROTOBUF_FILES.len());

for file in PROTOBUF_FILES.iter() {
let proto_file = format!("{}/{}.proto", PROTOBUF_BASE_DIRECTORY, file);
println!("cargo:rerun-if-changed={}", proto_file);
protobuf_files.push(proto_file);
}

prost_build::compile_protos(&protobuf_files, &[PROTOBUF_BASE_DIRECTORY]).unwrap();
}

fn main() {
// compile the spec files into Rust code
build_protobufs();
}
25 changes: 25 additions & 0 deletions ractor-cluster/src/hash.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright (c) Sean Lawlor
//
// This source code is licensed under both the MIT license found in the
// LICENSE-MIT file in the root directory of this source tree.

//! Hashing utilities mainly used around challenge computation

pub(crate) const DIGEST_BYTES: usize = 32;
pub(crate) type Digest = [u8; DIGEST_BYTES];

/// Compute a challenge digest
pub(crate) fn challenge_digest(secret: &'_ str, challenge: u32) -> Digest {
use sha2::Digest;

let secret_bytes = secret.as_bytes();
let mut data = Vec::with_capacity(secret_bytes.len() + 4);

let challenge_bytes = challenge.to_be_bytes();
data.copy_from_slice(&challenge_bytes);
data[4..].copy_from_slice(secret_bytes);

let hash = sha2::Sha256::digest(&data);

hash.into()
}
43 changes: 23 additions & 20 deletions ractor/src/distributed/mod.rs → ractor-cluster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,36 @@
// LICENSE-MIT file in the root directory of this source tree.

//! Support for remote nodes in a distributed cluster.
//!
//!
//! A node is the same as [Erlang's definition](https://www.erlang.org/doc/reference_manual/distributed.html)
//! for distributed Erlang, in that it's a remote "hosting" process in the distributed pool of processes.
//!
//!
//! In this realization, nodes are simply actors which handle an external connection to the other nodes in the pool.
//! When nodes connect, they identify all of the nodes the remote node is also connected to and additionally connect
//! to them as well. They merge registries and pg groups together in order to create larger clusters of services.
//!
//! For messages to be transmittable across the [Node] boundaries to other [Node]s in the pool, they need to be
//! serializable to a binary format (say protobuf)
//!
//! We have chosen protobuf for our inter-node defined protocol, however you can chose whatever medium you like
//! for binary serialization + deserialization. The "remote" actor will simply encode your message type and send it
//! over the wire for you

use dashmap::DashMap;
// #![deny(warnings)]
#![warn(unused_imports)]
#![warn(unsafe_code)]
#![warn(missing_docs)]
#![warn(unused_crate_dependencies)]
#![cfg_attr(docsrs, feature(doc_cfg))]
#![feature(min_specialization)]

/// Represents messages that can cross the node boundary which can be serialized and sent over the wire
pub trait NodeSerializableMessage {
/// Serialize the message to binary
fn serialize(&self) -> &[u8];
mod hash;
mod net;
pub mod node;
pub(crate) mod protocol;
pub(crate) mod remote_actor;

/// Deserialize from binary back into the message type
fn deserialize(&self, data: &[u8]) -> Self;
}
pub mod macros;

/// The identifier of a node is a globally unique u64
pub type NodeId = u64;
// Re-exports
pub use node::NodeServer;

/// A node in the distributed compute pool.
pub struct Node {
node_id: u64,
other_nodes: DashMap<u64, String>,
}
/// Node's are representing by an integer id
pub type NodeId = u64;
150 changes: 150 additions & 0 deletions ractor-cluster/src/macros.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// Copyright (c) Sean Lawlor
//
// This source code is licensed under both the MIT license found in the
// LICENSE-MIT file in the root directory of this source tree.

//! Macro helpers for remote actors

/// `serialized_rpc_forward!` converts a traditional RPC port to a port which recieves a serialized
/// [Vec<_>] message and can rebuild the reply. This is necessary for RPCs which can occur over the network.
///
/// However when defining the serialized logic, the cost will ONLY be incurred for actors which live
/// on another `node()`, never locally. Local actors will always use the local [ractor::message::BoxedMessage]
/// notation.
///
/// An example usage is
/// ```rust
/// #![feature(min_specialization)]
/// use ractor::concurrency::Duration;
/// use ractor::{RpcReplyPort, Message};
/// use ractor::message::SerializedMessage;
/// use ractor_cluster::serialized_rpc_forward;
///
/// enum MessageType {
/// #[allow(unused)]
/// Cast(String),
/// #[allow(unused)]
/// Call(String, RpcReplyPort<String>),
/// }
///
/// impl Message for MessageType {
/// fn serializable() -> bool {
/// true
/// }
///
/// fn serialize(self) -> SerializedMessage {
/// match self {
/// Self::Cast(args) => SerializedMessage::Cast(args.into_bytes()),
/// Self::Call(args, reply) => {
/// let tx = serialized_rpc_forward!(reply, |bytes| String::from_utf8(bytes).unwrap());
/// SerializedMessage::Call(args.into_bytes(), tx.into())
/// }
/// }
/// }
/// }
/// ```
#[macro_export]
macro_rules! serialized_rpc_forward {
($typed_port:expr, $converter:expr) => {{
let (tx, rx) = ractor::concurrency::oneshot();
ractor::concurrency::spawn(async move {
match $typed_port.get_timeout() {
Some(timeout) => {
if let Ok(Ok(result)) = ractor::concurrency::timeout(timeout, rx).await {
let _ = $typed_port.send($converter(result));
}
}
None => {
if let Ok(result) = rx.await {
let _ = $typed_port.send($converter(result));
}
}
}
});
tx
}};
}

#[cfg(test)]
mod tests {
use ractor::concurrency::Duration;
use ractor::message::SerializedMessage;
use ractor::{Message, RpcReplyPort};

enum MessageType {
#[allow(unused)]
Cast(String),
#[allow(unused)]
Call(String, RpcReplyPort<String>),
}

impl Message for MessageType {
fn serializable() -> bool {
true
}

fn serialize(self) -> SerializedMessage {
match self {
Self::Cast(args) => SerializedMessage::Cast(args.into_bytes()),
Self::Call(args, reply) => {
let tx =
serialized_rpc_forward!(reply, |bytes| String::from_utf8(bytes).unwrap());
SerializedMessage::Call(args.into_bytes(), tx.into())
}
}
}
}

#[tokio::test]
async fn no_timeout_rpc() {
let (tx, rx) = ractor::concurrency::oneshot();
let no_timeout = MessageType::Call("test".to_string(), tx.into());
let no_timeout_serialized = no_timeout.serialize();
match no_timeout_serialized {
SerializedMessage::Call(args, reply) => {
let _ = reply.send(args);
}
_ => panic!("Invalid"),
}

let no_timeout_reply = rx.await.expect("Receive error");
assert_eq!(no_timeout_reply, "test".to_string());
}

#[tokio::test]
async fn with_timeout_rpc() {
let (tx, rx) = ractor::concurrency::oneshot();
let duration = Duration::from_millis(10);
let with_timeout = MessageType::Call("test".to_string(), (tx, duration).into());

let with_timeout_serialized = with_timeout.serialize();
match with_timeout_serialized {
SerializedMessage::Call(args, reply) => {
let _ = reply.send(args);
}
_ => panic!("Invalid"),
}

let with_timeout_reply = rx.await.expect("Receive error");
assert_eq!(with_timeout_reply, "test".to_string());
}

#[tokio::test]
async fn timeout_rpc() {
let (tx, rx) = ractor::concurrency::oneshot();
let duration = Duration::from_millis(10);
let with_timeout = MessageType::Call("test".to_string(), (tx, duration).into());

let with_timeout_serialized = with_timeout.serialize();
match with_timeout_serialized {
SerializedMessage::Call(args, reply) => {
ractor::concurrency::sleep(Duration::from_millis(50)).await;
let _ = reply.send(args);
}
_ => panic!("Invalid"),
}

let result = rx.await;
assert!(matches!(result, Err(_)));
}
}
Loading

0 comments on commit 3a4ab27

Please sign in to comment.