Skip to content

Commit

Permalink
Merge pull request #78 from ldklenner/dev-resource-functions
Browse files Browse the repository at this point in the history
Added VNF resource consumption functions
  • Loading branch information
Stefan Schneider authored Aug 22, 2019
2 parents 74cbcc3 + 269e202 commit a51353c
Show file tree
Hide file tree
Showing 11 changed files with 88 additions and 14 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ install:
script:
- flake8 src
- nose2
- coord-sim -d 20 -n params/networks/triangle.graphml -sf params/services/abc.yaml -c params/config/sim_config.yaml
- coord-sim -d 20 -n params/networks/triangle.graphml -sf params/services/abc.yaml -sfr params/services/resource_functions -c params/config/sim_config.yaml
2 changes: 2 additions & 0 deletions params/services/abc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ sf_list:
a:
processing_delay_mean: 5.0
processing_delay_stdev: 0.0
resource_function_id: A
b:
processing_delay_mean: 5.0
processing_delay_stdev: 0.0
resource_function_id: B
c:
processing_delay_mean: 5.0
processing_delay_stdev: 0.0
2 changes: 2 additions & 0 deletions params/services/resource_functions/A.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
def resource_function(load):
return load
2 changes: 2 additions & 0 deletions params/services/resource_functions/B.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
def resource_function(load):
return load
4 changes: 3 additions & 1 deletion src/coordsim/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def main():

# Getting current SFC list, and the SF list of each SFC, and config
sfc_list = reader.get_sfc(args.sf)
sf_list = reader.get_sf(args.sf)
sf_list = reader.get_sf(args.sf, args.sfr)
config = reader.get_config(args.config)

# use dummy placement and schedule for running simulator without algorithm
Expand Down Expand Up @@ -66,6 +66,8 @@ def parse_args():
help="The duration of the simulation (simulates milliseconds).")
parser.add_argument('-sf', '--sf', required=True, dest="sf",
help="VNF file which contains the SFCs and their respective SFs and their properties.")
parser.add_argument('-sfr', '--sfr', required=False, default='', dest='sfr',
help="Path which contains the SF resource consumption functions.")
parser.add_argument('-n', '--network', required=True, dest='network',
help="The GraphML network file that specifies the nodes and edges of the network.")
parser.add_argument('-c', '--config', required=True, dest='config', help="Path to the simulator config file.")
Expand Down
34 changes: 32 additions & 2 deletions src/coordsim/reader/reader.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import networkx as nx
from geopy.distance import distance as dist
import numpy as np
import logging as log
import logging
import yaml
import math
from collections import defaultdict
import importlib

log = logging.getLogger(__name__)

# Disclaimer: Some snippets of the following file were imported/modified from B-JointSP on GitHub.
# Original code can be found on https://github.com/CN-UPB/B-JointSP
Expand Down Expand Up @@ -40,7 +42,21 @@ def get_sfc(sfc_file):
return sfc_list


def get_sf(sf_file):
def load_resource_function(name, path):
try:
spec = importlib.util.spec_from_file_location(name, path + '/' + name + '.py')
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
except Exception:
raise Exception(f'Cannot load file "{name}.py" from specified location "{path}".')

try:
return getattr(module, 'resource_function')
except Exception:
raise Exception(f'There is no "resource_function" defined in file "{name}.py."')


def get_sf(sf_file, resource_functions_path):
"""
Get the list of SFs and their properties from the yaml data.
"""
Expand All @@ -50,6 +66,7 @@ def get_sf(sf_file):
# Configureable default mean and stdev defaults
default_processing_delay_mean = 1.0
default_processing_delay_stdev = 1.0
def default_resource_function(x): return x
sf_list = defaultdict(None)
for sf_name, sf_details in sf_data['sf_list'].items():
sf_list[sf_name] = sf_details
Expand All @@ -58,6 +75,19 @@ def get_sf(sf_file):
default_processing_delay_mean)
sf_list[sf_name]["processing_delay_stdev"] = sf_list[sf_name].get("processing_delay_stdev",
default_processing_delay_stdev)
if 'resource_function_id' in sf_list[sf_name]:
try:
sf_list[sf_name]['resource_function'] = load_resource_function(sf_list[sf_name]['resource_function_id'],
resource_functions_path)
except Exception as ex:
sf_list[sf_name]['resource_function_id'] = 'default'
sf_list[sf_name]['resource_function'] = default_resource_function
log.warning(f'{str(ex)} SF {sf_name} will use default resource function instead.')
else:
sf_list[sf_name]["resource_function_id"] = 'default'
sf_list[sf_name]["resource_function"] = default_resource_function
log.info(
f'No resource function specified for SF {sf_name}. Default resource function will be used instead.')
return sf_list


Expand Down
31 changes: 27 additions & 4 deletions src/coordsim/simulation/flowsimulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,19 +187,32 @@ def process_flow(self, flow, sfc):
# Add the delay to the flow's end2end delay
metrics.add_processing_delay(processing_delay)
flow.end2end_delay += processing_delay

# Calculate the demanded capacity when the flow is processed at this node
demanded_total_capacity = 0.0
for sf_i, sf_data in self.params.network.nodes[current_node_id]['available_sf'].items():
if sf == sf_i:
# Include flows data rate in requested sf capacity calculation
demanded_total_capacity += self.params.sf_list[sf]['resource_function'](sf_data['load'] + flow.dr)
else:
demanded_total_capacity += self.params.sf_list[sf_i]['resource_function'](sf_data['load'])

# Get node capacities
node_cap = self.params.network.nodes[current_node_id]["cap"]
node_remaining_cap = self.params.network.nodes[current_node_id]["remaining_cap"]
assert node_remaining_cap >= 0, "Remaining node capacity cannot be less than 0 (zero)!"
# Metrics: Add active flow to the SF once the flow has begun processing.
metrics.add_active_flow(flow, current_node_id, current_sf)
if flow.dr <= node_remaining_cap:
if demanded_total_capacity <= node_cap:
log.info(
"Flow {} started proccessing at sf {} at node {}. Time: {}, "
"Flow {} started processing at sf {} at node {}. Time: {}, "
"Processing delay: {}".format(flow.flow_id, current_sf, current_node_id, self.env.now,
processing_delay))

self.params.network.nodes[current_node_id]["remaining_cap"] -= flow.dr
# Add load to sf
self.params.network.nodes[current_node_id]['available_sf'][sf]['load'] += flow.dr
# Set remaining node capacity
self.params.network.nodes[current_node_id]['remaining_cap'] = node_cap - demanded_total_capacity
# Just for the sake of keeping lines small, the node_remaining_cap is updated again.
node_remaining_cap = self.params.network.nodes[current_node_id]["remaining_cap"]

Expand All @@ -223,7 +236,17 @@ def process_flow(self, flow, sfc):
.format(flow.flow_id, current_sf, current_node_id, self.env.now))
# Remove the active flow from the SF after it departed the SF
metrics.remove_active_flow(flow, current_node_id, current_sf)
self.params.network.nodes[current_node_id]["remaining_cap"] += flow.dr

# Remove load from sf
self.params.network.nodes[current_node_id]['available_sf'][sf]['load'] -= flow.dr
assert self.params.network.nodes[current_node_id]['available_sf'][sf][
'load'] >= 0, 'SF load cannot be less than 0!'
# Recalculation is necessary because other flows could have already arrived or departed at the node
used_total_capacity = 0.0
for sf_i, sf_data in self.params.network.nodes[current_node_id]['available_sf'].items():
used_total_capacity += self.params.sf_list[sf_i]['resource_function'](sf_data['load'])
# Set remaining node capacity
self.params.network.nodes[current_node_id]['remaining_cap'] = node_cap - used_total_capacity
# Just for the sake of keeping lines small, the node_remaining_cap is updated again.
node_remaining_cap = self.params.network.nodes[current_node_id]["remaining_cap"]

Expand Down
7 changes: 6 additions & 1 deletion src/coordsim/simulation/simulatorparams.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@ def __init__(self, network, ing_nodes, sfc_list, sf_list, config, seed, schedule
self.schedule = schedule
# Placement of SFs in each node: defaultdict(list)
self.sf_placement = sf_placement

# Update which sf is available at which node
for node_id, placed_sf_list in sf_placement.items():
available_sf = {}
for sf in placed_sf_list:
available_sf[sf] = self.network.nodes[node_id]['available_sf'].get(sf, {'load': 0.0})
self.network.nodes[node_id]['available_sf'] = available_sf
# Flow interarrival exponential distribution mean: float
self.inter_arr_mean = config['inter_arrival_mean']
# Flow data rate normal distribution mean: float
Expand Down
10 changes: 8 additions & 2 deletions src/siminterface/simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def __init__(self, test_mode=False):
# Create CSV writer
self.writer = ResultWriter(self.test_mode)

def init(self, network_file, service_functions_file, config_file, seed):
def init(self, network_file, service_functions_file, config_file, seed, resource_functions_path=""):

# Initialize metrics, record start time
metrics.reset()
Expand All @@ -31,7 +31,7 @@ def init(self, network_file, service_functions_file, config_file, seed):
# Parse network and SFC + SF file
self.network, self.ing_nodes = reader.read_network(network_file, node_cap=10, link_cap=10)
self.sfc_list = reader.get_sfc(service_functions_file)
self.sf_list = reader.get_sf(service_functions_file)
self.sf_list = reader.get_sf(service_functions_file, resource_functions_path)
self.config = reader.get_config(config_file)

# Generate SimPy simulation environment
Expand Down Expand Up @@ -78,6 +78,12 @@ def apply(self, actions: SimulatorAction):
# Get the new placement from the action passed by the RL agent
# Modify and set the placement parameter of the instantiated simulator object.
self.simulator.params.sf_placement = actions.placement
# Update which sf is available at which node
for node_id, placed_sf_list in actions.placement.items():
available_sf = {}
for sf in placed_sf_list:
available_sf[sf] = self.simulator.params.network.nodes[node_id]['available_sf'].get(sf, {'load': 0.0})
self.simulator.params.network.nodes[node_id]['available_sf'] = available_sf

# Get the new schedule from the SimulatorAction
# Set it in the params of the instantiated simulator object.
Expand Down
3 changes: 2 additions & 1 deletion tests/test_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

NETWORK_FILE = "params/networks/triangle.graphml"
SERVICE_FUNCTIONS_FILE = "params/services/abc.yaml"
RESOURCE_FUNCTION_PATH = "params/services/resource_functions"
CONFIG_FILE = "params/config/sim_config.yaml"
SIMULATION_DURATION = 1000
SEED = 1234
Expand All @@ -30,7 +31,7 @@ def setUp(self):
# Configure simulator parameters
network, ing_nodes = reader.read_network(NETWORK_FILE, node_cap=10, link_cap=10)
sfc_list = reader.get_sfc(SERVICE_FUNCTIONS_FILE)
sf_list = reader.get_sf(SERVICE_FUNCTIONS_FILE)
sf_list = reader.get_sf(SERVICE_FUNCTIONS_FILE, RESOURCE_FUNCTION_PATH)
config = reader.get_config(CONFIG_FILE)

sf_placement = dummy_data.triangle_placement
Expand Down
5 changes: 3 additions & 2 deletions tests/test_simulatorInterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

NETWORK_FILE = "params/networks/triangle.graphml"
SERVICE_FUNCTIONS_FILE = "params/services/3sfcs.yaml"
RESOURCE_FUNCTION_PATH = "params/services/resource_functions"
CONFIG_FILE = "params/config/sim_config.yaml"

SIMULATOR_MODULE_NAME = "siminterface.simulator"
Expand All @@ -27,7 +28,7 @@ def setUp(self):
"""
# TODO: replace SimulatorInterface with implementation
self.simulator = SIMULATOR_CLS(TEST_MODE)
self.simulator.init(NETWORK_FILE, SERVICE_FUNCTIONS_FILE, CONFIG_FILE, 1234)
self.simulator.init(NETWORK_FILE, SERVICE_FUNCTIONS_FILE, CONFIG_FILE, 1234, resource_functions_path=RESOURCE_FUNCTION_PATH)

def test_apply(self):
# test if placement and schedule can be applied
Expand Down Expand Up @@ -246,7 +247,7 @@ def test_apply(self):
}
"""
network_stats = simulator_state.network_stats
self.assertIs(len(network_stats), 5)
self.assertIs(len(network_stats), 7)
self.assertIn('total_flows', network_stats)
self.assertIn('successful_flows', network_stats)
self.assertIn('dropped_flows', network_stats)
Expand Down

0 comments on commit a51353c

Please sign in to comment.