diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml new file mode 100644 index 0000000..a2b46d1 --- /dev/null +++ b/.github/workflows/main.yml @@ -0,0 +1,29 @@ +name: Main berserker CI + +on: + push: + branches: + - main + pull_request: + +concurrency: + group: ${{ github.head_ref || github.run_id }} + cancel-in-progress: true + +jobs: + lint: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + + - name: Check code formatting + run: cargo fmt --check + + - name: Run clippy + run: cargo clippy + + build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - run: make diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..209cfdb --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,28 @@ +pub mod worker; + +#[derive(Debug, Copy, Clone)] +pub enum Distribution { + Zipfian, + Uniform, +} + +#[derive(Debug, Copy, Clone)] +pub enum Workload { + Endpoints, + Processes, + Syscalls, +} + +#[derive(Debug, Copy, Clone)] +pub struct WorkloadConfig { + pub restart_interval: u64, + pub endpoints_dist: Distribution, + pub workload: Workload, + pub zipf_exponent: f64, + pub n_ports: u64, + pub uniform_lower: u64, + pub uniform_upper: u64, + pub arrival_rate: f64, + pub departure_rate: f64, + pub random_process: bool, +} diff --git a/src/main.rs b/src/main.rs index f5da3b3..817ad7a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,177 +2,26 @@ extern crate log; extern crate core_affinity; +use std::collections::HashMap; + +use berserker::WorkloadConfig; +use config::Config; use fork::{fork, Fork}; -use std::{thread, time}; -use std::process::Command; -use std::net::{TcpListener}; -use core_affinity::CoreId; use itertools::iproduct; use nix::sys::wait::waitpid; use nix::unistd::Pid; -use config::Config; -use std::collections::HashMap; -use syscalls::{Sysno, syscall}; - use rand::prelude::*; -use rand::{distributions::Alphanumeric, Rng}; -use rand_distr::Zipf; use rand_distr::Uniform; -use rand_distr::Exp; - -#[derive(Debug, Copy, Clone)] -enum Distribution { - Zipfian, - Uniform, -} - -#[derive(Debug, Copy, Clone)] -enum Workload { - Endpoints, - Processes, - Syscalls, -} - -#[derive(Debug, Copy, Clone)] -struct WorkloadConfig { - restart_interval: u64, - endpoints_dist: Distribution, - workload: Workload, - zipf_exponent: f64, - n_ports: u64, - uniform_lower: u64, - uniform_upper: u64, - arrival_rate: f64, - departure_rate: f64, - random_process: bool, -} - -#[derive(Debug, Copy, Clone)] -struct WorkerConfig { - workload: WorkloadConfig, - cpu: CoreId, - process: usize, - lower: usize, - upper: usize, -} - -fn listen(port: usize, sleep: u64) -> std::io::Result<()> { - let addr = format!("127.0.0.1:{}", port); - let listener = TcpListener::bind(addr)?; - - let _res = listener.incoming(); - - thread::sleep(time::Duration::from_secs(sleep)); - Ok(()) -} - -fn spawn_process(config: WorkerConfig, lifetime: u64) -> std::io::Result<()> { - if config.workload.random_process { - let uniq_arg: String = rand::thread_rng() - .sample_iter(&Alphanumeric) - .take(7) - .map(char::from) - .collect(); - let _res = Command::new("stub").arg(uniq_arg).output().unwrap(); - //info!("Command output: {}", String::from_utf8(res.stdout).unwrap()); - Ok(()) - } else { - match fork() { - Ok(Fork::Parent(child)) => { - info!("Parent: child {}", child); - waitpid(Pid::from_raw(child), None); - Ok(()) - }, - Ok(Fork::Child) => { - info!("{}-{}: Child start, {}", config.cpu.id, config.process, lifetime); - thread::sleep(time::Duration::from_millis(lifetime)); - info!("{}-{}: Child stop", config.cpu.id, config.process); - Ok(()) - }, - Err(_) => { - warn!("Failed"); - Ok(()) - }, - } - } -} - -// Spawn processes with a specified rate -fn process_payload(config: WorkerConfig) -> std::io::Result<()> { - info!("Process {} from {}: {}-{}", - config.process, config.cpu.id, config.lower, config.upper); - - loop { - let lifetime: f64 = thread_rng().sample(Exp::new(config.workload.departure_rate).unwrap()); - - thread::spawn(move || { - spawn_process(config, (lifetime * 1000.0).round() as u64) - }); - - let interval: f64 = thread_rng().sample(Exp::new(config.workload.arrival_rate).unwrap()); - info!("{}-{}: Interval {}, rounded {}, lifetime {}, rounded {}", - config.cpu.id, config.process, - interval, (interval * 1000.0).round() as u64, - lifetime, (lifetime * 1000.0).round() as u64); - thread::sleep(time::Duration::from_millis((interval * 1000.0).round() as u64)); - info!("{}-{}: Continue", config.cpu.id, config.process); - } -} - -fn listen_payload(config: WorkerConfig) -> std::io::Result<()> { - info!("Process {} from {}: {}-{}", - config.process, config.cpu.id, config.lower, config.upper); - - let listeners: Vec<_> = (config.lower..config.upper).map(|port| { - thread::spawn(move || { - listen(port, config.workload.restart_interval) - }) - }).collect(); - - for listener in listeners { - let _res = listener.join().unwrap(); - } - - Ok(()) -} - -fn do_syscall(config: WorkerConfig) -> std::io::Result<()> { - match unsafe { syscall!(Sysno::getpid) } { - Ok(_) => { - Ok(()) - } - Err(err) => { - warn!("Syscall failed: {}", err); - Ok(()) - } - } -} - -fn syscalls_payload(config: WorkerConfig) -> std::io::Result<()> { - info!("Process {} from {}: {}-{}", - config.process, config.cpu.id, config.lower, config.upper); - - loop { - thread::spawn(move || { - do_syscall(config) - }); +use rand_distr::Zipf; - let interval: f64 = thread_rng().sample(Exp::new(config.workload.arrival_rate).unwrap()); - info!("{}-{}: Interval {}, rounded {}", - config.cpu.id, config.process, - interval, (interval * 1000.0).round() as u64); - thread::sleep(time::Duration::from_millis((interval * 1000.0).round() as u64)); - info!("{}-{}: Continue", config.cpu.id, config.process); - } -} +use berserker::{worker::WorkerConfig, Distribution, Workload}; fn main() { // Retrieve the IDs of all active CPU cores. let core_ids = core_affinity::get_core_ids().unwrap(); let settings = Config::builder() // Add in `./Settings.toml` - .add_source(config::File::with_name("/etc/berserker/workload.toml") - .required(false)) + .add_source(config::File::with_name("/etc/berserker/workload.toml").required(false)) .add_source(config::File::with_name("workload.toml").required(false)) // Add in settings from the environment (with a prefix of APP) // Eg.. `WORKLOAD_DEBUG=1 ./target/app` would set the `debug` key @@ -191,19 +40,19 @@ fn main() { "endpoints" => Workload::Endpoints, "processes" => Workload::Processes, "syscalls" => Workload::Syscalls, - _ => Workload::Endpoints, + _ => Workload::Endpoints, }; let endpoints_dist = match settings["endpoints_distribution"].as_str() { - "zipf" => Distribution::Zipfian, - "uniform" => Distribution::Uniform, - _ => Distribution::Zipfian, + "zipf" => Distribution::Zipfian, + "uniform" => Distribution::Uniform, + _ => Distribution::Zipfian, }; - let config: WorkloadConfig = WorkloadConfig{ + let config = WorkloadConfig { restart_interval: settings["restart_interval"].parse::().unwrap(), - endpoints_dist: endpoints_dist, - workload: workload, + endpoints_dist, + workload, zipf_exponent: settings["zipf_exponent"].parse::().unwrap(), n_ports: settings["n_ports"].parse::().unwrap(), arrival_rate: settings["arrival_rate"].parse::().unwrap(), @@ -216,53 +65,55 @@ fn main() { // Create processes for each active CPU core. let handles: Vec<_> = iproduct!(core_ids.into_iter(), 0..9) .map(|(cpu, process)| { + match config.endpoints_dist { + Distribution::Zipfian => { + let n_ports: f64 = thread_rng() + .sample(Zipf::new(config.n_ports, config.zipf_exponent).unwrap()); - match config.endpoints_dist { - Distribution::Zipfian => { - let n_ports: f64 = thread_rng().sample(Zipf::new(config.n_ports, config.zipf_exponent).unwrap()); - - lower = upper; - upper += n_ports as usize; - }, - Distribution::Uniform => { - let n_ports = thread_rng().sample(Uniform::new(config.uniform_lower, config.uniform_upper)); + lower = upper; + upper += n_ports as usize; + } + Distribution::Uniform => { + let n_ports = thread_rng() + .sample(Uniform::new(config.uniform_lower, config.uniform_upper)); - lower = upper; - upper += n_ports as usize; + lower = upper; + upper += n_ports as usize; + } } - } - - match fork() { - Ok(Fork::Parent(child)) => {info!("Child {}", child); Some(child)}, - Ok(Fork::Child) => { - if core_affinity::set_for_current(cpu) { - let worker_config: WorkerConfig = WorkerConfig{ - workload: config, - cpu: cpu, - process: process, - lower: lower, - upper: upper, - }; - loop { - let _res = match config.workload { - Workload::Endpoints => listen_payload(worker_config), - Workload::Processes => process_payload(worker_config), - Workload::Syscalls => syscalls_payload(worker_config), - }; - } + match fork() { + Ok(Fork::Parent(child)) => { + info!("Child {}", child); + Some(child) } + Ok(Fork::Child) => { + if core_affinity::set_for_current(cpu) { + let worker_config = WorkerConfig::new(config, cpu, process, lower, upper); + + loop { + let _res = match config.workload { + Workload::Endpoints => worker_config.listen_payload(), + Workload::Processes => worker_config.process_payload(), + Workload::Syscalls => worker_config.syscalls_payload(), + }; + } + } - None - }, - Err(_) => {warn!("Failed"); None}, - } - }).collect(); + None + } + Err(_) => { + warn!("Failed"); + None + } + } + }) + .collect(); info!("In total: {}", upper); - for handle in handles.into_iter().filter_map(|pid| pid) { + for handle in handles.into_iter().flatten() { info!("waitpid: {}", handle); - waitpid(Pid::from_raw(handle), None); + waitpid(Pid::from_raw(handle), None).unwrap(); } } diff --git a/src/worker.rs b/src/worker.rs new file mode 100644 index 0000000..7b8a723 --- /dev/null +++ b/src/worker.rs @@ -0,0 +1,168 @@ +use std::{io::Result, net::TcpListener, process::Command, thread, time}; + +use core_affinity::CoreId; +use fork::{fork, Fork}; +use log::{info, warn}; +use nix::{sys::wait::waitpid, unistd::Pid}; +use rand::{distributions::Alphanumeric, thread_rng, Rng}; +use rand_distr::Exp; +use syscalls::{syscall, Sysno}; + +use crate::WorkloadConfig; + +#[derive(Debug, Copy, Clone)] +pub struct WorkerConfig { + workload: WorkloadConfig, + cpu: CoreId, + process: usize, + lower: usize, + upper: usize, +} + +impl WorkerConfig { + pub fn new( + workload: WorkloadConfig, + cpu: CoreId, + process: usize, + lower: usize, + upper: usize, + ) -> Self { + WorkerConfig { + workload, + cpu, + process, + lower, + upper, + } + } + + fn spawn_process(&self, lifetime: u64) -> Result<()> { + if self.workload.random_process { + let uniq_arg: String = rand::thread_rng() + .sample_iter(&Alphanumeric) + .take(7) + .map(char::from) + .collect(); + let _res = Command::new("stub").arg(uniq_arg).output().unwrap(); + Ok(()) + } else { + match fork() { + Ok(Fork::Parent(child)) => { + info!("Parent: child {}", child); + waitpid(Pid::from_raw(child), None).unwrap(); + Ok(()) + } + Ok(Fork::Child) => { + info!( + "{}-{}: Child start, {}", + self.cpu.id, self.process, lifetime + ); + thread::sleep(time::Duration::from_millis(lifetime)); + info!("{}-{}: Child stop", self.cpu.id, self.process); + Ok(()) + } + Err(_) => { + warn!("Failed"); + Ok(()) + } + } + } + } + + // Spawn processes with a specified rate + pub fn process_payload(&self) -> std::io::Result<()> { + info!( + "Process {} from {}: {}-{}", + self.process, self.cpu.id, self.lower, self.upper + ); + + loop { + let lifetime: f64 = + thread_rng().sample(Exp::new(self.workload.departure_rate).unwrap()); + + let worker = *self; + thread::spawn(move || worker.spawn_process((lifetime * 1000.0).round() as u64)); + + let interval: f64 = thread_rng().sample(Exp::new(self.workload.arrival_rate).unwrap()); + info!( + "{}-{}: Interval {}, rounded {}, lifetime {}, rounded {}", + self.cpu.id, + self.process, + interval, + (interval * 1000.0).round() as u64, + lifetime, + (lifetime * 1000.0).round() as u64 + ); + thread::sleep(time::Duration::from_millis( + (interval * 1000.0).round() as u64 + )); + info!("{}-{}: Continue", self.cpu.id, self.process); + } + } + + pub fn listen_payload(&self) -> std::io::Result<()> { + info!( + "Process {} from {}: {}-{}", + self.process, self.cpu.id, self.lower, self.upper + ); + + let restart_interval = self.workload.restart_interval; + + let listeners: Vec<_> = (self.lower..self.upper) + .map(|port| thread::spawn(move || listen(port, restart_interval))) + .collect(); + + for listener in listeners { + let _res = listener.join().unwrap(); + } + + Ok(()) + } + + pub fn syscalls_payload(&self) -> Result<()> { + info!( + "Process {} from {}: {}-{}", + self.process, self.cpu.id, self.lower, self.upper + ); + + loop { + let worker = *self; + thread::spawn(move || { + worker.do_syscall().unwrap(); + }); + + let interval: f64 = thread_rng().sample(Exp::new(self.workload.arrival_rate).unwrap()); + info!( + "{}-{}: Interval {}, rounded {}", + self.cpu.id, + self.process, + interval, + (interval * 1000.0).round() as u64 + ); + thread::sleep(time::Duration::from_millis( + (interval * 1000.0).round() as u64 + )); + info!("{}-{}: Continue", self.cpu.id, self.process); + } + } + + fn do_syscall(&self) -> std::io::Result<()> { + match unsafe { syscall!(Sysno::getpid) } { + Ok(_) => Ok(()), + Err(err) => { + warn!("Syscall failed: {}", err); + Ok(()) + } + } + } +} + +fn listen(port: usize, sleep: u64) -> std::io::Result<()> { + let addr = format!("127.0.0.1:{port}"); + let listener = TcpListener::bind(addr)?; + + let _res = listener.incoming(); + + thread::sleep(time::Duration::from_secs(sleep)); + Ok(()) +}