Skip to content

Commit

Permalink
Pipeline broadcast socket transmit and blocktree record (solana-labs#…
Browse files Browse the repository at this point in the history
…7481)

automerge
  • Loading branch information
aeyakovenko authored and solana-grimes committed Dec 17, 2019
1 parent 504adcc commit 97589f7
Show file tree
Hide file tree
Showing 13 changed files with 400 additions and 224 deletions.
4 changes: 3 additions & 1 deletion core/benches/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::timestamp;
use std::collections::HashMap;
use std::net::UdpSocket;
use std::sync::Arc;
use test::Bencher;

#[bench]
Expand All @@ -31,10 +32,11 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) {
cluster_info.insert_info(contact_info);
stakes.insert(id, thread_rng().gen_range(1, NUM_PEERS) as u64);
}
let stakes = Arc::new(stakes);
bencher.iter(move || {
let shreds = shreds.clone();
cluster_info
.broadcast_shreds(&socket, shreds, &seeds, Some(&stakes))
.broadcast_shreds(&socket, shreds, &seeds, Some(stakes.clone()))
.unwrap();
});
}
154 changes: 106 additions & 48 deletions core/src/broadcast_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,25 @@ use crate::cluster_info::{ClusterInfo, ClusterInfoError};
use crate::poh_recorder::WorkingBankEntry;
use crate::result::{Error, Result};
use solana_ledger::blocktree::Blocktree;
use solana_ledger::shred::Shred;
use solana_ledger::staking_utils;
use solana_metrics::{inc_new_counter_error, inc_new_counter_info};
use solana_sdk::pubkey::Pubkey;
use std::collections::HashMap;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{Receiver, RecvTimeoutError};
use std::sync::{Arc, RwLock};
use std::sync::mpsc::{channel, Receiver, RecvError, RecvTimeoutError, Sender};
use std::sync::{Arc, Mutex, RwLock};
use std::thread::{self, Builder, JoinHandle};
use std::time::Instant;

pub const NUM_INSERT_THREADS: usize = 2;

mod broadcast_fake_shreds_run;
pub(crate) mod broadcast_utils;
mod fail_entry_verification_broadcast_run;
mod standard_broadcast_run;

pub const NUM_THREADS: u32 = 10;

#[derive(Debug, PartialEq, Eq, Clone)]
pub enum BroadcastStageReturnType {
ChannelDisconnected,
Expand All @@ -37,33 +40,31 @@ pub enum BroadcastStageType {
impl BroadcastStageType {
pub fn new_broadcast_stage(
&self,
sock: UdpSocket,
sock: Vec<UdpSocket>,
cluster_info: Arc<RwLock<ClusterInfo>>,
receiver: Receiver<WorkingBankEntry>,
exit_sender: &Arc<AtomicBool>,
blocktree: &Arc<Blocktree>,
shred_version: u16,
) -> BroadcastStage {
let keypair = cluster_info.read().unwrap().keypair.clone();
match self {
BroadcastStageType::Standard => {
let keypair = cluster_info.read().unwrap().keypair.clone();
BroadcastStage::new(
sock,
cluster_info,
receiver,
exit_sender,
blocktree,
StandardBroadcastRun::new(keypair, shred_version),
)
}
BroadcastStageType::Standard => BroadcastStage::new(
sock,
cluster_info,
receiver,
exit_sender,
blocktree,
StandardBroadcastRun::new(keypair, shred_version),
),

BroadcastStageType::FailEntryVerification => BroadcastStage::new(
sock,
cluster_info,
receiver,
exit_sender,
blocktree,
FailEntryVerificationBroadcastRun::new(shred_version),
FailEntryVerificationBroadcastRun::new(keypair, shred_version),
),

BroadcastStageType::BroadcastFakeShreds => BroadcastStage::new(
Expand All @@ -72,18 +73,30 @@ impl BroadcastStageType {
receiver,
exit_sender,
blocktree,
BroadcastFakeShredsRun::new(0, shred_version),
BroadcastFakeShredsRun::new(keypair, 0, shred_version),
),
}
}
}

type TransmitShreds = (Option<Arc<HashMap<Pubkey, u64>>>, Arc<Vec<Shred>>);
trait BroadcastRun {
fn run(
&mut self,
cluster_info: &Arc<RwLock<ClusterInfo>>,
blocktree: &Arc<Blocktree>,
receiver: &Receiver<WorkingBankEntry>,
socket_sender: &Sender<TransmitShreds>,
blocktree_sender: &Sender<Arc<Vec<Shred>>>,
) -> Result<()>;
fn transmit(
&self,
receiver: &Arc<Mutex<Receiver<TransmitShreds>>>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
sock: &UdpSocket,
) -> Result<()>;
fn record(
&self,
receiver: &Arc<Mutex<Receiver<Arc<Vec<Shred>>>>>,
blocktree: &Arc<Blocktree>,
) -> Result<()>;
}
Expand All @@ -107,33 +120,43 @@ impl Drop for Finalizer {
}

pub struct BroadcastStage {
thread_hdl: JoinHandle<BroadcastStageReturnType>,
thread_hdls: Vec<JoinHandle<BroadcastStageReturnType>>,
}

impl BroadcastStage {
#[allow(clippy::too_many_arguments)]
fn run(
sock: &UdpSocket,
cluster_info: &Arc<RwLock<ClusterInfo>>,
receiver: &Receiver<WorkingBankEntry>,
blocktree: &Arc<Blocktree>,
receiver: &Receiver<WorkingBankEntry>,
socket_sender: &Sender<TransmitShreds>,
blocktree_sender: &Sender<Arc<Vec<Shred>>>,
mut broadcast_stage_run: impl BroadcastRun,
) -> BroadcastStageReturnType {
loop {
if let Err(e) = broadcast_stage_run.run(&cluster_info, receiver, sock, blocktree) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) | Error::SendError => {
return BroadcastStageReturnType::ChannelDisconnected;
}
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
Error::ClusterInfoError(ClusterInfoError::NoPeers) => (), // TODO: Why are the unit-tests throwing hundreds of these?
_ => {
inc_new_counter_error!("streamer-broadcaster-error", 1, 1);
error!("broadcaster error: {:?}", e);
}
let res = broadcast_stage_run.run(blocktree, receiver, socket_sender, blocktree_sender);
let res = Self::handle_error(res);
if let Some(res) = res {
return res;
}
}
}
fn handle_error(r: Result<()>) -> Option<BroadcastStageReturnType> {
if let Err(e) = r {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected)
| Error::SendError
| Error::RecvError(RecvError) => {
return Some(BroadcastStageReturnType::ChannelDisconnected);
}
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
Error::ClusterInfoError(ClusterInfoError::NoPeers) => (), // TODO: Why are the unit-tests throwing hundreds of these?
_ => {
inc_new_counter_error!("streamer-broadcaster-error", 1, 1);
error!("broadcaster error: {:?}", e);
}
}
}
None
}

/// Service to broadcast messages from the leader to layer 1 nodes.
Expand All @@ -153,34 +176,69 @@ impl BroadcastStage {
/// completing the cycle.
#[allow(clippy::too_many_arguments)]
fn new(
sock: UdpSocket,
socks: Vec<UdpSocket>,
cluster_info: Arc<RwLock<ClusterInfo>>,
receiver: Receiver<WorkingBankEntry>,
exit_sender: &Arc<AtomicBool>,
blocktree: &Arc<Blocktree>,
broadcast_stage_run: impl BroadcastRun + Send + 'static,
broadcast_stage_run: impl BroadcastRun + Send + 'static + Clone,
) -> Self {
let blocktree = blocktree.clone();
let exit_sender = exit_sender.clone();
let btree = blocktree.clone();
let exit = exit_sender.clone();
let (socket_sender, socket_receiver) = channel();
let (blocktree_sender, blocktree_receiver) = channel();
let bs_run = broadcast_stage_run.clone();
let thread_hdl = Builder::new()
.name("solana-broadcaster".to_string())
.spawn(move || {
let _finalizer = Finalizer::new(exit_sender);
Self::run(
&sock,
&cluster_info,
&receiver,
&blocktree,
broadcast_stage_run,
)
let _finalizer = Finalizer::new(exit);
Self::run(&btree, &receiver, &socket_sender, &blocktree_sender, bs_run)
})
.unwrap();
let mut thread_hdls = vec![thread_hdl];
let socket_receiver = Arc::new(Mutex::new(socket_receiver));
for sock in socks.into_iter() {
let socket_receiver = socket_receiver.clone();
let bs_transmit = broadcast_stage_run.clone();
let cluster_info = cluster_info.clone();
let t = Builder::new()
.name("solana-broadcaster-transmit".to_string())
.spawn(move || loop {
let res = bs_transmit.transmit(&socket_receiver, &cluster_info, &sock);
let res = Self::handle_error(res);
if let Some(res) = res {
return res;
}
})
.unwrap();
thread_hdls.push(t);
}
let blocktree_receiver = Arc::new(Mutex::new(blocktree_receiver));
for _ in 0..NUM_INSERT_THREADS {
let blocktree_receiver = blocktree_receiver.clone();
let bs_record = broadcast_stage_run.clone();
let btree = blocktree.clone();
let t = Builder::new()
.name("solana-broadcaster-record".to_string())
.spawn(move || loop {
let res = bs_record.record(&blocktree_receiver, &btree);
let res = Self::handle_error(res);
if let Some(res) = res {
return res;
}
})
.unwrap();
thread_hdls.push(t);
}

Self { thread_hdl }
Self { thread_hdls }
}

pub fn join(self) -> thread::Result<BroadcastStageReturnType> {
self.thread_hdl.join()
for thread_hdl in self.thread_hdls.into_iter() {
let _ = thread_hdl.join();
}
Ok(BroadcastStageReturnType::ChannelDisconnected)
}
}

Expand Down
70 changes: 47 additions & 23 deletions core/src/broadcast_stage/broadcast_fake_shreds_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,40 @@ use super::*;
use solana_ledger::entry::Entry;
use solana_ledger::shred::{Shredder, RECOMMENDED_FEC_RATE};
use solana_sdk::hash::Hash;
use solana_sdk::signature::Keypair;

#[derive(Clone)]
pub(super) struct BroadcastFakeShredsRun {
last_blockhash: Hash,
partition: usize,
shred_version: u16,
keypair: Arc<Keypair>,
}

impl BroadcastFakeShredsRun {
pub(super) fn new(partition: usize, shred_version: u16) -> Self {
pub(super) fn new(keypair: Arc<Keypair>, partition: usize, shred_version: u16) -> Self {
Self {
last_blockhash: Hash::default(),
partition,
shred_version,
keypair,
}
}
}

impl BroadcastRun for BroadcastFakeShredsRun {
fn run(
&mut self,
cluster_info: &Arc<RwLock<ClusterInfo>>,
receiver: &Receiver<WorkingBankEntry>,
sock: &UdpSocket,
blocktree: &Arc<Blocktree>,
receiver: &Receiver<WorkingBankEntry>,
socket_sender: &Sender<TransmitShreds>,
blocktree_sender: &Sender<Arc<Vec<Shred>>>,
) -> Result<()> {
// 1) Pull entries from banking stage
let receive_results = broadcast_utils::recv_slot_entries(receiver)?;
let bank = receive_results.bank.clone();
let last_tick_height = receive_results.last_tick_height;

let keypair = &cluster_info.read().unwrap().keypair.clone();
let next_shred_index = blocktree
.meta(bank.slot())
.expect("Database error")
Expand All @@ -45,7 +48,7 @@ impl BroadcastRun for BroadcastFakeShredsRun {
bank.slot(),
bank.parent().unwrap().slot(),
RECOMMENDED_FEC_RATE,
keypair.clone(),
self.keypair.clone(),
(bank.tick_height() % bank.ticks_per_slot()) as u8,
self.shred_version,
)
Expand Down Expand Up @@ -79,29 +82,50 @@ impl BroadcastRun for BroadcastFakeShredsRun {
self.last_blockhash = Hash::default();
}

blocktree.insert_shreds(data_shreds.clone(), None, true)?;
let data_shreds = Arc::new(data_shreds);
blocktree_sender.send(data_shreds.clone())?;

// 3) Start broadcast step
let peers = cluster_info.read().unwrap().tvu_peers();
peers.iter().enumerate().for_each(|(i, peer)| {
if i <= self.partition {
// Send fake shreds to the first N peers
fake_data_shreds
.iter()
.chain(fake_coding_shreds.iter())
.for_each(|b| {
//some indicates fake shreds
socket_sender.send((Some(Arc::new(HashMap::new())), Arc::new(fake_data_shreds)))?;
socket_sender.send((Some(Arc::new(HashMap::new())), Arc::new(fake_coding_shreds)))?;
//none indicates real shreds
socket_sender.send((None, data_shreds))?;
socket_sender.send((None, Arc::new(coding_shreds)))?;

Ok(())
}
fn transmit(
&self,
receiver: &Arc<Mutex<Receiver<TransmitShreds>>>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
sock: &UdpSocket,
) -> Result<()> {
for (stakes, data_shreds) in receiver.lock().unwrap().iter() {
let peers = cluster_info.read().unwrap().tvu_peers();
peers.iter().enumerate().for_each(|(i, peer)| {
if i <= self.partition && stakes.is_some() {
// Send fake shreds to the first N peers
data_shreds.iter().for_each(|b| {
sock.send_to(&b.payload, &peer.tvu_forwards).unwrap();
});
} else {
data_shreds
.iter()
.chain(coding_shreds.iter())
.for_each(|b| {
} else if i > self.partition && stakes.is_none() {
data_shreds.iter().for_each(|b| {
sock.send_to(&b.payload, &peer.tvu_forwards).unwrap();
});
}
});

}
});
}
Ok(())
}
fn record(
&self,
receiver: &Arc<Mutex<Receiver<Arc<Vec<Shred>>>>>,
blocktree: &Arc<Blocktree>,
) -> Result<()> {
for data_shreds in receiver.lock().unwrap().iter() {
blocktree.insert_shreds(data_shreds.to_vec(), None, true)?;
}
Ok(())
}
}
Expand Down
Loading

0 comments on commit 97589f7

Please sign in to comment.