Skip to content

Commit

Permalink
Specify multiple search clients for easier benchmarking (#614)
Browse files Browse the repository at this point in the history
Signed-off-by: Finn Roblin <finnrobl@amazon.com>
  • Loading branch information
finnroblin authored Sep 5, 2024
1 parent 6f0ae23 commit 5403700
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 12 deletions.
55 changes: 47 additions & 8 deletions osbenchmark/results_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import io
import logging
import sys
import re
from enum import Enum

import tabulate

Expand All @@ -43,6 +45,11 @@
------------------------------------------------------
"""

class Throughput(Enum):
MEAN = "mean"
MAX = "max"
MIN = "min"
MEDIAN = "median"

def summarize(results, cfg):
SummaryResultsPublisher(results, cfg).publish()
Expand Down Expand Up @@ -126,6 +133,17 @@ def __init__(self, results, config):
"throughput":comma_separated_string_to_number_list(config.opts("workload", "throughput.percentiles", mandatory=False)),
"latency": comma_separated_string_to_number_list(config.opts("workload", "latency.percentiles", mandatory=False))
}
self.logger = logging.getLogger(__name__)

def publish_operational_statistics(self, metrics_table: list, warnings: list, record, task):
metrics_table.extend(self._publish_throughput(record, task))
metrics_table.extend(self._publish_latency(record, task))
metrics_table.extend(self._publish_service_time(record, task))
# this is mostly needed for debugging purposes but not so relevant to end users
if self.show_processing_time:
metrics_table.extend(self._publish_processing_time(record, task))
metrics_table.extend(self._publish_error_rate(record, task))
self.add_warnings(warnings, record, task)

def publish(self):
print_header(FINAL_SCORE)
Expand All @@ -145,16 +163,33 @@ def publish(self):

metrics_table.extend(self._publish_transform_stats(stats))

# These variables are used with the clients_list parameter in test_procedures to find the max throughput.
max_throughput = -1
record_with_best_throughput = None

throughput_pattern = r"_(\d+)_clients$"


for record in stats.op_metrics:
task = record["task"]
metrics_table.extend(self._publish_throughput(record, task))
metrics_table.extend(self._publish_latency(record, task))
metrics_table.extend(self._publish_service_time(record, task))
# this is mostly needed for debugging purposes but not so relevant to end users
if self.show_processing_time:
metrics_table.extend(self._publish_processing_time(record, task))
metrics_table.extend(self._publish_error_rate(record, task))
self.add_warnings(warnings, record, task)
is_task_part_of_throughput_testing = re.search(throughput_pattern, task)
if is_task_part_of_throughput_testing:
# assumption: all units are the same and only maximizing throughput over one operation (i.e. not both ingest and search).
# To maximize throughput over multiple operations, would need a list/dictionary of maximum throughputs.
task_throughput = record["throughput"][Throughput.MEAN.value]
self.logger.info("Task %s has throughput %s", task, task_throughput)
if task_throughput > max_throughput:
max_throughput = task_throughput
record_with_best_throughput = record

else:
self.publish_operational_statistics(metrics_table=metrics_table, warnings=warnings, record=record, task=task)

# The following code is run when the clients_list parameter is specified and publishes the max throughput.
if max_throughput != -1 and record_with_best_throughput is not None:
self.publish_operational_statistics(metrics_table=metrics_table, warnings=warnings, record=record_with_best_throughput,
task=record_with_best_throughput["task"])
metrics_table.extend(self._publish_best_client_settings(record_with_best_throughput, record_with_best_throughput["task"]))

for record in stats.correctness_metrics:
task = record["task"]
Expand Down Expand Up @@ -217,6 +252,10 @@ def _publish_recall(self, values, task):
self._line("Mean recall@1", task, recall_1_mean, "", lambda v: "%.2f" % v)
)

def _publish_best_client_settings(self, record, task):
num_clients = re.search(r"_(\d+)_clients$", task).group(1)
return self._join(self._line("Number of clients that achieved max throughput", "", num_clients, ""))

def _publish_percentiles(self, name, task, value, unit="ms"):
lines = []
percentiles = self.display_percentiles.get(name, metrics.GlobalStatsCalculator.OTHER_PERCENTILES)
Expand Down
34 changes: 30 additions & 4 deletions osbenchmark/workload/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -1596,11 +1596,25 @@ def _create_test_procedures(self, workload_spec):
schedule = []

for op in self._r(test_procedure_spec, "schedule", error_ctx=name):
if "parallel" in op:
task = self.parse_parallel(op["parallel"], ops, name)
if "clients_list" in op:
self.logger.info("Clients list specified: %s. Running multiple search tasks, "\
"each scheduled with the corresponding number of clients from the list.", op["clients_list"])
for num_clients in op["clients_list"]:
op["clients"] = num_clients

new_name = self._rename_task_based_on_num_clients(name, num_clients)

new_name = name + "_" + str(num_clients) + "_clients"
new_task = self.parse_task(op, ops, new_name)
new_task.name = new_name
schedule.append(new_task)
else:
task = self.parse_task(op, ops, name)
schedule.append(task)
if "parallel" in op:
task = self.parse_parallel(op["parallel"], ops, name)
else:
task = self.parse_task(op, ops, name)

schedule.append(task)

# verify we don't have any duplicate task names (which can be confusing / misleading in results_publishing).
known_task_names = set()
Expand Down Expand Up @@ -1635,6 +1649,18 @@ def _create_test_procedures(self, workload_spec):
% ", ".join([c.name for c in test_procedures]))
return test_procedures

def _rename_task_based_on_num_clients(self, name: str, num_clients: int) -> str:
has_underscore = "_" in name
has_hyphen = "-" in name
if has_underscore and has_hyphen:
self.logger.warning("The test procedure name %s contains a mix of _ and -. "\
"Consider changing the name to avoid frustrating bugs in the future.", name)
return name + "_" + str(num_clients) + "_clients"
elif has_hyphen:
return name + "-" + str(num_clients) + "-clients"
else:
return name + "_" + str(num_clients) + "_clients"

def _get_test_procedure_specs(self, workload_spec):
schedule = self._r(workload_spec, "schedule", mandatory=False)
test_procedure = self._r(workload_spec, "test_procedure", mandatory=False)
Expand Down
64 changes: 64 additions & 0 deletions tests/workload/loader_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2477,6 +2477,70 @@ def test_parse_unique_task_names(self):
self.assertEqual("search-two-clients", schedule[1].name)
self.assertEqual("search", schedule[1].operation.name)

def test_parse_clients_list(self):
workload_specification = {
"description": "description for unit test",
"operations": [
{
"name": "search",
"operation-type": "search",
"index": "_all"
}
],
"test_procedure": {
"name": "default-test-procedure",
"schedule": [
{
"name": "search-one-client",
"operation": "search",
"clients": 1,
"clients_list": [1,2,3]
},
{
"name": "search-two-clients",
"operation": "search",
"clients": 2
}
]
}
}

reader = loader.WorkloadSpecificationReader(selected_test_procedure="default-test-procedure")
resulting_workload = reader("unittest", workload_specification, "/mappings")
self.assertEqual("unittest", resulting_workload.name)
test_procedure = resulting_workload.test_procedures[0]
self.assertTrue(test_procedure.selected)
schedule = test_procedure.schedule
self.assertEqual(4, len(schedule))

self.assertEqual("default-test-procedure_1_clients", schedule[0].name)
self.assertEqual("search", schedule[0].operation.name)
self.assertEqual("default-test-procedure_2_clients", schedule[1].name)
self.assertEqual("search", schedule[1].operation.name)
self.assertEqual("default-test-procedure_3_clients", schedule[2].name)
self.assertEqual("search", schedule[2].operation.name)

self.assertEqual("search-two-clients", schedule[3].name)
self.assertEqual("search", schedule[3].operation.name)
# pylint: disable=W0212
def test_naming_with_clients_list(self):
reader = loader.WorkloadSpecificationReader(selected_test_procedure="default-test_procedure")
# Test case 1: name contains both "_" and "-"
result = reader._rename_task_based_on_num_clients("test_name-task", 5)
self.assertEqual(result, "test_name-task_5_clients")

# Test case 2: name contains only "-"
result = reader._rename_task_based_on_num_clients("test-name", 3)
self.assertEqual(result, "test-name-3-clients")

# Test case 3: name contains only "_"
result = reader._rename_task_based_on_num_clients("test_name", 2)
self.assertEqual(result, "test_name_2_clients")

# Test case 4: name contains neither "_" nor "-"
result = reader._rename_task_based_on_num_clients("testname", 1)
self.assertEqual(result, "testname_1_clients")

def test_parse_indices_valid_workload_specification(self):
workload_specification = {
"description": "description for unit test",
Expand Down

0 comments on commit 5403700

Please sign in to comment.