Skip to content

Commit

Permalink
* log timeouts
Browse files Browse the repository at this point in the history
* make examples folder
* more readme docs
  • Loading branch information
liamzebedee committed Sep 17, 2024
1 parent 5e80513 commit dc48228
Show file tree
Hide file tree
Showing 15 changed files with 286 additions and 235 deletions.
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[lib]
name = "tendermint"
path = "src/lib.rs"

[dependencies]
tokio = { version = "1", features = ["full", "sync"] }
Expand Down
10 changes: 8 additions & 2 deletions PLAN.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,16 @@ Features:
- [x] add pubkey identities for nodes. add signatures to node messages.
- [ ] fix consensus height + stuff. commit data to log on disk.
- [ ] implement dynamic timeouts to allow network to resolve with backoff.
- [ ] change node to start up on a network interface and listen to messages.
- [x] change node to start up on a network interface and listen to messages.
- [ ] add node sync so it restarts and gets history from other nodes for height before it.
- [ ] check precommits/prevotes are unique.

Demo network:

- [ ] startup node.
- [ ] connect to server listing all validators and their ip's.
- [ ] try listen for other nodes.

Ideas:
- [ ]
- [ ] motomint - tendermint but the proposer set is dictated by POW. Basically

8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ Dependencies:

A consensus protocol consists of a set of processes, which communicate by sending messages to each other in order to agree on a value. Processes may crash, run at arbitrary speeds, and display byzantine failures. The challenge of consensus is building a protocol which can finalise and does so safely and consistently given these assumptions.

Tendermint-rs is a barebones implementation of Tendermint consensus.
The basic Tendermint algorithm is implemented as `Process`. Each `Process` communicates via abstract channels - there is an implementation using just local communication (`examples/standalone-channels`), and an implementation using RPC over HTTP servers (`examples/standalone-http`). Processes emit consensus events via tokio async streams - consumers can subscribe to the process and receive callbacks for new values agreed on by the network (called "decisions"). Each node has an ECDSA keypair it uses to sign messages.


## Status.

Protocol runs and achieves consensus, with rounds, epochs.
Protocol runs and achieves consensus, with rounds, epochs.

See [PLAN](./PLAN.md) for the backlog.

Expand All @@ -29,6 +30,9 @@ See [PLAN](./PLAN.md) for the backlog.

```sh
cargo build

cargo run --example standalone-channels
cargo run --example standalone-http
```

### Using it.
Expand Down
60 changes: 60 additions & 0 deletions examples/demo-network/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use std::collections::VecDeque;
use tokio::sync::{mpsc, Mutex};
use std::{
time::{SystemTime, UNIX_EPOCH},
};

use tokio_stream::StreamExt;
use tendermint::crypto::ECDSAKeypair;
use tendermint::messages::SignedMessage;
use tendermint::process::Process;
use tendermint::rpc_server::Server;

#[tokio::main]
async fn main() {
// Network configuration:
// - peers: (pubkey,address)[]

// Setup RPC server.
// Setup RPC client for each peer.
// Setup process.
// Run process.

// let peers = [
// ("http://localhost:3001"),
// ];

let keypair = ECDSAKeypair::new();

let mut peer_senders = Vec::new();

let api_server = Server::<SignedMessage>::new(3030);
let receiver = api_server.get_receiver();
tokio::spawn(async move {
api_server.run().await;
});
// tokio::spawn(async move {
// client.start().await;
// });

let get_value = || SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs().to_string();

// Define proposer sequence (round-robin)
let proposer_sequence: Vec<usize> = (0..4).collect();
let mut process =
Process::new(0, keypair, receiver, peer_senders, proposer_sequence.clone(), get_value);

// Listen to events from node0.
let mut subscriber1 = process.subscribe();
tokio::spawn(async move {
while let Some(event) = subscriber1.next().await {
println!("Subscriber 1 received: {:?}", event);
}
});

tokio::spawn(async move {
process.run_epoch(None).await;
}).await.unwrap();

println!("Consensus reached.");
}
83 changes: 83 additions & 0 deletions examples/standalone-channels/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
use std::collections::VecDeque;

use std::{
sync::Arc,
time::{SystemTime, UNIX_EPOCH},
};
use tokio::sync::{mpsc, Mutex};
use tokio_stream::StreamExt;
use tendermint::crypto::ECDSAKeypair;
use tendermint::params::*;
use tendermint::process::*;


async fn setup_pure_sendreceive() {
// Create channels for each node
let mut senders = Vec::new();
let mut receivers = VecDeque::new();

let get_value = || SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs().to_string();

// Separate the creation of senders and receivers
for _ in 0..NODES {
let (tx, rx) = mpsc::channel(100);
senders.push(tx);
receivers.push_back(rx);
}

// Define proposer sequence (round-robin)
let proposer_sequence: Vec<usize> = (0..NODES).collect();

// Initialize nodes
let mut nodes = Vec::new();
for i in 0..NODES {
let mut node_senders = Vec::new();
for j in 0..NODES {
if i != j {
node_senders.push(senders[j].clone());
}
}
let keypair = ECDSAKeypair::new();
let receiver = receivers.pop_front().unwrap();
let node = Process::new(
i,
keypair,
Arc::new(Mutex::new(receiver)),
node_senders,
proposer_sequence.clone(),
get_value,
);
nodes.push(node);
}

// Listen to events from node0.
let mut subscriber1 = nodes[0].subscribe();
tokio::spawn(async move {
while let Some(event) = subscriber1.next().await {
println!("Subscriber 1 received: {:?}", event);
}
});

// Run all nodes
let handles: Vec<_> = nodes
.into_iter()
.map(|mut node| {
tokio::spawn(async move {
node.run_epoch(None).await;
})
})
.collect();

// Wait for all nodes to finish
for handle in handles {
let _ = handle.await;
}

println!("Consensus reached.");
}


#[tokio::main]
async fn main() {
setup_pure_sendreceive().await;
}
89 changes: 89 additions & 0 deletions examples/standalone-http/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use std::collections::VecDeque;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio_stream::StreamExt;
use tendermint::crypto::ECDSAKeypair;
use tendermint::params::*;
use tendermint::process::*;
use tendermint::rpc_client::RpcClient;
use tendermint::rpc_server::Server;
use tendermint::messages::SignedMessage;

async fn setup_api_servers() {
// Create channels for each node
let mut senders = Vec::new();
let mut receivers = VecDeque::new();

let get_value = || SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs().to_string();

// Setup node API servers.
for i in 0..NODES {
let server = Server::<SignedMessage>::new(3030 + i as u16);
receivers.push_back(server.get_receiver());
let client = RpcClient::<SignedMessage>::new(
100,
format!("http://localhost:{}/inbox/", server.port),
);
senders.push(client.get_sender());

tokio::spawn(async move {
server.run().await;
});
tokio::spawn(async move {
client.start().await;
});
}

// Define proposer sequence (round-robin)
let proposer_sequence: Vec<usize> = (0..NODES).collect();

// Initialize nodes
let mut nodes = Vec::new();
for i in 0..NODES {
let mut node_senders = Vec::new();
for j in 0..NODES {
if i != j {
node_senders.push(senders[j].clone());
}
}

let keypair = ECDSAKeypair::new();
let receiver = receivers.pop_front().unwrap();
let node =
Process::new(i, keypair, receiver, node_senders, proposer_sequence.clone(), get_value);
nodes.push(node);
}

// Listen to events from node0.
let mut subscriber1 = nodes[0].subscribe();
tokio::spawn(async move {
while let Some(event) = subscriber1.next().await {
println!("Subscriber 1 received: {:?}", event);
}
});

// Run all nodes
let handles: Vec<_> = nodes
.into_iter()
.map(|mut node| {
tokio::spawn(async move {
node.run_epoch(None).await;
})
})
.collect();

// Wait for all nodes to finish
for handle in handles {
let _ = handle.await.unwrap();
}

println!("Consensus reached.");
}

#[tokio::main]
async fn main() {
println!("Main");
setup_api_servers().await;

// just wait for sigkill
tokio::signal::ctrl_c().await.unwrap();
}
2 changes: 0 additions & 2 deletions examples/standalone.rs

This file was deleted.

1 change: 0 additions & 1 deletion src/algos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use tokio::time::Duration;
/// Gets the proposer for a round.
pub fn get_proposer_for_round(round: u8, proposer_sequence: &[usize]) -> usize {
proposer_sequence[(round - 1) as usize % proposer_sequence.len()]

/*
// Tendermint/CometBFT consensus WIP.
Expand Down
17 changes: 7 additions & 10 deletions src/crypto.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use rand::rngs::OsRng;
use secp256k1::{ecdsa::SerializedSignature, Message, Secp256k1, SecretKey};
use sha3::{Digest, Keccak256};
// use core::slice::SlicePattern;
// use std::io::Read;
use std::str::FromStr;
use std::{
fmt,
fmt::{Display, Formatter},
};
use serde::{Deserialize, Deserializer, Serialize, Serializer};


#[derive(Clone, Debug, Copy)]
pub struct Signature(secp256k1::ecdsa::SerializedSignature);
Expand All @@ -13,6 +17,7 @@ pub struct PublicKey(secp256k1::PublicKey);

pub type Keypair = ECDSAKeypair;

#[derive(Debug)]
pub struct ECDSAKeypair {
secret_key: SecretKey,
public_key: PublicKey,
Expand Down Expand Up @@ -71,10 +76,7 @@ pub fn verify_signature(
secp.verify_ecdsa(&message, &signature.to_signature().unwrap(), &public_key.0).is_ok()
}

use serde::{Deserialize, Deserializer, Serialize, Serializer};

// PublicKey.

// Deserialize.
impl<'de> Deserialize<'de> for PublicKey {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
Expand Down Expand Up @@ -156,11 +158,6 @@ impl Signature {
}
}

use std::{
fmt,
fmt::{Display, Formatter},
};

impl Display for PublicKey {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(f, "{}", self.0)
Expand Down
30 changes: 1 addition & 29 deletions src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,32 +38,4 @@ where
pub fn publish(&self, event: T) {
let _ = self.sender.send(event); // Ignore the error for simplicity
}
}

// #[tokio::main]
// async fn main() {
// let event_system = EventSystem::new();

// // Subscriber 1
// let mut subscriber1 = event_system.subscribe();
// tokio::spawn(async move {
// while let Some(event) = subscriber1.next().await {
// println!("Subscriber 1 received: {}", event);
// }
// });

// // Subscriber 2
// let mut subscriber2 = event_system.subscribe();
// tokio::spawn(async move {
// while let Some(event) = subscriber2.next().await {
// println!("Subscriber 2 received: {}", event);
// }
// });

// // Publish events
// event_system.publish("Event 1".to_string());
// event_system.publish("Event 2".to_string());

// // Allow some time for the events to be processed
// tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
// }
}
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ pub mod events;
pub mod messages;
pub mod params;
pub mod process;
mod rpc_client;
mod rpc_server;
pub mod rpc_client;
pub mod rpc_server;

#[cfg(test)]
mod tests {
Expand Down
Loading

0 comments on commit dc48228

Please sign in to comment.