Skip to content

Commit

Permalink
Split main.rs into its own module and add basic CI (#1)
Browse files Browse the repository at this point in the history
* Apply cargo fmt

* Apply clippy suggestions

* Split main into some modules

Moved most structures and enums into lib.rs.
Added the worker module and moved functions related to it to be methods
of the WorkerConfig structure, still needs some more work though.

* Add basic CI

* Fix syscall spawn loop and use references in worker methods
  • Loading branch information
Molter73 authored Jul 7, 2023
1 parent 022db9d commit 0bad201
Show file tree
Hide file tree
Showing 4 changed files with 279 additions and 203 deletions.
29 changes: 29 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
name: Main berserker CI

on:
push:
branches:
- main
pull_request:

concurrency:
group: ${{ github.head_ref || github.run_id }}
cancel-in-progress: true

jobs:
lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3

- name: Check code formatting
run: cargo fmt --check

- name: Run clippy
run: cargo clippy

build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- run: make
28 changes: 28 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
pub mod worker;

#[derive(Debug, Copy, Clone)]
pub enum Distribution {
Zipfian,
Uniform,
}

#[derive(Debug, Copy, Clone)]
pub enum Workload {
Endpoints,
Processes,
Syscalls,
}

#[derive(Debug, Copy, Clone)]
pub struct WorkloadConfig {
pub restart_interval: u64,
pub endpoints_dist: Distribution,
pub workload: Workload,
pub zipf_exponent: f64,
pub n_ports: u64,
pub uniform_lower: u64,
pub uniform_upper: u64,
pub arrival_rate: f64,
pub departure_rate: f64,
pub random_process: bool,
}
257 changes: 54 additions & 203 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,177 +2,26 @@
extern crate log;
extern crate core_affinity;

use std::collections::HashMap;

use berserker::WorkloadConfig;
use config::Config;
use fork::{fork, Fork};
use std::{thread, time};
use std::process::Command;
use std::net::{TcpListener};
use core_affinity::CoreId;
use itertools::iproduct;
use nix::sys::wait::waitpid;
use nix::unistd::Pid;
use config::Config;
use std::collections::HashMap;
use syscalls::{Sysno, syscall};

use rand::prelude::*;
use rand::{distributions::Alphanumeric, Rng};
use rand_distr::Zipf;
use rand_distr::Uniform;
use rand_distr::Exp;

#[derive(Debug, Copy, Clone)]
enum Distribution {
Zipfian,
Uniform,
}

#[derive(Debug, Copy, Clone)]
enum Workload {
Endpoints,
Processes,
Syscalls,
}

#[derive(Debug, Copy, Clone)]
struct WorkloadConfig {
restart_interval: u64,
endpoints_dist: Distribution,
workload: Workload,
zipf_exponent: f64,
n_ports: u64,
uniform_lower: u64,
uniform_upper: u64,
arrival_rate: f64,
departure_rate: f64,
random_process: bool,
}

#[derive(Debug, Copy, Clone)]
struct WorkerConfig {
workload: WorkloadConfig,
cpu: CoreId,
process: usize,
lower: usize,
upper: usize,
}

fn listen(port: usize, sleep: u64) -> std::io::Result<()> {
let addr = format!("127.0.0.1:{}", port);
let listener = TcpListener::bind(addr)?;

let _res = listener.incoming();

thread::sleep(time::Duration::from_secs(sleep));
Ok(())
}

fn spawn_process(config: WorkerConfig, lifetime: u64) -> std::io::Result<()> {
if config.workload.random_process {
let uniq_arg: String = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(7)
.map(char::from)
.collect();
let _res = Command::new("stub").arg(uniq_arg).output().unwrap();
//info!("Command output: {}", String::from_utf8(res.stdout).unwrap());
Ok(())
} else {
match fork() {
Ok(Fork::Parent(child)) => {
info!("Parent: child {}", child);
waitpid(Pid::from_raw(child), None);
Ok(())
},
Ok(Fork::Child) => {
info!("{}-{}: Child start, {}", config.cpu.id, config.process, lifetime);
thread::sleep(time::Duration::from_millis(lifetime));
info!("{}-{}: Child stop", config.cpu.id, config.process);
Ok(())
},
Err(_) => {
warn!("Failed");
Ok(())
},
}
}
}

// Spawn processes with a specified rate
fn process_payload(config: WorkerConfig) -> std::io::Result<()> {
info!("Process {} from {}: {}-{}",
config.process, config.cpu.id, config.lower, config.upper);

loop {
let lifetime: f64 = thread_rng().sample(Exp::new(config.workload.departure_rate).unwrap());

thread::spawn(move || {
spawn_process(config, (lifetime * 1000.0).round() as u64)
});

let interval: f64 = thread_rng().sample(Exp::new(config.workload.arrival_rate).unwrap());
info!("{}-{}: Interval {}, rounded {}, lifetime {}, rounded {}",
config.cpu.id, config.process,
interval, (interval * 1000.0).round() as u64,
lifetime, (lifetime * 1000.0).round() as u64);
thread::sleep(time::Duration::from_millis((interval * 1000.0).round() as u64));
info!("{}-{}: Continue", config.cpu.id, config.process);
}
}

fn listen_payload(config: WorkerConfig) -> std::io::Result<()> {
info!("Process {} from {}: {}-{}",
config.process, config.cpu.id, config.lower, config.upper);

let listeners: Vec<_> = (config.lower..config.upper).map(|port| {
thread::spawn(move || {
listen(port, config.workload.restart_interval)
})
}).collect();

for listener in listeners {
let _res = listener.join().unwrap();
}

Ok(())
}

fn do_syscall(config: WorkerConfig) -> std::io::Result<()> {
match unsafe { syscall!(Sysno::getpid) } {
Ok(_) => {
Ok(())
}
Err(err) => {
warn!("Syscall failed: {}", err);
Ok(())
}
}
}

fn syscalls_payload(config: WorkerConfig) -> std::io::Result<()> {
info!("Process {} from {}: {}-{}",
config.process, config.cpu.id, config.lower, config.upper);

loop {
thread::spawn(move || {
do_syscall(config)
});
use rand_distr::Zipf;

let interval: f64 = thread_rng().sample(Exp::new(config.workload.arrival_rate).unwrap());
info!("{}-{}: Interval {}, rounded {}",
config.cpu.id, config.process,
interval, (interval * 1000.0).round() as u64);
thread::sleep(time::Duration::from_millis((interval * 1000.0).round() as u64));
info!("{}-{}: Continue", config.cpu.id, config.process);
}
}
use berserker::{worker::WorkerConfig, Distribution, Workload};

fn main() {
// Retrieve the IDs of all active CPU cores.
let core_ids = core_affinity::get_core_ids().unwrap();
let settings = Config::builder()
// Add in `./Settings.toml`
.add_source(config::File::with_name("/etc/berserker/workload.toml")
.required(false))
.add_source(config::File::with_name("/etc/berserker/workload.toml").required(false))
.add_source(config::File::with_name("workload.toml").required(false))
// Add in settings from the environment (with a prefix of APP)
// Eg.. `WORKLOAD_DEBUG=1 ./target/app` would set the `debug` key
Expand All @@ -191,19 +40,19 @@ fn main() {
"endpoints" => Workload::Endpoints,
"processes" => Workload::Processes,
"syscalls" => Workload::Syscalls,
_ => Workload::Endpoints,
_ => Workload::Endpoints,
};

let endpoints_dist = match settings["endpoints_distribution"].as_str() {
"zipf" => Distribution::Zipfian,
"uniform" => Distribution::Uniform,
_ => Distribution::Zipfian,
"zipf" => Distribution::Zipfian,
"uniform" => Distribution::Uniform,
_ => Distribution::Zipfian,
};

let config: WorkloadConfig = WorkloadConfig{
let config = WorkloadConfig {
restart_interval: settings["restart_interval"].parse::<u64>().unwrap(),
endpoints_dist: endpoints_dist,
workload: workload,
endpoints_dist,
workload,
zipf_exponent: settings["zipf_exponent"].parse::<f64>().unwrap(),
n_ports: settings["n_ports"].parse::<u64>().unwrap(),
arrival_rate: settings["arrival_rate"].parse::<f64>().unwrap(),
Expand All @@ -216,53 +65,55 @@ fn main() {
// Create processes for each active CPU core.
let handles: Vec<_> = iproduct!(core_ids.into_iter(), 0..9)
.map(|(cpu, process)| {
match config.endpoints_dist {
Distribution::Zipfian => {
let n_ports: f64 = thread_rng()
.sample(Zipf::new(config.n_ports, config.zipf_exponent).unwrap());

match config.endpoints_dist {
Distribution::Zipfian => {
let n_ports: f64 = thread_rng().sample(Zipf::new(config.n_ports, config.zipf_exponent).unwrap());

lower = upper;
upper += n_ports as usize;
},
Distribution::Uniform => {
let n_ports = thread_rng().sample(Uniform::new(config.uniform_lower, config.uniform_upper));
lower = upper;
upper += n_ports as usize;
}
Distribution::Uniform => {
let n_ports = thread_rng()
.sample(Uniform::new(config.uniform_lower, config.uniform_upper));

lower = upper;
upper += n_ports as usize;
lower = upper;
upper += n_ports as usize;
}
}
}

match fork() {
Ok(Fork::Parent(child)) => {info!("Child {}", child); Some(child)},
Ok(Fork::Child) => {
if core_affinity::set_for_current(cpu) {
let worker_config: WorkerConfig = WorkerConfig{
workload: config,
cpu: cpu,
process: process,
lower: lower,
upper: upper,
};

loop {
let _res = match config.workload {
Workload::Endpoints => listen_payload(worker_config),
Workload::Processes => process_payload(worker_config),
Workload::Syscalls => syscalls_payload(worker_config),
};
}
match fork() {
Ok(Fork::Parent(child)) => {
info!("Child {}", child);
Some(child)
}
Ok(Fork::Child) => {
if core_affinity::set_for_current(cpu) {
let worker_config = WorkerConfig::new(config, cpu, process, lower, upper);

loop {
let _res = match config.workload {
Workload::Endpoints => worker_config.listen_payload(),
Workload::Processes => worker_config.process_payload(),
Workload::Syscalls => worker_config.syscalls_payload(),
};
}
}

None
},
Err(_) => {warn!("Failed"); None},
}
}).collect();
None
}
Err(_) => {
warn!("Failed");
None
}
}
})
.collect();

info!("In total: {}", upper);

for handle in handles.into_iter().filter_map(|pid| pid) {
for handle in handles.into_iter().flatten() {
info!("waitpid: {}", handle);
waitpid(Pid::from_raw(handle), None);
waitpid(Pid::from_raw(handle), None).unwrap();
}
}
Loading

0 comments on commit 0bad201

Please sign in to comment.