diff --git a/osbenchmark/workload_generator/config.py b/osbenchmark/workload_generator/config.py new file mode 100644 index 000000000..a3ddff370 --- /dev/null +++ b/osbenchmark/workload_generator/config.py @@ -0,0 +1,30 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# Modifications Copyright OpenSearch Contributors. See +# GitHub history for details. + +from dataclasses import dataclass, field +from typing import List + +@dataclass +class Index: + name: str = None + document_frequency: int = 1 + number_of_docs: int = None + settings_and_mappings: dict = field(default_factory=dict) + +@dataclass +class CustomWorkload: + workload_name: str = None + output_path: str = None + workload_path: str = None + operations_path: str = None + test_procedures_path: str = None + indices: List[Index] = field(default_factory=list) + extracted_indices: List[str] = field(default_factory=list) + failed_indices: List[str] = field(default_factory=list) + corpora: List[dict] = field(default_factory=list) + queries: List[dict] = field(default_factory=list) diff --git a/osbenchmark/workload_generator/corpus.py b/osbenchmark/workload_generator/corpus.py deleted file mode 100644 index 7512d5c77..000000000 --- a/osbenchmark/workload_generator/corpus.py +++ /dev/null @@ -1,112 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# -# The OpenSearch Contributors require contributions made to -# this file be licensed under the Apache-2.0 license or a -# compatible open source license. -# Modifications Copyright OpenSearch Contributors. See -# GitHub history for details. -# Licensed to Elasticsearch B.V. under one or more contributor -# license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright -# ownership. Elasticsearch B.V. licenses this file to you under -# the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -import bz2 -import json -import logging -import os - -from osbenchmark.utils import console - - -DOCS_COMPRESSOR = bz2.BZ2Compressor -COMP_EXT = ".bz2" - - -def template_vars(index_name, out_path, doc_count): - comp_outpath = out_path + COMP_EXT - return { - "index_name": index_name, - "filename": os.path.basename(comp_outpath), - "path": comp_outpath, - "doc_count": doc_count, - "uncompressed_bytes": os.path.getsize(out_path), - "compressed_bytes": os.path.getsize(comp_outpath) - } - - -def get_doc_outpath(outdir, name, suffix=""): - return os.path.join(outdir, f"{name}-documents{suffix}.json") - - -def extract(client, output_path, index, number_of_docs_requested=None): - """ - Scroll an index with a match-all query, dumping document source to ``outdir/documents.json``. - - :param client: OpenSearch client used to extract data - :param output_path: Destination directory for corpus dump - :param index: Name of index to dump - :return: dict of properties describing the corpus for templates - """ - - logger = logging.getLogger(__name__) - - number_of_docs = client.count(index=index)["count"] - - total_docs = number_of_docs if not number_of_docs_requested else min(number_of_docs, number_of_docs_requested) - - if total_docs > 0: - logger.info("[%d] total docs in index [%s]. Extracting [%s] docs.", number_of_docs, index, total_docs) - docs_path = get_doc_outpath(output_path, index) - dump_documents(client, index, get_doc_outpath(output_path, index, "-1k"), min(total_docs, 1000), " for test mode") - dump_documents(client, index, docs_path, total_docs) - return template_vars(index, docs_path, total_docs) - else: - logger.info("Skipping corpus extraction fo index [%s] as it contains no documents.", index) - return None - - -def dump_documents(client, index, out_path, number_of_docs, progress_message_suffix=""): - # pylint: disable=import-outside-toplevel - from opensearchpy import helpers - - logger = logging.getLogger(__name__) - freq = max(1, number_of_docs // 1000) - - progress = console.progress() - compressor = DOCS_COMPRESSOR() - comp_outpath = out_path + COMP_EXT - with open(out_path, "wb") as outfile: - with open(comp_outpath, "wb") as comp_outfile: - logger.info("Dumping corpus for index [%s] to [%s].", index, out_path) - query = {"query": {"match_all": {}}} - for n, doc in enumerate(helpers.scan(client, query=query, index=index)): - if n >= number_of_docs: - break - data = (json.dumps(doc["_source"], separators=(",", ":")) + "\n").encode("utf-8") - - outfile.write(data) - comp_outfile.write(compressor.compress(data)) - - render_progress(progress, progress_message_suffix, index, n + 1, number_of_docs, freq) - - comp_outfile.write(compressor.flush()) - progress.finish() - - -def render_progress(progress, progress_message_suffix, index, cur, total, freq): - if cur % freq == 0 or total - cur < freq: - msg = f"Extracting documents for index [{index}]{progress_message_suffix}..." - percent = (cur * 100) / total - progress.print(msg, f"{cur}/{total} docs [{percent:.1f}% done]") diff --git a/osbenchmark/workload_generator/extractors.py b/osbenchmark/workload_generator/extractors.py new file mode 100644 index 000000000..a92dc0f7d --- /dev/null +++ b/osbenchmark/workload_generator/extractors.py @@ -0,0 +1,227 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# Modifications Copyright OpenSearch Contributors. See +# GitHub history for details. + +import bz2 +import json +import logging +import os +from abc import ABC, abstractmethod + +from opensearchpy import OpenSearchException + +from osbenchmark.utils import console +from osbenchmark.workload_generator.config import CustomWorkload + +DOCS_COMPRESSOR = bz2.BZ2Compressor +COMP_EXT = ".bz2" + +class IndexExtractor: + def __init__(self, custom_workload, client): + self.custom_workload: CustomWorkload = custom_workload + self.client = client + + self.INDEX_SETTINGS_EPHEMERAL_KEYS = ["uuid", + "creation_date", + "version", + "provided_name", + "store"] + self.INDEX_SETTINGS_PARAMETERS = { + "number_of_replicas": "{{{{number_of_replicas | default({orig})}}}}", + "number_of_shards": "{{{{number_of_shards | default({orig})}}}}" + } + self.logger = logging.getLogger(__name__) + + def extract_indices(self, workload_path): + extracted_indices, failed_indices = [], [] + try: + for index in self.custom_workload.indices: + extracted_indices += self.extract(workload_path, index.name) + except OpenSearchException: + self.logger("Failed at extracting index [%s]", index) + failed_indices += index + + return extracted_indices, failed_indices + + def extract(self, outdir, index_pattern): + """ + Extracts and writes index settings and + mappings to ".json" in a workload + :param outdir: destination directory + :param index_pattern: name of index or index pattern + :return: Dictionary of template variables corresponding to the + specified index / indices + """ + results = [] + + index_obj = self.extract_index_mapping_and_settings(index_pattern) + for index, details in index_obj.items(): + filename = f"{index}.json" + outpath = os.path.join(outdir, filename) + with open(outpath, "w") as outfile: + json.dump(details, outfile, indent=4, sort_keys=True) + outfile.write("\n") + results.append({ + "name": index, + "path": outpath, + "filename": filename, + }) + return results + + def extract_index_mapping_and_settings(self, index_pattern): + """ + Uses client to retrieve mapping + settings, filtering settings + related to index / indices. They will be used to re-create + index / indices + :param index_pattern: name of index or index pattern + :return: dictionary of index / indices mappings and settings + """ + results = {} + logger = logging.getLogger(__name__) + # the response might contain multiple indices if a wildcard was provided + response = self.client.indices.get(index_pattern) + for index, details in response.items(): + valid, reason = self.is_valid_index(index) + if valid: + mappings = details["mappings"] + index_settings = self.filter_ephemeral_index_settings(details["settings"]["index"]) + self.update_index_setting_parameters(index_settings) + results[index] = { + "mappings": mappings, + "settings": { + "index": index_settings + } + } + else: + logger.info("Skipping index [%s] (reason: %s).", index, reason) + + return results + + def filter_ephemeral_index_settings(self, settings): + """ + Some of the 'settings' (like uuid, creation-date, etc.) + published by OpenSearch for an index are + ephemeral values, not useful for re-creating the index. + :param settings: Index settings published by index.get() + :return: settings with ephemeral keys removed + """ + filtered = dict(settings) + for s in self.INDEX_SETTINGS_EPHEMERAL_KEYS: + filtered.pop(s, None) + return filtered + + + def update_index_setting_parameters(self, settings): + for s, param in self.INDEX_SETTINGS_PARAMETERS.items(): + if s in settings: + orig_value = settings[s] + settings[s] = param.format(orig=orig_value) + + + def is_valid_index(self, index_name): + if len(index_name) == 0: + return False, "Index name is empty" + if index_name.startswith("."): + return False, f"Index [{index_name}] is hidden" + return True, None + + +class CorpusExtractor(ABC): + + @abstractmethod + def extract_documents(self, index, documents_limit=None): + pass + + +class SequentialCorpusExtractor(CorpusExtractor): + def __init__(self, custom_workload, client): + self.custom_workload: CustomWorkload = custom_workload + self.client = client + + def template_vars(self,index_name, out_path, doc_count): + comp_outpath = out_path + COMP_EXT + return { + "index_name": index_name, + "filename": os.path.basename(comp_outpath), + "path": comp_outpath, + "doc_count": doc_count, + "uncompressed_bytes": os.path.getsize(out_path), + "compressed_bytes": os.path.getsize(comp_outpath) + } + + def _get_doc_outpath(self, outdir, name, suffix=""): + return os.path.join(outdir, f"{name}-documents{suffix}.json") + + + def extract_documents(self, index, documents_limit=None): + """ + Scroll an index with a match-all query, dumping document source to ``outdir/documents.json``. + + :param index: Name of index to dump + :param documents_limit: The number of documents to extract. Must be equal + to or less than the total number of documents that exists in the index + :return: dict of properties describing the corpus for templates + """ + + logger = logging.getLogger(__name__) + + total_documents = self.client.count(index=index)["count"] + + documents_to_extract = total_documents if not documents_limit else min(total_documents, documents_limit) + + if documents_to_extract > 0: + logger.info("[%d] total docs in index [%s]. Extracting [%s] docs.", total_documents, index, documents_to_extract) + docs_path = self._get_doc_outpath(self.custom_workload.workload_path, index) + # Create test mode corpora + self.dump_documents( + self.client, + index, + self._get_doc_outpath(self.custom_workload.workload_path, index, "-1k"), + min(documents_to_extract, 1000), + " for test mode") + # Create full corpora + self.dump_documents(self.client, index, docs_path, documents_to_extract) + + return self.template_vars(index, docs_path, documents_to_extract) + else: + logger.info("Skipping corpus extraction fo index [%s] as it contains no documents.", index) + return None + + + def dump_documents(self, client, index, out_path, number_of_docs, progress_message_suffix=""): + # pylint: disable=import-outside-toplevel + from opensearchpy import helpers + + logger = logging.getLogger(__name__) + freq = max(1, number_of_docs // 1000) + + progress = console.progress() + compressor = DOCS_COMPRESSOR() + comp_outpath = out_path + COMP_EXT + with open(out_path, "wb") as outfile: + with open(comp_outpath, "wb") as comp_outfile: + logger.info("Dumping corpus for index [%s] to [%s].", index, out_path) + query = {"query": {"match_all": {}}} + for i, doc in enumerate(helpers.scan(client, query=query, index=index)): + if i >= number_of_docs: + break + data = (json.dumps(doc["_source"], separators=(",", ":")) + "\n").encode("utf-8") + + outfile.write(data) + comp_outfile.write(compressor.compress(data)) + + self.render_progress(progress, progress_message_suffix, index, i + 1, number_of_docs, freq) + + comp_outfile.write(compressor.flush()) + progress.finish() + + + def render_progress(self, progress, progress_message_suffix, index, cur, total, freq): + if cur % freq == 0 or total - cur < freq: + msg = f"Extracting documents for index [{index}]{progress_message_suffix}..." + percent = (cur * 100) / total + progress.print(msg, f"{cur}/{total} docs [{percent:.1f}% done]") diff --git a/osbenchmark/workload_generator/helpers.py b/osbenchmark/workload_generator/helpers.py new file mode 100644 index 000000000..7a03e47d7 --- /dev/null +++ b/osbenchmark/workload_generator/helpers.py @@ -0,0 +1,159 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# Modifications Copyright OpenSearch Contributors. See +# GitHub history for details. + +import json +import os +import logging +import sys +import shutil + +from jinja2 import Environment, FileSystemLoader, select_autoescape + +from osbenchmark import exceptions +from osbenchmark.utils import io, console +from osbenchmark.workload_generator.config import CustomWorkload, Index + + +BASE_WORKLOAD = "base-workload" +CUSTOM_OPERATIONS = "custom-operations" +CUSTOM_TEST_PROCEDURES = "custom-test-procedures" +DEFAULT_OPERATIONS = "default-operations" +DEFAULT_TEST_PROCEDURES = "default-test-procedures" +TEMPLATE_EXT = ".json.j2" + +class CustomWorkloadWriter: + + def __init__(self, custom_workload: CustomWorkload, templates_path: str): + self.custom_workload = custom_workload + self.templates_path = templates_path + + self.custom_workload.workload_path = os.path.abspath( + os.path.join(io.normalize_path(self.custom_workload.output_path), + self.custom_workload.workload_name)) + self.custom_workload.operations_path = os.path.join(self.custom_workload.workload_path, "operations") + self.custom_workload.test_procedures_path = os.path.join(self.custom_workload.workload_path, "test_procedures") + self.logger = logging.getLogger(__name__) + + def make_workload_directory(self): + if not self._has_write_permission(self.custom_workload.workload_path): + error_suggestion = "Workload output path does not have write permissions. " \ + + "Please update the permissions for the specified output path or choose a different output path." + self.logger.error(error_suggestion) + console.error(error_suggestion) + + # Check if a workload of the same name already exists in output path + if os.path.exists(self.custom_workload.workload_path): + try: + input_text = f"A workload already exists at {self.custom_workload.workload_path}. " \ + + "Would you like to remove it? (y/n): " + user_decision = input(input_text) + while user_decision not in ('y', 'n'): + user_decision = input("Provide y for yes or n for no. " + input_text) + + if user_decision == "y": + self.logger.info("Removing existing workload [%s] in path [%s]", + self.custom_workload.workload_name, self.custom_workload.workload_path) + console.info("Removing workload of the same name.") + shutil.rmtree(self.custom_workload.workload_path) + elif user_decision == "n": + logging_info = "Keeping workload of the same name at existing path. Cancelling create-workload." + self.logger.info(logging_info) + console.println("") + console.info(logging_info) + sys.exit(0) + + except OSError: + self.logger.error("Had issues removing existing workload [%s] in path [%s]", + self.custom_workload.workload_name, self.custom_workload.workload_path) + + io.ensure_dir(self.custom_workload.workload_path) + io.ensure_dir(self.custom_workload.operations_path) + io.ensure_dir(self.custom_workload.test_procedures_path) + + def _has_write_permission(self, directory): + """ + Verify if output directory for workload has write permissions + """ + return os.access(directory, os.W_OK) + + def render_templates(self, template_vars: dict, custom_queries: dict): + workload_file_path = os.path.join(self.custom_workload.workload_path, "workload.json") + operations_file_path = os.path.join(self.custom_workload.operations_path, "default.json") + test_procedures_file_path = os.path.join(self.custom_workload.test_procedures_path, "default.json") + + self._write_template(template_vars, BASE_WORKLOAD, workload_file_path) + + if custom_queries: + self._write_template(template_vars, CUSTOM_OPERATIONS, operations_file_path) + self._write_template(template_vars, CUSTOM_TEST_PROCEDURES, test_procedures_file_path) + else: + self._write_template(template_vars, DEFAULT_OPERATIONS, operations_file_path) + self._write_template(template_vars, DEFAULT_TEST_PROCEDURES, test_procedures_file_path) + + def _write_template(self, template_vars: dict, template_file: str, output_path: str): + template = self._get_default_template(template_file) + with open(output_path, "w") as f: + f.write(template.render(template_vars)) + + def _get_default_template(self, template_file: str): + template_file_name = template_file + TEMPLATE_EXT + + env = Environment(loader=FileSystemLoader(self.templates_path), autoescape=select_autoescape(['html', 'xml'])) + + return env.get_template(template_file_name) + +class QueryProcessor: + def __init__(self, queries: str): + self.queries = queries + + def process_queries(self): + if not self.queries: + return [] + + with self.queries as queries: + try: + processed_queries = json.load(queries) + if isinstance(data, dict): + data = [data] + except ValueError as err: + raise exceptions.SystemSetupError(f"Ensure JSON schema is valid and queries are contained in a list: {err}") + + return processed_queries + +def process_indices(indices, document_frequency, number_of_docs): + processed_indices = [] + for index_name in indices: + index = Index( + name=index_name, + document_frequency=document_frequency, + number_of_docs=number_of_docs + ) + processed_indices.append(index) + + return processed_indices + + +def validate_index_documents_map(indices, indices_docs_map): + logger = logging.getLogger(__name__) + logger.info("Indices Docs Map: [%s]", indices_docs_map) + documents_limited = indices_docs_map is not None and len(indices_docs_map) > 0 + if not documents_limited: + return + + if len(indices) < len(indices_docs_map): + raise exceptions.SystemSetupError( + "Number of : pairs exceeds number of indices in --indices. " + + "Ensure number of : pairs is less than or equal to number of indices in --indices." + ) + + for index_name in indices_docs_map: + if index_name not in indices: + raise exceptions.SystemSetupError( + "Index from : pair was not found in --indices. " + + "Ensure that indices from all : pairs exist in --indices." + ) diff --git a/osbenchmark/workload_generator/index.py b/osbenchmark/workload_generator/index.py deleted file mode 100644 index 71301101d..000000000 --- a/osbenchmark/workload_generator/index.py +++ /dev/null @@ -1,120 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# -# The OpenSearch Contributors require contributions made to -# this file be licensed under the Apache-2.0 license or a -# compatible open source license. -# Modifications Copyright OpenSearch Contributors. See -# GitHub history for details. -# Licensed to Elasticsearch B.V. under one or more contributor -# license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright -# ownership. Elasticsearch B.V. licenses this file to you under -# the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -import json -import logging -import os - -INDEX_SETTINGS_EPHEMERAL_KEYS = ["uuid", - "creation_date", - "version", - "provided_name", - "store"] -INDEX_SETTINGS_PARAMETERS = { - "number_of_replicas": "{{{{number_of_replicas | default({orig})}}}}", - "number_of_shards": "{{{{number_of_shards | default({orig})}}}}" -} - - -def filter_ephemeral_index_settings(settings): - """ - Some of the 'settings' published by OpenSearch for an index are - ephemeral values, not useful for re-creating the index. - :param settings: Index settings published by index.get() - :return: settings with ephemeral keys removed - """ - filtered = dict(settings) - for s in INDEX_SETTINGS_EPHEMERAL_KEYS: - filtered.pop(s, None) - return filtered - - -def update_index_setting_parameters(settings): - for s, param in INDEX_SETTINGS_PARAMETERS.items(): - if s in settings: - orig_value = settings[s] - settings[s] = param.format(orig=orig_value) - - -def is_valid(index_name): - if len(index_name) == 0: - return False, "Index name is empty" - if index_name.startswith("."): - return False, f"Index [{index_name}] is hidden" - return True, None - - -def extract_index_mapping_and_settings(client, index_pattern): - """ - Calls index GET to retrieve mapping + settings, filtering settings - so they can be used to re-create this index - :param client: OpenSearch client - :param index_pattern: name of index - :return: index creation dictionary - """ - results = {} - logger = logging.getLogger(__name__) - # the response might contain multiple indices if a wildcard was provided - response = client.indices.get(index_pattern) - for index, details in response.items(): - valid, reason = is_valid(index) - if valid: - mappings = details["mappings"] - index_settings = filter_ephemeral_index_settings(details["settings"]["index"]) - update_index_setting_parameters(index_settings) - results[index] = { - "mappings": mappings, - "settings": { - "index": index_settings - } - } - else: - logger.info("Skipping index [%s] (reason: %s).", index, reason) - - return results - - -def extract(client, outdir, index_pattern): - """ - Request index information to format in "index.json" for Benchmark - :param client: OpenSearch client - :param outdir: destination directory - :param index_pattern: name of index - :return: Dict of template variables representing the index for use in workload - """ - results = [] - - index_obj = extract_index_mapping_and_settings(client, index_pattern) - for index, details in index_obj.items(): - filename = f"{index}.json" - outpath = os.path.join(outdir, filename) - with open(outpath, "w") as outfile: - json.dump(details, outfile, indent=4, sort_keys=True) - outfile.write("\n") - results.append({ - "name": index, - "path": outpath, - "filename": filename, - }) - return results diff --git a/osbenchmark/workload_generator/workload_generator.py b/osbenchmark/workload_generator/workload_generator.py index 60163701e..41adac508 100644 --- a/osbenchmark/workload_generator/workload_generator.py +++ b/osbenchmark/workload_generator/workload_generator.py @@ -5,203 +5,88 @@ # compatible open source license. # Modifications Copyright OpenSearch Contributors. See # GitHub history for details. -# Licensed to Elasticsearch B.V. under one or more contributor -# license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright -# ownership. Elasticsearch B.V. licenses this file to you under -# the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. import logging import os -import shutil -import json - -from opensearchpy import OpenSearchException -from jinja2 import Environment, FileSystemLoader, select_autoescape from osbenchmark import PROGRAM_NAME, exceptions from osbenchmark.client import OsClientFactory -from osbenchmark.workload_generator import corpus, index +from osbenchmark.workload_generator.config import CustomWorkload +from osbenchmark.workload_generator.helpers import QueryProcessor, CustomWorkloadWriter, process_indices, validate_index_documents_map +from osbenchmark.workload_generator.extractors import IndexExtractor, SequentialCorpusExtractor from osbenchmark.utils import io, opts, console -def validate_indices_docs_map(indices, indices_docs_map, docs_were_requested): - if not docs_were_requested: - return - - if len(indices) < len(indices_docs_map): - raise exceptions.SystemSetupError( - "Number of : pairs exceeds number of indices in --indices. " + - "Ensure number of : pairs is less than or equal to number of indices in --indices." - ) - - for index_name in indices_docs_map: - if index_name not in indices: - raise exceptions.SystemSetupError( - "Index from : pair was not found in --indices. " + - "Ensure that indices from all : pairs exist in --indices." - ) - -def extract_mappings_and_corpora(client, output_path, indices_to_extract, indices_docs_map): - indices = [] - corpora = [] - docs_were_requested = indices_docs_map is not None and len(indices_docs_map) > 0 - - validate_indices_docs_map(indices_to_extract, indices_docs_map, docs_were_requested) - - # first extract index metadata (which is cheap) and defer extracting data to reduce the potential for - # errors due to invalid index names late in the process. - for index_name in indices_to_extract: - try: - indices += index.extract(client, output_path, index_name) - except OpenSearchException: - logging.getLogger(__name__).exception("Failed to extract index [%s]", index_name) - - # That list only contains valid indices (with index patterns already resolved) - # For each index, check if docs were requested. If so, extract the number of docs from the map - for i in indices: - custom_docs_to_extract = None - - if docs_were_requested and i["name"] in indices_docs_map: - try: - custom_docs_to_extract = int(indices_docs_map.get(i["name"])) - except ValueError: - raise exceptions.InvalidSyntax( - f"The string [{indices_docs_map.get(i['name'])}] in : pair cannot be converted to an integer." - ) - - logging.getLogger(__name__).info("Extracting [%s] docs for index [%s]", custom_docs_to_extract, i["name"]) - c = corpus.extract(client, output_path, i["name"], custom_docs_to_extract) - if c: - corpora.append(c) - - return indices, corpora - -def process_custom_queries(custom_queries): - if not custom_queries: - return [] - - with custom_queries as queries: - try: - data = json.load(queries) - if isinstance(data, dict): - data = [data] - except ValueError as err: - raise exceptions.SystemSetupError(f"Ensure JSON schema is valid and queries are contained in a list: {err}") - - return data - -def write_template(template_vars, templates_path, output_path, template_file): - template = get_template(template_file, templates_path) - with open(output_path, "w") as f: - f.write(template.render(template_vars)) - -def get_template(template_file, templates_path): - template_file_name = template_file + ".json.j2" - - env = Environment(loader=FileSystemLoader(templates_path), autoescape=select_autoescape(['html', 'xml'])) - - return env.get_template(template_file_name) - -def render_templates(workload_path, - operations_path, - test_procedures_path, - templates_path, - template_vars, - custom_queries): - write_template(template_vars, templates_path, workload_path, "base-workload") - - if custom_queries: - write_template(template_vars, templates_path, operations_path, "custom-operations") - write_template(template_vars, templates_path, test_procedures_path, "custom-test-procedures") - else: - write_template(template_vars, templates_path, operations_path, "default-operations") - write_template(template_vars, templates_path, test_procedures_path, "default-test-procedures") - def create_workload(cfg): logger = logging.getLogger(__name__) # All inputs provided by user - workload_name = cfg.opts("workload", "workload.name") - indices = cfg.opts("generator", "indices") - root_path = cfg.opts("generator", "output.path") - target_hosts = cfg.opts("client", "hosts") - client_options = cfg.opts("client", "options") - number_of_docs = cfg.opts("generator", "number_of_docs") - unprocessed_custom_queries = cfg.opts("workload", "custom_queries") - templates_path = os.path.join(cfg.opts("node", "benchmark.root"), "resources") - - # Process custom queries - custom_queries = process_custom_queries(unprocessed_custom_queries) - - logger.info("Creating workload [%s] matching indices [%s]", workload_name, indices) - logger.info("Number of Docs: %s", number_of_docs) - - # Initialize client factory + workload_name: str = cfg.opts("workload", "workload.name") + indices: list = cfg.opts("generator", "indices") + output_path: str = cfg.opts("generator", "output.path") + target_hosts: opts.TargetHosts = cfg.opts("client", "hosts") + client_options: opts.ClientOptions = cfg.opts("client", "options") + # document_frequency: int = cfg.opts("generator", "document_frequency") # Enable later + document_frequency: int = 0 + number_of_docs: dict = cfg.opts("generator", "number_of_docs") + unprocessed_queries: dict = cfg.opts("workload", "custom_queries") + templates_path: str = os.path.join(cfg.opts("node", "benchmark.root"), "resources") + + # Validation + validate_index_documents_map(indices, number_of_docs) + client = OsClientFactory(hosts=target_hosts.all_hosts[opts.TargetHosts.DEFAULT], client_options=client_options.all_client_options[opts.TargetHosts.DEFAULT]).create() info = client.info() console.info(f"Connected to OpenSearch cluster [{info['name']}] version [{info['version']['number']}].\n", logger=logger) - # Establish output paths directory - output_path = os.path.abspath(os.path.join(io.normalize_path(root_path), workload_name)) + processed_indices = process_indices(indices, document_frequency, number_of_docs) - operations_path = os.path.join(output_path, "operations") - test_procedures_path = os.path.join(output_path, "test_procedures") + custom_workload = CustomWorkload( + workload_name=workload_name, + output_path=output_path, + indices=processed_indices, + ) + custom_workload.workload_path = os.path.abspath(os.path.join(io.normalize_path(output_path), workload_name)) + custom_workload.operations_path = os.path.join(custom_workload.workload_path, "operations") + custom_workload.test_procedures_path = os.path.join(custom_workload.workload_path, "test_procedures") - if os.path.exists(output_path): - try: - logger.info("Workload already exists. Removing existing workload [%s] in path [%s]", workload_name, output_path) - shutil.rmtree(output_path) - except OSError: - logger.error("Had issues removing existing workload [%s] in path [%s]", workload_name, output_path) + query_processor = QueryProcessor(unprocessed_queries) + custom_workload_writer = CustomWorkloadWriter(custom_workload, templates_path) + index_extractor = IndexExtractor(custom_workload, client) + corpus_extractor = SequentialCorpusExtractor(custom_workload, client) - io.ensure_dir(output_path) - io.ensure_dir(operations_path) - io.ensure_dir(test_procedures_path) + # Process Queries + processed_queries = query_processor.process_queries() + custom_workload.queries = processed_queries + logger.info("Processed custom queries [%s]", custom_workload.queries) - # Extract Indices and Corpora - logger.info("Extracting indices and corpora") - indices, corpora = extract_mappings_and_corpora(client, output_path, indices, number_of_docs) - logger.info("Finished extracting indices and corpora") + # Create Workload Output Path + custom_workload_writer.make_workload_directory() + logger.info("Created workload output path at [%s]", custom_workload.workload_path) - if len(indices) == 0: - raise RuntimeError("Failed to extract any indices for workload!") + # Extract Index Settings and Mappings + custom_workload.extracted_indices, custom_workload.failed_indices = index_extractor.extract_indices(custom_workload.workload_path) + logger.info("Extracted index settings and mappings from [%s]", custom_workload.indices) - # Collect all itmes into dictionary - template_vars = { - "workload_name": workload_name, - "indices": indices, - "corpora": corpora, - "custom_queries": custom_queries - } + # Extract Corpora + for index in custom_workload.indices: + index_corpora = corpus_extractor.extract_documents(index.name, index.number_of_docs) + custom_workload.corpora.append(index_corpora) + logger.info("Extracted all corpora [%s]", custom_workload.corpora) - logger.info("Template Vars: %s", template_vars) + if len(custom_workload.corpora) == 0: + raise exceptions.BenchmarkError("Failed to extract corpora for any indices for workload!") - workload_path = os.path.join(output_path, "workload.json") - operations_path = os.path.join(operations_path, "default.json") - test_procedures_path = os.path.join(test_procedures_path, "default.json") + template_vars = { + "workload_name": custom_workload.workload_name, + "indices": custom_workload.extracted_indices, + "corpora": custom_workload.corpora, + "custom_queries": custom_workload.queries + } + logger.info("Template vars [%s]", template_vars) # Render all templates - logger.info("Rendering templates") - render_templates( - workload_path, - operations_path, - test_procedures_path, - templates_path, - template_vars, - custom_queries - ) + custom_workload_writer.render_templates(template_vars, custom_workload.queries) console.println("") - console.info(f"Workload {workload_name} has been created. Run it with: {PROGRAM_NAME} --workload-path={output_path}") + console.info(f"Workload {workload_name} has been created. Run it with: {PROGRAM_NAME} --workload-path={custom_workload.workload_path}") diff --git a/tests/workload_generator/corpus_test.py b/tests/workload_generator/corpus_test.py index 30f089260..0312a11ff 100644 --- a/tests/workload_generator/corpus_test.py +++ b/tests/workload_generator/corpus_test.py @@ -22,78 +22,88 @@ # specific language governing permissions and limitations # under the License. import json -from unittest import mock -from unittest.mock import call +from unittest import mock, TestCase +from unittest.mock import call, Mock -from osbenchmark.workload_generator import corpus +from osbenchmark.workload_generator.config import CustomWorkload +from osbenchmark.workload_generator.extractors import SequentialCorpusExtractor +class TestSequentialCorpusExtractor(TestCase): -def serialize_doc(doc): - return (json.dumps(doc, separators=(",", ":")) + "\n").encode("utf-8") + def setUp(self): + self.mock_custom_workload = Mock(spec=CustomWorkload) + self.mock_custom_workload.workload_path = "/abs/outpath/to/workloads/" + # pylint: disable=no-value-for-parameter + self.mock_client = self.create_mock_client() + self.corpus_extractor = SequentialCorpusExtractor(self.mock_custom_workload, self.mock_client) + @mock.patch("opensearchpy.OpenSearch") + def create_mock_client(self, client): + return client -@mock.patch("builtins.open", new_callable=mock.mock_open) -@mock.patch("opensearchpy.OpenSearch") -def test_extract(client, mo): - doc = { - "field1": "stuff", - "field2": "things" - } - doc_data = serialize_doc(doc) - client.count.return_value = { - "count": 1001 - } - client.search.return_value = { - "_scroll_id": "uohialjrknf", - "_shards": { - "successful": 1, - "total": 1, - "skipped": 0 - }, - "hits": { - "hits": [ - { - "_index": "test", - "_id": "0", - "_score": 0, - "_source": doc - } - ] + def serialize_doc(self, doc): + return (json.dumps(doc, separators=(",", ":")) + "\n").encode("utf-8") + + @mock.patch("builtins.open", new_callable=mock.mock_open) + def test_extract(self, mo): + doc = { + "field1": "stuff", + "field2": "things" + } + doc_data = self.serialize_doc(doc) + self.mock_client.count.return_value = { + "count": 1001 + } + self.mock_client.search.return_value = { + "_scroll_id": "uohialjrknf", + "_shards": { + "successful": 1, + "total": 1, + "skipped": 0 + }, + "hits": { + "hits": [ + { + "_index": "test", + "_id": "0", + "_score": 0, + "_source": doc + } + ] + } } - } - def set_corp_size(*args, **kwargs): - path = args[0] - mockstat = mock.Mock() - if ".bz2" in path: - mockstat.st_size = 500 - else: - mockstat.st_size = 1000 - return mockstat + def set_corp_size(*args, **kwargs): + path = args[0] + mockstat = mock.Mock() + if ".bz2" in path: + mockstat.st_size = 500 + else: + mockstat.st_size = 1000 + return mockstat - client.scroll.return_value = {} + self.mock_client.scroll.return_value = {} - index = "test" - outdir = "/abs/outpath/to/workloads/" + index = "test" - with mock.patch("os.stat") as osstat: - osstat.side_effect = set_corp_size - res = corpus.extract(client, outdir, index) - assert mo.call_count == 4 - mo.assert_has_calls([call("/abs/outpath/to/workloads/test-documents.json", "wb"), - call("/abs/outpath/to/workloads/test-documents.json.bz2", "wb"), - call("/abs/outpath/to/workloads/test-documents-1k.json", "wb"), - call("/abs/outpath/to/workloads/test-documents-1k.json.bz2", "wb") - ], any_order=True) + with mock.patch("os.stat") as osstat: + osstat.side_effect = set_corp_size + res = self.corpus_extractor.extract_documents(index) + assert mo.call_count == 4 + mo.assert_has_calls([call("/abs/outpath/to/workloads/test-documents.json", "wb"), + call("/abs/outpath/to/workloads/test-documents.json.bz2", "wb"), + call("/abs/outpath/to/workloads/test-documents-1k.json", "wb"), + call("/abs/outpath/to/workloads/test-documents-1k.json.bz2", "wb") + ], any_order=True) - assert res == { - "filename": "test-documents.json.bz2", - "path": "/abs/outpath/to/workloads/test-documents.json.bz2", - "compressed_bytes": 500, - "index_name": "test", - "doc_count": 1001, - "uncompressed_bytes": 1000 - } + assert res == { + "filename": "test-documents.json.bz2", + "path": "/abs/outpath/to/workloads/test-documents.json.bz2", + "compressed_bytes": 500, + "index_name": "test", + "doc_count": 1001, + "uncompressed_bytes": 1000 + } - file_mock = mo.return_value - file_mock.assert_has_calls([call.write(doc_data)]) + file_mock = mo.return_value + file_mock.assert_has_calls([call.write(doc_data)]) diff --git a/tests/workload_generator/index_test.py b/tests/workload_generator/index_test.py index 1ab87ea53..b06208895 100644 --- a/tests/workload_generator/index_test.py +++ b/tests/workload_generator/index_test.py @@ -22,153 +22,161 @@ # specific language governing permissions and limitations # under the License. -from unittest import mock -from osbenchmark.workload_generator.index import ( - filter_ephemeral_index_settings, - extract_index_mapping_and_settings, - update_index_setting_parameters) +from unittest import TestCase +from unittest.mock import Mock, patch +from osbenchmark.workload_generator.config import CustomWorkload +from osbenchmark.workload_generator.extractors import IndexExtractor -def test_index_setting_filter(): - unfiltered_index_settings = { - "number_of_shards": "5", - "provided_name": "queries", - "creation_date": "1579230289084", - "requests": { - "cache": { - "enable": "false" - } - }, - "number_of_replicas": "0", - "queries": { - "cache": { - "enabled": "false" - } - }, - "uuid": "jdzVt-dDS1aRlqdZWK4pdA", - "version": { - "created": "7050099" - }, - "store": { - "type": "fs" - } - } - settings = filter_ephemeral_index_settings(unfiltered_index_settings) - assert settings.keys() == {"number_of_shards", "number_of_replicas", "requests", "queries"} +class TestIndexExtractor(TestCase): + def setUp(self): + self.mock_custom_workload = Mock(spec=CustomWorkload()) + # pylint: disable=no-value-for-parameter + self.mock_client = self.create_mock_client() + self.index_extractor = IndexExtractor(self.mock_custom_workload, self.mock_client) -def test_index_setting_parameters(): - settings = { - "number_of_shards": "5", - "provided_name": "queries", - "creation_date": "1579230289084", - "requests": { - "cache": { - "enable": "false" - } - }, - "number_of_replicas": "0", - } - update_index_setting_parameters(settings) - assert settings == { - "number_of_shards": "{{number_of_shards | default(5)}}", - "provided_name": "queries", - "creation_date": "1579230289084", - "requests": { - "cache": { - "enable": "false" - } - }, - "number_of_replicas": "{{number_of_replicas | default(0)}}", - } - # make sure we don't explode if the parameterized settings aren't present for some reason - settings.pop("number_of_shards") - settings.pop("number_of_replicas") - update_index_setting_parameters(settings) + @patch("opensearchpy.OpenSearch") + def create_mock_client(self, client): + return client + def test_index_setting_filter(self): + unfiltered_index_settings = { + "number_of_shards": "5", + "provided_name": "queries", + "creation_date": "1579230289084", + "requests": { + "cache": { + "enable": "false" + } + }, + "number_of_replicas": "0", + "queries": { + "cache": { + "enabled": "false" + } + }, + "uuid": "jdzVt-dDS1aRlqdZWK4pdA", + "version": { + "created": "7050099" + }, + "store": { + "type": "fs" + } + } + settings = self.index_extractor.filter_ephemeral_index_settings(unfiltered_index_settings) + assert settings.keys() == {"number_of_shards", "number_of_replicas", "requests", "queries"} -@mock.patch("opensearchpy.OpenSearch") -def test_extract_index_create(client): - client.indices.get.return_value = { - "osmgeopoints": { - "aliases": {}, - "mappings": { - "dynamic": "strict", - "properties": { - "location": { - "type": "geo_point" - } + def test_index_setting_parameters(self): + settings = { + "number_of_shards": "5", + "provided_name": "queries", + "creation_date": "1579230289084", + "requests": { + "cache": { + "enable": "false" + } + }, + "number_of_replicas": "0", + } + self.index_extractor.update_index_setting_parameters(settings) + assert settings == { + "number_of_shards": "{{number_of_shards | default(5)}}", + "provided_name": "queries", + "creation_date": "1579230289084", + "requests": { + "cache": { + "enable": "false" } }, - "settings": { - "index": { - "number_of_shards": "3", - "provided_name": "osmgeopoints", - "creation_date": "1579210032233", - "requests": { - "cache": { - "enable": "false" + "number_of_replicas": "{{number_of_replicas | default(0)}}", + } + # make sure we don't explode if the parameterized settings aren't present for some reason + settings.pop("number_of_shards") + settings.pop("number_of_replicas") + self.index_extractor.update_index_setting_parameters(settings) + + def test_extract_index_create(self): + self.mock_client.indices.get.return_value = { + "osmgeopoints": { + "aliases": {}, + "mappings": { + "dynamic": "strict", + "properties": { + "location": { + "type": "geo_point" + } + } + }, + "settings": { + "index": { + "number_of_shards": "3", + "provided_name": "osmgeopoints", + "creation_date": "1579210032233", + "requests": { + "cache": { + "enable": "false" + } + }, + "number_of_replicas": "2", + "uuid": "vOOsPNfxTJyQekkIo9TjPA", + "version": { + "created": "7050099" + }, + "store": { + "type": "fs" } - }, - "number_of_replicas": "2", - "uuid": "vOOsPNfxTJyQekkIo9TjPA", - "version": { - "created": "7050099" - }, - "store": { - "type": "fs" } } - } - }, - # should be filtered - ".security": { - "mappings": {}, - "settings": { - "index": { - "number_of_shards": "1" + }, + # should be filtered + ".security": { + "mappings": {}, + "settings": { + "index": { + "number_of_shards": "1" + } } - } - }, - "geodata": { - "mappings": {}, - "settings": { - "index": { - "number_of_shards": "1" + }, + "geodata": { + "mappings": {}, + "settings": { + "index": { + "number_of_shards": "1" + } } } } - } - expected = { - "osmgeopoints": { - "mappings": { - "dynamic": "strict", - "properties": { - "location": { - "type": "geo_point" + expected = { + "osmgeopoints": { + "mappings": { + "dynamic": "strict", + "properties": { + "location": { + "type": "geo_point" + } } - } - }, - "settings": { - "index": { - "number_of_replicas": "{{number_of_replicas | default(2)}}", - "number_of_shards": "{{number_of_shards | default(3)}}", - "requests": { - "cache": { - "enable": "false" + }, + "settings": { + "index": { + "number_of_replicas": "{{number_of_replicas | default(2)}}", + "number_of_shards": "{{number_of_shards | default(3)}}", + "requests": { + "cache": { + "enable": "false" + } } } } - } - }, - "geodata": { - "mappings": {}, - "settings": { - "index": { - "number_of_shards": "{{number_of_shards | default(1)}}" + }, + "geodata": { + "mappings": {}, + "settings": { + "index": { + "number_of_shards": "{{number_of_shards | default(1)}}" + } } } } - } - res = extract_index_mapping_and_settings(client, "_all") - assert res == expected + res = self.index_extractor.extract_index_mapping_and_settings("_all") + assert res == expected