Skip to content

Commit

Permalink
network prototype
Browse files Browse the repository at this point in the history
  • Loading branch information
govereau committed Nov 29, 2023
1 parent 1041e05 commit 343d2f2
Show file tree
Hide file tree
Showing 14 changed files with 1,109 additions and 0 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ members = [
"riscv-circuit",
"tools",
"prover",
"network",
"msnova",
]
default-members = [
"riscv",
"riscv-circuit",
"tools",
"prover",
"network",
]

[workspace.package]
Expand Down
43 changes: 43 additions & 0 deletions network/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
[package]
name = "nexus-network"
edition.workspace = true
version.workspace = true
publish = false
default-run = "pcdnode"

[dependencies]
clap.workspace = true
tokio = { version = "1", features = ["full"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3.18", features = [ "env-filter" ] }

async-channel = "1.9.0"
hyper = { version = "0.14", features = ["full"] }
fastwebsockets = { version = "0.4", features = ["upgrade"] }

reqwest = { version = "0.11.22", default-features = false, features = [ "blocking", "json" ] }

snmalloc-rs = { version = "0.3.4", optional = true }

serde.workspace = true
serde_json = "1.0"
postcard = { version = "1.0", features = ["use-std"] }
lz4 = "1.24"

sha2 = "0.10.8"

ark-ff.workspace = true
ark-ec.workspace = true
ark-serialize.workspace = true

nexus-riscv = { path = "../riscv" }
nexus-riscv-circuit = { path = "../riscv-circuit" }
nexus-prover = { path = "../prover" }
hex = "0.4.3"

[dev-dependencies]
nexus-riscv = { path = "../riscv" }

[features]
default = [ "snmalloc" ]
snmalloc = [ "snmalloc-rs" ]
40 changes: 40 additions & 0 deletions network/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
This is a prototype for the PCD network.
To run the code, follow these steps:

Generate the public parameters for parallel nova using the prover crate:

```
prover> cargo run -r -- gen -P
Generating public parameters to nexus-public.zst...
```

This will generate the file `nexus-public.zst`, which you can move into the
network directory for convenience.

Run the initial pcdnode: this node will use the default port of 8080:

```
network> cargo run -r -- -w
```

In separate terminals you can run a number of PCD nodes:

```
network> cargo run -r -- -l 127.0.0.1:0 -p
```

Or MSM nodes:

```
network> cargo run -r -- -l 127.0.0.1:0 -m
```

Once running you can use the basic client program to submit
a program and query its status. Note, the debug version will
connect to localhost, and the release version will try to
connect to the public PCD coordinator running in the cloud.

```
network> cargo run --bin client -- -p elf_file
network> cargo run --bin client -- -q proof_hash
```
38 changes: 38 additions & 0 deletions network/src/api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use serde::{Serialize, Deserialize};

#[derive(Serialize, Deserialize)]
pub enum NexusAPI {
Program { account: String, elf: Vec<u8> },
Query { hash: String },
Proof(Proof),
Error(String),
}
pub use NexusAPI::*;

#[derive(Default, Clone, Serialize, Deserialize)]
pub struct Proof {
pub hash: String,
pub total_nodes: u32,
pub complete_nodes: u32,
pub proof: Option<Vec<u8>>,
}

impl std::fmt::Display for Proof {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
"{} {} {} ",
self.hash, self.total_nodes, self.complete_nodes
)?;
match self.proof {
None => writeln!(f, "incomplete")?,
Some(ref p) => {
for x in p.iter().take(10) {
write!(f, "{:x} ", x)?;
}
writeln!(f)?;
}
}
Ok(())
}
}
96 changes: 96 additions & 0 deletions network/src/bin.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use std::future::Future;
use std::net::SocketAddr;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

use hyper::{
header::{CONNECTION, UPGRADE},
upgrade::Upgraded,
Body, Client, Request, Response, StatusCode,
};

use super::*;
use super::pcd::*;

const MAX_SIZE: u32 = 40 * 1024 * 1024;

pub async fn read_msg(upgraded: &mut Upgraded) -> Result<NexusMsg> {
let size = upgraded.read_u32().await?;
if size > MAX_SIZE {
println!("read SIZE is {size}");
return Err("bad size".into());
}
let mut b = vec![0; size as usize];
upgraded.read_exact(&mut b).await?;
let t = decode_lz4(&b)?;
Ok(t)
}

pub async fn write_msg(upgraded: &mut Upgraded, msg: &NexusMsg) -> Result<()> {
let v = encode_lz4(msg)?;
let size = v.len() as u32;
if size > MAX_SIZE {
println!("write SIZE is {size}");
return Err("bad size".into());
}
upgraded.write_u32(size).await?;
upgraded.write_all(&v).await?;
Ok(())
}

pub fn upgrade<S, F>(
state: S,
req: Request<Body>,
f: fn(S, Upgraded) -> F,
) -> Result<Response<Body>>
where
S: Send + 'static,
F: Future<Output = Result<()>> + Send + 'static,
{
let proto = req.headers().get(UPGRADE);
let proto = proto.ok_or::<DynError>("bad header".into())?;
let proto = proto.clone();

tokio::task::spawn(async move {
match hyper::upgrade::on(req).await {
Ok(upgraded) => {
println!("new nexus connection");
if let Err(e) = f(state, upgraded).await {
eprintln!("io error: {}", e)
};
}
Err(e) => eprintln!("upgrade error: {}", e),
}
});

let res = Response::builder()
.status(StatusCode::SWITCHING_PROTOCOLS)
.header(CONNECTION, "upgrade")
.header(UPGRADE, proto)
.body(Body::empty())?;
Ok(res)
}

pub async fn client<S, F>(
state: S,
addr: SocketAddr,
path: &str,
f: fn(S, Upgraded) -> F,
) -> Result<()>
where
F: Future<Output = Result<()>> + Send + 'static,
{
let req = Request::builder()
.uri(format!("http://{}/{}", addr, path))
.header(UPGRADE, "nexus")
.body(Body::empty())?;

let res = Client::new().request(req).await?;
if res.status() != StatusCode::SWITCHING_PROTOCOLS {
return Err("server refused upgrade".into());
}

let upgraded = hyper::upgrade::on(res).await?;
println!("connected to {}/{}", addr, path);
tokio::spawn(f(state, upgraded));
Ok(())
}
26 changes: 26 additions & 0 deletions network/src/bin/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use std::path::PathBuf;
use clap::Parser;
use nexus_network::client::*;

#[derive(Debug, Parser)]
struct Opts {
#[arg(group = "g", short)]
program: Option<PathBuf>,

#[arg(group = "g", short)]
query: Option<String>,
}

fn main() {
let opts = Opts::parse();

let proof = if opts.program.is_some() {
submit_proof("account".to_string(), &opts.program.unwrap()).unwrap()
} else if opts.query.is_some() {
fetch_proof(&opts.query.unwrap()).unwrap()
} else {
panic!()
};

println!("{}", proof);
}
42 changes: 42 additions & 0 deletions network/src/bin/pcdnode/db.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use std::sync::{Arc, Mutex};
use std::collections::HashMap;

use nexus_network::api::Proof;

#[derive(Clone, Default)]
pub struct DB(Arc<Mutex<DBase>>);

#[derive(Default)]
pub struct DBase {
proofs: HashMap<String, Proof>,
}

impl DB {
pub fn new() -> Self {
DB(Arc::new(Mutex::new(DBase { proofs: HashMap::new() })))
}

pub fn new_proof(&mut self, hash: String, total: u32) {
let mut db = self.0.lock().unwrap();
let proof = db.proofs.entry(hash.clone()).or_default();
proof.hash = hash;
proof.total_nodes = total;
}

pub fn query_proof(&mut self, hash: &str) -> Option<Proof> {
let db = self.0.lock().unwrap();
db.proofs.get(hash).cloned() // TODO eliminate clone
}

pub fn update_complete(&mut self, hash: String, complete: u32) {
let mut db = self.0.lock().unwrap();
db.proofs
.entry(hash)
.and_modify(|p| p.complete_nodes += complete);
}

pub fn update_proof(&mut self, hash: String, proof: Vec<u8>) {
let mut db = self.0.lock().unwrap();
db.proofs.entry(hash).and_modify(|p| p.proof = Some(proof));
}
}
Loading

0 comments on commit 343d2f2

Please sign in to comment.