diff --git a/misc/python/materialize/docker.py b/misc/python/materialize/docker.py index 5904267c0d6e..4a40d5b9542a 100644 --- a/misc/python/materialize/docker.py +++ b/misc/python/materialize/docker.py @@ -13,6 +13,8 @@ from materialize.mz_version import MzVersion EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK: dict[str, bool] = dict() +IMAGE_TAG_OF_VERSION_PREFIX = "v" +IMAGE_TAG_OF_COMMIT_PREFIX = "devel-" def image_of_release_version_exists(version: MzVersion) -> bool: @@ -58,8 +60,31 @@ def mz_image_tag_exists(image_tag: str) -> bool: def commit_to_image_tag(commit_hash: str) -> str: - return f"devel-{commit_hash}" + return f"{IMAGE_TAG_OF_COMMIT_PREFIX}{commit_hash}" def version_to_image_tag(version: MzVersion) -> str: return str(version) + + +def is_image_tag_of_version(image_tag: str) -> bool: + return image_tag.startswith(IMAGE_TAG_OF_VERSION_PREFIX) + + +def is_image_tag_of_commit(image_tag: str) -> bool: + return image_tag.startswith(IMAGE_TAG_OF_COMMIT_PREFIX) + + +def get_version_from_image_tag(image_tag: str) -> str: + assert image_tag.startswith(IMAGE_TAG_OF_VERSION_PREFIX) + # image tag is equal to the version + return image_tag + + +def get_mz_version_from_image_tag(image_tag: str) -> MzVersion: + return MzVersion.parse_mz(get_version_from_image_tag(image_tag)) + + +def get_commit_from_image_tag(image_tag: str) -> str: + assert image_tag.startswith(IMAGE_TAG_OF_COMMIT_PREFIX) + return image_tag.removeprefix(IMAGE_TAG_OF_COMMIT_PREFIX) diff --git a/misc/python/materialize/scalability/comparison_outcome.py b/misc/python/materialize/scalability/comparison_outcome.py index 2b88fc1fbd43..5f856c392ddc 100644 --- a/misc/python/materialize/scalability/comparison_outcome.py +++ b/misc/python/materialize/scalability/comparison_outcome.py @@ -14,6 +14,7 @@ DfTotalsExtended, concat_df_totals_extended, ) +from materialize.scalability.endpoint import Endpoint from materialize.scalability.scalability_change import ( Regression, ScalabilityChange, @@ -29,6 +30,7 @@ def __init__( self.significant_improvements: list[ScalabilityImprovement] = [] self.regression_df = DfTotalsExtended() self.significant_improvement_df = DfTotalsExtended() + self.endpoints_with_regressions: set[Endpoint] = set() def has_regressions(self) -> bool: assert len(self.regressions) == self.regression_df.length() @@ -41,6 +43,9 @@ def has_significant_improvements(self) -> bool: ) return len(self.significant_improvements) > 0 + def has_scalability_changes(self) -> bool: + return self.has_regressions() or self.has_significant_improvements() + def __str__(self) -> str: return f"{len(self.regressions)} regressions, {len(self.significant_improvements)} significant improvements" @@ -60,17 +65,34 @@ def _to_description(self, entries: Sequence[ScalabilityChange]) -> str: return "\n".join(f"* {x}" for x in entries) def merge(self, other: ComparisonOutcome) -> None: - self.regressions.extend(other.regressions) - self.significant_improvements.extend(other.significant_improvements) - self.append_regression_df(other.regression_df) - self.append_significant_improvement_df(other.significant_improvement_df) + self.append_regressions( + other.regressions, + other.significant_improvements, + other.regression_df, + other.significant_improvement_df, + ) + + def append_regressions( + self, + regressions: list[Regression], + significant_improvements: list[ScalabilityImprovement], + regression_df: DfTotalsExtended, + significant_improvement_df: DfTotalsExtended, + ) -> None: + self.regressions.extend(regressions) + self.significant_improvements.extend(significant_improvements) + self._append_regression_df(regression_df) + self._append_significant_improvement_df(significant_improvement_df) + + for regression in regressions: + self.endpoints_with_regressions.add(regression.endpoint) - def append_regression_df(self, regressions_data: DfTotalsExtended) -> None: + def _append_regression_df(self, regressions_data: DfTotalsExtended) -> None: self.regression_df = concat_df_totals_extended( [self.regression_df, regressions_data] ) - def append_significant_improvement_df( + def _append_significant_improvement_df( self, significant_improvements_data: DfTotalsExtended ) -> None: self.significant_improvement_df = concat_df_totals_extended( diff --git a/misc/python/materialize/scalability/endpoint.py b/misc/python/materialize/scalability/endpoint.py index f66db5dd9138..ed733fc6dd48 100644 --- a/misc/python/materialize/scalability/endpoint.py +++ b/misc/python/materialize/scalability/endpoint.py @@ -16,6 +16,9 @@ class Endpoint: _version: str | None = None + def __init__(self, specified_target: str): + self._specified_target = specified_target + def sql_connection(self) -> psycopg.connection.Connection[tuple[Any, ...]]: conn = psycopg.connect(self.url()) conn.autocommit = True @@ -26,6 +29,12 @@ def url(self) -> str: f"postgresql://{self.user()}:{self.password()}@{self.host()}:{self.port()}" ) + def specified_target(self) -> str: + return self._specified_target + + def resolved_target(self) -> str: + return self.specified_target() + def host(self) -> str: raise NotImplementedError diff --git a/misc/python/materialize/scalability/endpoints.py b/misc/python/materialize/scalability/endpoints.py index d4369032ea54..9c6ec55aa813 100644 --- a/misc/python/materialize/scalability/endpoints.py +++ b/misc/python/materialize/scalability/endpoints.py @@ -17,11 +17,17 @@ POSTGRES_ENDPOINT_NAME = "postgres" +TARGET_MATERIALIZE_LOCAL = "local" +TARGET_MATERIALIZE_REMOTE = "remote" +TARGET_POSTGRES = "postgres" +TARGET_HEAD = "HEAD" + class MaterializeRemote(Endpoint): """Connect to a remote Materialize instance using a psql URL""" def __init__(self, materialize_url: str) -> None: + super().__init__(specified_target=TARGET_MATERIALIZE_REMOTE) self.materialize_url = materialize_url def url(self) -> str: @@ -38,7 +44,7 @@ class PostgresContainer(Endpoint): def __init__(self, composition: Composition) -> None: self.composition = composition self._port: int | None = None - super().__init__() + super().__init__(specified_target=TARGET_POSTGRES) def host(self) -> str: return "localhost" @@ -67,6 +73,9 @@ def __str__(self) -> str: class MaterializeNonRemote(Endpoint): + def __init__(self, specified_target: str): + super().__init__(specified_target) + def host(self) -> str: return "localhost" @@ -92,6 +101,9 @@ def lift_limits(self) -> None: class MaterializeLocal(MaterializeNonRemote): """Connect to a Materialize instance running on the local host""" + def __init__(self) -> None: + super().__init__(specified_target=TARGET_MATERIALIZE_LOCAL) + def port(self) -> int: return 6875 @@ -110,6 +122,7 @@ def __init__( self, composition: Composition, specified_target: str, + resolved_target: str, image: str | None = None, alternative_image: str | None = None, ) -> None: @@ -119,8 +132,11 @@ def __init__( alternative_image if image != alternative_image else None ) self._port: int | None = None - self.specified_target = specified_target - super().__init__() + self._resolved_target = resolved_target + super().__init__(specified_target) + + def resolved_target(self) -> str: + return self._resolved_target def port(self) -> int: assert self._port is not None diff --git a/misc/python/materialize/scalability/regression_assessment.py b/misc/python/materialize/scalability/regression_assessment.py new file mode 100644 index 000000000000..a7b1a3758279 --- /dev/null +++ b/misc/python/materialize/scalability/regression_assessment.py @@ -0,0 +1,101 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. +from __future__ import annotations + +from materialize.docker import ( + get_mz_version_from_image_tag, + is_image_tag_of_version, +) +from materialize.mz_version import MzVersion +from materialize.scalability.comparison_outcome import ComparisonOutcome +from materialize.scalability.endpoint import Endpoint +from materialize.version_list import ( + ANCESTOR_OVERRIDES_FOR_SCALABILITY_REGRESSIONS, + get_commits_of_accepted_regressions_between_versions, +) + + +class RegressionAssessment: + def __init__( + self, + baseline_endpoint: Endpoint | None, + comparison_outcome: ComparisonOutcome, + ): + self.baseline_endpoint = baseline_endpoint + self.comparison_outcome = comparison_outcome + self.endpoints_with_regressions_and_justifications: dict[ + Endpoint, str | None + ] = {} + self.check_targets() + assert len(comparison_outcome.endpoints_with_regressions) == len( + self.endpoints_with_regressions_and_justifications + ) + + def has_comparison_target(self) -> bool: + return self.baseline_endpoint is not None + + def has_regressions(self) -> bool: + return self.comparison_outcome.has_regressions() + + def has_unjustified_regressions(self): + return any( + justification is None + for justification in self.endpoints_with_regressions_and_justifications.values() + ) + + def check_targets(self) -> None: + if self.baseline_endpoint is None: + return + + if not self.comparison_outcome.has_regressions(): + return + + if not self._endpoint_references_release_version(self.baseline_endpoint): + # justified regressions require a version as comparison target + self._mark_all_targets_with_regressions_as_unjustified() + return + + baseline_version = get_mz_version_from_image_tag( + self.baseline_endpoint.resolved_target() + ) + + for endpoint in self.comparison_outcome.endpoints_with_regressions: + if not self._endpoint_references_release_version(endpoint): + continue + + endpoint_version = get_mz_version_from_image_tag(endpoint.resolved_target()) + + if baseline_version >= endpoint_version: + # not supported, should not be a relevant case + continue + + commits_with_accepted_regressions = ( + get_commits_of_accepted_regressions_between_versions( + ANCESTOR_OVERRIDES_FOR_SCALABILITY_REGRESSIONS, + since_version_exclusive=baseline_version, + to_version_inclusive=endpoint_version, + ) + ) + + if len(commits_with_accepted_regressions) > 0: + self.endpoints_with_regressions_and_justifications[ + endpoint + ] = ", ".join(commits_with_accepted_regressions) + else: + self.endpoints_with_regressions_and_justifications[endpoint] = None + + def _mark_all_targets_with_regressions_as_unjustified(self) -> None: + for endpoint in self.comparison_outcome.endpoints_with_regressions: + self.endpoints_with_regressions_and_justifications[endpoint] = None + + def _endpoint_references_release_version(self, endpoint: Endpoint) -> bool: + target = endpoint.resolved_target() + return is_image_tag_of_version(target) and MzVersion.is_valid_version_string( + target + ) diff --git a/misc/python/materialize/scalability/result_analyzers.py b/misc/python/materialize/scalability/result_analyzers.py index 6f9b1c2dd34b..1495e377017f 100644 --- a/misc/python/materialize/scalability/result_analyzers.py +++ b/misc/python/materialize/scalability/result_analyzers.py @@ -60,10 +60,10 @@ def perform_comparison_in_workload( workload_name, other_endpoint, ) - comparison_outcome.regressions.extend(regressions) - comparison_outcome.significant_improvements.extend(improvements) - comparison_outcome.append_regression_df(entries_worse_than_threshold) - comparison_outcome.append_significant_improvement_df( - entries_better_than_threshold + comparison_outcome.append_regressions( + regressions, + improvements, + entries_worse_than_threshold, + entries_better_than_threshold, ) return comparison_outcome diff --git a/misc/python/materialize/version_list.py b/misc/python/materialize/version_list.py index fb5f0164958b..fe7f290093f0 100644 --- a/misc/python/materialize/version_list.py +++ b/misc/python/materialize/version_list.py @@ -391,6 +391,35 @@ def is_valid_release_image(version: MzVersion) -> bool: return docker.image_of_release_version_exists(version) +def get_commits_of_accepted_regressions_between_versions( + ancestor_overrides: dict[str, MzVersion], + since_version_exclusive: MzVersion, + to_version_inclusive: MzVersion, +) -> list[str]: + """ + Get commits of accepted regressions between both versions. + :param ancestor_overrides: one of #ANCESTOR_OVERRIDES_FOR_PERFORMANCE_REGRESSIONS, #ANCESTOR_OVERRIDES_FOR_SCALABILITY_REGRESSIONS, #ANCESTOR_OVERRIDES_FOR_CORRECTNESS_REGRESSIONS + :return: commits + """ + + assert since_version_exclusive <= to_version_inclusive + + commits = [] + + for ( + regression_introducing_commit, + first_version_with_regression, + ) in ancestor_overrides.items(): + if ( + since_version_exclusive + < first_version_with_regression + <= to_version_inclusive + ): + commits.append(regression_introducing_commit) + + return commits + + class VersionsFromDocs: """Materialize versions as listed in doc/user/content/versions diff --git a/test/feature-benchmark/mzcompose.py b/test/feature-benchmark/mzcompose.py index 58e25179f21c..ede89f04785b 100644 --- a/test/feature-benchmark/mzcompose.py +++ b/test/feature-benchmark/mzcompose.py @@ -14,8 +14,11 @@ import uuid from textwrap import dedent +from materialize.docker import is_image_tag_of_version +from materialize.mz_version import MzVersion from materialize.version_list import ( ANCESTOR_OVERRIDES_FOR_PERFORMANCE_REGRESSIONS, + get_commits_of_accepted_regressions_between_versions, get_latest_published_version, resolve_ancestor_image_tag, ) @@ -358,12 +361,14 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: # Build the list of scenarios to run root_scenario = globals()[args.root_scenario] - scenarios = [] + selected_scenarios = [] if root_scenario.__subclasses__(): - scenarios = [s for s in all_subclasses(root_scenario) if not s.__subclasses__()] + selected_scenarios = [ + s for s in all_subclasses(root_scenario) if not s.__subclasses__() + ] else: - scenarios = [root_scenario] + selected_scenarios = [root_scenario] dependencies = ["postgres"] @@ -374,15 +379,17 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: c.up(*dependencies) + scenarios_to_run = selected_scenarios + scenarios_with_regressions = [] for cycle in range(0, args.max_retries): print( - f"Cycle {cycle+1} with scenarios: {', '.join([scenario.__name__ for scenario in scenarios])}" + f"Cycle {cycle+1} with scenarios: {', '.join([scenario.__name__ for scenario in scenarios_to_run])}" ) report = Report() scenarios_with_regressions = [] - for scenario in scenarios: + for scenario in scenarios_to_run: comparators = run_one_scenario(c, scenario, args) report.extend(comparators) @@ -393,12 +400,54 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: print(f"+++ Benchmark Report for cycle {cycle+1}:") report.dump() - scenarios = scenarios_with_regressions - if not scenarios: + scenarios_to_run = scenarios_with_regressions + if len(scenarios_to_run) == 0: break - if scenarios: + if len(scenarios_with_regressions) > 0: + regressions_justified, comment = _are_regressions_justified( + this_tag=args.this_tag, baseline_tag=args.other_tag + ) + + print("+++ Regressions") + print( - f"ERROR: The following scenarios have regressions: {', '.join([scenario.__name__ for scenario in scenarios])}" + f"{'INFO' if regressions_justified else 'ERROR'}:" + f" The following scenarios have regressions:" + f" {', '.join([scenario.__name__ for scenario in scenarios_with_regressions])}" ) - sys.exit(1) + + if regressions_justified: + print(f"However, the regressions are accepted. {comment}") + else: + sys.exit(1) + + +def _are_regressions_justified(this_tag: str, baseline_tag: str) -> tuple[bool, str]: + if not _tag_references_release_version( + this_tag + ) or not _tag_references_release_version(baseline_tag): + return False, "" + + this_version = MzVersion.parse_mz(this_tag) + baseline_version = MzVersion.parse_mz(baseline_tag) + + commits_with_regressions = get_commits_of_accepted_regressions_between_versions( + ANCESTOR_OVERRIDES_FOR_PERFORMANCE_REGRESSIONS, + since_version_exclusive=baseline_version, + to_version_inclusive=this_version, + ) + + if len(commits_with_regressions) == 0: + return False, "" + else: + return ( + True, + f"Accepted regressions were introduced with these commits: {commits_with_regressions}", + ) + + +def _tag_references_release_version(image_tag: str) -> bool: + return is_image_tag_of_version(image_tag) and MzVersion.is_valid_version_string( + image_tag + ) diff --git a/test/scalability/mzcompose.py b/test/scalability/mzcompose.py index 62d6962cdc8b..3dd4cccff6ef 100644 --- a/test/scalability/mzcompose.py +++ b/test/scalability/mzcompose.py @@ -26,6 +26,10 @@ from materialize.scalability.df.df_totals import DfTotalsExtended from materialize.scalability.endpoint import Endpoint from materialize.scalability.endpoints import ( + TARGET_HEAD, + TARGET_MATERIALIZE_LOCAL, + TARGET_MATERIALIZE_REMOTE, + TARGET_POSTGRES, MaterializeContainer, MaterializeLocal, MaterializeRemote, @@ -38,6 +42,7 @@ plot_duration_by_endpoints_for_workload, plot_tps_per_connections, ) +from materialize.scalability.regression_assessment import RegressionAssessment from materialize.scalability.result_analyzer import ResultAnalyzer from materialize.scalability.result_analyzers import DefaultResultAnalyzer from materialize.scalability.schema import Schema, TransactionIsolation @@ -207,13 +212,20 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: create_plots(benchmark_result, baseline_endpoint) upload_plots_to_buildkite() + regression_assessment = RegressionAssessment( + baseline_endpoint, + benchmark_result.overall_comparison_outcome, + ) + report_regression_result( baseline_endpoint, regression_threshold, benchmark_result.overall_comparison_outcome, ) - if benchmark_result.overall_comparison_outcome.has_regressions(): + report_assessment(regression_assessment) + + if regression_assessment.has_unjustified_regressions(): sys.exit(1) @@ -239,34 +251,37 @@ def get_baseline_and_other_endpoints( ) -> tuple[Endpoint | None, list[Endpoint]]: baseline_endpoint: Endpoint | None = None other_endpoints: list[Endpoint] = [] - for i, target in enumerate(args.target): - original_target = target + for i, specified_target in enumerate(args.target): endpoint: Endpoint | None = None - if target == "local": + if specified_target == TARGET_MATERIALIZE_LOCAL: endpoint = MaterializeLocal() - elif target == "remote": + elif specified_target == TARGET_MATERIALIZE_REMOTE: endpoint = MaterializeRemote(materialize_url=args.materialize_url[i]) - elif target == "postgres": + elif specified_target == TARGET_POSTGRES: endpoint = PostgresContainer(composition=c) - elif target == "HEAD": + elif specified_target == TARGET_HEAD: endpoint = MaterializeContainer( - composition=c, specified_target=original_target + composition=c, + specified_target=specified_target, + resolved_target=specified_target, ) else: - if target == "common-ancestor": - target = resolve_ancestor_image_tag( + resolved_target = specified_target + if specified_target == "common-ancestor": + resolved_target = resolve_ancestor_image_tag( ANCESTOR_OVERRIDES_FOR_SCALABILITY_REGRESSIONS ) endpoint = MaterializeContainer( composition=c, - specified_target=original_target, - image=f"materialize/materialized:{target}", + specified_target=specified_target, + resolved_target=resolved_target, + image=f"materialize/materialized:{resolved_target}", alternative_image="materialize/materialized:latest", ) assert endpoint is not None - if original_target == regression_against_target: + if specified_target == regression_against_target: baseline_endpoint = endpoint else: other_endpoints.append(endpoint) @@ -302,7 +317,8 @@ def report_regression_result( baseline_desc = endpoint_name_to_description(baseline_endpoint.try_load_version()) - if outcome.has_regressions() or outcome.has_significant_improvements(): + print("+++ Scalability changes") + if outcome.has_scalability_changes(): print( f"{'ERROR' if outcome.has_regressions() else 'INFO'}: " f"The following scalability changes were detected " @@ -318,6 +334,41 @@ def report_regression_result( print("No scalability changes were detected.") +def report_assessment(regression_assessment: RegressionAssessment): + print("+++ Assessment of regressions") + + if not regression_assessment.has_comparison_target(): + print("No comparison was performed because not baseline was specified") + return + + assert regression_assessment.baseline_endpoint is not None + + if not regression_assessment.has_regressions(): + print("No regressions were detected") + return + + baseline_desc = endpoint_name_to_description( + regression_assessment.baseline_endpoint.try_load_version() + ) + for ( + endpoint_with_regression, + justification, + ) in regression_assessment.endpoints_with_regressions_and_justifications.items(): + endpoint_desc = endpoint_name_to_description( + endpoint_with_regression.try_load_version() + ) + + if justification is None: + print( + f"* There are regressions between baseline {baseline_desc} and endpoint {endpoint_desc} that need to be checked." + ) + else: + print( + f"* Although there are regressions between baseline {baseline_desc} and endpoint {endpoint_desc}," + f" they can be explained by the following commits that are marked as accepted regressions: {justification}." + ) + + def create_result_analyzer(regression_threshold: float) -> ResultAnalyzer: return DefaultResultAnalyzer(max_deviation_as_percent_decimal=regression_threshold)