-
Notifications
You must be signed in to change notification settings - Fork 83
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add automatic testing and aggregation to OSB #655
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
Signed-off-by: Michael Oviedo <mikeovi@amazon.com>
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,7 +28,6 @@ | |
import os | ||
import platform | ||
import sys | ||
import threading | ||
import time | ||
import uuid | ||
|
||
|
@@ -615,6 +614,25 @@ def add_workload_source(subparser): | |
f"high values favor the most common queries. " | ||
f"Ignored if randomization is off (default: {workload.loader.QueryRandomizerWorkloadProcessor.DEFAULT_ALPHA}).", | ||
default=workload.loader.QueryRandomizerWorkloadProcessor.DEFAULT_ALPHA) | ||
test_execution_parser.add_argument( | ||
"--test-iterations", | ||
help="The number of times to run the workload (default: 1).", | ||
default=1) | ||
test_execution_parser.add_argument( | ||
"--aggregate", | ||
type=lambda x: (str(x).lower() in ['true', '1', 'yes', 'y']), | ||
help="Aggregate the results of multiple test executions (default: true).", | ||
default=True) | ||
test_execution_parser.add_argument( | ||
"--sleep-timer", | ||
help="Sleep for the specified number of seconds before starting the next test execution (default: 5).", | ||
default=5) | ||
test_execution_parser.add_argument( | ||
"--cancel-on-error", | ||
action="store_true", | ||
help="Stop executing tests if an error occurs in one of the test iterations (default: false).", | ||
default=False | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No need to explicitly set There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Haha oops. Fixed now :) |
||
) | ||
|
||
############################################################################### | ||
# | ||
|
@@ -635,17 +653,7 @@ def add_workload_source(subparser): | |
action="store_true", | ||
default=False) | ||
|
||
auto_aggregate_parser = subparsers.add_parser("auto-aggregate", | ||
add_help=False, | ||
help="Run multiple test executions with the same configuration and aggregate the results", | ||
parents=[test_execution_parser]) | ||
auto_aggregate_parser.add_argument( | ||
"--test-iterations", | ||
type=int, | ||
required=True, | ||
help="Number of test executions to run and aggregate") | ||
|
||
for p in [list_parser, test_execution_parser, compare_parser, aggregate_parser, auto_aggregate_parser, | ||
for p in [list_parser, test_execution_parser, compare_parser, aggregate_parser, | ||
download_parser, install_parser, start_parser, stop_parser, info_parser, create_workload_parser]: | ||
# This option is needed to support a separate configuration for the integration tests on the same machine | ||
p.add_argument( | ||
|
@@ -874,29 +882,48 @@ def prepare_test_executions_dict(args, cfg): | |
test_executions_dict[execution] = None | ||
return test_executions_dict | ||
|
||
def run_and_aggregate(arg_parser, args, cfg): | ||
semaphore = threading.Semaphore(1) | ||
test_exes = [] | ||
args.subcommand = "execute-test" | ||
aggregate_id = args.test_execution_id | ||
|
||
for _ in range(args.test_iterations): | ||
console.info(f"Running test {_ + 1}...") | ||
args.test_execution_id = str(uuid.uuid4()) # we reuse the same args for each test so need to refresh the id | ||
test_exes.append(args.test_execution_id) | ||
with semaphore: | ||
dispatch_sub_command(arg_parser, args, cfg) | ||
|
||
console.info(f"Test executions: {', '.join(test_exes)}") | ||
console.info("Aggregating results...") | ||
aggregate_args = arg_parser.parse_args([ | ||
"aggregate", | ||
f"--test-executions={','.join(test_exes)}", | ||
f"--test-execution-id={aggregate_id}", | ||
f"--results-file={args.results_file}", | ||
f"--workload-repository={args.workload_repository}" | ||
]) | ||
dispatch_sub_command(arg_parser, aggregate_args, cfg) | ||
def configure_test(arg_parser, args, cfg): | ||
# As the execute-test command is doing more work than necessary at the moment, we duplicate several parameters | ||
# in this section that actually belong to dedicated subcommands (like install, start or stop). Over time | ||
# these duplicated parameters will vanish as we move towards dedicated subcommands and use "execute-test" only | ||
# to run the actual benchmark (i.e. generating load). | ||
print_test_execution_id(args) | ||
if args.effective_start_date: | ||
cfg.add(config.Scope.applicationOverride, "system", "time.start", args.effective_start_date) | ||
cfg.add(config.Scope.applicationOverride, "system", "test_execution.id", args.test_execution_id) | ||
# use the test_execution id implicitly also as the install id. | ||
cfg.add(config.Scope.applicationOverride, "system", "install.id", args.test_execution_id) | ||
cfg.add(config.Scope.applicationOverride, "test_execution", "pipeline", args.pipeline) | ||
cfg.add(config.Scope.applicationOverride, "test_execution", "user.tag", args.user_tag) | ||
cfg.add(config.Scope.applicationOverride, "worker_coordinator", "profiling", args.enable_worker_coordinator_profiling) | ||
cfg.add(config.Scope.applicationOverride, "worker_coordinator", "assertions", args.enable_assertions) | ||
cfg.add(config.Scope.applicationOverride, "worker_coordinator", "on.error", args.on_error) | ||
cfg.add( | ||
config.Scope.applicationOverride, | ||
"worker_coordinator", | ||
"load_worker_coordinator_hosts", | ||
opts.csv_to_list(args.load_worker_coordinator_hosts)) | ||
cfg.add(config.Scope.applicationOverride, "workload", "test.mode.enabled", args.test_mode) | ||
cfg.add(config.Scope.applicationOverride, "workload", "latency.percentiles", args.latency_percentiles) | ||
cfg.add(config.Scope.applicationOverride, "workload", "throughput.percentiles", args.throughput_percentiles) | ||
cfg.add(config.Scope.applicationOverride, "workload", "randomization.enabled", args.randomization_enabled) | ||
cfg.add(config.Scope.applicationOverride, "workload", "randomization.repeat_frequency", args.randomization_repeat_frequency) | ||
cfg.add(config.Scope.applicationOverride, "workload", "randomization.n", args.randomization_n) | ||
cfg.add(config.Scope.applicationOverride, "workload", "randomization.alpha", args.randomization_alpha) | ||
configure_workload_params(arg_parser, args, cfg) | ||
configure_connection_params(arg_parser, args, cfg) | ||
configure_telemetry_params(args, cfg) | ||
configure_builder_params(args, cfg) | ||
cfg.add(config.Scope.applicationOverride, "builder", "runtime.jdk", args.runtime_jdk) | ||
cfg.add(config.Scope.applicationOverride, "builder", "source.revision", args.revision) | ||
cfg.add(config.Scope.applicationOverride, "builder", | ||
"provision_config_instance.plugins", opts.csv_to_list( | ||
args.opensearch_plugins)) | ||
cfg.add(config.Scope.applicationOverride, "builder", "plugin.params", opts.to_dict(args.plugin_params)) | ||
cfg.add(config.Scope.applicationOverride, "builder", "preserve.install", convert.to_bool(args.preserve_install)) | ||
cfg.add(config.Scope.applicationOverride, "builder", "skip.rest.api.check", convert.to_bool(args.skip_rest_api_check)) | ||
|
||
configure_results_publishing_params(args, cfg) | ||
|
||
def print_test_execution_id(args): | ||
console.info(f"[Test Execution ID]: {args.test_execution_id}") | ||
|
@@ -916,8 +943,6 @@ def dispatch_sub_command(arg_parser, args, cfg): | |
test_executions_dict = prepare_test_executions_dict(args, cfg) | ||
aggregator_instance = aggregator.Aggregator(cfg, test_executions_dict, args) | ||
aggregator_instance.aggregate() | ||
elif sub_command == "auto-aggregate": | ||
run_and_aggregate(arg_parser, args, cfg) | ||
elif sub_command == "list": | ||
cfg.add(config.Scope.applicationOverride, "system", "list.config.option", args.configuration) | ||
cfg.add(config.Scope.applicationOverride, "system", "list.test_executions.max_results", args.limit) | ||
|
@@ -957,49 +982,32 @@ def dispatch_sub_command(arg_parser, args, cfg): | |
cfg.add(config.Scope.applicationOverride, "system", "install.id", args.installation_id) | ||
builder.stop(cfg) | ||
elif sub_command == "execute-test": | ||
# As the execute-test command is doing more work than necessary at the moment, we duplicate several parameters | ||
# in this section that actually belong to dedicated subcommands (like install, start or stop). Over time | ||
# these duplicated parameters will vanish as we move towards dedicated subcommands and use "execute-test" only | ||
# to run the actual benchmark (i.e. generating load). | ||
print_test_execution_id(args) | ||
if args.effective_start_date: | ||
cfg.add(config.Scope.applicationOverride, "system", "time.start", args.effective_start_date) | ||
cfg.add(config.Scope.applicationOverride, "system", "test_execution.id", args.test_execution_id) | ||
# use the test_execution id implicitly also as the install id. | ||
cfg.add(config.Scope.applicationOverride, "system", "install.id", args.test_execution_id) | ||
cfg.add(config.Scope.applicationOverride, "test_execution", "pipeline", args.pipeline) | ||
cfg.add(config.Scope.applicationOverride, "test_execution", "user.tag", args.user_tag) | ||
cfg.add(config.Scope.applicationOverride, "worker_coordinator", "profiling", args.enable_worker_coordinator_profiling) | ||
cfg.add(config.Scope.applicationOverride, "worker_coordinator", "assertions", args.enable_assertions) | ||
cfg.add(config.Scope.applicationOverride, "worker_coordinator", "on.error", args.on_error) | ||
cfg.add( | ||
config.Scope.applicationOverride, | ||
"worker_coordinator", | ||
"load_worker_coordinator_hosts", | ||
opts.csv_to_list(args.load_worker_coordinator_hosts)) | ||
cfg.add(config.Scope.applicationOverride, "workload", "test.mode.enabled", args.test_mode) | ||
cfg.add(config.Scope.applicationOverride, "workload", "latency.percentiles", args.latency_percentiles) | ||
cfg.add(config.Scope.applicationOverride, "workload", "throughput.percentiles", args.throughput_percentiles) | ||
cfg.add(config.Scope.applicationOverride, "workload", "randomization.enabled", args.randomization_enabled) | ||
cfg.add(config.Scope.applicationOverride, "workload", "randomization.repeat_frequency", args.randomization_repeat_frequency) | ||
cfg.add(config.Scope.applicationOverride, "workload", "randomization.n", args.randomization_n) | ||
cfg.add(config.Scope.applicationOverride, "workload", "randomization.alpha", args.randomization_alpha) | ||
configure_workload_params(arg_parser, args, cfg) | ||
configure_connection_params(arg_parser, args, cfg) | ||
configure_telemetry_params(args, cfg) | ||
configure_builder_params(args, cfg) | ||
cfg.add(config.Scope.applicationOverride, "builder", "runtime.jdk", args.runtime_jdk) | ||
cfg.add(config.Scope.applicationOverride, "builder", "source.revision", args.revision) | ||
cfg.add(config.Scope.applicationOverride, "builder", | ||
"provision_config_instance.plugins", opts.csv_to_list( | ||
args.opensearch_plugins)) | ||
cfg.add(config.Scope.applicationOverride, "builder", "plugin.params", opts.to_dict(args.plugin_params)) | ||
cfg.add(config.Scope.applicationOverride, "builder", "preserve.install", convert.to_bool(args.preserve_install)) | ||
cfg.add(config.Scope.applicationOverride, "builder", "skip.rest.api.check", convert.to_bool(args.skip_rest_api_check)) | ||
|
||
configure_results_publishing_params(args, cfg) | ||
|
||
execute_test(cfg, args.kill_running_processes) | ||
iterations = int(args.test_iterations) | ||
if iterations > 1: | ||
test_exes = [] | ||
for _ in range(iterations): | ||
try: | ||
test_exes.append(args.test_execution_id) | ||
configure_test(arg_parser, args, cfg) | ||
execute_test(cfg, args.kill_running_processes) | ||
time.sleep(int(args.sleep_timer)) | ||
args.test_execution_id = str(uuid.uuid4()) | ||
except Exception as e: | ||
console.error(f"Error occurred during test execution {_+1}: {str(e)}") | ||
if args.cancel_on_error: | ||
console.info("Cancelling remaining test executions.") | ||
break | ||
|
||
if args.aggregate: | ||
args.test_executions = test_exes | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this exception block gets reached, Maybe move line 990 to be after line 992, assuming execute_test succeeds, or we can pop from the list in the exception block. On a side note, based on the way There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice catch, I moved like 990 so it only adds the test ID after the execute_test is called. |
||
test_executions_dict = prepare_test_executions_dict(args, cfg) | ||
aggregator_instance = aggregator.Aggregator(cfg, test_executions_dict, args) | ||
aggregator_instance.aggregate() | ||
elif args.test_iterations == 1: | ||
configure_test(arg_parser, args, cfg) | ||
execute_test(cfg, args.kill_running_processes) | ||
else: | ||
console.info("Please enter a valid number of test iterations") | ||
Comment on lines
+1008
to
+1009
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will we ever reach this point since argparse by default sets the value to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. True, I removed this. Although users can potentially enter invalid values There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oops, you're right. Let's add it back |
||
elif sub_command == "create-workload": | ||
cfg.add(config.Scope.applicationOverride, "generator", "indices", args.indices) | ||
cfg.add(config.Scope.applicationOverride, "generator", "number_of_docs", args.number_of_docs) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively, we could assign the user_tag to a k,v pair something like
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would help users who are trying to filtering for specific aggregated files in the MDS
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't consider this, I fixed it to look like your suggestion above!