Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
Fixes for serializing entries over blobs and reorg into ledger
Browse files Browse the repository at this point in the history
  • Loading branch information
sakridge committed May 10, 2018
1 parent c9cd813 commit 81dbda7
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 107 deletions.
120 changes: 120 additions & 0 deletions src/ledger.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
//! The `ledger` module provides functions for parallel verification of the
//! Proof of History ledger.
use bincode::{deserialize, serialize_into};
use entry::{next_tick, Entry};
use event::Event;
use hash::Hash;
use rayon::prelude::*;
use std::cmp::min;
use packet::{SharedBlob, BLOB_DATA_SIZE, BLOB_SIZE};
use packet;
use std::mem::size_of;
use std::collections::VecDeque;
use std::io::Cursor;

pub trait Block {
/// Verifies the hashes and counts of a slice of events are all consistent.
Expand All @@ -30,10 +38,104 @@ pub fn next_ticks(start_hash: &Hash, num_hashes: u64, len: usize) -> Vec<Entry>
ticks
}

pub fn process_entry_list_into_blobs(
list: &Vec<Entry>,
blob_recycler: &packet::BlobRecycler,
q: &mut VecDeque<SharedBlob>,
) {
let mut start = 0;
let mut end = 0;
while start < list.len() {
let mut entries: Vec<Vec<Entry>> = Vec::new();
let mut total = 0;
for i in &list[start..] {
total += size_of::<Event>() * i.events.len();
total += size_of::<Entry>();
if total >= BLOB_DATA_SIZE {
break;
}
end += 1;
}
// See if we need to split the events
if end <= start {
let mut event_start = 0;
let num_events_per_blob = BLOB_DATA_SIZE / size_of::<Event>();
let total_entry_chunks =
(list[end].events.len() + num_events_per_blob - 1) / num_events_per_blob;
trace!(
"splitting events end: {} total_chunks: {}",
end,
total_entry_chunks
);
for _ in 0..total_entry_chunks {
let event_end = min(event_start + num_events_per_blob, list[end].events.len());
trace!("event start: {} end: {}", event_start, event_end);
let mut entry = Entry {
num_hashes: list[end].num_hashes,
id: list[end].id,
events: list[end].events[event_start..event_end].to_vec(),
};
trace!("event[0]: {:?}", entry.events[0]);
entries.push(vec![entry]);
event_start = event_end;
}
end += 1;
} else {
entries.push(list[start..end].to_vec());
}

for entry in entries {
trace!(
"entry.id: {:?} num_hashes: {} num events: {}",
entry[0].id,
entry[0].num_hashes,
entry[0].events.len()
);
let b = blob_recycler.allocate();
let pos = {
let mut bd = b.write().unwrap();
let mut out = Cursor::new(bd.data_mut());
serialize_into(&mut out, &entry).expect("failed to serialize output");
out.position() as usize
};
assert!(pos < BLOB_SIZE);
b.write().unwrap().set_size(pos);
q.push_back(b);
}
start = end;
}
}

pub fn reconstruct_entries_from_blobs(blobs: &VecDeque<SharedBlob>) -> Vec<Entry> {
let mut entries_to_apply: Vec<Entry> = Vec::new();
let mut last_id = Hash::default();
for msgs in blobs {
let blob = msgs.read().unwrap();
let entries: Vec<Entry> = deserialize(&blob.data()[..blob.meta.size]).unwrap();
for entry in entries {
if entry.id == last_id {
trace!("same id!");
if let Some(last_entry) = entries_to_apply.last_mut() {
last_entry.events.extend(entry.events);
}
} else {
last_id = entry.id;
entries_to_apply.push(entry);
}
}
//TODO respond back to leader with hash of the state
}
entries_to_apply
}

#[cfg(test)]
mod tests {
use super::*;
use entry;
use hash::hash;
use signature::{KeyPair, KeyPairUtil};
use packet::BlobRecycler;
use transaction::Transaction;

#[test]
fn test_verify_slice() {
Expand All @@ -48,6 +150,24 @@ mod tests {
bad_ticks[1].id = one;
assert!(!bad_ticks.verify(&zero)); // inductive step, bad
}

#[test]
fn test_entry_to_blobs() {
let zero = Hash::default();
let one = hash(&zero);
let keypair = KeyPair::new();
let tr0 = Event::Transaction(Transaction::new(&keypair, keypair.pubkey(), 1, one));
let events = vec![tr0.clone(); 10000];
let e0 = entry::create_entry(&zero, 0, events);

let entry_list = vec![e0.clone(); 1];
let blob_recycler = BlobRecycler::default();
let mut blob_q = VecDeque::new();
process_entry_list_into_blobs(&entry_list, &blob_recycler, &mut blob_q);
let entries = reconstruct_entries_from_blobs(&blob_q);

assert_eq!(entry_list, entries);
}
}

#[cfg(all(feature = "unstable", test))]
Expand Down
129 changes: 22 additions & 107 deletions src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,21 @@
use accountant::Accountant;
use accounting_stage::AccountingStage;
use bincode::{deserialize, serialize, serialize_into};
use bincode::{deserialize, serialize};
use crdt::{Crdt, ReplicatedData};
use ecdsa;
use entry::Entry;
use event::Event;
use ledger;
use packet;
use packet::{SharedBlob, SharedPackets, BLOB_DATA_SIZE, BLOB_SIZE};
use packet::SharedPackets;
use rand::{thread_rng, Rng};
use rayon::prelude::*;
use result::Result;
use serde_json;
use std::collections::VecDeque;
use std::io::sink;
use std::io::{Cursor, Write};
use std::mem::size_of;
use std::io::Write;
use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender};
Expand Down Expand Up @@ -75,61 +75,6 @@ impl Tpu {
Ok(l)
}

fn process_entry_list_into_blobs(
list: &Vec<Entry>,
blob_recycler: &packet::BlobRecycler,
q: &mut VecDeque<SharedBlob>,
) {
let mut start = 0;
let mut end = 0;
while start < list.len() {
let mut entries: Vec<Vec<Entry>> = Vec::new();
let mut total = 0;
for i in &list[start..] {
total += size_of::<Event>() * i.events.len();
total += size_of::<Entry>();
if total >= BLOB_DATA_SIZE {
break;
}
end += 1;
}
// See if we need to split the events
if end <= start {
trace!("splitting events");
let mut event_start = 0;
let num_events_per_blob = BLOB_DATA_SIZE / size_of::<Event>();
let total_entry_chunks = list[end].events.len() / num_events_per_blob;
for _ in 0..total_entry_chunks {
let event_end = event_start + num_events_per_blob;
let mut entry = Entry {
num_hashes: list[end].num_hashes,
id: list[end].id,
events: list[end].events[event_start..event_end].to_vec(),
};
entries.push(vec![entry]);
event_start = event_end;
}
end += 1;
} else {
entries.push(list[start..end].to_vec());
}

for entry in entries {
let b = blob_recycler.allocate();
let pos = {
let mut bd = b.write().unwrap();
let mut out = Cursor::new(bd.data_mut());
serialize_into(&mut out, &entry).expect("failed to serialize output");
out.position() as usize
};
assert!(pos < BLOB_SIZE);
b.write().unwrap().set_size(pos);
q.push_back(b);
}
start = end;
}
}

/// Process any Entry items that have been published by the Historian.
/// continuosly broadcast blobs of entries out
fn run_sync<W: Write>(
Expand All @@ -141,7 +86,7 @@ impl Tpu {
let mut q = VecDeque::new();
let list = Self::receive_all(&obj, writer)?;
trace!("New blobs? {}", list.len());
Self::process_entry_list_into_blobs(&list, blob_recycler, &mut q);
ledger::process_entry_list_into_blobs(&list, blob_recycler, &mut q);
if !q.is_empty() {
broadcast.send(q)?;
}
Expand Down Expand Up @@ -381,6 +326,19 @@ impl Tpu {
);
Ok(())
}
fn apply_entries_state(obj: &Tpu, entries: Vec<Entry>) -> Result<()> {
trace!("{} entries to apply", entries.len());
let accountant = &obj.accounting_stage.accountant;
for entry in entries {
trace!("{} events to apply id: {:?}", entry.events.len(), entry.id);
accountant.register_entry_id(&entry.id);
for result in accountant.process_verified_events(entry.events) {
result?;
}
}
Ok(())
}

/// Process verified blobs, already in order
/// Respond with a signed hash of the state
fn replicate_state(
Expand All @@ -391,18 +349,8 @@ impl Tpu {
let timer = Duration::new(1, 0);
let blobs = verified_receiver.recv_timeout(timer)?;
trace!("replicating blobs {}", blobs.len());
for msgs in &blobs {
let blob = msgs.read().unwrap();
let entries: Vec<Entry> = deserialize(&blob.data()[..blob.meta.size]).unwrap();
let accountant = &obj.accounting_stage.accountant;
for entry in entries {
accountant.register_entry_id(&entry.id);
for result in accountant.process_verified_events(entry.events) {
result?;
}
}
//TODO respond back to leader with hash of the state
}
let entries = ledger::reconstruct_entries_from_blobs(&blobs);
Self::apply_entries_state(obj, entries)?;
for blob in blobs {
blob_recycler.recycle(blob);
}
Expand Down Expand Up @@ -709,7 +657,6 @@ pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocke
mod tests {
use accountant::Accountant;
use accounting_stage::AccountingStage;
use bincode::{deserialize, serialize};
use chrono::prelude::*;
use crdt::Crdt;
use ecdsa;
Expand All @@ -718,7 +665,7 @@ mod tests {
use hash::{hash, Hash};
use logger;
use mint::Mint;
use packet::{BlobRecycler, PacketRecycler, BLOB_SIZE, NUM_PACKETS};
use packet::{BlobRecycler, PacketRecycler, NUM_PACKETS};
use signature::{KeyPair, KeyPairUtil};
use std::collections::VecDeque;
use std::sync::atomic::{AtomicBool, Ordering};
Expand All @@ -728,6 +675,7 @@ mod tests {
use streamer;
use tpu::{test_node, to_packets, Request, Tpu};
use transaction::{memfind, test_tx, Transaction};
use bincode::serialize;

#[test]
fn test_layout() {
Expand Down Expand Up @@ -894,37 +842,4 @@ mod tests {
t_l_listen.join().expect("join");
}

#[test]
fn test_entry_to_blobs() {
let zero = Hash::default();
let keypair = KeyPair::new();
let tr0 = Event::Transaction(Transaction::new(&keypair, keypair.pubkey(), 0, zero));
let events = vec![tr0.clone(); 10000];
//let tr1 = Event::Transaction(Transaction::new(&keypair, keypair.pubkey(), 1, zero));
let e0 = entry::create_entry(&zero, 0, events);

let entry_list = vec![e0.clone(); 1];
let blob_recycler = BlobRecycler::default();
let mut blob_q = VecDeque::new();
Tpu::process_entry_list_into_blobs(&entry_list, &blob_recycler, &mut blob_q);
let serialized_entry_list = serialize(&entry_list).unwrap();
let mut num_blobs_ref = serialized_entry_list.len() / BLOB_SIZE;
if serialized_entry_list.len() % BLOB_SIZE != 0 {
num_blobs_ref += 1
}
let mut new_events = Vec::new();
for b in &blob_q {
let blob = b.read().unwrap();
let entries: Vec<entry::Entry> = deserialize(&blob.data()[..blob.meta.size]).unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].num_hashes, e0.num_hashes);
assert_eq!(entries[0].id, e0.id);
new_events.extend(entries[0].events.clone());
}
for (i, e) in new_events.iter().enumerate() {
assert_eq!(*e, e0.events[i]);
}
trace!("len: {} ref_len: {}", blob_q.len(), num_blobs_ref);
assert!(blob_q.len() > num_blobs_ref);
}
}

0 comments on commit 81dbda7

Please sign in to comment.