I will draw a picture of a crab someday
This is a Rust implementation of a distributed messaging system. It uses a simple design inspired by Apache Kafka. It simply records messages to local files.
Current Pilgrimage supports At-least-once.
When using Pilgramage as a Crate, client authentication is implemented, but at present, authentication is not implemented for message sending and receiving from the CLI and web client. You can find a sample of authentication with Crate examples/auth-example.rs
, examples/auth-send-recv.rs
.
- Topic-based pub/sub model
- Scalability through partitioning
- Persistent messages (log file based)
- Leader/Follower Replication
- Fault Detection and Automatic Recovery
- Delivery guaranteed by acknowledgement (ACK)
- Fully implemented leader selection mechanism
- Partition Replication
- Persistent messages
- Schema Registry for managing message schemas and ensuring compatibility
- Automatic Scaling
- Broker Clustering
- Message processing in parallel
- Authentication and Authorization Mechanisms
- Data Encryption
- CLI based console
- WEB based console
- Rust 1.51.0 or later
- Message Queue: Efficient message queue implementation using
Mutex
andVecDeque
. - Broker: Core broker functionality including message handling, node management, and leader election.
- Consumer Groups: Support for consumer groups to allow multiple consumers to read from the same topic.
- Leader Election: Mechanism for electing a leader among brokers to manage partitions and replication.
- Storage: Persistent storage of messages using local files.
- Replication: Replication of messages across multiple brokers for fault tolerance.
- Schema Registry: Management of message schemas to ensure compatibility between producers and consumers.
- Benchmarking: Comprehensive benchmarking tests to measure performance of various components.
- Automatic Scaling: Automatically scale the number of instances based on load.
- Log Compressions: Compress and optimize logs.
use pilgrimage::broker::{Broker, Node};
use std::sync::{Arc, Mutex};
fn main() {
// Broker Creation
let broker = Broker::new("broker1", 3, 2, "storage_path");
// Adding a node
let node = Node {
data: Arc::new(Mutex::new(Vec::new())),
};
broker.add_node("node1".to_string(), node);
// Send a message
broker.send_message("Hello, world!".to_string());
// Message received
if let Some(message) = broker.receive_message() {
println!("Received: {}", message);
}
}
use pilgrimage::broker::Broker;
use std::sync::Arc;
use std::thread;
fn main() {
let broker = Arc::new(Broker::new("broker1", 3, 2, "storage_path"));
let broker_sender = Arc::clone(&broker);
let sender_handle = thread::spawn(move || {
for i in 0..10 {
let message = format!("Message {}", i);
broker_sender.send_message(message);
}
});
let broker_receiver = Arc::clone(&broker);
let receiver_handle = thread::spawn(move || {
for _ in 0..10 {
if let Some(message) = broker_receiver.receive_message() {
println!("Received: {:?}", message);
}
}
});
sender_handle.join().unwrap();
receiver_handle.join().unwrap();
}
The system includes mechanisms for fault detection and automatic recovery. Nodes are monitored using heartbeat signals, and if a fault is detected, the system will attempt to recover automatically.
use pilgrimage::Broker;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
fn main() {
let storage = Arc::new(Mutex::new(Storage::new("test_db_path").unwrap()));
let mut broker = Broker::new("broker_id", 1, 1, "test_db_path");
broker.storage = storage.clone();
// Simulating a disability
{
let mut storage_guard = storage.lock().unwrap();
storage_guard.available = false;
}
// Simulating a disability
broker.monitor_nodes();
// Simulating a disability
thread::sleep(Duration::from_millis(100));
let storage_guard = storage.lock().unwrap();
assert!(storage_guard.is_available());
}
- Simple message sending and receiving
- Sending and receiving multiple messages
- Sending and receiving messages in multiple threads
- Authentication processing example
- Sending and receiving messages as an authenticated user
To execute a basic example, use the following command:
cargo run --example simple-send-recv
cargo run --example mulch-send-recv
cargo run --example thread-send-recv
cargo run --example auth-example
cargo run --example auth-send-recv
If the allocated memory is small, it may fail.
cargo bench
Pilgrimage offers a comprehensive Command-Line Interface (CLI) to manage and interact with your message brokers efficiently. Below are the available commands along with their descriptions and usage examples.
Description: Starts the broker with the specified configurations.
Usage:
pilgrimage start --id <BROKER_ID> --partitions <NUMBER_OF_PARTITIONS> --replication <REPLICATION_FACTOR> --storage <STORAGE_PATH> [--test-mode]
Options:
--id
,-i
(required): Sets the broker ID.--partitions
,-p
(required): Sets the number of partitions.--replication
,-r
(required): Sets the replication factor.--storage
,-s
(required): Sets the storage path.--test-mode
: Runs the broker in test mode, which breaks out of the main loop quickly for testing purposes.
Example:
pilgrimage start --id broker1 --partitions 3 --replication 2 --storage /data/broker1 --test-mode
Description: Stops the specified broker.
Usage
pilgrimage stop --id <BROKER_ID>
Options:
--id
,-i
(required): Sets the broker ID.
Example
pilgrimage stop --id broker1
Description:
Sends a message to the specified broker.
Usage
pilgrimage send <BROKER_ID> <MESSAGE>
Arguments:
<BROKER_ID>
(required): The ID of the broker to send the message to.<MESSAGE>
(required): The message content to send.
Example
pilgrimage send broker1 "Hello, World!"
Description:
Consumes messages from the specified broker.
Usage
pilgrimage consume <BROKER_ID>
Arguments:
<BROKER_ID>
(required): The ID of the broker to consume messages from.
Example:
pilgrimage consume broker1
Description:
Checks the status of the specified broker.
Usage:
pilgrimage status --id <BROKER_ID>
Options:
--id
,-i
(required): Sets the broker ID.
Example:
pilgrimage status --id broker1
- Help Command:
To view all available commands and options, use the
help
command:
pilgrimage help
- Version Information: To check the current version of Pilgrimage, use:
pilgrimage --version
To start the web server:
cargo run --bin pilgrimage
Pilgrimage provides a REST API for managing brokers through HTTP requests. The server runs on http://localhost:8080
by default.
Starts a new broker instance.
Endpoint: POST /start
Request:
{
"id": "broker1",
"partitions": 3,
"replication": 2,
"storage": "/tmp/broker1"
}
Example:
curl -X POST http://localhost:8080/start \
-H "Content-Type: application/json" \
-d '{
"id": "broker1",
"partitions": 3,
"replication": 2,
"storage": "/tmp/broker1"
}'
Stops a running broker instance.
Endpoint: POST /stop
Request:
{
"id": "broker1"
}
Example:
curl -X POST http://localhost:8080/stop \
-H "Content-Type: application/json" \
-d '{
"id": "broker1"
}'
Sends a message to the broker.
Endpoint: POST /send
Request:
{
"id": "broker1",
"message": "Hello, World!"
}
Example:
curl -X POST http://localhost:8080/send \
-H "Content-Type: application/json" \
-d '{
"id": "broker1",
"message": "Hello, World!"
}'
Consumes messages from the broker.
Endpoint: POST /consume
Request:
{
"id": "broker1"
}
Example:
curl -X POST http://localhost:8080/consume \
-H "Content-Type: application/json" \
-d '{
"id": "broker1"
}'
Checks the status of the broker.
Endpoint: POST /status
Request:
{
"id": "broker1"
}
Example:
curl -X POST http://localhost:8080/status \
-H "Content-Type: application/json" \
-d '{
"id": "broker1"
}'
To start the web server:
cargo run --bin web
The server will be available at http://localhost:8080
.
- The commit message is parsed and the version of either major, minor or patch is incremented.
- The version of Cargo.toml is updated.
- The updated Cargo.toml is committed and a new tag is created.
- The changes and tag are pushed to the remote repository.
The version is automatically incremented based on the commit message. Here, we treat feat
as minor, fix
as patch, and BREAKING CHANGE
as major.
MIT