Skip to content

Commit e8409cc

Browse files
committed
refactor: divide sim env into sub mods.
1 parent fdfea42 commit e8409cc

35 files changed

+1208
-1282
lines changed

serverless_sim/src/algos.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,16 @@ impl SimEnv {
2828
&self,
2929
fns_ready_2_schedule: &HashMap<FnId, ContainerMetric>,
3030
) -> Vec<(FnId, ContainerMetric)> {
31-
self.fns
32-
.borrow()
31+
self.core
32+
.fns()
3333
.iter()
3434
.filter(|f| !fns_ready_2_schedule.contains_key(&f.fn_id))
3535
.map(|f| {
3636
(
3737
f.fn_id,
3838
ContainerMetric {
3939
container_count: self.fn_container_cnt(f.fn_id),
40-
scheduled_fn_count: self.fn_2_nodes.borrow().get(&f.fn_id).map_or_else(
40+
scheduled_fn_count: self.core.fn_2_nodes().get(&f.fn_id).map_or_else(
4141
|| 0,
4242
|nodes| {
4343
nodes
@@ -58,7 +58,7 @@ impl SimEnv {
5858
let env = self;
5959
let mut collect_map: BTreeMap<ReqId, VecDeque<FnId>> = BTreeMap::new();
6060
// 对于已经进来的请求,scale up 已经没有前驱的fns
61-
for (&reqid, req) in env.requests.borrow().iter() {
61+
for (&reqid, req) in env.core.requests().iter() {
6262
let req_dag = env.dag(req.dag_i);
6363
let mut walker = req_dag.new_dag_walker();
6464
'outer: while let Some(f) = walker.next(&req_dag.dag_inner) {
@@ -100,7 +100,7 @@ impl SimEnv {
100100
let env = self;
101101
let mut collect_map: HashMap<FnId, ContainerMetric> = HashMap::new();
102102
// 对于已经进来的请求,scale up 已经没有前驱的fns
103-
for (_reqid, req) in env.requests.borrow().iter() {
103+
for (_reqid, req) in env.core.requests().iter() {
104104
let req_dag = env.dag(req.dag_i);
105105
let mut walker = req_dag.new_dag_walker();
106106
'outer: while let Some(f) = walker.next(&req_dag.dag_inner) {
@@ -126,11 +126,11 @@ impl SimEnv {
126126
})
127127
.or_insert(ContainerMetric {
128128
container_count: env
129-
.fn_2_nodes
130-
.borrow()
129+
.core
130+
.fn_2_nodes()
131131
.get(&fnid)
132132
.map_or_else(|| 0, |nodes| nodes.len()),
133-
scheduled_fn_count: env.fn_2_nodes.borrow().get(&fnid).map_or_else(
133+
scheduled_fn_count: env.core.fn_2_nodes().get(&fnid).map_or_else(
134134
|| 0,
135135
|nodes| {
136136
nodes

serverless_sim/src/env_gc.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
use std::{ thread, time::{ SystemTime, UNIX_EPOCH, Duration } };
1+
use std::{
2+
thread,
3+
time::{Duration, SystemTime, UNIX_EPOCH},
4+
};
25

36
use crate::network::SIM_ENVS;
47

@@ -16,9 +19,9 @@ pub fn start_gc() {
1619
for e in sim_envs.iter() {
1720
let env = e.1.lock().unwrap();
1821
if now > Duration::from_secs(60) + env.recent_use_time {
19-
let key = env.config.str();
22+
let key = env.help.config().str();
2023
log::warn!("gc env {}", key);
21-
env.metric_record.borrow().flush(&env);
24+
env.help.metric_record().flush(&env);
2225
to_remove.push(key);
2326
}
2427
}

serverless_sim/src/es.rs

Lines changed: 13 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -2,26 +2,12 @@ use crate::{
22
actions::{ESActionWrapper, RawAction},
33
algos::ContainerMetric,
44
config::Config,
5-
// es_lass::LassESScaler,
65
fn_dag::FnId,
76
node::NodeId,
87
request::ReqId,
9-
scale_preloader::{least_task::LeastTaskPreLoader, ScalePreLoader},
10-
scaler_ai::AIScaler,
11-
// es_ai::{self, AIScaler},
12-
// es_faas_flow::FaasFlowScheduler,
13-
// es_fnsche::FnScheScaler,
14-
scaler_hpa::HpaESScaler,
15-
scaler_lass::LassESScaler,
16-
scaler_no::ScalerNo,
17-
sche_faasflow::FaasFlowScheduler,
18-
sche_fnsche::FnScheScheduler,
19-
sche_pass::PassScheduler,
20-
sche_pos::PosScheduler,
21-
sche_rule_based::{RuleBasedScheduler, ScheduleRule},
22-
sche_time_aware::TimeScheduler,
23-
schedule::Scheduler,
8+
scale::num::{ai::AIScaleNum, hpa::HpaScaleNum, lass::LassScaleNum, no::NoScaleNum, ScaleNum},
249
sim_env::SimEnv,
10+
sim_run::Scheduler,
2511
};
2612
use enum_as_inner::EnumAsInner;
2713
use std::{
@@ -33,19 +19,6 @@ pub trait ActionEffectStage {
3319
fn prepare_next(&mut self) -> bool;
3420
}
3521

36-
pub trait ESScaler {
37-
/// return (action, action_is_done)
38-
/// - action_is_done: need prepare next state and wait for new action
39-
fn scale_for_fn(
40-
&mut self,
41-
env: &SimEnv,
42-
fnid: FnId,
43-
metric: &ContainerMetric,
44-
action: &ESActionWrapper,
45-
) -> (f32, bool);
46-
47-
fn fn_available_count(&self, fnid: FnId, env: &SimEnv) -> usize;
48-
}
4922
#[derive(Debug)]
5023
pub struct StageScaleForFns {
5124
current_index: Option<usize>,
@@ -139,7 +112,7 @@ pub struct StageScaleDown {
139112
impl StageScaleDown {
140113
fn new(env: &SimEnv) -> Self {
141114
let mut idle_containers = Vec::new();
142-
let nodes = env.nodes.borrow();
115+
let nodes = env.core.nodes();
143116
for node in nodes.iter() {
144117
for (&fnid, container) in node.fn_containers.borrow().iter() {
145118
if container.is_idle() {
@@ -241,7 +214,7 @@ impl ESState {
241214
}
242215
fn_metrics.extend(fn_all_sched_metrics);
243216

244-
assert_eq!(fn_metrics.len(), env.fns.borrow().len());
217+
assert_eq!(fn_metrics.len(), env.core.fns().len());
245218
self.stage = EFStage::ScaleForFns(StageScaleForFns {
246219
current_index: None,
247220
fn_metrics,
@@ -250,8 +223,8 @@ impl ESState {
250223
});
251224
if self.stage.as_scale_for_fns_mut().unwrap().prepare_next() {
252225
// pre load info of scheduler because scaler need to know the info of scheduler
253-
env.spec_scheduler
254-
.borrow_mut()
226+
env.mechanisms
227+
.spec_scheduler_mut()
255228
.as_mut()
256229
.unwrap()
257230
.prepare_this_turn_will_schedule(env);
@@ -281,57 +254,6 @@ impl ESState {
281254
}
282255
}
283256

284-
pub fn prepare_spec_scheduler(config: &Config) -> Option<Box<dyn Scheduler + Send>> {
285-
if config.es.sche_faas_flow() {
286-
return Some(Box::new(FaasFlowScheduler::new()));
287-
} else if config.es.sche_gofs() {
288-
return Some(Box::new(RuleBasedScheduler {
289-
rule: ScheduleRule::GOFS,
290-
}));
291-
} else if config.es.sche_load_least() {
292-
return Some(Box::new(RuleBasedScheduler {
293-
rule: ScheduleRule::LeastLoad,
294-
}));
295-
} else if config.es.sche_random() {
296-
return Some(Box::new(RuleBasedScheduler {
297-
rule: ScheduleRule::Random,
298-
}));
299-
} else if config.es.sche_round_robin() {
300-
return Some(Box::new(RuleBasedScheduler {
301-
rule: ScheduleRule::RoundRobin(9999),
302-
}));
303-
} else if config.es.sche_pass() {
304-
return Some(Box::new(PassScheduler::new()));
305-
} else if config.es.sche_rule() {
306-
return Some(Box::new(PosScheduler::new()));
307-
} else if config.es.sche_fnsche() {
308-
return Some(Box::new(FnScheScheduler::new()));
309-
} else if config.es.sche_time() {
310-
return Some(Box::new(TimeScheduler::new()));
311-
}
312-
None
313-
}
314-
315-
pub fn prepare_spec_scaler(config: &Config) -> Option<Box<dyn ESScaler + Send>> {
316-
let es = &config.es;
317-
318-
if es.scale_lass() {
319-
return Some(Box::new(LassESScaler::new()));
320-
}
321-
// } else if es.sche_fnsche() {
322-
// return Some(Box::new(FnScheScaler::new()));
323-
// } else
324-
if es.scale_hpa() {
325-
return Some(Box::new(HpaESScaler::new()));
326-
} else if es.scale_ai() {
327-
return Some(Box::new(AIScaler::new(config)));
328-
} else if es.scale_up_no() {
329-
return Some(Box::new(ScalerNo::new()));
330-
}
331-
332-
None
333-
}
334-
335257
impl SimEnv {
336258
// return false if schedule failed
337259
fn step_schedule(&self, raw_action: u32, stage: &mut StageSchedule) -> bool {
@@ -365,8 +287,6 @@ impl SimEnv {
365287
let mut action_done = false;
366288
// 只有确定了下一个action,才会有可以返回的state
367289

368-
let config_es = || &self.config.es;
369-
370290
loop {
371291
if ef_state.stage.is_frame_begin() {
372292
if (self.current_frame() == 0 && ef_state.computed) || self.current_frame() > 0 {
@@ -389,7 +309,7 @@ impl SimEnv {
389309
break;
390310
}
391311
// faas flow don't do scale for fns
392-
if config_es().sche_faas_flow() {
312+
if self.help.config().es.sche_faas_flow() {
393313
ef_state.trans_stage(self);
394314
continue;
395315
}
@@ -398,8 +318,8 @@ impl SimEnv {
398318
// let fnid = stage.current_fnid.unwrap();
399319
let &(fnid, ref metric) = stage.current_fn().unwrap();
400320
let (action_score_, action_done_) = self
401-
.spec_ef_scaler
402-
.borrow_mut()
321+
.mechanisms
322+
.spec_scale_num_mut()
403323
.as_mut()
404324
.unwrap()
405325
.scale_for_fn(self, fnid, metric, &raw_action);
@@ -412,7 +332,7 @@ impl SimEnv {
412332
}
413333
} else if ef_state.stage.is_schedule() {
414334
log::info!("schedule");
415-
if self.config.es.sche_ai() {
335+
if self.help.config().es.sche_ai() {
416336
if action_done {
417337
// next action effect stage is prepared
418338
break;
@@ -441,11 +361,11 @@ impl SimEnv {
441361
// self.try_put_fn(true);
442362
// ef_state.trans_stage(self);
443363
// }
444-
else if let Some(spec_sche) = self.spec_scheduler.borrow_mut().as_mut() {
364+
else if let Some(spec_sche) = self.mechanisms.spec_scheduler_mut().as_mut() {
445365
// let mut spec = self.spec_scheduler.borrow_mut();
446366
spec_sche.schedule_some(self);
447367
ef_state.trans_stage(self);
448-
} else if self.config.es.sche_fnsche() {
368+
} else if self.help.config().es.sche_fnsche() {
449369
// sche is done in scale stage
450370
ef_state.trans_stage(self);
451371
} else {
@@ -458,7 +378,7 @@ impl SimEnv {
458378
ef_state.computed = true;
459379
self.sim_run();
460380
frame_score = self.score();
461-
self.metric_record.borrow_mut().add_frame(self);
381+
self.help.metric_record_mut().add_frame(self);
462382

463383
ef_state.trans_stage(self);
464384
}

0 commit comments

Comments
 (0)