diff --git a/galileoexperiments/experiment/run.py b/galileoexperiments/experiment/run.py index 97060a5..f9059bd 100644 --- a/galileoexperiments/experiment/run.py +++ b/galileoexperiments/experiment/run.py @@ -1,6 +1,5 @@ import logging import time -from concurrent.futures import ThreadPoolExecutor from typing import Callable from galileoexperiments.api.model import ProfilingExperimentConfiguration, ScenarioExperimentConfiguration, \ @@ -14,19 +13,8 @@ def run_profiling_experiment(config: ProfilingExperimentConfiguration): run_experiment(config.exp_run_config, config.app_workload_config.requests) -# TODO test profiling again and scenario - -def run_scenario_experiment(config: ScenarioExperimentConfiguration): - # make callable that calls all AppWorkloadConfigurations and waits for all to end - async_requests = [] - for app in config.apps: - async_requests.append(app.requests) - - def requests(): - with ThreadPoolExecutor(max_workers=len(async_requests)) as executor: - for async_request in async_requests: - executor.submit(async_request) +def run_scenario_experiment(config: ScenarioExperimentConfiguration, requests: Callable): return run_experiment(config.exp_run_config, requests) @@ -67,7 +55,7 @@ def run_experiment(config: ExperimentRunConfiguration, requests: Callable): # set requests logger.info("start requests") requests() - + time.sleep(5) except Exception as e: logger.error(e) diff --git a/galileoexperiments/experiment/scenario/run.py b/galileoexperiments/experiment/scenario/run.py index 49814d8..3ff5cb9 100644 --- a/galileoexperiments/experiment/scenario/run.py +++ b/galileoexperiments/experiment/scenario/run.py @@ -10,7 +10,7 @@ import time from typing import List, Dict, Tuple, Callable -from galileo.shell.shell import RoutingTableHelper, Galileo, ClientGroup +from galileo.shell.shell import RoutingTableHelper, ClientGroup from galileoexperiments.api.model import ScenarioWorkloadConfiguration, Pod, ScenarioExperimentConfiguration, \ ExperimentRunConfiguration, AppWorkloadConfiguration @@ -37,7 +37,8 @@ def spawn_pods_for_config(workload_config: ScenarioWorkloadConfiguration) -> Lis profiling_app = workload_config.profiling_apps[image] names = spawn_pods(image, name, host, labels, no_pods, profiling_app.pod_factory) pod_names.extend(names) - + # TODO remove sleep and implement approach to continuously poll for pods till their IP is available + time.sleep(5) return get_pods(pod_names) @@ -80,8 +81,9 @@ def set_rtbl(fns: List[str], load_balancers: Dict[str, str], rtbl: RoutingTableH return services -def prepare_client_groups_for_services(workload_config: ScenarioWorkloadConfiguration) -> List[ - Tuple[str, str, ClientGroup, Callable]]: +def prepare_client_groups_for_services(workload_config: ScenarioWorkloadConfiguration) -> Tuple[List[ + Tuple[ + str, str, ClientGroup]], Callable]: client_groups = [] for zone, values in workload_config.profiles.items(): for image, profiles in values.items(): @@ -103,12 +105,26 @@ def prepare_client_groups_for_services(workload_config: ScenarioWorkloadConfigur clear_list(client.client_id, rds) read_and_save_profile(profile_path, client, rds) - def requests(): - client_group.request(ia=('prerecorded', 'ran')).wait() - client_group.close() + client_groups.append((image, zone, client_group)) + + def requests(): + for idx, group in enumerate(client_groups): + if idx == len(client_groups) - 1: + # TODO the wait here might return sooner than other profiles => implement solution that waits for all + group[2].request(ia=('prerecorded', 'ran')).wait() + else: + group[2].request(ia=('prerecorded', 'ran')) + + return client_groups, requests + - client_groups.append((image, zone, client_group, requests)) - return client_groups +def set_params(workload_config: ScenarioWorkloadConfiguration): + workload_config.params['app_params'] = workload_config.app_params + workload_config.params['profiles'] = workload_config.profiles + workload_config.params['lb_ips'] = workload_config.lb_ips + workload_config.params['zone_mapping'] = workload_config.zone_mapping + workload_config.params['services'] = workload_config.services + workload_config.params['app_names'] = workload_config.app_names def run_scenario_workload(workload_config: ScenarioWorkloadConfiguration): @@ -118,12 +134,16 @@ def run_scenario_workload(workload_config: ScenarioWorkloadConfiguration): etcd_service_keys = [] creator = workload_config.creator master_node = workload_config.master_node + client_groups = [] try: - client_groups = prepare_client_groups_for_services(workload_config) + client_groups, requests = prepare_client_groups_for_services(workload_config) pods = spawn_pods_for_config(workload_config) etcd_service_keys = set_loadbalancer_weights(_map_pods_to_dict(pods)) rtbl_services = set_rtbl(list(workload_config.app_names.values()), workload_config.lb_ips, rtbl) + + set_params(workload_config) + exp_run_config = ExperimentRunConfiguration( creator=creator, master_node=master_node, @@ -132,10 +152,10 @@ def run_scenario_workload(workload_config: ScenarioWorkloadConfiguration): ) app_configs = [] - for (image, zone, client_group, requests) in client_groups: + for (image, zone, client_group) in client_groups: app_workload_config = AppWorkloadConfiguration( app_container_image=image, - requests=requests, + requests=lambda x: None, pod_factory=workload_config.profiling_apps[image].pod_factory ) app_configs.append(app_workload_config) @@ -144,7 +164,7 @@ def run_scenario_workload(workload_config: ScenarioWorkloadConfiguration): apps=app_configs, exp_run_config=exp_run_config ) - run_scenario_experiment(scenario_experiment_config) + run_scenario_experiment(scenario_experiment_config, requests) except Exception as e: logger.error(e) finally: @@ -157,3 +177,5 @@ def run_scenario_workload(workload_config: ScenarioWorkloadConfiguration): client = EtcdClient.from_env() for key in etcd_service_keys: client.remove(key) + for c_group in client_groups: + c_group[2].close() diff --git a/galileoexperiments/utils/arrivalprofile.py b/galileoexperiments/utils/arrivalprofile.py index 4ac2f59..6dbb7b1 100644 --- a/galileoexperiments/utils/arrivalprofile.py +++ b/galileoexperiments/utils/arrivalprofile.py @@ -17,7 +17,7 @@ def read_and_save_profile(profile_path: str, client_desc: ClientDescription, rds # prevents of using 0 because it may lead to crash ias[index] = 0.00000000001 list_key = client_desc.client_id - print('clear list') + print(f'clear list, with key: {list_key}') clear_list(list_key, rds) print('list cleared, start push') rds.lpush(list_key, *ias[0:])