Skip to content

Adding the eggress node handling functionality #137

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ Simulate flow-level, inter-node network coordination including scaling and place

## Citing this work

If you are using this work in whole or in part in your project, please cite it as follows:
If you are using this work in whole or in part in your project, please cite it as follows:

```
@inproceedings{schneider2020coordination,
Expand All @@ -54,9 +54,9 @@ pip install -r requirements.txt

## Usage

Type `coord-sim -h` for help using the simulator. For now, this should print
Type `coord-sim -h` for help using the simulator. For now, this should print

```
```
$ coord-sim -h
usage: coord-sim [-h] -d DURATION -sf SF [-sfr SFR] -n NETWORK -c CONFIG
[-t TRACE] [-s SEED]
Expand Down Expand Up @@ -85,7 +85,7 @@ optional arguments:

You can use the following command as an example (run from the root project folder)

```bash
```bash
coord-sim -d 20 -n params/networks/triangle.graphml -sf params/services/abc.yaml -sfr params/services/resource_functions -c params/config/sim_config.yaml
```
This will run a simulation on a provided GraphML network file and a YAML placement file for a duration of 20 timesteps.
Expand All @@ -95,7 +95,7 @@ This will run a simulation on a provided GraphML network file and a YAML placeme

By default, all SFs have a node resource consumption, which exactly equals the aggregated traffic that they have to handle.

It is possible to specify arbitrary other resource consumption models simply by implementing a python module with a
It is possible to specify arbitrary other resource consumption models simply by implementing a python module with a
function `resource_function(load)` (see examples [here](https://github.com/RealVNF/coordination-simulation/tree/master/params/services/resource_functions)).

To use these modules, they need to be referenced in the service file:
Expand All @@ -113,6 +113,15 @@ And the path to the folder with the Python modules needs to be passed via the `-
See PR https://github.com/RealVNF/coordination-simulation/pull/78 for details.


### Egress nodes

- A node can be set to be a `Egress` node in the `NodeType` attribute of the network file
- If some nodes are set as `Egress` then only the simulator will randomly choose one of them as the Egress node for each flow in the network
- If some nodes are set to be Egress then once the flow is processed we check if for the flow, `current node == egress node` . If Yes then we depart , otherwise we forward the flow to the egress_node using the shortest_path routing.
- **Todo**: Ideally the coordination algorithms should keep the path(Ingress to Egress) of the flow in view while creating the schedule/placement.

See [PR 137](https://github.com/RealVNF/coord-sim/pull/137) for details.

## Tests

```bash
Expand Down
2 changes: 1 addition & 1 deletion params/networks/triangle.graphml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
<data key="d31">1</data>
<data key="d32">-87.65005</data>
<data key="d33">Chicago</data>
<data key="d39">Normal</data>
<data key="d39">Egress</data>
<data key="d40">10</data>
</node>
<node id="2">
Expand Down
8 changes: 4 additions & 4 deletions src/coordsim/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ def main():
random.seed(args.seed)
numpy.random.seed(args.seed)

# Parse network and get NetworkX object and ingress network list
network, ing_nodes = reader.read_network(args.network, node_cap=10, link_cap=10)
# Parse network, get NetworkX object ,ingress network list, and egress nodes list
network, ing_nodes, eg_nodes = reader.read_network(args.network, node_cap=10, link_cap=10)

# use dummy placement and schedule for running simulator without algorithm
# TODO: make configurable via CLI
Expand All @@ -44,8 +44,8 @@ def main():
metrics = Metrics(network, sf_list)

# Create the simulator parameters object with the provided args
params = SimulatorParams(network, ing_nodes, sfc_list, sf_list, config, metrics, sf_placement=sf_placement,
schedule=schedule)
params = SimulatorParams(network, ing_nodes, eg_nodes, sfc_list, sf_list, config, metrics,
sf_placement=sf_placement, schedule=schedule)
log.info(params)

if 'trace_path' in config:
Expand Down
8 changes: 6 additions & 2 deletions src/coordsim/network/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@

class Flow:

def __init__(self, flow_id, sfc, dr, size, creation_time,
destination=None, current_sf=None, current_node_id=None, current_position=0, end2end_delay=0.0):
def __init__(self, flow_id, sfc, dr, size, creation_time, destination=None, egress_node_id=None, current_sf=None,
current_node_id=None, current_position=0, end2end_delay=0.0):

# Flow ID: Unique ID string
self.flow_id = flow_id
Expand All @@ -24,6 +24,10 @@ def __init__(self, flow_id, sfc, dr, size, creation_time,
self.current_sf = current_sf
# The current node that the flow is being processed in
self.current_node_id = current_node_id
# The specified ingress node of the flow. The flow will spawn at the ingress node.
self.ingress_node_id = current_node_id
# The specified egress node of the flow. The flow will depart at the egress node. Might be non-existent.
self.egress_node_id = egress_node_id
# The duration of the flow calculated in ms.
self.duration = (float(size) / float(dr)) * 1000 # Converted flow duration to ms
# Current flow position within the SFC
Expand Down
7 changes: 5 additions & 2 deletions src/coordsim/reader/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,13 +223,16 @@ def read_network(file, node_cap=None, link_cap=None):
# Setting the all-pairs shortest path in the NetworkX network as a graph attribute
shortest_paths(networkx_network)

# Filter ingress nodes
# Filter ingress and egress (if any) nodes
ing_nodes = []
eg_nodes = []
for node in networkx_network.nodes.items():
if node[1]["type"] == "Ingress":
ing_nodes.append(node)
if node[1]["type"] == "Egress":
eg_nodes.append(node[0])

return networkx_network, ing_nodes
return networkx_network, ing_nodes, eg_nodes


def reset_cap(network):
Expand Down
42 changes: 35 additions & 7 deletions src/coordsim/simulation/flowsimulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ def start(self):
# Setting the all-pairs shortest path in the NetworkX network as a graph attribute
log.info("Using nodes list {}\n".format(list(self.params.network.nodes.keys())))
log.info("Total of {} ingress nodes available\n".format(len(self.params.ing_nodes)))
if self.params.eg_nodes:
log.info("Total of {} egress nodes available\n".format(len(self.params.eg_nodes)))
for node in self.params.ing_nodes:
node_id = node[0]
self.env.process(self.generate_flow(node_id))
Expand Down Expand Up @@ -66,9 +68,13 @@ def generate_flow(self, node_id):
flow_sfc = np.random.choice([sfc for sfc in self.params.sfc_list.keys()])
# Get the flow's creation time (current environment time)
creation_time = self.env.now
# Set the egress node for the flow if some are specified in the network file
flow_egress_node = None
if self.params.eg_nodes:
flow_egress_node = random.choice(self.params.eg_nodes)
# Generate flow based on given params
flow = Flow(str(self.total_flow_count), flow_sfc, flow_dr, flow_size, creation_time,
current_node_id=node_id)
current_node_id=node_id, egress_node_id=flow_egress_node)
# Update metrics for the generated flow
self.params.metrics.generated_flow(flow, node_id)
# Generate flows and schedule them at ingress node
Expand Down Expand Up @@ -230,10 +236,31 @@ def process_flow(self, flow, sfc):
log.info("Flow {} started departing sf {} at node {}. Time {}"
.format(flow.flow_id, current_sf, current_node_id, self.env.now))

# Check if flow is currently in last SF, if so, then depart flow.
if (flow.current_position == len(sfc) - 1):
yield self.env.timeout(flow.duration)
self.depart_flow(flow)
# Check if flow is currently in last SF, if so, then:
# - Check if the flow has some Egress node set or not. If not then just depart. If Yes then:
# - check if the current node is the egress node. If Yes then depart. If No then forward the flow to
# the egress node using the shortest_path

if flow.current_position == len(sfc) - 1:
if flow.current_node_id == flow.egress_node_id:
# Flow is processed and resides at egress node: depart flow
yield self.env.timeout(flow.duration)
self.depart_flow(flow)
elif flow.egress_node_id is None:
# Flow is processed and no egress node specified: depart flow
log.info(f'Flow {flow.flow_id} has no egress node, will depart from'
f' current node {flow.current_node_id}. Time {self.env.now}.')
yield self.env.timeout(flow.duration)
self.depart_flow(flow)
else:
# Remove the active flow from the SF after it departed the SF on current node towards egress
self.params.metrics.remove_active_flow(flow, current_node_id, current_sf)
# Forward flow to the egress node and then depart from there
yield self.env.process(self.forward_flow(flow, flow.egress_node_id))
yield self.env.timeout(flow.duration)
# In this situation the last sf was never active for the egress node,
# so we should not remove it from the metrics
self.depart_flow(flow, remove_active_flow=False)
else:
# Increment the position of the flow within SFC
flow.current_position += 1
Expand Down Expand Up @@ -278,13 +305,14 @@ def process_flow(self, flow, sfc):
self.params.metrics.dropped_flow(flow)
self.env.exit()

def depart_flow(self, flow):
def depart_flow(self, flow, remove_active_flow=True):
"""
Process the flow at the requested SF of the current node.
"""
# Update metrics for the processed flow
self.params.metrics.completed_flow()
self.params.metrics.add_end2end_delay(flow.end2end_delay)
self.params.metrics.remove_active_flow(flow, flow.current_node_id, flow.current_sf)
if remove_active_flow:
self.params.metrics.remove_active_flow(flow, flow.current_node_id, flow.current_sf)
log.info("Flow {} was processed and departed the network from {}. Time {}"
.format(flow.flow_id, flow.current_node_id, self.env.now))
4 changes: 3 additions & 1 deletion src/coordsim/simulation/simulatorparams.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@


class SimulatorParams:
def __init__(self, network, ing_nodes, sfc_list, sf_list, config, metrics, prediction=False,
def __init__(self, network, ing_nodes, eg_nodes, sfc_list, sf_list, config, metrics, prediction=False,
schedule=None, sf_placement=None):
# NetworkX network object: DiGraph
self.network = network
# Ingress nodes of the network (nodes at which flows arrive): list
self.ing_nodes = ing_nodes
# Egress nodes of the network (nodes at which flows may leave the network): list
self.eg_nodes = eg_nodes
# List of available SFCs and their child SFs: defaultdict(None)
self.sfc_list = sfc_list
# List of every SF and it's properties (e.g. processing_delay): defaultdict(None)
Expand Down
6 changes: 3 additions & 3 deletions src/siminterface/simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def __init__(self, network_file, service_functions_file, config_file, resource_
# Create CSV writer
self.writer = ResultWriter(self.test_mode, self.test_dir)
# init network, sfc, sf, and config files
self.network, self.ing_nodes = reader.read_network(self.network_file)
self.network, self.ing_nodes, self.eg_nodes = reader.read_network(self.network_file)
self.sfc_list = reader.get_sfc(service_functions_file)
self.sf_list = reader.get_sf(service_functions_file, resource_functions_path)
self.config = reader.get_config(config_file)
Expand All @@ -38,8 +38,8 @@ def __init__(self, network_file, service_functions_file, config_file, resource_
# Check if future ingress traffic setting is enabled
if 'future_traffic' in self.config and self.config['future_traffic']:
self.prediction = True
self.params = SimulatorParams(self.network, self.ing_nodes, self.sfc_list, self.sf_list, self.config,
self.metrics, prediction=self.prediction)
self.params = SimulatorParams(self.network, self.ing_nodes, self.eg_nodes, self.sfc_list, self.sf_list,
self.config, self.metrics, prediction=self.prediction)
if self.prediction:
self.predictor = TrafficPredictor(self.params)
self.episode = 0
Expand Down
4 changes: 2 additions & 2 deletions tests/test_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def setUp(self):

self.env = simpy.Environment()
# Configure simulator parameters
network, ing_nodes = reader.read_network(NETWORK_FILE, node_cap=10, link_cap=10)
network, ing_nodes, eg_nodes = reader.read_network(NETWORK_FILE, node_cap=10, link_cap=10)
sfc_list = reader.get_sfc(SERVICE_FUNCTIONS_FILE)
sf_list = reader.get_sf(SERVICE_FUNCTIONS_FILE, RESOURCE_FUNCTION_PATH)
config = reader.get_config(CONFIG_FILE)
Expand All @@ -38,7 +38,7 @@ def setUp(self):
schedule = dummy_data.triangle_schedule

# Initialize Simulator and SimulatoParams objects
self.simulator_params = SimulatorParams(network, ing_nodes, sfc_list, sf_list, config, self.metrics,
self.simulator_params = SimulatorParams(network, ing_nodes, eg_nodes, sfc_list, sf_list, config, self.metrics,
sf_placement=sf_placement, schedule=schedule)
self.flow_simulator = FlowSimulator(self.env, self.simulator_params)
self.flow_simulator.start()
Expand Down