diff --git a/src/ledger.rs b/src/ledger.rs index 0056bd54e20894..382e3c83e45451 100644 --- a/src/ledger.rs +++ b/src/ledger.rs @@ -1,9 +1,17 @@ //! The `ledger` module provides functions for parallel verification of the //! Proof of History ledger. +use bincode::{deserialize, serialize_into}; use entry::{next_tick, Entry}; +use event::Event; use hash::Hash; use rayon::prelude::*; +use std::cmp::min; +use packet::{SharedBlob, BLOB_DATA_SIZE, BLOB_SIZE}; +use packet; +use std::mem::size_of; +use std::collections::VecDeque; +use std::io::Cursor; pub trait Block { /// Verifies the hashes and counts of a slice of events are all consistent. @@ -30,10 +38,104 @@ pub fn next_ticks(start_hash: &Hash, num_hashes: u64, len: usize) -> Vec ticks } +pub fn process_entry_list_into_blobs( + list: &Vec, + blob_recycler: &packet::BlobRecycler, + q: &mut VecDeque, +) { + let mut start = 0; + let mut end = 0; + while start < list.len() { + let mut entries: Vec> = Vec::new(); + let mut total = 0; + for i in &list[start..] { + total += size_of::() * i.events.len(); + total += size_of::(); + if total >= BLOB_DATA_SIZE { + break; + } + end += 1; + } + // See if we need to split the events + if end <= start { + let mut event_start = 0; + let num_events_per_blob = BLOB_DATA_SIZE / size_of::(); + let total_entry_chunks = + (list[end].events.len() + num_events_per_blob - 1) / num_events_per_blob; + trace!( + "splitting events end: {} total_chunks: {}", + end, + total_entry_chunks + ); + for _ in 0..total_entry_chunks { + let event_end = min(event_start + num_events_per_blob, list[end].events.len()); + trace!("event start: {} end: {}", event_start, event_end); + let mut entry = Entry { + num_hashes: list[end].num_hashes, + id: list[end].id, + events: list[end].events[event_start..event_end].to_vec(), + }; + trace!("event[0]: {:?}", entry.events[0]); + entries.push(vec![entry]); + event_start = event_end; + } + end += 1; + } else { + entries.push(list[start..end].to_vec()); + } + + for entry in entries { + trace!( + "entry.id: {:?} num_hashes: {} num events: {}", + entry[0].id, + entry[0].num_hashes, + entry[0].events.len() + ); + let b = blob_recycler.allocate(); + let pos = { + let mut bd = b.write().unwrap(); + let mut out = Cursor::new(bd.data_mut()); + serialize_into(&mut out, &entry).expect("failed to serialize output"); + out.position() as usize + }; + assert!(pos < BLOB_SIZE); + b.write().unwrap().set_size(pos); + q.push_back(b); + } + start = end; + } +} + +pub fn reconstruct_entries_from_blobs(blobs: &VecDeque) -> Vec { + let mut entries_to_apply: Vec = Vec::new(); + let mut last_id = Hash::default(); + for msgs in blobs { + let blob = msgs.read().unwrap(); + let entries: Vec = deserialize(&blob.data()[..blob.meta.size]).unwrap(); + for entry in entries { + if entry.id == last_id { + trace!("same id!"); + if let Some(last_entry) = entries_to_apply.last_mut() { + last_entry.events.extend(entry.events); + } + } else { + last_id = entry.id; + entries_to_apply.push(entry); + } + } + //TODO respond back to leader with hash of the state + } + entries_to_apply +} + #[cfg(test)] mod tests { use super::*; + use entry; use hash::hash; + use signature::{KeyPair, KeyPairUtil}; + use packet::BlobRecycler; + use transaction::Transaction; #[test] fn test_verify_slice() { @@ -48,6 +150,24 @@ mod tests { bad_ticks[1].id = one; assert!(!bad_ticks.verify(&zero)); // inductive step, bad } + + #[test] + fn test_entry_to_blobs() { + let zero = Hash::default(); + let one = hash(&zero); + let keypair = KeyPair::new(); + let tr0 = Event::Transaction(Transaction::new(&keypair, keypair.pubkey(), 1, one)); + let events = vec![tr0.clone(); 10000]; + let e0 = entry::create_entry(&zero, 0, events); + + let entry_list = vec![e0.clone(); 1]; + let blob_recycler = BlobRecycler::default(); + let mut blob_q = VecDeque::new(); + process_entry_list_into_blobs(&entry_list, &blob_recycler, &mut blob_q); + let entries = reconstruct_entries_from_blobs(&blob_q); + + assert_eq!(entry_list, entries); + } } #[cfg(all(feature = "unstable", test))] diff --git a/src/tpu.rs b/src/tpu.rs index 0c04af830158ec..867c6311d5042f 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -3,21 +3,21 @@ use accountant::Accountant; use accounting_stage::AccountingStage; -use bincode::{deserialize, serialize, serialize_into}; +use bincode::{deserialize, serialize}; use crdt::{Crdt, ReplicatedData}; use ecdsa; use entry::Entry; use event::Event; +use ledger; use packet; -use packet::{SharedBlob, SharedPackets, BLOB_DATA_SIZE, BLOB_SIZE}; +use packet::SharedPackets; use rand::{thread_rng, Rng}; use rayon::prelude::*; use result::Result; use serde_json; use std::collections::VecDeque; use std::io::sink; -use std::io::{Cursor, Write}; -use std::mem::size_of; +use std::io::Write; use std::net::{SocketAddr, UdpSocket}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, Receiver, Sender}; @@ -75,61 +75,6 @@ impl Tpu { Ok(l) } - fn process_entry_list_into_blobs( - list: &Vec, - blob_recycler: &packet::BlobRecycler, - q: &mut VecDeque, - ) { - let mut start = 0; - let mut end = 0; - while start < list.len() { - let mut entries: Vec> = Vec::new(); - let mut total = 0; - for i in &list[start..] { - total += size_of::() * i.events.len(); - total += size_of::(); - if total >= BLOB_DATA_SIZE { - break; - } - end += 1; - } - // See if we need to split the events - if end <= start { - trace!("splitting events"); - let mut event_start = 0; - let num_events_per_blob = BLOB_DATA_SIZE / size_of::(); - let total_entry_chunks = list[end].events.len() / num_events_per_blob; - for _ in 0..total_entry_chunks { - let event_end = event_start + num_events_per_blob; - let mut entry = Entry { - num_hashes: list[end].num_hashes, - id: list[end].id, - events: list[end].events[event_start..event_end].to_vec(), - }; - entries.push(vec![entry]); - event_start = event_end; - } - end += 1; - } else { - entries.push(list[start..end].to_vec()); - } - - for entry in entries { - let b = blob_recycler.allocate(); - let pos = { - let mut bd = b.write().unwrap(); - let mut out = Cursor::new(bd.data_mut()); - serialize_into(&mut out, &entry).expect("failed to serialize output"); - out.position() as usize - }; - assert!(pos < BLOB_SIZE); - b.write().unwrap().set_size(pos); - q.push_back(b); - } - start = end; - } - } - /// Process any Entry items that have been published by the Historian. /// continuosly broadcast blobs of entries out fn run_sync( @@ -141,7 +86,7 @@ impl Tpu { let mut q = VecDeque::new(); let list = Self::receive_all(&obj, writer)?; trace!("New blobs? {}", list.len()); - Self::process_entry_list_into_blobs(&list, blob_recycler, &mut q); + ledger::process_entry_list_into_blobs(&list, blob_recycler, &mut q); if !q.is_empty() { broadcast.send(q)?; } @@ -381,6 +326,19 @@ impl Tpu { ); Ok(()) } + fn apply_entries_state(obj: &Tpu, entries: Vec) -> Result<()> { + trace!("{} entries to apply", entries.len()); + let accountant = &obj.accounting_stage.accountant; + for entry in entries { + trace!("{} events to apply id: {:?}", entry.events.len(), entry.id); + accountant.register_entry_id(&entry.id); + for result in accountant.process_verified_events(entry.events) { + result?; + } + } + Ok(()) + } + /// Process verified blobs, already in order /// Respond with a signed hash of the state fn replicate_state( @@ -391,18 +349,8 @@ impl Tpu { let timer = Duration::new(1, 0); let blobs = verified_receiver.recv_timeout(timer)?; trace!("replicating blobs {}", blobs.len()); - for msgs in &blobs { - let blob = msgs.read().unwrap(); - let entries: Vec = deserialize(&blob.data()[..blob.meta.size]).unwrap(); - let accountant = &obj.accounting_stage.accountant; - for entry in entries { - accountant.register_entry_id(&entry.id); - for result in accountant.process_verified_events(entry.events) { - result?; - } - } - //TODO respond back to leader with hash of the state - } + let entries = ledger::reconstruct_entries_from_blobs(&blobs); + Self::apply_entries_state(obj, entries)?; for blob in blobs { blob_recycler.recycle(blob); } @@ -709,7 +657,6 @@ pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocke mod tests { use accountant::Accountant; use accounting_stage::AccountingStage; - use bincode::{deserialize, serialize}; use chrono::prelude::*; use crdt::Crdt; use ecdsa; @@ -718,7 +665,7 @@ mod tests { use hash::{hash, Hash}; use logger; use mint::Mint; - use packet::{BlobRecycler, PacketRecycler, BLOB_SIZE, NUM_PACKETS}; + use packet::{BlobRecycler, PacketRecycler, NUM_PACKETS}; use signature::{KeyPair, KeyPairUtil}; use std::collections::VecDeque; use std::sync::atomic::{AtomicBool, Ordering}; @@ -728,6 +675,7 @@ mod tests { use streamer; use tpu::{test_node, to_packets, Request, Tpu}; use transaction::{memfind, test_tx, Transaction}; + use bincode::serialize; #[test] fn test_layout() { @@ -894,37 +842,4 @@ mod tests { t_l_listen.join().expect("join"); } - #[test] - fn test_entry_to_blobs() { - let zero = Hash::default(); - let keypair = KeyPair::new(); - let tr0 = Event::Transaction(Transaction::new(&keypair, keypair.pubkey(), 0, zero)); - let events = vec![tr0.clone(); 10000]; - //let tr1 = Event::Transaction(Transaction::new(&keypair, keypair.pubkey(), 1, zero)); - let e0 = entry::create_entry(&zero, 0, events); - - let entry_list = vec![e0.clone(); 1]; - let blob_recycler = BlobRecycler::default(); - let mut blob_q = VecDeque::new(); - Tpu::process_entry_list_into_blobs(&entry_list, &blob_recycler, &mut blob_q); - let serialized_entry_list = serialize(&entry_list).unwrap(); - let mut num_blobs_ref = serialized_entry_list.len() / BLOB_SIZE; - if serialized_entry_list.len() % BLOB_SIZE != 0 { - num_blobs_ref += 1 - } - let mut new_events = Vec::new(); - for b in &blob_q { - let blob = b.read().unwrap(); - let entries: Vec = deserialize(&blob.data()[..blob.meta.size]).unwrap(); - assert_eq!(entries.len(), 1); - assert_eq!(entries[0].num_hashes, e0.num_hashes); - assert_eq!(entries[0].id, e0.id); - new_events.extend(entries[0].events.clone()); - } - for (i, e) in new_events.iter().enumerate() { - assert_eq!(*e, e0.events[i]); - } - trace!("len: {} ref_len: {}", blob_q.len(), num_blobs_ref); - assert!(blob_q.len() > num_blobs_ref); - } }