Skip to content

Commit

Permalink
Add multithreaded queue benches
Browse files Browse the repository at this point in the history
  • Loading branch information
Martin Hafskjold Thoresen committed Nov 21, 2017
1 parent e364d15 commit eb941f2
Show file tree
Hide file tree
Showing 3 changed files with 207 additions and 30 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ lazy_static = "*"
[dev-dependencies]
rand = "*"
bencher = "*"
crossbeam = "*"

[profile.bench]
debug = true
Expand Down
209 changes: 206 additions & 3 deletions benches/queue.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#[macro_use]
extern crate bencher;
extern crate comere;
extern crate crossbeam;

use bencher::Bencher;

Expand Down Expand Up @@ -54,6 +55,9 @@ mod hp {
use super::Bencher;
use comere::hp::queue::Queue;

use std::sync::{Arc, Condvar, Mutex};
use std::mem::drop;

pub fn push(b: &mut Bencher) {
const N: u64 = 1024 * 1024;
b.bench_n(N, |_b| {
Expand All @@ -75,13 +79,70 @@ mod hp {
});
});
}

pub fn transfer_n(b: &mut Bencher, n_threads: usize) {
b.bench_n(1, |_b| {
const NUM_ELEMENTS: usize = 256 * 256;
let source = Arc::new(Queue::new());
for i in 0..NUM_ELEMENTS {
source.push(i);
}
let sink = Arc::new(Queue::new());
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let mut threads = Vec::with_capacity(n_threads);
for _ in 0..n_threads {
let p = pair.clone();
let source = source.clone();
let sink = sink.clone();
let handle = ::std::thread::spawn(move || {
let &(ref lock, ref cvar) = &*p;
let mut started = lock.lock().unwrap();
while !*started {
started = cvar.wait(started).unwrap();
}
drop(started);
while let Some(i) = source.pop() {
sink.push(i);
}
});
threads.push(handle);
}
_b.iter(|| {
let &(ref lock, ref cvar) = &*pair;
let mut started = lock.lock().unwrap();
*started = true;
drop(started);
cvar.notify_all();
for i in (0..n_threads).rev() {
let t = threads.remove(i);
let _ = t.join();
}
});

});
}

macro_rules! transfer_ {
($name:ident, $n:expr) => {
pub fn $name(b: &mut Bencher) { transfer_n(b, $n); }
}
}
transfer_!(transfer_1, 1);
transfer_!(transfer_2, 2);
transfer_!(transfer_4, 4);
transfer_!(transfer_8, 8);
transfer_!(transfer_16, 16);
transfer_!(transfer_32, 32);
}

mod ebr {
use super::Bencher;
use comere::ebr::queue::Queue;
use comere::ebr::pin;

use std::sync::{Arc, Condvar, Mutex};
use std::mem::drop;

pub fn push(b: &mut Bencher) {
let queue = Queue::new();
b.iter(|| { pin(|pin| { queue.push(0usize, pin); }); })
Expand Down Expand Up @@ -116,9 +177,151 @@ mod ebr {
})
});
}

pub fn transfer_n(b: &mut Bencher, n_threads: usize) {
b.bench_n(1, |_b| {
const NUM_ELEMENTS: usize = 256 * 256;
let source = Arc::new(Queue::new());
pin(|pin| for i in 0..NUM_ELEMENTS {
source.push(i, pin);
});
let sink = Arc::new(Queue::new());
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let mut threads = Vec::with_capacity(n_threads);
for i in 0..n_threads {
let p = pair.clone();
let source = source.clone();
let sink = sink.clone();
let handle = ::std::thread::spawn(move || {
let &(ref lock, ref cvar) = &*p;
let mut started = lock.lock().unwrap();
while !*started {
started = cvar.wait(started).unwrap();
}
drop(started);
while let Some(i) = pin(|pin| source.pop(pin)) {
pin(|pin| sink.push(i, pin));
}
});
threads.push(handle);
}
_b.iter(|| {
let &(ref lock, ref cvar) = &*pair;
let mut started = lock.lock().unwrap();
*started = true;
drop(started);
cvar.notify_all();
for i in (0..n_threads).rev() {
let t = threads.remove(i);
let _ = t.join();
}
});

});
}
macro_rules! transfer_ {
($name:ident, $n:expr) => {
pub fn $name(b: &mut Bencher) { transfer_n(b, $n); }
}
}
transfer_!(transfer_1, 1);
transfer_!(transfer_2, 2);
transfer_!(transfer_4, 4);
transfer_!(transfer_8, 8);
transfer_!(transfer_16, 16);
transfer_!(transfer_32, 32);
}
mod crossbeam_bench {
use super::Bencher;
use crossbeam::sync::MsQueue;

use std::sync::{Arc, Condvar, Mutex};
use std::mem::drop;

pub fn transfer_n(b: &mut Bencher, n_threads: usize) {
b.bench_n(1, |_b| {
const NUM_ELEMENTS: usize = 256 * 256;
let source = Arc::new(MsQueue::new());
for i in 0..NUM_ELEMENTS {
source.push(i);
}
let sink = Arc::new(MsQueue::new());
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let mut threads = Vec::with_capacity(n_threads);
for _ in 0..n_threads {
let p = pair.clone();
let source = source.clone();
let sink = sink.clone();
let handle = ::std::thread::spawn(move || {
let &(ref lock, ref cvar) = &*p;
let mut started = lock.lock().unwrap();
while !*started {
started = cvar.wait(started).unwrap();
}
drop(started);
while let Some(i) = source.try_pop() {
sink.push(i);
}
});
threads.push(handle);
}
_b.iter(|| {
let &(ref lock, ref cvar) = &*pair;
let mut started = lock.lock().unwrap();
*started = true;
drop(started);
cvar.notify_all();
for i in (0..n_threads).rev() {
let t = threads.remove(i);
let _ = t.join();
}
});

});
}

macro_rules! transfer_ {
($name:ident, $n:expr) => {
pub fn $name(b: &mut Bencher) { transfer_n(b, $n); }
}
}

transfer_!(transfer_1, 1);
transfer_!(transfer_2, 2);
transfer_!(transfer_4, 4);
transfer_!(transfer_8, 8);
transfer_!(transfer_16, 16);
transfer_!(transfer_32, 32);
}

benchmark_group!(nothing_queue, nothing::push, nothing::pop);
benchmark_group!(hp_queue, hp::push, hp::pop);
benchmark_group!(ebr_queue, ebr::push, ebr::pop, ebr::pop_pin_outer);
benchmark_main!(hp_queue, ebr_queue, nothing_queue);
benchmark_group!(
hp_queue,
hp::push,
hp::pop,
hp::transfer_1,
hp::transfer_2,
hp::transfer_4 // hp::transfer_8,
// hp::transfer_16,
// hp::transfer_32
);
benchmark_group!(
ebr_queue,
ebr::push,
ebr::pop,
ebr::pop_pin_outer,
ebr::transfer_1,
ebr::transfer_2,
ebr::transfer_4 // ebr::transfer_8,
// ebr::transfer_16,
// ebr::transfer_32
);
benchmark_group!(
crossbeam_bench,
crossbeam_bench::transfer_1,
crossbeam_bench::transfer_2,
crossbeam_bench::transfer_4 // crossbeam_bench::transfer_8,
// crossbeam_bench::transfer_16,
// crossbeam_bench::transfer_32
);
benchmark_main!(hp_queue, ebr_queue, nothing_queue, crossbeam_bench);
27 changes: 0 additions & 27 deletions src/nothing/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,31 +225,4 @@ mod test {
Self { b: [0; 1024 * 4] }
}
}

// This test confirms that the queue leaks memory.
// #[test]
// fn memory_usage() {
// let mut q: Queue<LargeStruct> = Queue::new();
// // This will leak
// for i in 0..(1024 * 1024) {
// q.push(LargeStruct::new());
// q.pop();
// }
// }
}

mod bench {
extern crate test;

#[bench]
fn queue_push(b: &mut test::Bencher) {
let q = super::Queue::new();
b.iter(|| { q.push(0, None); });
}

#[bench]
fn queue_enqueue(b: &mut test::Bencher) {
let q = super::Queue::new();
b.iter(|| { q.enqueue(0); });
}
}

0 comments on commit eb941f2

Please sign in to comment.