Skip to content

Commit

Permalink
Simulator and model clocks refactoring - maintain a single simulation…
Browse files Browse the repository at this point in the history
… clock at the simulator level
  • Loading branch information
ndebuhr committed Apr 3, 2021
1 parent 870e7f4 commit 8d0a58d
Show file tree
Hide file tree
Showing 10 changed files with 47 additions and 60 deletions.
8 changes: 3 additions & 5 deletions src/models/exclusive_gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ struct PortsOut {
#[serde(rename_all = "camelCase")]
struct State {
event_list: Vec<ScheduledEvent>,
#[serde(default)]
global_time: f64,
}

impl Default for State {
Expand All @@ -60,7 +58,6 @@ impl Default for State {
};
State {
event_list: vec![initalization_event],
global_time: 0.0,
}
}
}
Expand Down Expand Up @@ -134,6 +131,7 @@ impl AsModel for ExclusiveGateway {
&mut self,
uniform_rng: &mut UniformRNG,
incoming_message: ModelMessage,
global_time: f64,
) -> Result<Vec<ModelMessage>, SimulationError> {
let mut outgoing_messages: Vec<ModelMessage> = Vec::new();
let port_number = self.port_weights.random_variate(uniform_rng)?;
Expand All @@ -142,7 +140,7 @@ impl AsModel for ExclusiveGateway {
self.snapshot.last_job = Some((
self.ports_out.flow_paths[port_number].clone(),
incoming_message.content.clone(),
self.state.global_time,
global_time,
));
}
if self.need_historical_metrics() {
Expand All @@ -159,6 +157,7 @@ impl AsModel for ExclusiveGateway {
fn events_int(
&mut self,
_uniform_rng: &mut UniformRNG,
_global_time: f64,
) -> Result<Vec<ModelMessage>, SimulationError> {
let events = self.state.event_list.clone();
self.state.event_list = self
Expand All @@ -184,7 +183,6 @@ impl AsModel for ExclusiveGateway {
.for_each(|scheduled_event| {
scheduled_event.time -= time_delta;
});
self.state.global_time += time_delta;
}

fn until_next_event(&self) -> f64 {
Expand Down
12 changes: 5 additions & 7 deletions src/models/gate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ struct State {
event_list: Vec<ScheduledEvent>,
jobs: Vec<String>,
phase: Phase,
#[serde(default)]
global_time: f64,
}

impl Default for State {
Expand All @@ -63,7 +61,6 @@ impl Default for State {
event_list: vec![initalization_event],
jobs: Vec::new(),
phase: Phase::Open,
global_time: 0.0,
}
}
}
Expand Down Expand Up @@ -162,13 +159,14 @@ impl AsModel for Gate {
&mut self,
_uniform_rng: &mut UniformRNG,
incoming_message: ModelMessage,
global_time: f64,
) -> Result<Vec<ModelMessage>, SimulationError> {
let incoming_port: &str = &incoming_message.port_name;
match &self.ports_in {
PortsIn { activation, .. } if activation == incoming_port => {
// Possible metrics updates
if self.need_snapshot_metrics() {
self.snapshot.last_activation = Some(self.state.global_time);
self.snapshot.last_activation = Some(global_time);
self.snapshot.is_open = Some(true);
}
if self.need_historical_metrics() {
Expand All @@ -180,7 +178,7 @@ impl AsModel for Gate {
PortsIn { deactivation, .. } if deactivation == incoming_port => {
// Possible metrics updates
if self.need_snapshot_metrics() {
self.snapshot.last_deactivation = Some(self.state.global_time);
self.snapshot.last_deactivation = Some(global_time);
self.snapshot.is_open = Some(false);
}
if self.need_historical_metrics() {
Expand All @@ -193,7 +191,7 @@ impl AsModel for Gate {
// Possible metrics updates
if self.need_snapshot_metrics() {
self.snapshot.last_received =
Some((incoming_message.content.clone(), self.state.global_time));
Some((incoming_message.content.clone(), global_time));
}
if self.need_historical_metrics() {
self.history.push(self.snapshot.clone());
Expand All @@ -219,6 +217,7 @@ impl AsModel for Gate {
fn events_int(
&mut self,
_uniform_rng: &mut UniformRNG,
_global_time: f64,
) -> Result<Vec<ModelMessage>, SimulationError> {
let mut outgoing_messages: Vec<ModelMessage> = Vec::new();
let events = self.state.event_list.clone();
Expand Down Expand Up @@ -270,7 +269,6 @@ impl AsModel for Gate {
.for_each(|scheduled_event| {
scheduled_event.time -= time_delta;
});
self.state.global_time += time_delta;
}

fn until_next_event(&self) -> f64 {
Expand Down
12 changes: 4 additions & 8 deletions src/models/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ struct State {
event_list: Vec<ScheduledEvent>,
until_message_interdeparture: f64,
job_counter: usize,
#[serde(default)]
global_time: f64,
}

impl Default for State {
Expand All @@ -69,7 +67,6 @@ impl Default for State {
event_list: vec![initalization_event],
until_message_interdeparture: INFINITY,
job_counter: 0,
global_time: 0.0,
}
}
}
Expand Down Expand Up @@ -147,13 +144,15 @@ impl AsModel for Generator {
&mut self,
_uniform_rng: &mut UniformRNG,
_incoming_message: ModelMessage,
_global_time: f64,
) -> Result<Vec<ModelMessage>, SimulationError> {
Ok(Vec::new())
}

fn events_int(
&mut self,
uniform_rng: &mut UniformRNG,
global_time: f64,
) -> Result<Vec<ModelMessage>, SimulationError> {
let mut outgoing_messages: Vec<ModelMessage> = Vec::new();
let events = self.state.event_list.clone();
Expand Down Expand Up @@ -185,8 +184,7 @@ impl AsModel for Generator {
event: Event::BeginGeneration,
});
if let Some(thinning) = self.thinning.clone() {
let thinning_threshold =
thinning.evaluate(self.state.global_time)?;
let thinning_threshold = thinning.evaluate(global_time)?;
let uniform_rn = uniform_rng.rn();
if uniform_rn < thinning_threshold {
self.state.event_list.push(ScheduledEvent {
Expand Down Expand Up @@ -214,8 +212,7 @@ impl AsModel for Generator {
});
// Possible metrics updates
if self.need_snapshot_metrics() {
self.snapshot.last_generation =
Some((generated, self.state.global_time));
self.snapshot.last_generation = Some((generated, global_time));
}
if self.need_historical_metrics() {
self.history.push(self.snapshot.clone());
Expand All @@ -236,7 +233,6 @@ impl AsModel for Generator {
.for_each(|scheduled_event| {
scheduled_event.time -= time_delta;
});
self.state.global_time += time_delta;
}

fn until_next_event(&self) -> f64 {
Expand Down
8 changes: 3 additions & 5 deletions src/models/load_balancer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ struct State {
event_list: Vec<ScheduledEvent>,
jobs: Vec<String>,
next_port_out: usize,
#[serde(default)]
global_time: f64,
}

impl Default for State {
Expand All @@ -58,7 +56,6 @@ impl Default for State {
event_list: vec![initalization_event],
jobs: Vec::new(),
next_port_out: 0,
global_time: 0.0,
}
}
}
Expand Down Expand Up @@ -131,6 +128,7 @@ impl AsModel for LoadBalancer {
&mut self,
_uniform_rng: &mut UniformRNG,
incoming_message: ModelMessage,
_global_time: f64,
) -> Result<Vec<ModelMessage>, SimulationError> {
self.state.jobs.push(incoming_message.content);
self.state.event_list.push(ScheduledEvent {
Expand All @@ -143,6 +141,7 @@ impl AsModel for LoadBalancer {
fn events_int(
&mut self,
_uniform_rng: &mut UniformRNG,
global_time: f64,
) -> Result<Vec<ModelMessage>, SimulationError> {
let mut outgoing_messages: Vec<ModelMessage> = Vec::new();
let events = self.state.event_list.clone();
Expand All @@ -164,7 +163,7 @@ impl AsModel for LoadBalancer {
self.snapshot.last_job = Some((
self.ports_out.flow_paths[self.state.next_port_out].clone(),
self.state.jobs[0].clone(),
self.state.global_time,
global_time,
));
}
if self.need_historical_metrics() {
Expand All @@ -189,7 +188,6 @@ impl AsModel for LoadBalancer {
.for_each(|scheduled_event| {
scheduled_event.time -= time_delta;
});
self.state.global_time += time_delta;
}

fn until_next_event(&self) -> f64 {
Expand Down
9 changes: 7 additions & 2 deletions src/models/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,18 @@ impl AsModel for Model {
&mut self,
uniform_rng: &mut UniformRNG,
incoming_message: ModelMessage,
global_time: f64,
) -> Result<Vec<ModelMessage>, SimulationError> {
self.inner.events_ext(uniform_rng, incoming_message)
self.inner
.events_ext(uniform_rng, incoming_message, global_time)
}

fn events_int(
&mut self,
uniform_rng: &mut UniformRNG,
global_time: f64,
) -> Result<Vec<ModelMessage>, SimulationError> {
self.inner.events_int(uniform_rng)
self.inner.events_int(uniform_rng, global_time)
}

fn time_advance(&mut self, time_delta: f64) {
Expand Down Expand Up @@ -90,10 +93,12 @@ pub trait AsModel {
&mut self,
uniform_rng: &mut UniformRNG,
incoming_message: ModelMessage,
global_time: f64,
) -> Result<Vec<ModelMessage>, SimulationError>;
fn events_int(
&mut self,
uniform_rng: &mut UniformRNG,
global_time: f64,
) -> Result<Vec<ModelMessage>, SimulationError>;
fn time_advance(&mut self, time_delta: f64);
fn until_next_event(&self) -> f64;
Expand Down
10 changes: 4 additions & 6 deletions src/models/parallel_gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ struct PortsOut {
struct State {
event_list: Vec<ScheduledEvent>,
collections: HashMap<String, usize>,
#[serde(default)]
global_time: f64,
}

impl Default for State {
Expand All @@ -60,7 +58,6 @@ impl Default for State {
State {
event_list: vec![initalization_event],
collections: HashMap::new(),
global_time: 0.0,
}
}
}
Expand Down Expand Up @@ -137,11 +134,11 @@ impl AsModel for ParallelGateway {
&mut self,
_uniform_rng: &mut UniformRNG,
incoming_message: ModelMessage,
global_time: f64,
) -> Result<Vec<ModelMessage>, SimulationError> {
// Possible metrics updates
if self.need_snapshot_metrics() {
self.snapshot.last_arrival =
Some((incoming_message.content.clone(), self.state.global_time));
self.snapshot.last_arrival = Some((incoming_message.content.clone(), global_time));
}
if self.need_historical_metrics() {
self.history.push(self.snapshot.clone());
Expand All @@ -165,6 +162,7 @@ impl AsModel for ParallelGateway {
fn events_int(
&mut self,
_uniform_rng: &mut UniformRNG,
global_time: f64,
) -> Result<Vec<ModelMessage>, SimulationError> {
let mut outgoing_messages: Vec<ModelMessage> = Vec::new();
let events = self.state.event_list.clone();
Expand Down Expand Up @@ -200,7 +198,7 @@ impl AsModel for ParallelGateway {
// Possible metrics updates
if self.need_snapshot_metrics() {
self.snapshot.last_departure =
Some((completed_collection, self.state.global_time));
Some((completed_collection, global_time));
}
if self.need_historical_metrics() {
self.history.push(self.snapshot.clone());
Expand Down
13 changes: 5 additions & 8 deletions src/models/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ struct State {
until_job_completion: f64,
queue: Vec<String>,
phase: Phase,
#[serde(default)]
global_time: f64,
}

impl Default for State {
Expand All @@ -76,7 +74,6 @@ impl Default for State {
until_job_completion: INFINITY,
queue: Vec::new(),
phase: Phase::Passive,
global_time: 0.0,
}
}
}
Expand Down Expand Up @@ -174,6 +171,7 @@ impl AsModel for Processor {
&mut self,
_uniform_rng: &mut UniformRNG,
incoming_message: ModelMessage,
global_time: f64,
) -> Result<Vec<ModelMessage>, SimulationError> {
let mut outgoing_messages: Vec<ModelMessage> = Vec::new();
let incoming_port: String = incoming_message.port_name;
Expand All @@ -183,8 +181,7 @@ impl AsModel for Processor {
// Possible metrics updates
if self.need_snapshot_metrics() {
self.snapshot.queue_size = self.state.queue.len();
self.snapshot.last_arrival =
Some((incoming_message.content, self.state.global_time));
self.snapshot.last_arrival = Some((incoming_message.content, global_time));
}
if self.need_historical_metrics() {
self.history.push(self.snapshot.clone());
Expand Down Expand Up @@ -241,6 +238,7 @@ impl AsModel for Processor {
fn events_int(
&mut self,
uniform_rng: &mut UniformRNG,
global_time: f64,
) -> Result<Vec<ModelMessage>, SimulationError> {
let mut outgoing_messages: Vec<ModelMessage> = Vec::new();
let events = self.state.event_list.clone();
Expand Down Expand Up @@ -284,7 +282,7 @@ impl AsModel for Processor {
.first()
.ok_or_else(|| SimulationError::InvalidModelState)?
.to_string(),
self.state.global_time,
global_time,
));
self.snapshot.is_utilized = true;
}
Expand All @@ -304,7 +302,7 @@ impl AsModel for Processor {
.first()
.ok_or_else(|| SimulationError::InvalidModelState)?
.to_string(),
self.state.global_time,
global_time,
));
}
// Use just the job ID from the input message - transform job type
Expand Down Expand Up @@ -351,7 +349,6 @@ impl AsModel for Processor {
.for_each(|scheduled_event| {
scheduled_event.time -= time_delta;
});
self.state.global_time += time_delta;
}

fn until_next_event(&self) -> f64 {
Expand Down
Loading

0 comments on commit 8d0a58d

Please sign in to comment.