Skip to content

Commit

Permalink
Implement scenario experiments
Browse files Browse the repository at this point in the history
  • Loading branch information
praith-dsg authored and phip123 committed Jul 4, 2022
1 parent 2c6fc1e commit e6bed3b
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 28 deletions.
16 changes: 2 additions & 14 deletions galileoexperiments/experiment/run.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import logging
import time
from concurrent.futures import ThreadPoolExecutor
from typing import Callable

from galileoexperiments.api.model import ProfilingExperimentConfiguration, ScenarioExperimentConfiguration, \
Expand All @@ -14,19 +13,8 @@
def run_profiling_experiment(config: ProfilingExperimentConfiguration):
run_experiment(config.exp_run_config, config.app_workload_config.requests)

# TODO test profiling again and scenario

def run_scenario_experiment(config: ScenarioExperimentConfiguration):
# make callable that calls all AppWorkloadConfigurations and waits for all to end
async_requests = []
for app in config.apps:
async_requests.append(app.requests)

def requests():
with ThreadPoolExecutor(max_workers=len(async_requests)) as executor:
for async_request in async_requests:
executor.submit(async_request)

def run_scenario_experiment(config: ScenarioExperimentConfiguration, requests: Callable):
return run_experiment(config.exp_run_config, requests)


Expand Down Expand Up @@ -67,7 +55,7 @@ def run_experiment(config: ExperimentRunConfiguration, requests: Callable):
# set requests
logger.info("start requests")
requests()

time.sleep(5)

except Exception as e:
logger.error(e)
Expand Down
48 changes: 35 additions & 13 deletions galileoexperiments/experiment/scenario/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import time
from typing import List, Dict, Tuple, Callable

from galileo.shell.shell import RoutingTableHelper, Galileo, ClientGroup
from galileo.shell.shell import RoutingTableHelper, ClientGroup

from galileoexperiments.api.model import ScenarioWorkloadConfiguration, Pod, ScenarioExperimentConfiguration, \
ExperimentRunConfiguration, AppWorkloadConfiguration
Expand All @@ -37,7 +37,8 @@ def spawn_pods_for_config(workload_config: ScenarioWorkloadConfiguration) -> Lis
profiling_app = workload_config.profiling_apps[image]
names = spawn_pods(image, name, host, labels, no_pods, profiling_app.pod_factory)
pod_names.extend(names)

# TODO remove sleep and implement approach to continuously poll for pods till their IP is available
time.sleep(5)
return get_pods(pod_names)


Expand Down Expand Up @@ -80,8 +81,9 @@ def set_rtbl(fns: List[str], load_balancers: Dict[str, str], rtbl: RoutingTableH
return services


def prepare_client_groups_for_services(workload_config: ScenarioWorkloadConfiguration) -> List[
Tuple[str, str, ClientGroup, Callable]]:
def prepare_client_groups_for_services(workload_config: ScenarioWorkloadConfiguration) -> Tuple[List[
Tuple[
str, str, ClientGroup]], Callable]:
client_groups = []
for zone, values in workload_config.profiles.items():
for image, profiles in values.items():
Expand All @@ -103,12 +105,26 @@ def prepare_client_groups_for_services(workload_config: ScenarioWorkloadConfigur
clear_list(client.client_id, rds)
read_and_save_profile(profile_path, client, rds)

def requests():
client_group.request(ia=('prerecorded', 'ran')).wait()
client_group.close()
client_groups.append((image, zone, client_group))

def requests():
for idx, group in enumerate(client_groups):
if idx == len(client_groups) - 1:
# TODO the wait here might return sooner than other profiles => implement solution that waits for all
group[2].request(ia=('prerecorded', 'ran')).wait()
else:
group[2].request(ia=('prerecorded', 'ran'))

return client_groups, requests


client_groups.append((image, zone, client_group, requests))
return client_groups
def set_params(workload_config: ScenarioWorkloadConfiguration):
workload_config.params['app_params'] = workload_config.app_params
workload_config.params['profiles'] = workload_config.profiles
workload_config.params['lb_ips'] = workload_config.lb_ips
workload_config.params['zone_mapping'] = workload_config.zone_mapping
workload_config.params['services'] = workload_config.services
workload_config.params['app_names'] = workload_config.app_names


def run_scenario_workload(workload_config: ScenarioWorkloadConfiguration):
Expand All @@ -118,12 +134,16 @@ def run_scenario_workload(workload_config: ScenarioWorkloadConfiguration):
etcd_service_keys = []
creator = workload_config.creator
master_node = workload_config.master_node
client_groups = []
try:
client_groups = prepare_client_groups_for_services(workload_config)
client_groups, requests = prepare_client_groups_for_services(workload_config)
pods = spawn_pods_for_config(workload_config)
etcd_service_keys = set_loadbalancer_weights(_map_pods_to_dict(pods))
rtbl_services = set_rtbl(list(workload_config.app_names.values()), workload_config.lb_ips, rtbl)


set_params(workload_config)

exp_run_config = ExperimentRunConfiguration(
creator=creator,
master_node=master_node,
Expand All @@ -132,10 +152,10 @@ def run_scenario_workload(workload_config: ScenarioWorkloadConfiguration):
)
app_configs = []

for (image, zone, client_group, requests) in client_groups:
for (image, zone, client_group) in client_groups:
app_workload_config = AppWorkloadConfiguration(
app_container_image=image,
requests=requests,
requests=lambda x: None,
pod_factory=workload_config.profiling_apps[image].pod_factory
)
app_configs.append(app_workload_config)
Expand All @@ -144,7 +164,7 @@ def run_scenario_workload(workload_config: ScenarioWorkloadConfiguration):
apps=app_configs,
exp_run_config=exp_run_config
)
run_scenario_experiment(scenario_experiment_config)
run_scenario_experiment(scenario_experiment_config, requests)
except Exception as e:
logger.error(e)
finally:
Expand All @@ -157,3 +177,5 @@ def run_scenario_workload(workload_config: ScenarioWorkloadConfiguration):
client = EtcdClient.from_env()
for key in etcd_service_keys:
client.remove(key)
for c_group in client_groups:
c_group[2].close()
2 changes: 1 addition & 1 deletion galileoexperiments/utils/arrivalprofile.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def read_and_save_profile(profile_path: str, client_desc: ClientDescription, rds
# prevents of using 0 because it may lead to crash
ias[index] = 0.00000000001
list_key = client_desc.client_id
print('clear list')
print(f'clear list, with key: {list_key}')
clear_list(list_key, rds)
print('list cleared, start push')
rds.lpush(list_key, *ias[0:])
Expand Down

0 comments on commit e6bed3b

Please sign in to comment.