Skip to content

Commit

Permalink
Generate plot for EBR, HP, and crossbeam, for 1,2,4, and 8 threads
Browse files Browse the repository at this point in the history
The benchmark is `transfer`: move `n` elements from one queue to
another.
  • Loading branch information
Martin Hafskjold Thoresen committed Nov 28, 2017
1 parent 427cb8b commit 0abe7d4
Show file tree
Hide file tree
Showing 10 changed files with 338 additions and 29 deletions.
14 changes: 11 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,27 @@ authors = ["Martin Hafskjold Thoresen <martinhath@gmail.com>"]
[dependencies]
lazy_static = "*"
bench = { path = './bench' }
crossbeam = "*"

[dev-dependencies]
rand = "*"
bencher = "*"
crossbeam = "*"
time = "*"

[profile.bench]
debug = true

[[bin]]
name = "queue2"
path = "benches/queue2.rs"
name = "queue-ebr"
path = "benches/queue-ebr.rs"

[[bin]]
name = "queue-crossbeam"
path = "benches/queue-crossbeam.rs"

[[bin]]
name = "queue-hp"
path = "benches/queue-hp.rs"

[[bench]]
name = "queue"
Expand Down
35 changes: 27 additions & 8 deletions bench/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl<S> Bencher<S> {
let t0 = time::precise_time_ns();
black_box(f(&state));
let t1 = time::precise_time_ns();
self.samples.push((t1 - t0) / 1000);
self.samples.push(t1 - t0);
(self.between)(&mut state);
}
(self.post)(&mut state);
Expand All @@ -56,20 +56,39 @@ impl<S> Bencher<S> {
let len = self.samples.len() as u64;
let sum = self.samples.iter().cloned().sum::<u64>();
let avg = sum / len;
let var = { let s = self.samples
.iter()
.cloned()
.map(|s| (if s < avg { (avg - s) } else { s - avg }).pow(2))
.sum::<u64>() / len;
let var = {
let s = self.samples
.iter()
.cloned()
.map(|s| (if s < avg { (avg - s) } else { s - avg }).pow(2))
.sum::<u64>() / len;
(s as f32).sqrt() as u64
};
let max = self.samples.iter().cloned().max().unwrap();
let min = self.samples.iter().cloned().min().unwrap();
let above = self.samples.iter().filter(|&&s| s > avg).count();
let below = self.samples.len() - above;
println!(
"Bench: ................ {} ns/iter (+/- {})",
"Bench: ................ {} ns/iter (+/- {}) min={} max={} above={} below={}",
fmt_thousands_sep(avg),
fmt_thousands_sep(var)
fmt_thousands_sep(var),
min,
max,
above,
below
);
}

pub fn output_samples<W: ::std::io::Write>(
&self,
mut writer: W,
) -> Result<(), ::std::io::Error> {
for sample in &self.samples {
writer.write_fmt(format_args!("{}\n", sample))?;
}
Ok(())
}

pub fn pre<F: 'static + Fn(&mut S)>(&mut self, f: F) {
self.pre = Box::new(f);
}
Expand Down
123 changes: 123 additions & 0 deletions benches/queue-crossbeam.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
extern crate crossbeam;
extern crate bench;

use std::sync::{Arc, Barrier, Condvar, Mutex};
use std::thread::spawn;
use std::cell::UnsafeCell;
use std::env;

use crossbeam::sync::MsQueue;

fn main() {
let num_threads: usize = env::args().nth(1)
.unwrap_or("4".to_string())
.parse()
.unwrap_or(4);
const NUM_ELEMENTS: usize = 256 * 256;
struct BenchState {
state: Arc<Mutex<State>>,
condvar: Arc<Condvar>,
barrier: Arc<Barrier>,
source: UnsafeCell<MsQueue<usize>>,
sink: UnsafeCell<MsQueue<usize>>,
threads: Vec<::std::thread::JoinHandle<()>>,
};
#[derive(Clone, Copy, PartialEq)]
enum State {
Wait,
Run,
Exit,
};
let bench_state = BenchState {
state: Arc::new(Mutex::new(State::Wait)),
condvar: Arc::new(Condvar::new()),
barrier: Arc::new(Barrier::new(num_threads + 1)),
source: UnsafeCell::new(MsQueue::new()),
sink: UnsafeCell::new(MsQueue::new()),
threads: vec![],
};

let mut b = bench::Bencher::<BenchState>::new();

// Before the benchmark, fill the source up with elements, and spawn the threads that are to do
// the work.
b.pre(move |state| {
for i in 0..NUM_ELEMENTS {
unsafe { (*state.source.get()).push(i) };
}
for _ in 0..num_threads {
let bench_state = state.state.clone();
let condvar = state.condvar.clone();
let barrier = state.barrier.clone();
let (source, sink) = unsafe {
let source: &MsQueue<_> = &*state.source.get();
let sink: &MsQueue<_> = &*state.sink.get();
(source, sink)
};
state.threads.push(spawn(move || loop {
let mut started = bench_state.lock().unwrap();
while *started == State::Wait {
started = condvar.wait(started).unwrap();
}
let state = *started;
drop(started);
match state {
State::Exit => {
break;
}
State::Run => {
// BODY BEGINS HERE! ///////////////////////////////

// let mut c = 0;
while let Some(i) = source.try_pop() {
sink.push(i);
// c += 1;
}
// println!("thread {} moved {} elements", i, c);

// BODY END HERE ///////////////////////////////////
}
State::Wait => unreachable!(),
}
barrier.wait();
barrier.wait();
}));
}
});

b.post(|state| {
let mut s = state.state.lock().unwrap();
*s = State::Exit;
});

b.between(|state| {
let (source, sink) = unsafe {
let source: &mut MsQueue<_> = &mut *state.source.get();
let sink: &mut MsQueue<_> = &mut *state.sink.get();
(source, sink)
};
while let Some(e) = source.try_pop() {
sink.push(e);
}
unsafe {
// We know that no other thread is reading this data when we swap it. Therefore,
// this is safe.
::std::ptr::swap(sink, source);
}
});

b.set_n(100);
b.bench(bench_state, |state| {
let mut s = state.state.lock().unwrap();
*s = State::Run;
drop(s);
state.condvar.notify_all();

state.barrier.wait();
*state.state.lock().unwrap() = State::Wait;
state.barrier.wait();
});

let mut f = ::std::fs::File::create(&format!("crossbeam-{}", num_threads)).unwrap();
let _ = b.output_samples(&mut f);
}
13 changes: 10 additions & 3 deletions benches/queue2.rs → benches/queue-ebr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@ extern crate bench;
use std::sync::{Arc, Barrier, Condvar, Mutex};
use std::thread::spawn;
use std::cell::UnsafeCell;
use std::env;

use comere::ebr::pin;
use comere::ebr::queue::Queue;

fn main() {
let num_threads: usize = env::args().nth(1)
.unwrap_or("4".to_string())
.parse()
.unwrap_or(4);
const NUM_ELEMENTS: usize = 256 * 256;
const NUM_THREADS: usize = 4;
struct BenchState {
state: Arc<Mutex<State>>,
condvar: Arc<Condvar>,
Expand All @@ -28,7 +32,7 @@ fn main() {
let bench_state = BenchState {
state: Arc::new(Mutex::new(State::Wait)),
condvar: Arc::new(Condvar::new()),
barrier: Arc::new(Barrier::new(NUM_THREADS + 1)),
barrier: Arc::new(Barrier::new(num_threads + 1)),
source: UnsafeCell::new(Queue::new()),
sink: UnsafeCell::new(Queue::new()),
threads: vec![],
Expand All @@ -42,7 +46,7 @@ fn main() {
pin(|pin| for i in 0..NUM_ELEMENTS {
unsafe { (*state.source.get()).push(i, pin) };
});
for i in 0..NUM_THREADS {
for i in 0..num_threads {
let bench_state = state.state.clone();
let condvar = state.condvar.clone();
let barrier = state.barrier.clone();
Expand Down Expand Up @@ -116,4 +120,7 @@ fn main() {
*state.state.lock().unwrap() = State::Wait;
state.barrier.wait();
});

let mut f = ::std::fs::File::create(&format!("ebr-{}", num_threads)).unwrap();
let _ = b.output_samples(&mut f);
}
124 changes: 124 additions & 0 deletions benches/queue-hp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
extern crate comere;
extern crate bench;

use std::sync::{Arc, Barrier, Condvar, Mutex};
use std::thread::spawn;
use std::cell::UnsafeCell;
use std::env;

use comere::hp::queue::Queue;

fn main() {
let num_threads: usize = env::args()
.nth(1)
.unwrap_or("4".to_string())
.parse()
.unwrap_or(4);
const NUM_ELEMENTS: usize = 256 * 256;
struct BenchState {
state: Arc<Mutex<State>>,
condvar: Arc<Condvar>,
barrier: Arc<Barrier>,
source: UnsafeCell<Queue<usize>>,
sink: UnsafeCell<Queue<usize>>,
threads: Vec<::std::thread::JoinHandle<()>>,
};
#[derive(Clone, Copy, PartialEq)]
enum State {
Wait,
Run,
Exit,
};
let bench_state = BenchState {
state: Arc::new(Mutex::new(State::Wait)),
condvar: Arc::new(Condvar::new()),
barrier: Arc::new(Barrier::new(num_threads + 1)),
source: UnsafeCell::new(Queue::new()),
sink: UnsafeCell::new(Queue::new()),
threads: vec![],
};

let mut b = bench::Bencher::<BenchState>::new();

// Before the benchmark, fill the source up with elements, and spawn the threads that are to do
// the work.
b.pre(move |state| {
for i in 0..NUM_ELEMENTS {
unsafe { (*state.source.get()).push(i) };
}
for i in 0..num_threads {
let bench_state = state.state.clone();
let condvar = state.condvar.clone();
let barrier = state.barrier.clone();
let (source, sink) = unsafe {
let source: &Queue<_> = &*state.source.get();
let sink: &Queue<_> = &*state.sink.get();
(source, sink)
};
state.threads.push(spawn(move || loop {
let mut started = bench_state.lock().unwrap();
while *started == State::Wait {
started = condvar.wait(started).unwrap();
}
let state = *started;
drop(started);
match state {
State::Exit => {
break;
}
State::Run => {
// BODY BEGINS HERE! ///////////////////////////////

// let mut c = 0;
while let Some(i) = source.pop() {
sink.push(i);
// c += 1;
}
// println!("thread {} moved {} elements", i, c);

// BODY END HERE ///////////////////////////////////
}
State::Wait => unreachable!(),
}
barrier.wait();
barrier.wait();
}));
}
});

b.post(|state| {
let mut s = state.state.lock().unwrap();
*s = State::Exit;
});

b.between(|state| {
let (source, sink) = unsafe {
let source: &mut Queue<_> = &mut *state.source.get();
let sink: &mut Queue<_> = &mut *state.sink.get();
(source, sink)
};
while let Some(e) = source.pop() {
sink.push(e);
}
unsafe {
// We know that no other thread is reading this data when we swap it. Therefore,
// this is safe.
::std::ptr::swap(sink, source);
}
});

b.set_n(100);
b.bench(bench_state, |state| {
let mut s = state.state.lock().unwrap();
*s = State::Run;
drop(s);
state.condvar.notify_all();

state.barrier.wait();
*state.state.lock().unwrap() = State::Wait;
state.barrier.wait();
});

let mut f = ::std::fs::File::create(&format!("hp-{}", num_threads)).unwrap();
let _ = b.output_samples(&mut f);
}
4 changes: 0 additions & 4 deletions benches/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,6 @@ mod ebr {
transfer_!(transfer_2, 2);
transfer_!(transfer_4, 4);
transfer_!(transfer_8, 8);
transfer_!(transfer_16, 16);
transfer_!(transfer_32, 32);
}

mod crossbeam_bench {
Expand Down Expand Up @@ -345,8 +343,6 @@ mod crossbeam_bench {
transfer_!(transfer_2, 2);
transfer_!(transfer_4, 4);
transfer_!(transfer_8, 8);
transfer_!(transfer_16, 16);
transfer_!(transfer_32, 32);

pub fn aransfer_barrier_1(b: &mut Bencher) {
transfer_n_barrier(b, 1);
Expand Down
Loading

0 comments on commit 0abe7d4

Please sign in to comment.