Skip to content

Commit

Permalink
align the LocalExecutorBuilder API with the pooled version.
Browse files Browse the repository at this point in the history
The pooled version is presently strictly superior to the non-pooled one
because it uses the `PoolPlacement` logic. It is therefore  possible to
spawn a single executor that's fenced to a collection of CPUs using the
pooled API, but not with the non-pooled one.

With the existing `LocalExecutorBuilder`, an executor is either pinned
to a specific CPU, or to none at all.

To remedy this, this commit creates a new `Placement` enum that contains
a subset of placement strategies that make sense for a single executor.
In turn, `LocalExecutorBuilder::new` now takes a placement strategy and
the `pin_to_cpu` API is removed.
  • Loading branch information
HippoBaro committed Sep 28, 2021
1 parent 3a5094d commit 310e418
Show file tree
Hide file tree
Showing 29 changed files with 331 additions and 266 deletions.
2 changes: 1 addition & 1 deletion examples/cooperative_preempt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ fn main() {
// be used in situations where .await is illegal. For instance, if we are
// holding a borrow. For those, one can call need_preempt() which will tell
// you if yielding is needed, and then explicitly yield with later().
let handle = LocalExecutorBuilder::new()
let handle = LocalExecutorBuilder::default()
.spawn(|| async move {
let tq1 = Local::create_task_queue(
Shares::default(),
Expand Down
3 changes: 1 addition & 2 deletions examples/deadline_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,7 @@ async fn read_int() -> Result<usize, <usize as std::str::FromStr>::Err> {
}

fn main() {
let handle = LocalExecutorBuilder::new()
.pin_to_cpu(0)
let handle = LocalExecutorBuilder::new(Placement::Exact(0))
.spawn(|| async move {
let cpuhog_tq =
Local::create_task_queue(Shares::Static(1000), Latency::NotImportant, "cpuhog");
Expand Down
2 changes: 1 addition & 1 deletion examples/defer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ fn main() {
println!("Executor is done!");
}

let handle = LocalExecutorBuilder::new()
let handle = LocalExecutorBuilder::default()
.spawn(|| async move {
defer! {
println!("This will print after the timer");
Expand Down
5 changes: 2 additions & 3 deletions examples/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ async fn server(conns: usize) -> Result<()> {
// need for sleep or retry), but it also demonstrates how a more complex
// application may not necessarily spawn all executors at once running
// symmetrical code.
let client_handle = LocalExecutorBuilder::new()
.pin_to_cpu(2)
let client_handle = LocalExecutorBuilder::new(Placement::Exact(2))
.name("client")
.spawn(move || async move { client(conns).await })?;

Expand Down Expand Up @@ -95,7 +94,7 @@ fn main() -> Result<()> {
// Skip CPU0 because that is commonly used to host interrupts. That depends on
// system configuration and most modern systems will balance it, but that it is
// still common enough that it is worth excluding it in this benchmark
let builder = LocalExecutorBuilder::new().pin_to_cpu(1);
let builder = LocalExecutorBuilder::new(Placement::Exact(1));
let server_handle = builder.name("server").spawn(|| async move {
// If you try `top` during the execution of the first batch, you
// will see that the CPUs should not be at 100%. A single connection will
Expand Down
4 changes: 2 additions & 2 deletions examples/hello_world.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ fn main() -> Result<()> {
// There are two ways to create an executor, demonstrated in this example.
//
// We can create it in the current thread, and run it separately later...
let ex = LocalExecutorBuilder::new().pin_to_cpu(0).make()?;
let ex = LocalExecutorBuilder::new(Placement::Exact(0)).make()?;

// Or we can spawn a new thread with an executor inside.
let builder = LocalExecutorBuilder::new().pin_to_cpu(1);
let builder = LocalExecutorBuilder::new(Placement::Exact(1));
let handle = builder.name("hello").spawn(|| async move {
hello().await;
})?;
Expand Down
2 changes: 1 addition & 1 deletion examples/sharding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ fn main() {
let mesh = MeshBuilder::full(nr_shards, 1024);

let shards = (0..nr_shards).map(|_| {
LocalExecutorBuilder::new().spawn(enclose!((mesh) move || async move {
LocalExecutorBuilder::default().spawn(enclose!((mesh) move || async move {
let handler = RequestHandler { nr_shards };
let mut sharded = Sharded::new(mesh, get_shard_for, handler).await.unwrap();
let me = sharded.shard_id();
Expand Down
4 changes: 2 additions & 2 deletions examples/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use glommio::{
},
Local,
LocalExecutorBuilder,
Placement,
};
use pretty_bytes::converter;
use std::{
Expand Down Expand Up @@ -330,8 +331,7 @@ fn main() {

let random = total_memory / 10;

let local_ex = LocalExecutorBuilder::new()
.pin_to_cpu(0)
let local_ex = LocalExecutorBuilder::new(Placement::Exact(0))
.spin_before_park(Duration::from_millis(10))
.spawn(move || async move {
let mut dio_filename = dir.path.clone();
Expand Down
12 changes: 4 additions & 8 deletions glommio/benches/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ fn main() {
let vec = Arc::new(Mutex::new(Vec::with_capacity(10)));
for _ in 0..runs {
let t = Instant::now();
let local_ex = LocalExecutorBuilder::new()
.pin_to_cpu(0)
let local_ex = LocalExecutorBuilder::new(Placement::Exact(0))
.spawn(enclose! { (vec) move || async move {
let mut v = vec.lock().unwrap();
v.push(t.elapsed());
Expand All @@ -29,8 +28,7 @@ fn main() {

let t = Instant::now();
for _ in 0..runs {
let local_ex = LocalExecutorBuilder::new()
.pin_to_cpu(0)
let local_ex = LocalExecutorBuilder::new(Placement::Exact(0))
.spawn(move || async move {})
.unwrap();
local_ex.join().unwrap();
Expand All @@ -40,8 +38,7 @@ fn main() {
t.elapsed() / runs
);

let local_ex = LocalExecutorBuilder::new()
.pin_to_cpu(0)
let local_ex = LocalExecutorBuilder::new(Placement::Exact(0))
.spawn(|| async move {
let runs: u32 = 10_000_000;
let t = Instant::now();
Expand All @@ -53,8 +50,7 @@ fn main() {
.unwrap();
local_ex.join().unwrap();

let local_ex = LocalExecutorBuilder::new()
.pin_to_cpu(0)
let local_ex = LocalExecutorBuilder::new(Placement::Exact(0))
.spawn(|| async move {
let runs: u32 = 10_000_000;
let tq1 = Local::create_task_queue(Shares::Static(1000), Latency::NotImportant, "tq1");
Expand Down
6 changes: 2 additions & 4 deletions glommio/benches/local_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ use glommio::{channels::local_channel, prelude::*};
use std::time::Instant;

fn main() {
let local_ex = LocalExecutorBuilder::new()
.pin_to_cpu(0)
let local_ex = LocalExecutorBuilder::new(Placement::Exact(0))
.spawn(|| async move {
let runs: u32 = 10_000_000;
let (sender, receiver) = local_channel::new_bounded(1);
Expand All @@ -27,8 +26,7 @@ fn main() {
.unwrap();
local_ex.join().unwrap();

let local_ex = LocalExecutorBuilder::new()
.pin_to_cpu(0)
let local_ex = LocalExecutorBuilder::new(Placement::Exact(0))
.spawn(|| async move {
let runs: u32 = 10_000_000;
let (sender, receiver) = local_channel::new_bounded(10_000_000);
Expand Down
6 changes: 4 additions & 2 deletions glommio/benches/nop.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Benchmark the performance of the submission ring by timing tasks which
//! submit and wait on noop requests.
use glommio::{LocalExecutorBuilder, Task};
use glommio::{LocalExecutorBuilder, Placement, Task};
use std::{
fmt,
time::{Duration, Instant},
Expand Down Expand Up @@ -59,7 +59,9 @@ fn main() {
let num_bench_runs = 5;
for bench in BENCH_RUNS {
for _ in 0..num_bench_runs {
let ex = LocalExecutorBuilder::new().pin_to_cpu(0).make().unwrap();
let ex = LocalExecutorBuilder::new(Placement::Exact(0))
.make()
.unwrap();
let measurement = ex.run(run_bench_tasks(bench.num_tasks, bench.num_events));

println!("{}", measurement);
Expand Down
3 changes: 1 addition & 2 deletions glommio/benches/preempt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ use glommio::prelude::*;
use std::time::Instant;

fn main() {
let local_ex = LocalExecutorBuilder::new()
.pin_to_cpu(0)
let local_ex = LocalExecutorBuilder::new(Placement::Exact(0))
.spawn(|| async move {
let mut runs = 0;
let t = Instant::now();
Expand Down
6 changes: 2 additions & 4 deletions glommio/benches/semaphore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ use glommio::{enclose, prelude::*, sync::Semaphore};
use std::{cell::Cell, rc::Rc, time::Instant};

fn main() {
let local_ex = LocalExecutorBuilder::new()
.pin_to_cpu(0)
let local_ex = LocalExecutorBuilder::new(Placement::Exact(0))
.spawn(|| async move {
let runs: u32 = 10_000_000;
let s = Rc::new(Semaphore::new(10_000_000));
Expand All @@ -19,8 +18,7 @@ fn main() {
.unwrap();
local_ex.join().unwrap();

let local_ex = LocalExecutorBuilder::new()
.pin_to_cpu(0)
let local_ex = LocalExecutorBuilder::new(Placement::Exact(0))
.spawn(|| async move {
let runs: u32 = 10_000_000;
let s = Rc::new(Semaphore::new(0));
Expand Down
29 changes: 13 additions & 16 deletions glommio/benches/sharding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,24 +35,21 @@ fn main() {

let n = 400_000_000;

let shards = (0..nr_shards).map(|i| {
LocalExecutorBuilder::new()
.pin_to_cpu(i)
.spin_before_park(Duration::from_millis(10))
.spawn(enclose!((mesh) move || async move {
let handler = RequestHandler { nr_shards };
let mut sharded = Sharded::new(mesh, get_shard_for, handler).await.unwrap();
if sharded.shard_id() == 0 {
sharded.handle(repeat(1).take(n)).unwrap();
}
sharded.close().await;
}))
});
let shards = LocalExecutorPoolBuilder::new(PoolPlacement::MaxSpread(nr_shards, None))
.spin_before_park(Duration::from_millis(10))
.on_all_shards(enclose!((mesh) move || async move {
let handler = RequestHandler { nr_shards };
let mut sharded = Sharded::new(mesh, get_shard_for, handler).await.unwrap();
if sharded.shard_id() == 0 {
sharded.handle(repeat(1).take(n)).unwrap();
}
sharded.close().await;
}))
.unwrap();

let t = Instant::now();
for s in shards.collect::<Vec<_>>() {
s.unwrap().join().unwrap();
}
shards.join_all();

println!(
"elapsed: {:?}, average cost: {:?}",
t.elapsed(),
Expand Down
91 changes: 38 additions & 53 deletions glommio/benches/shared_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ fn test_spsc(capacity: usize) {
let runs: u32 = 10_000_000;
let (sender, receiver) = shared_channel::new_bounded(capacity);

let sender = LocalExecutorBuilder::new()
.pin_to_cpu(0)
let sender = LocalExecutorBuilder::new(Placement::Exact(0))
.spawn(move || async move {
let sender = sender.connect().await;
let t = Instant::now();
Expand All @@ -23,8 +22,7 @@ fn test_spsc(capacity: usize) {
})
.unwrap();

let receiver = LocalExecutorBuilder::new()
.pin_to_cpu(1)
let receiver = LocalExecutorBuilder::new(Placement::Exact(1))
.spawn(move || async move {
let receiver = receiver.connect().await;
let t = Instant::now();
Expand All @@ -47,8 +45,7 @@ fn test_rust_std(capacity: usize) {
let runs: u32 = 10_000_000;
let (sender, receiver) = sync_channel::<u8>(capacity);

let sender = LocalExecutorBuilder::new()
.pin_to_cpu(0)
let sender = LocalExecutorBuilder::new(Placement::Exact(0))
.spawn(move || async move {
let t = Instant::now();
for _ in 0..runs {
Expand All @@ -63,8 +60,7 @@ fn test_rust_std(capacity: usize) {
})
.unwrap();

let receiver = LocalExecutorBuilder::new()
.pin_to_cpu(1)
let receiver = LocalExecutorBuilder::new(Placement::Exact(1))
.spawn(move || async move {
let t = Instant::now();
for _ in 0..runs {
Expand All @@ -86,8 +82,7 @@ fn test_tokio_mpsc(capacity: usize) {
let runs: u32 = 10_000_000;
let (sender, mut receiver) = tokio::sync::mpsc::channel(capacity);

let sender = LocalExecutorBuilder::new()
.pin_to_cpu(0)
let sender = LocalExecutorBuilder::new(Placement::Exact(0))
.spawn(move || async move {
let t = Instant::now();
for _ in 0..runs {
Expand All @@ -102,8 +97,7 @@ fn test_tokio_mpsc(capacity: usize) {
})
.unwrap();

let receiver = LocalExecutorBuilder::new()
.pin_to_cpu(1)
let receiver = LocalExecutorBuilder::new(Placement::Exact(1))
.spawn(move || async move {
let t = Instant::now();
for _ in 0..runs {
Expand All @@ -125,50 +119,41 @@ fn test_mesh_mpmc(capacity: usize, peers: usize) {
let runs: u32 = 10_000_000;

let mesh = glommio::channels::channel_mesh::MeshBuilder::full(peers, capacity);
let mut execs = vec![];

let t = Instant::now();
for _ in 0..peers {
execs.push(
LocalExecutorBuilder::new()
.spawn(enclose! {(mesh) move || async move {
let (sender, mut receiver) = mesh.join().await.unwrap();

let sender_task = async move {
let mut rng = rand::thread_rng();
for _ in 0..runs {
let mut peer = rng.gen_range(0..sender.nr_consumers());
if peer == sender.producer_id().unwrap() && peer == 0 {
peer += 1;
} else if peer == sender.producer_id().unwrap() {
peer-=1;
}

sender.send_to(peer, 1).await.unwrap();
}

drop(sender);
};

let mut recvs = vec![];
for (_, recv) in receiver.streams() {
recvs.push(Local::local(async move {
while recv.recv().await.is_some(){};
}));
LocalExecutorPoolBuilder::new(PoolPlacement::MaxSpread(peers, None))
.on_all_shards(enclose! {(mesh) move || async move {
let (sender, mut receiver) = mesh.join().await.unwrap();

let sender_task = async move {
let mut rng = rand::thread_rng();
for _ in 0..runs {
let mut peer = rng.gen_range(0..sender.nr_consumers());
if peer == sender.producer_id().unwrap() && peer == 0 {
peer += 1;
} else if peer == sender.producer_id().unwrap() {
peer-=1;
}

sender_task.await;
for x in recvs {
x.await;
}
}})
.unwrap(),
);
}

for x in execs {
x.join().unwrap();
}
sender.send_to(peer, 1).await.unwrap();
}

drop(sender);
};

let mut recvs = vec![];
for (_, recv) in receiver.streams() {
recvs.push(Local::local(async move {
while recv.recv().await.is_some(){};
}));
}

sender_task.await;
for x in recvs {
x.await;
}
}})
.unwrap()
.join_all();

println!(
"cost of mesh message shared channel: {:#?}, peers {}, capacity {}",
Expand Down
6 changes: 2 additions & 4 deletions glommio/benches/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ fn main() {
let coord = Arc::new(Mutex::new(0));

let server = coord.clone();
let _server_ex = LocalExecutorBuilder::new()
.pin_to_cpu(0)
let _server_ex = LocalExecutorBuilder::new(Placement::Exact(0))
.spawn(move || async move {
let bw = Local::local(async move {
let mut handles = Vec::new();
Expand Down Expand Up @@ -77,8 +76,7 @@ fn main() {

while *(coord.lock().unwrap()) != 1 {}

let client_ex = LocalExecutorBuilder::new()
.pin_to_cpu(1)
let client_ex = LocalExecutorBuilder::new(Placement::Exact(1))
.spawn(move || async move {
let t = Instant::now();
for _ in 0..runs {
Expand Down
Loading

0 comments on commit 310e418

Please sign in to comment.