Skip to content

refactor: divide sim env into sub mods. #32

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions serverless_sim/src/algos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,16 @@ impl SimEnv {
&self,
fns_ready_2_schedule: &HashMap<FnId, ContainerMetric>,
) -> Vec<(FnId, ContainerMetric)> {
self.fns
.borrow()
self.core
.fns()
.iter()
.filter(|f| !fns_ready_2_schedule.contains_key(&f.fn_id))
.map(|f| {
(
f.fn_id,
ContainerMetric {
container_count: self.fn_container_cnt(f.fn_id),
scheduled_fn_count: self.fn_2_nodes.borrow().get(&f.fn_id).map_or_else(
scheduled_fn_count: self.core.fn_2_nodes().get(&f.fn_id).map_or_else(
|| 0,
|nodes| {
nodes
Expand All @@ -58,7 +58,7 @@ impl SimEnv {
let env = self;
let mut collect_map: BTreeMap<ReqId, VecDeque<FnId>> = BTreeMap::new();
// 对于已经进来的请求,scale up 已经没有前驱的fns
for (&reqid, req) in env.requests.borrow().iter() {
for (&reqid, req) in env.core.requests().iter() {
let req_dag = env.dag(req.dag_i);
let mut walker = req_dag.new_dag_walker();
'outer: while let Some(f) = walker.next(&req_dag.dag_inner) {
Expand Down Expand Up @@ -100,7 +100,7 @@ impl SimEnv {
let env = self;
let mut collect_map: HashMap<FnId, ContainerMetric> = HashMap::new();
// 对于已经进来的请求,scale up 已经没有前驱的fns
for (_reqid, req) in env.requests.borrow().iter() {
for (_reqid, req) in env.core.requests().iter() {
let req_dag = env.dag(req.dag_i);
let mut walker = req_dag.new_dag_walker();
'outer: while let Some(f) = walker.next(&req_dag.dag_inner) {
Expand All @@ -126,11 +126,11 @@ impl SimEnv {
})
.or_insert(ContainerMetric {
container_count: env
.fn_2_nodes
.borrow()
.core
.fn_2_nodes()
.get(&fnid)
.map_or_else(|| 0, |nodes| nodes.len()),
scheduled_fn_count: env.fn_2_nodes.borrow().get(&fnid).map_or_else(
scheduled_fn_count: env.core.fn_2_nodes().get(&fnid).map_or_else(
|| 0,
|nodes| {
nodes
Expand Down
9 changes: 6 additions & 3 deletions serverless_sim/src/env_gc.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::{ thread, time::{ SystemTime, UNIX_EPOCH, Duration } };
use std::{
thread,
time::{Duration, SystemTime, UNIX_EPOCH},
};

use crate::network::SIM_ENVS;

Expand All @@ -16,9 +19,9 @@ pub fn start_gc() {
for e in sim_envs.iter() {
let env = e.1.lock().unwrap();
if now > Duration::from_secs(60) + env.recent_use_time {
let key = env.config.str();
let key = env.help.config().str();
log::warn!("gc env {}", key);
env.metric_record.borrow().flush(&env);
env.help.metric_record().flush(&env);
to_remove.push(key);
}
}
Expand Down
106 changes: 13 additions & 93 deletions serverless_sim/src/es.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,12 @@ use crate::{
actions::{ESActionWrapper, RawAction},
algos::ContainerMetric,
config::Config,
// es_lass::LassESScaler,
fn_dag::FnId,
node::NodeId,
request::ReqId,
scale_preloader::{least_task::LeastTaskPreLoader, ScalePreLoader},
scaler_ai::AIScaler,
// es_ai::{self, AIScaler},
// es_faas_flow::FaasFlowScheduler,
// es_fnsche::FnScheScaler,
scaler_hpa::HpaESScaler,
scaler_lass::LassESScaler,
scaler_no::ScalerNo,
sche_faasflow::FaasFlowScheduler,
sche_fnsche::FnScheScheduler,
sche_pass::PassScheduler,
sche_pos::PosScheduler,
sche_rule_based::{RuleBasedScheduler, ScheduleRule},
sche_time_aware::TimeScheduler,
schedule::Scheduler,
scale::num::{ai::AIScaleNum, hpa::HpaScaleNum, lass::LassScaleNum, no::NoScaleNum, ScaleNum},
sim_env::SimEnv,
sim_run::Scheduler,
};
use enum_as_inner::EnumAsInner;
use std::{
Expand All @@ -33,19 +19,6 @@ pub trait ActionEffectStage {
fn prepare_next(&mut self) -> bool;
}

pub trait ESScaler {
/// return (action, action_is_done)
/// - action_is_done: need prepare next state and wait for new action
fn scale_for_fn(
&mut self,
env: &SimEnv,
fnid: FnId,
metric: &ContainerMetric,
action: &ESActionWrapper,
) -> (f32, bool);

fn fn_available_count(&self, fnid: FnId, env: &SimEnv) -> usize;
}
#[derive(Debug)]
pub struct StageScaleForFns {
current_index: Option<usize>,
Expand Down Expand Up @@ -139,7 +112,7 @@ pub struct StageScaleDown {
impl StageScaleDown {
fn new(env: &SimEnv) -> Self {
let mut idle_containers = Vec::new();
let nodes = env.nodes.borrow();
let nodes = env.core.nodes();
for node in nodes.iter() {
for (&fnid, container) in node.fn_containers.borrow().iter() {
if container.is_idle() {
Expand Down Expand Up @@ -241,7 +214,7 @@ impl ESState {
}
fn_metrics.extend(fn_all_sched_metrics);

assert_eq!(fn_metrics.len(), env.fns.borrow().len());
assert_eq!(fn_metrics.len(), env.core.fns().len());
self.stage = EFStage::ScaleForFns(StageScaleForFns {
current_index: None,
fn_metrics,
Expand All @@ -250,8 +223,8 @@ impl ESState {
});
if self.stage.as_scale_for_fns_mut().unwrap().prepare_next() {
// pre load info of scheduler because scaler need to know the info of scheduler
env.spec_scheduler
.borrow_mut()
env.mechanisms
.spec_scheduler_mut()
.as_mut()
.unwrap()
.prepare_this_turn_will_schedule(env);
Expand Down Expand Up @@ -281,57 +254,6 @@ impl ESState {
}
}

pub fn prepare_spec_scheduler(config: &Config) -> Option<Box<dyn Scheduler + Send>> {
if config.es.sche_faas_flow() {
return Some(Box::new(FaasFlowScheduler::new()));
} else if config.es.sche_gofs() {
return Some(Box::new(RuleBasedScheduler {
rule: ScheduleRule::GOFS,
}));
} else if config.es.sche_load_least() {
return Some(Box::new(RuleBasedScheduler {
rule: ScheduleRule::LeastLoad,
}));
} else if config.es.sche_random() {
return Some(Box::new(RuleBasedScheduler {
rule: ScheduleRule::Random,
}));
} else if config.es.sche_round_robin() {
return Some(Box::new(RuleBasedScheduler {
rule: ScheduleRule::RoundRobin(9999),
}));
} else if config.es.sche_pass() {
return Some(Box::new(PassScheduler::new()));
} else if config.es.sche_rule() {
return Some(Box::new(PosScheduler::new()));
} else if config.es.sche_fnsche() {
return Some(Box::new(FnScheScheduler::new()));
} else if config.es.sche_time() {
return Some(Box::new(TimeScheduler::new()));
}
None
}

pub fn prepare_spec_scaler(config: &Config) -> Option<Box<dyn ESScaler + Send>> {
let es = &config.es;

if es.scale_lass() {
return Some(Box::new(LassESScaler::new()));
}
// } else if es.sche_fnsche() {
// return Some(Box::new(FnScheScaler::new()));
// } else
if es.scale_hpa() {
return Some(Box::new(HpaESScaler::new()));
} else if es.scale_ai() {
return Some(Box::new(AIScaler::new(config)));
} else if es.scale_up_no() {
return Some(Box::new(ScalerNo::new()));
}

None
}

impl SimEnv {
// return false if schedule failed
fn step_schedule(&self, raw_action: u32, stage: &mut StageSchedule) -> bool {
Expand Down Expand Up @@ -365,8 +287,6 @@ impl SimEnv {
let mut action_done = false;
// 只有确定了下一个action,才会有可以返回的state

let config_es = || &self.config.es;

loop {
if ef_state.stage.is_frame_begin() {
if (self.current_frame() == 0 && ef_state.computed) || self.current_frame() > 0 {
Expand All @@ -389,7 +309,7 @@ impl SimEnv {
break;
}
// faas flow don't do scale for fns
if config_es().sche_faas_flow() {
if self.help.config().es.sche_faas_flow() {
ef_state.trans_stage(self);
continue;
}
Expand All @@ -398,8 +318,8 @@ impl SimEnv {
// let fnid = stage.current_fnid.unwrap();
let &(fnid, ref metric) = stage.current_fn().unwrap();
let (action_score_, action_done_) = self
.spec_ef_scaler
.borrow_mut()
.mechanisms
.spec_scale_num_mut()
.as_mut()
.unwrap()
.scale_for_fn(self, fnid, metric, &raw_action);
Expand All @@ -412,7 +332,7 @@ impl SimEnv {
}
} else if ef_state.stage.is_schedule() {
log::info!("schedule");
if self.config.es.sche_ai() {
if self.help.config().es.sche_ai() {
if action_done {
// next action effect stage is prepared
break;
Expand Down Expand Up @@ -441,11 +361,11 @@ impl SimEnv {
// self.try_put_fn(true);
// ef_state.trans_stage(self);
// }
else if let Some(spec_sche) = self.spec_scheduler.borrow_mut().as_mut() {
else if let Some(spec_sche) = self.mechanisms.spec_scheduler_mut().as_mut() {
// let mut spec = self.spec_scheduler.borrow_mut();
spec_sche.schedule_some(self);
ef_state.trans_stage(self);
} else if self.config.es.sche_fnsche() {
} else if self.help.config().es.sche_fnsche() {
// sche is done in scale stage
ef_state.trans_stage(self);
} else {
Expand All @@ -458,7 +378,7 @@ impl SimEnv {
ef_state.computed = true;
self.sim_run();
frame_score = self.score();
self.metric_record.borrow_mut().add_frame(self);
self.help.metric_record_mut().add_frame(self);

ef_state.trans_stage(self);
}
Expand Down
Loading