From f887459d1ac844d27458fe2903b2b2d9ba012078 Mon Sep 17 00:00:00 2001 From: Dmitry Ratushnyy <132273757+dmitry-ratushnyy@users.noreply.github.com> Date: Sat, 22 Jul 2023 18:34:03 +0200 Subject: [PATCH] [DPE-1720] Add backups to k8s charm and configure PBM (#172) Co-authored-by: Mehdi Bendriss --- actions.yaml | 16 + lib/charms/data_platform_libs/v0/s3.py | 723 +++++++++++ lib/charms/mongodb/v0/helpers.py | 61 +- lib/charms/mongodb/v0/mongodb_backups.py | 359 ++++++ lib/charms/operator_libs_linux/v1/snap.py | 1065 +++++++++++++++++ metadata.yaml | 3 + src/charm.py | 214 +++- src/config.py | 2 + tests/integration/backup_tests/__init__.py | 2 + tests/integration/backup_tests/helpers.py | 149 +++ .../integration/backup_tests/test_backups.py | 419 +++++++ tests/unit/test_charm.py | 1 + tests/unit/test_mongodb_backups.py | 740 ++++++++++++ tox.ini | 14 +- 14 files changed, 3744 insertions(+), 24 deletions(-) create mode 100644 lib/charms/data_platform_libs/v0/s3.py create mode 100644 lib/charms/mongodb/v0/mongodb_backups.py create mode 100644 lib/charms/operator_libs_linux/v1/snap.py create mode 100644 tests/integration/backup_tests/__init__.py create mode 100644 tests/integration/backup_tests/helpers.py create mode 100644 tests/integration/backup_tests/test_backups.py create mode 100644 tests/unit/test_mongodb_backups.py diff --git a/actions.yaml b/actions.yaml index 32e883f99..a82f6a978 100644 --- a/actions.yaml +++ b/actions.yaml @@ -17,6 +17,22 @@ set-password: password: type: string description: The password will be auto-generated if this option is not specified. + +create-backup: + description: Create a database backup. + S3 credentials are retrieved from a relation with the S3 integrator charm. + +list-backups: + description: List available backup_ids in the S3 bucket and path provided by the S3 integrator charm. + +restore: + description: Restore a database backup. + S3 credentials are retrieved from a relation with the S3 integrator charm. + params: + backup-id: + type: string + description: A backup-id to identify the backup to restore. Format of <%Y-%m-%dT%H:%M:%SZ> + set-tls-private-key: description: Set the privates key, which will be used for certificate signing requests (CSR). Run for each unit separately. params: diff --git a/lib/charms/data_platform_libs/v0/s3.py b/lib/charms/data_platform_libs/v0/s3.py new file mode 100644 index 000000000..9fb518a56 --- /dev/null +++ b/lib/charms/data_platform_libs/v0/s3.py @@ -0,0 +1,723 @@ +# Copyright 2023 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""A library for communicating with the S3 credentials providers and consumers. + +This library provides the relevant interface code implementing the communication +specification for fetching, retrieving, triggering, and responding to events related to +the S3 provider charm and its consumers. + +### Provider charm + +The provider is implemented in the `s3-provider` charm which is meant to be deployed +alongside one or more consumer charms. The provider charm is serving the s3 credentials and +metadata needed to communicate and work with an S3 compatible backend. + +Example: +```python + +from charms.data_platform_libs.v0.s3 import CredentialRequestedEvent, S3Provider + + +class ExampleProviderCharm(CharmBase): + def __init__(self, *args) -> None: + super().__init__(*args) + self.s3_provider = S3Provider(self, "s3-credentials") + + self.framework.observe(self.s3_provider.on.credentials_requested, + self._on_credential_requested) + + def _on_credential_requested(self, event: CredentialRequestedEvent): + if not self.unit.is_leader(): + return + + # get relation id + relation_id = event.relation.id + + # get bucket name + bucket = event.bucket + + # S3 configuration parameters + desired_configuration = {"access-key": "your-access-key", "secret-key": + "your-secret-key", "bucket": "your-bucket"} + + # update the configuration + self.s3_provider.update_connection_info(relation_id, desired_configuration) + + # or it is possible to set each field independently + + self.s3_provider.set_secret_key(relation_id, "your-secret-key") + + +if __name__ == "__main__": + main(ExampleProviderCharm) + + +### Requirer charm + +The requirer charm is the charm requiring the S3 credentials. +An example of requirer charm is the following: + +Example: +```python + +from charms.data_platform_libs.v0.s3 import ( + CredentialsChangedEvent, + CredentialsGoneEvent, + S3Requirer +) + +class ExampleRequirerCharm(CharmBase): + + def __init__(self, *args): + super().__init__(*args) + + bucket_name = "test-bucket" + # if bucket name is not provided the bucket name will be generated + # e.g., ('relation-{relation.id}') + + self.s3_client = S3Requirer(self, "s3-credentials", bucket_name) + + self.framework.observe(self.s3_client.on.credentials_changed, self._on_credential_changed) + self.framework.observe(self.s3_client.on.credentials_gone, self._on_credential_gone) + + def _on_credential_changed(self, event: CredentialsChangedEvent): + + # access single parameter credential + secret_key = event.secret_key + access_key = event.access_key + + # or as alternative all credentials can be collected as a dictionary + credentials = self.s3_client.get_s3_credentials() + + def _on_credential_gone(self, event: CredentialsGoneEvent): + # credentials are removed + pass + + if __name__ == "__main__": + main(ExampleRequirerCharm) +``` + +""" +import json +import logging +from collections import namedtuple +from typing import Dict, List, Optional + +import ops.charm +import ops.framework +import ops.model +from ops.charm import ( + CharmBase, + CharmEvents, + EventSource, + Object, + ObjectEvents, + RelationBrokenEvent, + RelationChangedEvent, + RelationEvent, + RelationJoinedEvent, +) +from ops.model import Relation + +# The unique Charmhub library identifier, never change it +LIBID = "fca396f6254246c9bfa565b1f85ab528" + +# Increment this major API version when introducing breaking changes +LIBAPI = 0 + +# Increment this PATCH version before using `charmcraft publish-lib` or reset +# to 0 if you are raising the major API version +LIBPATCH = 2 + +logger = logging.getLogger(__name__) + +Diff = namedtuple("Diff", "added changed deleted") +Diff.__doc__ = """ +A tuple for storing the diff between two data mappings. + +added - keys that were added +changed - keys that still exist but have new values +deleted - key that were deleted""" + + +def diff(event: RelationChangedEvent, bucket: str) -> Diff: + """Retrieves the diff of the data in the relation changed databag. + + Args: + event: relation changed event. + bucket: bucket of the databag (app or unit) + + Returns: + a Diff instance containing the added, deleted and changed + keys from the event relation databag. + """ + # Retrieve the old data from the data key in the application relation databag. + old_data = json.loads(event.relation.data[bucket].get("data", "{}")) + # Retrieve the new data from the event relation databag. + new_data = { + key: value for key, value in event.relation.data[event.app].items() if key != "data" + } + + # These are the keys that were added to the databag and triggered this event. + added = new_data.keys() - old_data.keys() + # These are the keys that were removed from the databag and triggered this event. + deleted = old_data.keys() - new_data.keys() + # These are the keys that already existed in the databag, + # but had their values changed. + changed = {key for key in old_data.keys() & new_data.keys() if old_data[key] != new_data[key]} + + # TODO: evaluate the possibility of losing the diff if some error + # happens in the charm before the diff is completely checked (DPE-412). + # Convert the new_data to a serializable format and save it for a next diff check. + event.relation.data[bucket].update({"data": json.dumps(new_data)}) + + # Return the diff with all possible changes. + return Diff(added, changed, deleted) + + +class BucketEvent(RelationEvent): + """Base class for bucket events.""" + + @property + def bucket(self) -> Optional[str]: + """Returns the bucket was requested.""" + return self.relation.data[self.relation.app].get("bucket") + + +class CredentialRequestedEvent(BucketEvent): + """Event emitted when a set of credential is requested for use on this relation.""" + + +class S3CredentialEvents(CharmEvents): + """Event descriptor for events raised by S3Provider.""" + + credentials_requested = EventSource(CredentialRequestedEvent) + + +class S3Provider(Object): + """A provider handler for communicating S3 credentials to consumers.""" + + on = S3CredentialEvents() + + def __init__( + self, + charm: CharmBase, + relation_name: str, + ): + super().__init__(charm, relation_name) + self.charm = charm + self.local_app = self.charm.model.app + self.local_unit = self.charm.unit + self.relation_name = relation_name + + # monitor relation changed event for changes in the credentials + self.framework.observe(charm.on[relation_name].relation_changed, self._on_relation_changed) + + def _on_relation_changed(self, event: RelationChangedEvent) -> None: + """React to the relation changed event by consuming data.""" + if not self.charm.unit.is_leader(): + return + diff = self._diff(event) + # emit on credential requested if bucket is provided by the requirer application + if "bucket" in diff.added: + self.on.credentials_requested.emit(event.relation, app=event.app, unit=event.unit) + + def _load_relation_data(self, raw_relation_data: dict) -> dict: + """Loads relation data from the relation data bag. + + Args: + raw_relation_data: Relation data from the databag + Returns: + dict: Relation data in dict format. + """ + connection_data = dict() + for key in raw_relation_data: + try: + connection_data[key] = json.loads(raw_relation_data[key]) + except (json.decoder.JSONDecodeError, TypeError): + connection_data[key] = raw_relation_data[key] + return connection_data + + # def _diff(self, event: RelationChangedEvent) -> Diff: + # """Retrieves the diff of the data in the relation changed databag. + + # Args: + # event: relation changed event. + + # Returns: + # a Diff instance containing the added, deleted and changed + # keys from the event relation databag. + # """ + # # Retrieve the old data from the data key in the application relation databag. + # old_data = json.loads(event.relation.data[self.local_app].get("data", "{}")) + # # Retrieve the new data from the event relation databag. + # new_data = { + # key: value for key, value in event.relation.data[event.app].items() if key != "data" + # } + + # # These are the keys that were added to the databag and triggered this event. + # added = new_data.keys() - old_data.keys() + # # These are the keys that were removed from the databag and triggered this event. + # deleted = old_data.keys() - new_data.keys() + # # These are the keys that already existed in the databag, + # # but had their values changed. + # changed = { + # key for key in old_data.keys() & new_data.keys() if old_data[key] != new_data[key] + # } + + # # TODO: evaluate the possibility of losing the diff if some error + # # happens in the charm before the diff is completely checked (DPE-412). + # # Convert the new_data to a serializable format and save it for a next diff check. + # event.relation.data[self.local_app].update({"data": json.dumps(new_data)}) + + # # Return the diff with all possible changes. + # return Diff(added, changed, deleted) + + def _diff(self, event: RelationChangedEvent) -> Diff: + """Retrieves the diff of the data in the relation changed databag. + + Args: + event: relation changed event. + + Returns: + a Diff instance containing the added, deleted and changed + keys from the event relation databag. + """ + return diff(event, self.local_app) + + def fetch_relation_data(self) -> dict: + """Retrieves data from relation. + + This function can be used to retrieve data from a relation + in the charm code when outside an event callback. + + Returns: + a dict of the values stored in the relation data bag + for all relation instances (indexed by the relation id). + """ + data = {} + for relation in self.relations: + data[relation.id] = { + key: value for key, value in relation.data[relation.app].items() if key != "data" + } + return data + + def update_connection_info(self, relation_id: int, connection_data: dict) -> None: + """Updates the credential data as set of key-value pairs in the relation. + + This function writes in the application data bag, therefore, + only the leader unit can call it. + + Args: + relation_id: the identifier for a particular relation. + connection_data: dict containing the key-value pairs + that should be updated. + """ + # check and write changes only if you are the leader + if not self.local_unit.is_leader(): + return + + relation = self.charm.model.get_relation(self.relation_name, relation_id) + + if not relation: + return + + # configuration options that are list + s3_list_options = ["attributes", "tls-ca-chain"] + + # update the databag, if connection data did not change with respect to before + # the relation changed event is not triggered + updated_connection_data = {} + for configuration_option, configuration_value in connection_data.items(): + if configuration_option in s3_list_options: + updated_connection_data[configuration_option] = json.dumps(configuration_value) + else: + updated_connection_data[configuration_option] = configuration_value + + relation.data[self.local_app].update(updated_connection_data) + logger.debug(f"Updated S3 connection info: {updated_connection_data}") + + @property + def relations(self) -> List[Relation]: + """The list of Relation instances associated with this relation_name.""" + return list(self.charm.model.relations[self.relation_name]) + + def set_bucket(self, relation_id: int, bucket: str) -> None: + """Sets bucket name in application databag. + + This function writes in the application data bag, therefore, + only the leader unit can call it. + + Args: + relation_id: the identifier for a particular relation. + bucket: the bucket name. + """ + self.update_connection_info(relation_id, {"bucket": bucket}) + + def set_access_key(self, relation_id: int, access_key: str) -> None: + """Sets access-key value in application databag. + + This function writes in the application data bag, therefore, + only the leader unit can call it. + + Args: + relation_id: the identifier for a particular relation. + access_key: the access-key value. + """ + self.update_connection_info(relation_id, {"access-key": access_key}) + + def set_secret_key(self, relation_id: int, secret_key: str) -> None: + """Sets the secret key value in application databag. + + This function writes in the application data bag, therefore, + only the leader unit can call it. + + Args: + relation_id: the identifier for a particular relation. + secret_key: the value of the secret key. + """ + self.update_connection_info(relation_id, {"secret-key": secret_key}) + + def set_path(self, relation_id: int, path: str) -> None: + """Sets the path value in application databag. + + This function writes in the application data bag, therefore, + only the leader unit can call it. + + Args: + relation_id: the identifier for a particular relation. + path: the path value. + """ + self.update_connection_info(relation_id, {"path": path}) + + def set_endpoint(self, relation_id: int, endpoint: str) -> None: + """Sets the endpoint address in application databag. + + This function writes in the application data bag, therefore, + only the leader unit can call it. + + Args: + relation_id: the identifier for a particular relation. + endpoint: the endpoint address. + """ + self.update_connection_info(relation_id, {"endpoint": endpoint}) + + def set_region(self, relation_id: int, region: str) -> None: + """Sets the region location in application databag. + + This function writes in the application data bag, therefore, + only the leader unit can call it. + + Args: + relation_id: the identifier for a particular relation. + region: the region address. + """ + self.update_connection_info(relation_id, {"region": region}) + + def set_s3_uri_style(self, relation_id: int, s3_uri_style: str) -> None: + """Sets the S3 URI style in application databag. + + This function writes in the application data bag, therefore, + only the leader unit can call it. + + Args: + relation_id: the identifier for a particular relation. + s3_uri_style: the s3 URI style. + """ + self.update_connection_info(relation_id, {"s3-uri-style": s3_uri_style}) + + def set_storage_class(self, relation_id: int, storage_class: str) -> None: + """Sets the storage class in application databag. + + This function writes in the application data bag, therefore, + only the leader unit can call it. + + Args: + relation_id: the identifier for a particular relation. + storage_class: the storage class. + """ + self.update_connection_info(relation_id, {"storage-class": storage_class}) + + def set_tls_ca_chain(self, relation_id: int, tls_ca_chain: List[str]) -> None: + """Sets the tls_ca_chain value in application databag. + + This function writes in the application data bag, therefore, + only the leader unit can call it. + + Args: + relation_id: the identifier for a particular relation. + tls_ca_chain: the TLS Chain value. + """ + self.update_connection_info(relation_id, {"tls-ca-chain": tls_ca_chain}) + + def set_s3_api_version(self, relation_id: int, s3_api_version: str) -> None: + """Sets the S3 API version in application databag. + + This function writes in the application data bag, therefore, + only the leader unit can call it. + + Args: + relation_id: the identifier for a particular relation. + s3_api_version: the S3 version value. + """ + self.update_connection_info(relation_id, {"s3-api-version": s3_api_version}) + + def set_attributes(self, relation_id: int, attributes: List[str]) -> None: + """Sets the connection attributes in application databag. + + This function writes in the application data bag, therefore, + only the leader unit can call it. + + Args: + relation_id: the identifier for a particular relation. + attributes: the attributes value. + """ + self.update_connection_info(relation_id, {"attributes": attributes}) + + +class S3Event(RelationEvent): + """Base class for S3 storage events.""" + + @property + def bucket(self) -> Optional[str]: + """Returns the bucket name.""" + return self.relation.data[self.relation.app].get("bucket") + + @property + def access_key(self) -> Optional[str]: + """Returns the access key.""" + return self.relation.data[self.relation.app].get("access-key") + + @property + def secret_key(self) -> Optional[str]: + """Returns the secret key.""" + return self.relation.data[self.relation.app].get("secret-key") + + @property + def path(self) -> Optional[str]: + """Returns the path where data can be stored.""" + return self.relation.data[self.relation.app].get("path") + + @property + def endpoint(self) -> Optional[str]: + """Returns the endpoint address.""" + return self.relation.data[self.relation.app].get("endpoint") + + @property + def region(self) -> Optional[str]: + """Returns the region.""" + return self.relation.data[self.relation.app].get("region") + + @property + def s3_uri_style(self) -> Optional[str]: + """Returns the s3 uri style.""" + return self.relation.data[self.relation.app].get("s3-uri-style") + + @property + def storage_class(self) -> Optional[str]: + """Returns the storage class name.""" + return self.relation.data[self.relation.app].get("storage-class") + + @property + def tls_ca_chain(self) -> Optional[List[str]]: + """Returns the TLS CA chain.""" + tls_ca_chain = self.relation.data[self.relation.app].get("tls-ca-chain") + if tls_ca_chain is not None: + return json.loads(tls_ca_chain) + return None + + @property + def s3_api_version(self) -> Optional[str]: + """Returns the S3 API version.""" + return self.relation.data[self.relation.app].get("s3-api-version") + + @property + def attributes(self) -> Optional[List[str]]: + """Returns the attributes.""" + attributes = self.relation.data[self.relation.app].get("attributes") + if attributes is not None: + return json.loads(attributes) + return None + + +class CredentialsChangedEvent(S3Event): + """Event emitted when S3 credential are changed on this relation.""" + + +class CredentialsGoneEvent(RelationEvent): + """Event emitted when S3 credential are removed from this relation.""" + + +class S3CredentialRequiresEvents(ObjectEvents): + """Event descriptor for events raised by the S3Provider.""" + + credentials_changed = EventSource(CredentialsChangedEvent) + credentials_gone = EventSource(CredentialsGoneEvent) + + +S3_REQUIRED_OPTIONS = ["access-key", "secret-key"] + + +class S3Requirer(Object): + """Requires-side of the s3 relation.""" + + on = S3CredentialRequiresEvents() + + def __init__(self, charm: ops.charm.CharmBase, relation_name: str, bucket_name: str = None): + """Manager of the s3 client relations.""" + super().__init__(charm, relation_name) + + self.relation_name = relation_name + self.charm = charm + self.local_app = self.charm.model.app + self.local_unit = self.charm.unit + self.bucket = bucket_name + + self.framework.observe( + self.charm.on[self.relation_name].relation_changed, self._on_relation_changed + ) + + self.framework.observe( + self.charm.on[self.relation_name].relation_joined, self._on_relation_joined + ) + + self.framework.observe( + self.charm.on[self.relation_name].relation_broken, + self._on_relation_broken, + ) + + def _generate_bucket_name(self, event: RelationJoinedEvent): + """Returns the bucket name generated from relation id.""" + return f"relation-{event.relation.id}" + + def _on_relation_joined(self, event: RelationJoinedEvent) -> None: + """Event emitted when the application joins the s3 relation.""" + if self.bucket is None: + self.bucket = self._generate_bucket_name(event) + self.update_connection_info(event.relation.id, {"bucket": self.bucket}) + + def fetch_relation_data(self) -> dict: + """Retrieves data from relation. + + This function can be used to retrieve data from a relation + in the charm code when outside an event callback. + + Returns: + a dict of the values stored in the relation data bag + for all relation instances (indexed by the relation id). + """ + data = {} + + for relation in self.relations: + data[relation.id] = self._load_relation_data(relation.data[self.charm.app]) + return data + + def update_connection_info(self, relation_id: int, connection_data: dict) -> None: + """Updates the credential data as set of key-value pairs in the relation. + + This function writes in the application data bag, therefore, + only the leader unit can call it. + + Args: + relation_id: the identifier for a particular relation. + connection_data: dict containing the key-value pairs + that should be updated. + """ + # check and write changes only if you are the leader + if not self.local_unit.is_leader(): + return + + relation = self.charm.model.get_relation(self.relation_name, relation_id) + + if not relation: + return + + # update the databag, if connection data did not change with respect to before + # the relation changed event is not triggered + # configuration options that are list + s3_list_options = ["attributes", "tls-ca-chain"] + updated_connection_data = {} + for configuration_option, configuration_value in connection_data.items(): + if configuration_option in s3_list_options: + updated_connection_data[configuration_option] = json.dumps(configuration_value) + else: + updated_connection_data[configuration_option] = configuration_value + + relation.data[self.local_app].update(updated_connection_data) + logger.debug(f"Updated S3 credentials: {updated_connection_data}") + + def _load_relation_data(self, raw_relation_data: dict) -> dict: + """Loads relation data from the relation data bag. + + Args: + raw_relation_data: Relation data from the databag + Returns: + dict: Relation data in dict format. + """ + connection_data = dict() + for key in raw_relation_data: + try: + connection_data[key] = json.loads(raw_relation_data[key]) + except (json.decoder.JSONDecodeError, TypeError): + connection_data[key] = raw_relation_data[key] + return connection_data + + def _diff(self, event: RelationChangedEvent) -> Diff: + """Retrieves the diff of the data in the relation changed databag. + + Args: + event: relation changed event. + + Returns: + a Diff instance containing the added, deleted and changed + keys from the event relation databag. + """ + return diff(event, self.local_unit) + + def _on_relation_changed(self, event: RelationChangedEvent) -> None: + """Notify the charm about the presence of S3 credentials.""" + # check if the mandatory options are in the relation data + contains_required_options = True + # get current credentials data + credentials = self.get_s3_connection_info() + # records missing options + missing_options = [] + for configuration_option in S3_REQUIRED_OPTIONS: + if configuration_option not in credentials: + contains_required_options = False + missing_options.append(configuration_option) + # emit credential change event only if all mandatory fields are present + if contains_required_options: + self.on.credentials_changed.emit(event.relation, app=event.app, unit=event.unit) + else: + logger.warning( + f"Some mandatory fields: {missing_options} are not present, do not emit credential change event!" + ) + + def get_s3_connection_info(self) -> Dict: + """Return the s3 credentials as a dictionary.""" + relation = self.charm.model.get_relation(self.relation_name) + if not relation: + return {} + return self._load_relation_data(relation.data[relation.app]) + + def _on_relation_broken(self, event: RelationBrokenEvent) -> None: + """Notify the charm about a broken S3 credential store relation.""" + self.on.credentials_gone.emit(event.relation, app=event.app, unit=event.unit) + + @property + def relations(self) -> List[Relation]: + """The list of Relation instances associated with this relation_name.""" + return list(self.charm.model.relations[self.relation_name]) diff --git a/lib/charms/mongodb/v0/helpers.py b/lib/charms/mongodb/v0/helpers.py index 1d7b81604..8d0938b06 100644 --- a/lib/charms/mongodb/v0/helpers.py +++ b/lib/charms/mongodb/v0/helpers.py @@ -7,10 +7,16 @@ import secrets import string import subprocess -from typing import List +from typing import List, Optional, Union from charms.mongodb.v0.mongodb import MongoDBConfiguration, MongoDBConnection -from ops.model import ActiveStatus, BlockedStatus, StatusBase, WaitingStatus +from ops.model import ( + ActiveStatus, + BlockedStatus, + MaintenanceStatus, + StatusBase, + WaitingStatus, +) from pymongo.errors import AutoReconnect, ServerSelectionTimeoutError # The unique Charmhub library identifier, never change it @@ -194,3 +200,54 @@ def copy_licenses_to_unit(): subprocess.check_output( "cp -r /snap/charmed-mongodb/current/licenses/* src/licenses", shell=True ) + + +_StrOrBytes = Union[str, bytes] + + +def process_pbm_error(error_string: Optional[_StrOrBytes]) -> str: + """Parses pbm error string and returns a user friendly message.""" + message = "couldn't configure s3 backup option" + if not error_string: + return message + if type(error_string) == bytes: + error_string = error_string.decode("utf-8") + if "status code: 403" in error_string: # type: ignore + message = "s3 credentials are incorrect." + elif "status code: 404" in error_string: # type: ignore + message = "s3 configurations are incompatible." + elif "status code: 301" in error_string: # type: ignore + message = "s3 configurations are incompatible." + return message + + +def current_pbm_op(pbm_status: str) -> str: + """Parses pbm status for the operation that pbm is running.""" + pbm_status_lines = pbm_status.splitlines() + for i in range(0, len(pbm_status_lines)): + line = pbm_status_lines[i] + + # operation is two lines after the line "Currently running:" + if line == "Currently running:": + return pbm_status_lines[i + 2] + + return "" + + +def process_pbm_status(pbm_status: str) -> StatusBase: + """Parses current pbm operation and returns unit status.""" + # pbm is running resync operation + if "Resync" in current_pbm_op(pbm_status): + return WaitingStatus("waiting to sync s3 configurations.") + + # no operations are currently running with pbm + if "(none)" in current_pbm_op(pbm_status): + return ActiveStatus("") + + if "Snapshot backup" in current_pbm_op(pbm_status): + return MaintenanceStatus("backup started/running") + + if "Snapshot restore" in current_pbm_op(pbm_status): + return MaintenanceStatus("restore started/running") + + return ActiveStatus() diff --git a/lib/charms/mongodb/v0/mongodb_backups.py b/lib/charms/mongodb/v0/mongodb_backups.py new file mode 100644 index 000000000..71e16807f --- /dev/null +++ b/lib/charms/mongodb/v0/mongodb_backups.py @@ -0,0 +1,359 @@ +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. + +"""In this class, we manage backup configurations and actions. + +Specifically backups are handled with Percona Backup MongoDB (pbm). +A user for PBM is created when MongoDB is first started during the start phase. +This user is named "backup". +""" + +import logging +import subprocess +import time +from typing import Dict + +from charms.data_platform_libs.v0.s3 import CredentialsChangedEvent, S3Requirer +from charms.mongodb.v0.helpers import ( + current_pbm_op, + process_pbm_error, + process_pbm_status, +) +from charms.operator_libs_linux.v1 import snap +from ops.framework import Object +from ops.model import ( + BlockedStatus, + MaintenanceStatus, + ModelError, + StatusBase, + WaitingStatus, +) +from ops.pebble import ExecError +from tenacity import ( + Retrying, + before_log, + retry, + retry_if_exception_type, + stop_after_attempt, + wait_fixed, +) + +# The unique Charmhub library identifier, never change it +LIBID = "9f2b91c6128d48d6ba22724bf365da3b" + +# Increment this major API version when introducing breaking changes +LIBAPI = 0 + +# Increment this PATCH version before using `charmcraft publish-lib` or reset +# to 0 if you are raising the major API version +LIBPATCH = 1 + +logger = logging.getLogger(__name__) + +S3_PBM_OPTION_MAP = { + "region": "storage.s3.region", + "bucket": "storage.s3.bucket", + "path": "storage.s3.prefix", + "access-key": "storage.s3.credentials.access-key-id", + "secret-key": "storage.s3.credentials.secret-access-key", + "endpoint": "storage.s3.endpointUrl", + "storage-class": "storage.s3.storageClass", +} +S3_RELATION = "s3-credentials" +REMAPPING_PATTERN = r"\ABackup doesn't match current cluster topology - it has different replica set names. Extra shards in the backup will cause this, for a simple example. The extra/unknown replica set names found in the backup are: ([^,\s]+)([.] Backup has no data for the config server or sole replicaset)?\Z" +PBM_STATUS_CMD = ["status", "-o", "json"] +MONGODB_SNAP_DATA_DIR = "/var/snap/charmed-mongodb/current" + + +class ResyncError(Exception): + """Raised when pbm is resyncing configurations and is not ready to be used.""" + + +class SetPBMConfigError(Exception): + """Raised when pbm cannot configure a given option.""" + + +class PBMBusyError(Exception): + """Raised when PBM is busy and cannot run another operation.""" + + +class MongoDBBackups(Object): + """Manages MongoDB backups.""" + + def __init__(self, charm, substrate): + """Manager of MongoDB client relations.""" + super().__init__(charm, "client-relations") + self.charm = charm + self.substrate = substrate + + # s3 relation handles the config options for s3 backups + self.s3_client = S3Requirer(self.charm, S3_RELATION) + self.framework.observe( + self.s3_client.on.credentials_changed, self._on_s3_credential_changed + ) + self.framework.observe(self.charm.on.create_backup_action, self._on_create_backup_action) + self.framework.observe(self.charm.on.list_backups_action, self._on_list_backups_action) + self.framework.observe(self.charm.on.restore_action, self._on_restore_action) + + def _on_s3_credential_changed(self, event: CredentialsChangedEvent): + """Sets pbm credentials, resyncs if necessary and reports config errors.""" + # handling PBM configurations requires that MongoDB is running and the pbm snap is + # installed. + if not self.charm.db_initialised: + logger.info("Deferring: set PBM configurations, MongoDB has not yet started.") + event.defer() + return + + try: + # TODO VM charm should implement this methodx§ + self.charm.get_backup_service() + except ModelError: + logger.info("Deferring: set PBM configurations, pbm-agent service not found.") + event.defer() + + self._configure_pbm_options(event) + + def _on_create_backup_action(self, event) -> None: + if self.model.get_relation(S3_RELATION) is None: + event.fail("Relation with s3-integrator charm missing, cannot create backup.") + return + + # only leader can create backups. This prevents multiple backups from being attempted at + # once. + if not self.charm.unit.is_leader(): + event.fail("The action can be run only on leader unit.") + return + + # cannot create backup if pbm is not ready. This could be due to: resyncing, incompatible, + # options, incorrect credentials, or already creating a backup + pbm_status = self._get_pbm_status() + self.charm.unit.status = pbm_status + if isinstance(pbm_status, MaintenanceStatus): + event.fail( + "Can only create one backup at a time, please wait for current backup to finish." + ) + return + if isinstance(pbm_status, WaitingStatus): + event.defer() + logger.debug( + "Sync-ing configurations needs more time, must wait before creating a backup." + ) + return + if isinstance(pbm_status, BlockedStatus): + event.fail(f"Cannot create backup {pbm_status.message}.") + return + + # TODO create backup + + def _on_list_backups_action(self, event) -> None: + if self.model.get_relation(S3_RELATION) is None: + event.fail("Relation with s3-integrator charm missing, cannot list backups.") + return + + # cannot list backups if pbm is resyncing, or has incompatible options or incorrect + # credentials + pbm_status = self._get_pbm_status() + self.charm.unit.status = pbm_status + if isinstance(pbm_status, WaitingStatus): + event.defer() + logger.debug( + "Sync-ing configurations needs more time, must wait before listing backups." + ) + return + if isinstance(pbm_status, BlockedStatus): + event.fail(f"Cannot list backups: {pbm_status.message}.") + return + + # TODO list backups + + def _on_restore_action(self, event) -> None: + if self.model.get_relation(S3_RELATION) is None: + event.fail("Relation with s3-integrator charm missing, cannot restore from a backup.") + return + + backup_id = event.params.get("backup-id") + if not backup_id: + event.fail("Missing backup-id to restore") + return + + # only leader can restore backups. This prevents multiple restores from being attempted at + # once. + if not self.charm.unit.is_leader(): + event.fail("The action can be run only on leader unit.") + return + + # cannot restore backup if pbm is not ready. This could be due to: resyncing, incompatible, + # options, incorrect credentials, creating a backup, or already performing a restore. + pbm_status = self._get_pbm_status() + # TOD check status + self.charm.unit.status = pbm_status + if isinstance(pbm_status, MaintenanceStatus): + event.fail("Please wait for current backup/restore to finish.") + return + if isinstance(pbm_status, WaitingStatus): + event.defer() + logger.debug("Sync-ing configurations needs more time, must wait before restoring.") + return + if isinstance(pbm_status, BlockedStatus): + event.fail(f"Cannot restore backup {pbm_status.message}.") + return + + # TODO restore backup + + # BEGIN: helper functions + + def _configure_pbm_options(self, event) -> None: + try: + self._set_config_options() + self._resync_config_options() + except SetPBMConfigError: + self.charm.unit.status = BlockedStatus("couldn't configure s3 backup options.") + return + except snap.SnapError as e: + logger.error("An exception occurred when starting pbm agent, error: %s.", str(e)) + self.charm.unit.status = BlockedStatus("couldn't start pbm") + return + except ResyncError: + self.charm.unit.status = WaitingStatus("waiting to sync s3 configurations.") + event.defer() + logger.info("Deferring: Sync-ing configurations needs more time.") + return + except PBMBusyError: + self.charm.unit.status = WaitingStatus("waiting to sync s3 configurations.") + logger.info( + "Deferring: Cannot update configs while PBM is running, must wait for PBM action to finish." + ) + event.defer() + return + except ExecError as e: + self.charm.unit.status = BlockedStatus(process_pbm_error(e.stdout)) + return + except subprocess.CalledProcessError as e: + logger.error("Syncing configurations failed: %s", str(e)) + + self.charm.unit.status = self._get_pbm_status() + + def _set_config_options(self): + """Applying given configurations with pbm.""" + # TODO VM charm should implement this method + self.charm.set_pbm_config_file() + + # the pbm tool can only set one configuration at a time. + for pbm_key, pbm_value in self._get_pbm_configs().items(): + try: + config_cmd = ["config", "--set", f"{pbm_key}={pbm_value}"] + # TODO VM charm should implement this method + self.charm.run_pbm_command(config_cmd) + except (subprocess.CalledProcessError, ExecError): + logger.error( + "Failed to configure the PBM snap option: %s", + pbm_key, + ) + raise SetPBMConfigError + + def _get_pbm_configs(self) -> Dict: + """Returns a dictionary of desired PBM configurations.""" + pbm_configs = {"storage.type": "s3"} + credentials = self.s3_client.get_s3_connection_info() + for s3_option, s3_value in credentials.items(): + if s3_option not in S3_PBM_OPTION_MAP: + continue + + pbm_configs[S3_PBM_OPTION_MAP[s3_option]] = s3_value + return pbm_configs + + def _resync_config_options(self): + """Attempts to sync pbm config options and sets status in case of failure.""" + # TODO VM charm should implement this method + self.charm.start_backup_service() + + # pbm has a flakely resync and it is necessary to wait for no actions to be running before + # resync-ing. See: https://jira.percona.com/browse/PBM-1038 + for attempt in Retrying( + stop=stop_after_attempt(20), + wait=wait_fixed(5), + reraise=True, + ): + with attempt: + pbm_status = self._get_pbm_status() + # wait for backup/restore to finish + if isinstance(pbm_status, (MaintenanceStatus)): + raise PBMBusyError + + # if a resync is running restart the service + if isinstance(pbm_status, (WaitingStatus)): + # TODO VM charm should implement this method + self.charm.restart_backup_service() + raise PBMBusyError + + # wait for re-sync and update charm status based on pbm syncing status. Need to wait for + # 2 seconds for pbm_agent to receive the resync command before verifying. + self.charm.run_pbm_command(["config", "--force-resync"]) + time.sleep(2) + self._wait_pbm_status() + + @retry( + stop=stop_after_attempt(20), + reraise=True, + retry=retry_if_exception_type(ResyncError), + before=before_log(logger, logging.DEBUG), + ) + def _wait_pbm_status(self) -> None: + """Wait for pbm_agent to resolve errors and return the status of pbm. + + The pbm status is set by the pbm_agent daemon which needs time to both resync and resolve + errors in configurations. Resync-ing is a longer process and should take around 5 minutes. + Configuration errors generally occur when the configurations change and pbm_agent is + updating, this is generally quick and should take <15s. If errors are not resolved in 15s + it means there is an incorrect configuration which will require user intervention. + + Retrying for resync is handled by decorator, retrying for configuration errors is handled + within this function. + """ + # on occasion it takes the pbm_agent daemon time to update its configs, meaning that it + # will error for incorrect configurations for <15s before resolving itself. + + for attempt in Retrying( + stop=stop_after_attempt(3), + wait=wait_fixed(5), + reraise=True, + ): + with attempt: + try: + # TODO VM charm should implement this method + pbm_status = self.charm.run_pbm_command(PBM_STATUS_CMD) + + if "Resync" in current_pbm_op(pbm_status): + # since this process takes several minutes we should let the user know + # immediately. + self.charm.unit.status = WaitingStatus( + "waiting to sync s3 configurations." + ) + raise ResyncError + except ExecError as e: + self.charm.unit.status = BlockedStatus(process_pbm_error(e.stdout)) + + def _get_pbm_status(self) -> StatusBase: + """Retrieve pbm status.""" + try: + # TODO VM charm should implement this method + self.charm.get_backup_service() + except ModelError: + return WaitingStatus("waiting for pbm to start") + + try: + # TODO VM charm should implement this method + pbm_status = self.charm.run_pbm_command(PBM_STATUS_CMD) + return process_pbm_status(pbm_status) + except ExecError as e: + logger.error("Failed to get pbm status.") + return BlockedStatus(process_pbm_error(e.stdout)) + except subprocess.CalledProcessError as e: + # pbm pipes a return code of 1, but its output shows the true error code so it is + # necessary to parse the output + return BlockedStatus(process_pbm_error(e.output)) + except Exception as e: + # pbm pipes a return code of 1, but its output shows the true error code so it is + # necessary to parse the output + logger.error(f"Failed to get pbm status: {e}") + return BlockedStatus("PBM error") diff --git a/lib/charms/operator_libs_linux/v1/snap.py b/lib/charms/operator_libs_linux/v1/snap.py new file mode 100644 index 000000000..69f604c93 --- /dev/null +++ b/lib/charms/operator_libs_linux/v1/snap.py @@ -0,0 +1,1065 @@ +# Copyright 2021 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Representations of the system's Snaps, and abstractions around managing them. + +The `snap` module provides convenience methods for listing, installing, refreshing, and removing +Snap packages, in addition to setting and getting configuration options for them. + +In the `snap` module, `SnapCache` creates a dict-like mapping of `Snap` objects at when +instantiated. Installed snaps are fully populated, and available snaps are lazily-loaded upon +request. This module relies on an installed and running `snapd` daemon to perform operations over +the `snapd` HTTP API. + +`SnapCache` objects can be used to install or modify Snap packages by name in a manner similar to +using the `snap` command from the commandline. + +An example of adding Juju to the system with `SnapCache` and setting a config value: + +```python +try: + cache = snap.SnapCache() + juju = cache["juju"] + + if not juju.present: + juju.ensure(snap.SnapState.Latest, channel="beta") + juju.set({"some.key": "value", "some.key2": "value2"}) +except snap.SnapError as e: + logger.error("An exception occurred when installing charmcraft. Reason: %s", e.message) +``` + +In addition, the `snap` module provides "bare" methods which can act on Snap packages as +simple function calls. :meth:`add`, :meth:`remove`, and :meth:`ensure` are provided, as +well as :meth:`add_local` for installing directly from a local `.snap` file. These return +`Snap` objects. + +As an example of installing several Snaps and checking details: + +```python +try: + nextcloud, charmcraft = snap.add(["nextcloud", "charmcraft"]) + if nextcloud.get("mode") != "production": + nextcloud.set({"mode": "production"}) +except snap.SnapError as e: + logger.error("An exception occurred when installing snaps. Reason: %s" % e.message) +``` +""" + +import http.client +import json +import logging +import os +import re +import socket +import subprocess +import sys +import urllib.error +import urllib.parse +import urllib.request +from collections.abc import Mapping +from datetime import datetime, timedelta, timezone +from enum import Enum +from subprocess import CalledProcessError, CompletedProcess +from typing import Any, Dict, Iterable, List, Optional, Union + +logger = logging.getLogger(__name__) + +# The unique Charmhub library identifier, never change it +LIBID = "05394e5893f94f2d90feb7cbe6b633cd" + +# Increment this major API version when introducing breaking changes +LIBAPI = 1 + +# Increment this PATCH version before using `charmcraft publish-lib` or reset +# to 0 if you are raising the major API version +LIBPATCH = 12 + + +# Regex to locate 7-bit C1 ANSI sequences +ansi_filter = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") + + +def _cache_init(func): + def inner(*args, **kwargs): + if _Cache.cache is None: + _Cache.cache = SnapCache() + return func(*args, **kwargs) + + return inner + + +# recursive hints seems to error out pytest +JSONType = Union[Dict[str, Any], List[Any], str, int, float] + + +class SnapService: + """Data wrapper for snap services.""" + + def __init__( + self, + daemon: Optional[str] = None, + daemon_scope: Optional[str] = None, + enabled: bool = False, + active: bool = False, + activators: List[str] = [], + **kwargs, + ): + self.daemon = daemon + self.daemon_scope = kwargs.get("daemon-scope", None) or daemon_scope + self.enabled = enabled + self.active = active + self.activators = activators + + def as_dict(self) -> Dict: + """Return instance representation as dict.""" + return { + "daemon": self.daemon, + "daemon_scope": self.daemon_scope, + "enabled": self.enabled, + "active": self.active, + "activators": self.activators, + } + + +class MetaCache(type): + """MetaCache class used for initialising the snap cache.""" + + @property + def cache(cls) -> "SnapCache": + """Property for returning the snap cache.""" + return cls._cache + + @cache.setter + def cache(cls, cache: "SnapCache") -> None: + """Setter for the snap cache.""" + cls._cache = cache + + def __getitem__(cls, name) -> "Snap": + """Snap cache getter.""" + return cls._cache[name] + + +class _Cache(object, metaclass=MetaCache): + _cache = None + + +class Error(Exception): + """Base class of most errors raised by this library.""" + + def __repr__(self): + """Represent the Error class.""" + return "<{}.{} {}>".format(type(self).__module__, type(self).__name__, self.args) + + @property + def name(self): + """Return a string representation of the model plus class.""" + return "<{}.{}>".format(type(self).__module__, type(self).__name__) + + @property + def message(self): + """Return the message passed as an argument.""" + return self.args[0] + + +class SnapAPIError(Error): + """Raised when an HTTP API error occurs talking to the Snapd server.""" + + def __init__(self, body: Dict, code: int, status: str, message: str): + super().__init__(message) # Makes str(e) return message + self.body = body + self.code = code + self.status = status + self._message = message + + def __repr__(self): + """Represent the SnapAPIError class.""" + return "APIError({!r}, {!r}, {!r}, {!r})".format( + self.body, self.code, self.status, self._message + ) + + +class SnapState(Enum): + """The state of a snap on the system or in the cache.""" + + Present = "present" + Absent = "absent" + Latest = "latest" + Available = "available" + + +class SnapError(Error): + """Raised when there's an error running snap control commands.""" + + +class SnapNotFoundError(Error): + """Raised when a requested snap is not known to the system.""" + + +class Snap(object): + """Represents a snap package and its properties. + + `Snap` exposes the following properties about a snap: + - name: the name of the snap + - state: a `SnapState` representation of its install status + - channel: "stable", "candidate", "beta", and "edge" are common + - revision: a string representing the snap's revision + - confinement: "classic" or "strict" + """ + + def __init__( + self, + name, + state: SnapState, + channel: str, + revision: int, + confinement: str, + apps: Optional[List[Dict[str, str]]] = None, + cohort: Optional[str] = "", + ) -> None: + self._name = name + self._state = state + self._channel = channel + self._revision = revision + self._confinement = confinement + self._cohort = cohort + self._apps = apps or [] + self._snap_client = SnapClient() + + def __eq__(self, other) -> bool: + """Equality for comparison.""" + return isinstance(other, self.__class__) and ( + self._name, + self._revision, + ) == (other._name, other._revision) + + def __hash__(self): + """Calculate a hash for this snap.""" + return hash((self._name, self._revision)) + + def __repr__(self): + """Represent the object such that it can be reconstructed.""" + return "<{}.{}: {}>".format(self.__module__, self.__class__.__name__, self.__dict__) + + def __str__(self): + """Represent the snap object as a string.""" + return "<{}: {}-{}.{} -- {}>".format( + self.__class__.__name__, + self._name, + self._revision, + self._channel, + str(self._state), + ) + + def _snap(self, command: str, optargs: Optional[Iterable[str]] = None) -> str: + """Perform a snap operation. + + Args: + command: the snap command to execute + optargs: an (optional) list of additional arguments to pass, + commonly confinement or channel + + Raises: + SnapError if there is a problem encountered + """ + optargs = optargs or [] + _cmd = ["snap", command, self._name, *optargs] + try: + return subprocess.check_output(_cmd, universal_newlines=True) + except CalledProcessError as e: + raise SnapError( + "Snap: {!r}; command {!r} failed with output = {!r}".format( + self._name, _cmd, e.output + ) + ) + + def _snap_daemons( + self, + command: List[str], + services: Optional[List[str]] = None, + ) -> CompletedProcess: + """Perform snap app commands. + + Args: + command: the snap command to execute + services: the snap service to execute command on + + Raises: + SnapError if there is a problem encountered + """ + if services: + # an attempt to keep the command constrained to the snap instance's services + services = ["{}.{}".format(self._name, service) for service in services] + else: + services = [self._name] + + _cmd = ["snap", *command, *services] + + try: + return subprocess.run(_cmd, universal_newlines=True, check=True, capture_output=True) + except CalledProcessError as e: + raise SnapError("Could not {} for snap [{}]: {}".format(_cmd, self._name, e.stderr)) + + def get(self, key) -> str: + """Fetch a snap configuration value. + + Args: + key: the key to retrieve + """ + return self._snap("get", [key]).strip() + + def set(self, config: Dict) -> str: + """Set a snap configuration value. + + Args: + config: a dictionary containing keys and values specifying the config to set. + """ + args = ['{}="{}"'.format(key, val) for key, val in config.items()] + + return self._snap("set", [*args]) + + def unset(self, key) -> str: + """Unset a snap configuration value. + + Args: + key: the key to unset + """ + return self._snap("unset", [key]) + + def start(self, services: Optional[List[str]] = None, enable: Optional[bool] = False) -> None: + """Start a snap's services. + + Args: + services (list): (optional) list of individual snap services to start (otherwise all) + enable (bool): (optional) flag to enable snap services on start. Default `false` + """ + args = ["start", "--enable"] if enable else ["start"] + self._snap_daemons(args, services) + + def stop(self, services: Optional[List[str]] = None, disable: Optional[bool] = False) -> None: + """Stop a snap's services. + + Args: + services (list): (optional) list of individual snap services to stop (otherwise all) + disable (bool): (optional) flag to disable snap services on stop. Default `False` + """ + args = ["stop", "--disable"] if disable else ["stop"] + self._snap_daemons(args, services) + + def logs(self, services: Optional[List[str]] = None, num_lines: Optional[int] = 10) -> str: + """Fetch a snap services' logs. + + Args: + services (list): (optional) list of individual snap services to show logs from + (otherwise all) + num_lines (int): (optional) integer number of log lines to return. Default `10` + """ + args = ["logs", "-n={}".format(num_lines)] if num_lines else ["logs"] + return self._snap_daemons(args, services).stdout + + def connect( + self, plug: str, service: Optional[str] = None, slot: Optional[str] = None + ) -> None: + """Connect a plug to a slot. + + Args: + plug (str): the plug to connect + service (str): (optional) the snap service name to plug into + slot (str): (optional) the snap service slot to plug in to + + Raises: + SnapError if there is a problem encountered + """ + command = ["connect", "{}:{}".format(self._name, plug)] + + if service and slot: + command = command + ["{}:{}".format(service, slot)] + elif slot: + command = command + [slot] + + _cmd = ["snap", *command] + try: + subprocess.run(_cmd, universal_newlines=True, check=True, capture_output=True) + except CalledProcessError as e: + raise SnapError("Could not {} for snap [{}]: {}".format(_cmd, self._name, e.stderr)) + + def hold(self, duration: Optional[timedelta] = None) -> None: + """Add a refresh hold to a snap. + + Args: + duration: duration for the hold, or None (the default) to hold this snap indefinitely. + """ + hold_str = "forever" + if duration is not None: + seconds = round(duration.total_seconds()) + hold_str = f"{seconds}s" + self._snap("refresh", [f"--hold={hold_str}"]) + + def unhold(self) -> None: + """Remove the refresh hold of a snap.""" + self._snap("refresh", ["--unhold"]) + + def restart( + self, services: Optional[List[str]] = None, reload: Optional[bool] = False + ) -> None: + """Restarts a snap's services. + + Args: + services (list): (optional) list of individual snap services to show logs from. + (otherwise all) + reload (bool): (optional) flag to use the service reload command, if available. + Default `False` + """ + args = ["restart", "--reload"] if reload else ["restart"] + self._snap_daemons(args, services) + + def _install( + self, + channel: Optional[str] = "", + cohort: Optional[str] = "", + revision: Optional[int] = None, + ) -> None: + """Add a snap to the system. + + Args: + channel: the channel to install from + cohort: optional, the key of a cohort that this snap belongs to + revision: optional, the revision of the snap to install + """ + cohort = cohort or self._cohort + + args = [] + if self.confinement == "classic": + args.append("--classic") + if channel: + args.append('--channel="{}"'.format(channel)) + if revision: + args.append('--revision="{}"'.format(revision)) + if cohort: + args.append('--cohort="{}"'.format(cohort)) + + self._snap("install", args) + + def _refresh( + self, + channel: Optional[str] = "", + cohort: Optional[str] = "", + revision: Optional[int] = None, + leave_cohort: Optional[bool] = False, + ) -> None: + """Refresh a snap. + + Args: + channel: the channel to install from + cohort: optionally, specify a cohort. + revision: optionally, specify the revision of the snap to refresh + leave_cohort: leave the current cohort. + """ + args = [] + if channel: + args.append('--channel="{}"'.format(channel)) + + if revision: + args.append('--revision="{}"'.format(revision)) + + if not cohort: + cohort = self._cohort + + if leave_cohort: + self._cohort = "" + args.append("--leave-cohort") + elif cohort: + args.append('--cohort="{}"'.format(cohort)) + + self._snap("refresh", args) + + def _remove(self) -> str: + """Remove a snap from the system.""" + return self._snap("remove") + + @property + def name(self) -> str: + """Returns the name of the snap.""" + return self._name + + def ensure( + self, + state: SnapState, + classic: Optional[bool] = False, + channel: Optional[str] = "", + cohort: Optional[str] = "", + revision: Optional[int] = None, + ): + """Ensure that a snap is in a given state. + + Args: + state: a `SnapState` to reconcile to. + classic: an (Optional) boolean indicating whether classic confinement should be used + channel: the channel to install from + cohort: optional. Specify the key of a snap cohort. + revision: optional. the revision of the snap to install/refresh + + While both channel and revision could be specified, the underlying snap install/refresh + command will determine which one takes precedence (revision at this time) + + Raises: + SnapError if an error is encountered + """ + self._confinement = "classic" if classic or self._confinement == "classic" else "" + + if state not in (SnapState.Present, SnapState.Latest): + # We are attempting to remove this snap. + if self._state in (SnapState.Present, SnapState.Latest): + # The snap is installed, so we run _remove. + self._remove() + else: + # The snap is not installed -- no need to do anything. + pass + else: + # We are installing or refreshing a snap. + if self._state not in (SnapState.Present, SnapState.Latest): + # The snap is not installed, so we install it. + self._install(channel, cohort, revision) + else: + # The snap is installed, but we are changing it (e.g., switching channels). + self._refresh(channel, cohort, revision) + + self._update_snap_apps() + self._state = state + + def _update_snap_apps(self) -> None: + """Update a snap's apps after snap changes state.""" + try: + self._apps = self._snap_client.get_installed_snap_apps(self._name) + except SnapAPIError: + logger.debug("Unable to retrieve snap apps for {}".format(self._name)) + self._apps = [] + + @property + def present(self) -> bool: + """Report whether or not a snap is present.""" + return self._state in (SnapState.Present, SnapState.Latest) + + @property + def latest(self) -> bool: + """Report whether the snap is the most recent version.""" + return self._state is SnapState.Latest + + @property + def state(self) -> SnapState: + """Report the current snap state.""" + return self._state + + @state.setter + def state(self, state: SnapState) -> None: + """Set the snap state to a given value. + + Args: + state: a `SnapState` to reconcile the snap to. + + Raises: + SnapError if an error is encountered + """ + if self._state is not state: + self.ensure(state) + self._state = state + + @property + def revision(self) -> int: + """Returns the revision for a snap.""" + return self._revision + + @property + def channel(self) -> str: + """Returns the channel for a snap.""" + return self._channel + + @property + def confinement(self) -> str: + """Returns the confinement for a snap.""" + return self._confinement + + @property + def apps(self) -> List: + """Returns (if any) the installed apps of the snap.""" + self._update_snap_apps() + return self._apps + + @property + def services(self) -> Dict: + """Returns (if any) the installed services of the snap.""" + self._update_snap_apps() + services = {} + for app in self._apps: + if "daemon" in app: + services[app["name"]] = SnapService(**app).as_dict() + + return services + + @property + def held(self) -> bool: + """Report whether the snap has a hold.""" + info = self._snap("info") + return "hold:" in info + + +class _UnixSocketConnection(http.client.HTTPConnection): + """Implementation of HTTPConnection that connects to a named Unix socket.""" + + def __init__(self, host, timeout=None, socket_path=None): + if timeout is None: + super().__init__(host) + else: + super().__init__(host, timeout=timeout) + self.socket_path = socket_path + + def connect(self): + """Override connect to use Unix socket (instead of TCP socket).""" + if not hasattr(socket, "AF_UNIX"): + raise NotImplementedError("Unix sockets not supported on {}".format(sys.platform)) + self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self.sock.connect(self.socket_path) + if self.timeout is not None: + self.sock.settimeout(self.timeout) + + +class _UnixSocketHandler(urllib.request.AbstractHTTPHandler): + """Implementation of HTTPHandler that uses a named Unix socket.""" + + def __init__(self, socket_path: str): + super().__init__() + self.socket_path = socket_path + + def http_open(self, req) -> http.client.HTTPResponse: + """Override http_open to use a Unix socket connection (instead of TCP).""" + return self.do_open(_UnixSocketConnection, req, socket_path=self.socket_path) + + +class SnapClient: + """Snapd API client to talk to HTTP over UNIX sockets. + + In order to avoid shelling out and/or involving sudo in calling the snapd API, + use a wrapper based on the Pebble Client, trimmed down to only the utility methods + needed for talking to snapd. + """ + + def __init__( + self, + socket_path: str = "/run/snapd.socket", + opener: Optional[urllib.request.OpenerDirector] = None, + base_url: str = "http://localhost/v2/", + timeout: float = 5.0, + ): + """Initialize a client instance. + + Args: + socket_path: a path to the socket on the filesystem. Defaults to /run/snap/snapd.socket + opener: specifies an opener for unix socket, if unspecified a default is used + base_url: base url for making requests to the snap client. Defaults to + http://localhost/v2/ + timeout: timeout in seconds to use when making requests to the API. Default is 5.0s. + """ + if opener is None: + opener = self._get_default_opener(socket_path) + self.opener = opener + self.base_url = base_url + self.timeout = timeout + + @classmethod + def _get_default_opener(cls, socket_path): + """Build the default opener to use for requests (HTTP over Unix socket).""" + opener = urllib.request.OpenerDirector() + opener.add_handler(_UnixSocketHandler(socket_path)) + opener.add_handler(urllib.request.HTTPDefaultErrorHandler()) + opener.add_handler(urllib.request.HTTPRedirectHandler()) + opener.add_handler(urllib.request.HTTPErrorProcessor()) + return opener + + def _request( + self, + method: str, + path: str, + query: Dict = None, + body: Dict = None, + ) -> JSONType: + """Make a JSON request to the Snapd server with the given HTTP method and path. + + If query dict is provided, it is encoded and appended as a query string + to the URL. If body dict is provided, it is serialied as JSON and used + as the HTTP body (with Content-Type: "application/json"). The resulting + body is decoded from JSON. + """ + headers = {"Accept": "application/json"} + data = None + if body is not None: + data = json.dumps(body).encode("utf-8") + headers["Content-Type"] = "application/json" + + response = self._request_raw(method, path, query, headers, data) + return json.loads(response.read().decode())["result"] + + def _request_raw( + self, + method: str, + path: str, + query: Dict = None, + headers: Dict = None, + data: bytes = None, + ) -> http.client.HTTPResponse: + """Make a request to the Snapd server; return the raw HTTPResponse object.""" + url = self.base_url + path + if query: + url = url + "?" + urllib.parse.urlencode(query) + + if headers is None: + headers = {} + request = urllib.request.Request(url, method=method, data=data, headers=headers) + + try: + response = self.opener.open(request, timeout=self.timeout) + except urllib.error.HTTPError as e: + code = e.code + status = e.reason + message = "" + try: + body = json.loads(e.read().decode())["result"] + except (IOError, ValueError, KeyError) as e2: + # Will only happen on read error or if Pebble sends invalid JSON. + body = {} + message = "{} - {}".format(type(e2).__name__, e2) + raise SnapAPIError(body, code, status, message) + except urllib.error.URLError as e: + raise SnapAPIError({}, 500, "Not found", e.reason) + return response + + def get_installed_snaps(self) -> Dict: + """Get information about currently installed snaps.""" + return self._request("GET", "snaps") + + def get_snap_information(self, name: str) -> Dict: + """Query the snap server for information about single snap.""" + return self._request("GET", "find", {"name": name})[0] + + def get_installed_snap_apps(self, name: str) -> List: + """Query the snap server for apps belonging to a named, currently installed snap.""" + return self._request("GET", "apps", {"names": name, "select": "service"}) + + +class SnapCache(Mapping): + """An abstraction to represent installed/available packages. + + When instantiated, `SnapCache` iterates through the list of installed + snaps using the `snapd` HTTP API, and a list of available snaps by reading + the filesystem to populate the cache. Information about available snaps is lazily-loaded + from the `snapd` API when requested. + """ + + def __init__(self): + if not self.snapd_installed: + raise SnapError("snapd is not installed or not in /usr/bin") from None + self._snap_client = SnapClient() + self._snap_map = {} + if self.snapd_installed: + self._load_available_snaps() + self._load_installed_snaps() + + def __contains__(self, key: str) -> bool: + """Check if a given snap is in the cache.""" + return key in self._snap_map + + def __len__(self) -> int: + """Report number of items in the snap cache.""" + return len(self._snap_map) + + def __iter__(self) -> Iterable["Snap"]: + """Provide iterator for the snap cache.""" + return iter(self._snap_map.values()) + + def __getitem__(self, snap_name: str) -> Snap: + """Return either the installed version or latest version for a given snap.""" + snap = self._snap_map.get(snap_name, None) + if snap is None: + # The snapd cache file may not have existed when _snap_map was + # populated. This is normal. + try: + self._snap_map[snap_name] = self._load_info(snap_name) + except SnapAPIError: + raise SnapNotFoundError("Snap '{}' not found!".format(snap_name)) + + return self._snap_map[snap_name] + + @property + def snapd_installed(self) -> bool: + """Check whether snapd has been installled on the system.""" + return os.path.isfile("/usr/bin/snap") + + def _load_available_snaps(self) -> None: + """Load the list of available snaps from disk. + + Leave them empty and lazily load later if asked for. + """ + if not os.path.isfile("/var/cache/snapd/names"): + # The snap catalog may not be populated yet; this is normal. + # snapd updates the cache infrequently and the cache file may not + # currently exist. + return + + with open("/var/cache/snapd/names", "r") as f: + for line in f: + if line.strip(): + self._snap_map[line.strip()] = None + + def _load_installed_snaps(self) -> None: + """Load the installed snaps into the dict.""" + installed = self._snap_client.get_installed_snaps() + + for i in installed: + snap = Snap( + name=i["name"], + state=SnapState.Latest, + channel=i["channel"], + revision=int(i["revision"]), + confinement=i["confinement"], + apps=i.get("apps", None), + ) + self._snap_map[snap.name] = snap + + def _load_info(self, name) -> Snap: + """Load info for snaps which are not installed if requested. + + Args: + name: a string representing the name of the snap + """ + info = self._snap_client.get_snap_information(name) + + return Snap( + name=info["name"], + state=SnapState.Available, + channel=info["channel"], + revision=int(info["revision"]), + confinement=info["confinement"], + apps=None, + ) + + +@_cache_init +def add( + snap_names: Union[str, List[str]], + state: Union[str, SnapState] = SnapState.Latest, + channel: Optional[str] = "", + classic: Optional[bool] = False, + cohort: Optional[str] = "", + revision: Optional[int] = None, +) -> Union[Snap, List[Snap]]: + """Add a snap to the system. + + Args: + snap_names: the name or names of the snaps to install + state: a string or `SnapState` representation of the desired state, one of + [`Present` or `Latest`] + channel: an (Optional) channel as a string. Defaults to 'latest' + classic: an (Optional) boolean specifying whether it should be added with classic + confinement. Default `False` + cohort: an (Optional) string specifying the snap cohort to use + revision: an (Optional) integer specifying the snap revision to use + + Raises: + SnapError if some snaps failed to install or were not found. + """ + if not channel and not revision: + channel = "latest" + + snap_names = [snap_names] if type(snap_names) is str else snap_names + if not snap_names: + raise TypeError("Expected at least one snap to add, received zero!") + + if type(state) is str: + state = SnapState(state) + + return _wrap_snap_operations(snap_names, state, channel, classic, cohort, revision) + + +@_cache_init +def remove(snap_names: Union[str, List[str]]) -> Union[Snap, List[Snap]]: + """Remove specified snap(s) from the system. + + Args: + snap_names: the name or names of the snaps to install + + Raises: + SnapError if some snaps failed to install. + """ + snap_names = [snap_names] if type(snap_names) is str else snap_names + if not snap_names: + raise TypeError("Expected at least one snap to add, received zero!") + + return _wrap_snap_operations(snap_names, SnapState.Absent, "", False) + + +@_cache_init +def ensure( + snap_names: Union[str, List[str]], + state: str, + channel: Optional[str] = "", + classic: Optional[bool] = False, + cohort: Optional[str] = "", + revision: Optional[int] = None, +) -> Union[Snap, List[Snap]]: + """Ensure specified snaps are in a given state on the system. + + Args: + snap_names: the name(s) of the snaps to operate on + state: a string representation of the desired state, from `SnapState` + channel: an (Optional) channel as a string. Defaults to 'latest' + classic: an (Optional) boolean specifying whether it should be added with classic + confinement. Default `False` + cohort: an (Optional) string specifying the snap cohort to use + revision: an (Optional) integer specifying the snap revision to use + + When both channel and revision are specified, the underlying snap install/refresh + command will determine the precedence (revision at the time of adding this) + + Raises: + SnapError if the snap is not in the cache. + """ + if not revision and not channel: + channel = "latest" + + if state in ("present", "latest") or revision: + return add(snap_names, SnapState(state), channel, classic, cohort, revision) + else: + return remove(snap_names) + + +def _wrap_snap_operations( + snap_names: List[str], + state: SnapState, + channel: str, + classic: bool, + cohort: Optional[str] = "", + revision: Optional[int] = None, +) -> Union[Snap, List[Snap]]: + """Wrap common operations for bare commands.""" + snaps = {"success": [], "failed": []} + + op = "remove" if state is SnapState.Absent else "install or refresh" + + for s in snap_names: + try: + snap = _Cache[s] + if state is SnapState.Absent: + snap.ensure(state=SnapState.Absent) + else: + snap.ensure( + state=state, classic=classic, channel=channel, cohort=cohort, revision=revision + ) + snaps["success"].append(snap) + except SnapError as e: + logger.warning("Failed to {} snap {}: {}!".format(op, s, e.message)) + snaps["failed"].append(s) + except SnapNotFoundError: + logger.warning("Snap '{}' not found in cache!".format(s)) + snaps["failed"].append(s) + + if len(snaps["failed"]): + raise SnapError( + "Failed to install or refresh snap(s): {}".format(", ".join(list(snaps["failed"]))) + ) + + return snaps["success"] if len(snaps["success"]) > 1 else snaps["success"][0] + + +def install_local( + filename: str, classic: Optional[bool] = False, dangerous: Optional[bool] = False +) -> Snap: + """Perform a snap operation. + + Args: + filename: the path to a local .snap file to install + classic: whether to use classic confinement + dangerous: whether --dangerous should be passed to install snaps without a signature + + Raises: + SnapError if there is a problem encountered + """ + _cmd = [ + "snap", + "install", + filename, + ] + if classic: + _cmd.append("--classic") + if dangerous: + _cmd.append("--dangerous") + try: + result = subprocess.check_output(_cmd, universal_newlines=True).splitlines()[-1] + snap_name, _ = result.split(" ", 1) + snap_name = ansi_filter.sub("", snap_name) + + c = SnapCache() + + try: + return c[snap_name] + except SnapAPIError as e: + logger.error( + "Could not find snap {} when querying Snapd socket: {}".format(snap_name, e.body) + ) + raise SnapError("Failed to find snap {} in Snap cache".format(snap_name)) + except CalledProcessError as e: + raise SnapError("Could not install snap {}: {}".format(filename, e.output)) + + +def _system_set(config_item: str, value: str) -> None: + """Set system snapd config values. + + Args: + config_item: name of snap system setting. E.g. 'refresh.hold' + value: value to assign + """ + _cmd = ["snap", "set", "system", "{}={}".format(config_item, value)] + try: + subprocess.check_call(_cmd, universal_newlines=True) + except CalledProcessError: + raise SnapError("Failed setting system config '{}' to '{}'".format(config_item, value)) + + +def hold_refresh(days: int = 90, forever: bool = False) -> bool: + """Set the system-wide snap refresh hold. + + Args: + days: number of days to hold system refreshes for. Maximum 90. Set to zero to remove hold. + forever: if True, will set a hold forever. + """ + if not isinstance(forever, bool): + raise TypeError("forever must be a bool") + if not isinstance(days, int): + raise TypeError("days must be an int") + if forever: + _system_set("refresh.hold", "forever") + logger.info("Set system-wide snap refresh hold to: forever") + elif days == 0: + _system_set("refresh.hold", "") + logger.info("Removed system-wide snap refresh hold") + else: + # Currently the snap daemon can only hold for a maximum of 90 days + if not 1 <= days <= 90: + raise ValueError("days must be between 1 and 90") + # Add the number of days to current time + target_date = datetime.now(timezone.utc).astimezone() + timedelta(days=days) + # Format for the correct datetime format + hold_date = target_date.strftime("%Y-%m-%dT%H:%M:%S%z") + # Python dumps the offset in format '+0100', we need '+01:00' + hold_date = "{0}:{1}".format(hold_date[:-2], hold_date[-2:]) + # Actually set the hold date + _system_set("refresh.hold", hold_date) + logger.info("Set system-wide snap refresh hold to: %s", hold_date) \ No newline at end of file diff --git a/metadata.yaml b/metadata.yaml index fcf53f87d..b41d31ff6 100644 --- a/metadata.yaml +++ b/metadata.yaml @@ -38,6 +38,8 @@ requires: interface: loki_push_api limit: 1 optional: true + s3-credentials: + interface: s3 containers: mongod: resource: mongodb-image @@ -53,3 +55,4 @@ storage: mongodb: type: filesystem location: /var/lib/mongodb + \ No newline at end of file diff --git a/src/charm.py b/src/charm.py index 5614f4bfd..9f6bbdf84 100755 --- a/src/charm.py +++ b/src/charm.py @@ -10,16 +10,19 @@ from charms.grafana_k8s.v0.grafana_dashboard import GrafanaDashboardProvider from charms.loki_k8s.v0.loki_push_api import LogProxyConsumer from charms.mongodb.v0.helpers import ( + build_unit_status, generate_keyfile, generate_password, get_create_user_cmd, get_mongod_args, + process_pbm_error, ) from charms.mongodb.v0.mongodb import ( MongoDBConfiguration, MongoDBConnection, NotReadyError, ) +from charms.mongodb.v0.mongodb_backups import S3_RELATION, MongoDBBackups from charms.mongodb.v0.mongodb_provider import MongoDBProvider from charms.mongodb.v0.mongodb_tls import MongoDBTLS from charms.mongodb.v0.users import ( @@ -31,10 +34,17 @@ ) from charms.prometheus_k8s.v0.prometheus_scrape import MetricsEndpointProvider from ops import JujuVersion -from ops.charm import ActionEvent, CharmBase, RelationDepartedEvent, StartEvent +from ops.charm import ( + ActionEvent, + CharmBase, + RelationDepartedEvent, + StartEvent, + UpdateStatusEvent, +) from ops.main import main from ops.model import ( ActiveStatus, + BlockedStatus, Container, Relation, RelationDataContent, @@ -42,7 +52,14 @@ Unit, WaitingStatus, ) -from ops.pebble import ExecError, Layer, PathError, ProtocolError +from ops.pebble import ( + ChangeError, + ExecError, + Layer, + PathError, + ProtocolError, + ServiceInfo, +) from pymongo.errors import PyMongoError from tenacity import before_log, retry, stop_after_attempt, wait_fixed @@ -66,7 +83,7 @@ def __init__(self, *args): self.framework.observe(self.on.mongod_pebble_ready, self._on_mongod_pebble_ready) self.framework.observe(self.on.start, self._on_start) - + self.framework.observe(self.on.update_status, self._on_update_status) self.framework.observe( self.on[Config.Relations.PEERS].relation_joined, self._relation_changes_handler ) @@ -91,6 +108,7 @@ def __init__(self, *args): self.client_relations = MongoDBProvider(self) self.tls = MongoDBTLS(self, Config.Relations.PEERS, Config.SUBSTRATE) + self.backups = MongoDBBackups(self, substrate=Config.SUBSTRATE) self.metrics_endpoint = MetricsEndpointProvider( self, refresh_event=self.on.start, jobs=Config.Monitoring.JOBS @@ -150,8 +168,28 @@ def monitor_config(self) -> MongoDBConfiguration: @property def backup_config(self) -> MongoDBConfiguration: """Generates a MongoDBConfiguration object for backup.""" - self._check_or_set_user_password(BackupUser) - return self._get_mongodb_config_for_user(BackupUser, BackupUser.get_hosts()) + return self._get_mongodb_config_for_user( + BackupUser, [self.get_hostname_for_unit(self.unit)] + ) + + @property + def _mongod_layer(self) -> Layer: + """Returns a Pebble configuration layer for mongod.""" + layer_config = { + "summary": "mongod layer", + "description": "Pebble config layer for replicated mongod", + "services": { + "mongod": { + "override": "replace", + "summary": "mongod", + "command": "mongod " + get_mongod_args(self.mongodb_config), + "startup": "enabled", + "user": Config.UNIX_USER, + "group": Config.UNIX_GROUP, + } + }, + } + return Layer(layer_config) # type: ignore @property def _monitor_layer(self) -> Layer: @@ -174,19 +212,20 @@ def _monitor_layer(self) -> Layer: return Layer(layer_config) # type: ignore @property - def _mongod_layer(self) -> Layer: - """Returns a Pebble configuration layer for mongod.""" + def _backup_layer(self) -> Layer: + """Returns a Pebble configuration layer for pbm.""" layer_config = { - "summary": "mongod layer", - "description": "Pebble config layer for replicated mongod", + "summary": "pbm layer", + "description": "Pebble config layer for pbm", "services": { - "mongod": { + Config.Backup.SERVICE_NAME: { "override": "replace", - "summary": "mongod", - "command": "mongod " + get_mongod_args(self.mongodb_config), + "summary": "pbm", + "command": "pbm-agent", "startup": "enabled", "user": Config.UNIX_USER, "group": Config.UNIX_GROUP, + "environment": {"PBM_MONGODB_URI": self.backup_config.uri}, } }, } @@ -216,11 +255,13 @@ def app_peer_data(self) -> RelationDataContent: return relation.data[self.app] @property - def _db_initialised(self) -> bool: + def db_initialised(self) -> bool: + """Check if MongoDB is initialised.""" return "db_initialised" in self.app_peer_data - @_db_initialised.setter - def _db_initialised(self, value): + @db_initialised.setter + def db_initialised(self, value): + """Set the db_initialised flag.""" if isinstance(value, bool): self.app_peer_data["db_initialised"] = str(value) else: @@ -273,6 +314,7 @@ def _on_mongod_pebble_ready(self, event) -> None: # when a network cuts and the pod restarts - reconnect to the exporter try: self._connect_mongodb_exporter() + self._connect_pbm_agent() except MissingSecretError as e: logger.error("Cannot connect mongodb exporter: %r", e) event.defer() @@ -313,7 +355,14 @@ def _on_start(self, event) -> None: event.defer() return - self._connect_mongodb_exporter() + try: + self._connect_mongodb_exporter() + except ChangeError as e: + logger.error( + "An exception occurred when starting mongodb exporter, error: %s.", str(e) + ) + self.unit.status = BlockedStatus("couldn't start mongodb exporter") + return self._initialise_replica_set(event) @@ -323,6 +372,7 @@ def _on_start(self, event) -> None: def _relation_changes_handler(self, event) -> None: """Handles different relation events and updates MongoDB replica set.""" self._connect_mongodb_exporter() + self._connect_pbm_agent() if type(event) is RelationDepartedEvent: if event.departing_unit.name == self.unit.name: @@ -335,7 +385,7 @@ def _relation_changes_handler(self, event) -> None: # This code runs on leader_elected event before mongod_pebble_ready self._generate_secrets() - if not self._db_initialised: + if not self.db_initialised: return with MongoDBConnection(self.mongodb_config) as mongo: @@ -385,6 +435,40 @@ def _on_stop(self, event) -> None: logger.debug(f"{self.unit.name} releasing on_stop") self.unit_peer_data["unit_departed"] = "" + def _on_update_status(self, event: UpdateStatusEvent): + # no need to report on replica set status until initialised + if not self.db_initialised: + return + + # Cannot check more advanced MongoDB statuses if mongod hasn't started. + with MongoDBConnection(self.mongodb_config, "localhost", direct=True) as direct_mongo: + if not direct_mongo.is_ready: + self.unit.status = WaitingStatus("Waiting for MongoDB to start") + return + + # leader should periodically handle configuring the replica set. Incidents such as network + # cuts can lead to new IP addresses and therefore will require a reconfigure. Especially + # in the case that the leader a change in IP address it will not receive a relation event. + if self.unit.is_leader(): + self._relation_changes_handler(event) + + # update the units status based on it's replica set config and backup status. An error in + # the status of MongoDB takes precedence over pbm status. + mongodb_status = build_unit_status( + self.mongodb_config, self.get_hostname_for_unit(self.unit) + ) + pbm_status = self.backups._get_pbm_status() + if ( + not isinstance(mongodb_status, ActiveStatus) + or not self.model.get_relation( + S3_RELATION + ) # if s3 relation doesn't exist only report MongoDB status + or isinstance(pbm_status, ActiveStatus) # pbm is ready then report the MongoDB status + ): + self.unit.status = mongodb_status + else: + self.unit.status = pbm_status + # END: charm events # BEGIN: actions @@ -438,6 +522,9 @@ def _on_set_password(self, event: ActionEvent) -> None: APP_SCOPE, MongoDBUser.get_password_key_name_for_user(username), new_password ) + if username == BackupUser.get_username(): + self._connect_pbm_agent() + if username == MonitorUser.get_username(): self._connect_mongodb_exporter() @@ -541,11 +628,12 @@ def _init_backup_user(self): with MongoDBConnection(self.mongodb_config) as mongo: # first we must create the necessary roles for the PBM tool - logger.debug("creating the backup user roles...") + logger.info("creating the backup user roles...") mongo.create_role( role_name=BackupUser.get_mongodb_role(), privileges=BackupUser.get_privileges() ) - logger.debug("creating the backup user...") + + self._check_or_set_user_password(BackupUser) mongo.create_user(self.backup_config) self._set_user_created(BackupUser) @@ -632,7 +720,7 @@ def _update_app_relation_data(self, database_users: Set[str]) -> None: def _initialise_replica_set(self, event: StartEvent) -> None: """Initialise replica set and create users.""" - if self._db_initialised: + if self.db_initialised: # The replica set should be initialised only once. Check should be # external (e.g., check initialisation inside peer relation). We # shouldn't rely on MongoDB response because the data directory @@ -664,7 +752,7 @@ def _initialise_replica_set(self, event: StartEvent) -> None: event.defer() return - self._db_initialised = True + self.db_initialised = True def _add_units_from_replica_set( self, event, mongo: MongoDBConnection, units_to_add: Set[str] @@ -838,6 +926,7 @@ def restart_mongod_service(self): container.replan() self._connect_mongodb_exporter() + self._connect_pbm_agent() def _push_keyfile_to_workload(self, container: Container) -> None: """Upload the keyFile to a workload container.""" @@ -933,15 +1022,58 @@ def _connect_mongodb_exporter(self) -> None: if not container.can_connect(): return + if not self.db_initialised: + return + # must wait for leader to set URI before connecting if not self.get_secret(APP_SCOPE, MonitorUser.get_password_key_name()): return + + current_service_config = ( + container.get_plan().to_dict().get("services", {}).get("mongodb_exporter", {}) + ) + new_service_config = self._monitor_layer.services.get("mongodb_exporter", {}) + + if current_service_config == new_service_config: + return + # Add initial Pebble config layer using the Pebble API # mongodb_exporter --mongodb.uri= + container.add_layer("mongodb_exporter", self._monitor_layer, combine=True) # Restart changed services and start startup-enabled services. container.replan() + def _connect_pbm_agent(self) -> None: + """Updates URI for pbm-agent.""" + container = self.unit.get_container(Config.CONTAINER_NAME) + + if not container.can_connect(): + return + + if not self.db_initialised: + return + + # must wait for leader to set URI before any attempts to update are made + if not self.get_secret("app", BackupUser.get_password_key_name()): + return + + current_service_config = ( + container.get_plan().to_dict().get("services", {}).get(Config.Backup.SERVICE_NAME, {}) + ) + new_service_config = self._backup_layer.services.get(Config.Backup.SERVICE_NAME, {}) + + if current_service_config == new_service_config: + return + + container.add_layer(Config.Backup.SERVICE_NAME, self._backup_layer, combine=True) + container.replan() + + def get_backup_service(self) -> ServiceInfo: + """Returns the backup service.""" + container = self.unit.get_container(Config.CONTAINER_NAME) + return container.get_service(Config.Backup.SERVICE_NAME) + def is_unit_in_replica_set(self) -> bool: """Check if the unit is in the replica set.""" with MongoDBConnection(self.mongodb_config) as mongo: @@ -954,6 +1086,46 @@ def is_unit_in_replica_set(self) -> bool: logger.error(f"{self.unit.name}.is_unit_in_replica_set PyMongoError={e}") return False + def run_pbm_command(self, cmd: List[str]) -> str: + """Executes a command in the workload container.""" + container = self.unit.get_container(Config.CONTAINER_NAME) + environment = {"PBM_MONGODB_URI": f"{self.backup_config.uri}"} + process = container.exec([Config.Backup.PBM_PATH] + cmd, environment=environment) + stdout, _ = process.wait_output() + return stdout + + def set_pbm_config_file(self) -> None: + """Sets the pbm config file.""" + container = self.unit.get_container(Config.CONTAINER_NAME) + container.push( + Config.Backup.PBM_CONFIG_FILE_PATH, + "# this file is to be left empty. Changes in this file will be ignored.\n", + make_dirs=True, + permissions=0o400, + ) + try: + self.run_pbm_command( + [ + "config", + "--file", + Config.Backup.PBM_CONFIG_FILE_PATH, + ] + ) + except ExecError as e: + logger.error(f"Failed to set pbm config file. {e}") + self.unit.status = BlockedStatus(process_pbm_error(e.stdout)) + return + + def start_backup_service(self) -> None: + """Starts the backup service.""" + container = self.unit.get_container(Config.CONTAINER_NAME) + container.start(Config.Backup.SERVICE_NAME) + + def restart_backup_service(self) -> None: + """Restarts the backup service.""" + container = self.unit.get_container(Config.CONTAINER_NAME) + container.restart(Config.Backup.SERVICE_NAME) + # END: helper functions # BEGIN: static methods diff --git a/src/config.py b/src/config.py index 33320da1a..4e61b76f4 100644 --- a/src/config.py +++ b/src/config.py @@ -34,6 +34,8 @@ class Backup: SERVICE_NAME = "pbm-agent" URI_PARAM_NAME = "pbm-uri" + PBM_PATH = "/usr/bin/pbm" + PBM_CONFIG_FILE_PATH = "/etc/pbm_config.yaml" class Monitoring: """Monitoring related config for MongoDB Charm.""" diff --git a/tests/integration/backup_tests/__init__.py b/tests/integration/backup_tests/__init__.py new file mode 100644 index 000000000..db3bfe1a6 --- /dev/null +++ b/tests/integration/backup_tests/__init__.py @@ -0,0 +1,2 @@ +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. diff --git a/tests/integration/backup_tests/helpers.py b/tests/integration/backup_tests/helpers.py new file mode 100644 index 000000000..b4cb47682 --- /dev/null +++ b/tests/integration/backup_tests/helpers.py @@ -0,0 +1,149 @@ +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. +import os + +import ops +from pymongo import MongoClient +from pytest_operator.plugin import OpsTest +from tenacity import RetryError, Retrying, stop_after_attempt, wait_fixed + +from ..ha_tests import helpers as ha_helpers + +S3_APP_NAME = "s3-integrator" +TIMEOUT = 10 * 60 + + +async def destroy_cluster(ops_test: OpsTest, cluster_name: str) -> None: + """Destroy the cluster and wait for its removal.""" + units = ops_test.model.applications[cluster_name].units + # best practice to scale down before removing the entire cluster. Wait for cluster to settle + # removing the next + for i in range(0, len(units[:-1])): + await units[i].remove() + await ops_test.model.block_until( + lambda: len(ops_test.model.applications[cluster_name].units) == len(units) - i - 1, + timeout=TIMEOUT, + ) + ops_test.model.wait_for_idle(apps=[cluster_name], status="active") + + # now that the cluster only has one unit left we can remove the application from Juju + await ops_test.model.applications[cluster_name].destroy() + + # verify there are no more units. + await ops_test.model.block_until( + lambda: cluster_name not in ops_test.model.applications, + timeout=TIMEOUT, + ) + + +async def create_and_verify_backup(ops_test: OpsTest) -> None: + """Creates and verifies that a backup was successfully created.""" + db_unit = await get_leader_unit(ops_test) + prev_backups = await count_logical_backups(db_unit) + action = await db_unit.run_action(action_name="create-backup") + backup = await action.wait() + assert backup.status == "completed", "Backup not started." + + # verify that backup was made on the bucket + try: + for attempt in Retrying(stop=stop_after_attempt(4), wait=wait_fixed(5)): + with attempt: + backups = await count_logical_backups(db_unit) + assert backups == prev_backups + 1, "Backup not created." + except RetryError: + assert backups == prev_backups + 1, "Backup not created." + + +async def get_leader_unit(ops_test: OpsTest, db_app_name=None) -> ops.model.Unit: + """Returns the leader unit of the database charm.""" + db_app_name = db_app_name or await app_name(ops_test) + for unit in ops_test.model.applications[db_app_name].units: + if await unit.is_leader_from_status(): + return unit + + +async def app_name(ops_test: OpsTest) -> str: + """Returns the name of the cluster running MongoDB. + + This is important since not all deployments of the MongoDB charm have the application name + "mongodb". + + Note: if multiple clusters are running MongoDB this will return the one first found. + """ + status = await ops_test.model.get_status() + for app in ops_test.model.applications: + # note that format of the charm field is not exactly "mongodb" but instead takes the form + # of `local:focal/mongodb-6` + if "mongodb" in status["applications"][app]["charm"]: + return app + + return None + + +async def count_logical_backups(db_unit: ops.model.Unit) -> int: + """Count the number of logical backups.""" + action = await db_unit.run_action(action_name="list-backups") + list_result = await action.wait() + list_result = list_result.results["backups"] + list_result = list_result.split("\n") + backups = 0 + for res in list_result: + backups += 1 if "logical" in res else 0 + + return backups + + +async def count_failed_backups(db_unit: ops.model.Unit) -> int: + """Count the number of failed backups.""" + action = await db_unit.run_action(action_name="list-backups") + list_result = await action.wait() + list_result = list_result.results["backups"] + list_result = list_result.split("\n") + failed_backups = 0 + for res in list_result: + failed_backups += 1 if "failed" in res else 0 + + return failed_backups + + +async def set_credentials(ops_test: OpsTest, cloud: str) -> None: + """Sets the s3 crednetials for the provided cloud, valid options are AWS or GCP.""" + # set access key and secret keys + access_key = os.environ.get(f"{cloud}_ACCESS_KEY", False) + secret_key = os.environ.get(f"{cloud}_SECRET_KEY", False) + assert access_key and secret_key, f"{cloud} access key and secret key not provided." + + s3_integrator_unit = ops_test.model.applications[S3_APP_NAME].units[0] + parameters = {"access-key": access_key, "secret-key": secret_key} + action = await s3_integrator_unit.run_action(action_name="sync-s3-credentials", **parameters) + await action.wait() + + +def is_relation_joined(ops_test: OpsTest, endpoint_one: str, endpoint_two: str) -> bool: + """Check if a relation is joined. + + Args: + ops_test: The ops test object passed into every test case + endpoint_one: The first endpoint of the relation + endpoint_two: The second endpoint of the relation + """ + for rel in ops_test.model.relations: + endpoints = [endpoint.name for endpoint in rel.endpoints] + if endpoint_one in endpoints and endpoint_two in endpoints: + return True + return False + + +async def insert_unwanted_data(ops_test: OpsTest) -> None: + """Inserts the data into the MongoDB cluster via primary replica.""" + app = await app_name(ops_test) + ip_addresses = [unit.public_address for unit in ops_test.model.applications[app].units] + primary = (await ha_helpers.replica_set_primary(ip_addresses, ops_test)).public_address + password = await ha_helpers.get_password(ops_test, app) + client = MongoClient(ha_helpers.unit_uri(primary, password, app), directConnection=True) + db = client["new-db"] + test_collection = db["test_collection"] + test_collection.insert_one({"unwanted_data": "bad data 1"}) + test_collection.insert_one({"unwanted_data": "bad data 2"}) + test_collection.insert_one({"unwanted_data": "bad data 3"}) + client.close() diff --git a/tests/integration/backup_tests/test_backups.py b/tests/integration/backup_tests/test_backups.py new file mode 100644 index 000000000..8a4bceb41 --- /dev/null +++ b/tests/integration/backup_tests/test_backups.py @@ -0,0 +1,419 @@ +#!/usr/bin/env python3 +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. +import asyncio +import secrets +import string +import time +from pathlib import Path + +import pytest +import yaml +from pytest_operator.plugin import OpsTest +from tenacity import RetryError, Retrying, stop_after_delay, wait_fixed + +from ..ha_tests import helpers as ha_helpers +from . import helpers + +S3_APP_NAME = "s3-integrator" +TIMEOUT = 15 * 60 +ENDPOINT = "s3-credentials" +NEW_CLUSTER = "new-mongodb" +DATABASE_APP_NAME = "mongodb-k8s" +METADATA = yaml.safe_load(Path("./metadata.yaml").read_text()) + + +@pytest.fixture() +async def continuous_writes_to_db(ops_test: OpsTest): + """Continuously writes to DB for the duration of the test.""" + await ha_helpers.start_continous_writes(ops_test, 1) + yield + await ha_helpers.stop_continous_writes(ops_test) + await ha_helpers.clear_db_writes(ops_test) + + +@pytest.fixture() +async def add_writes_to_db(ops_test: OpsTest): + """Adds writes to DB before test starts and clears writes at the end of the test.""" + await ha_helpers.start_continous_writes(ops_test, 1) + time.sleep(20) + await ha_helpers.stop_continous_writes(ops_test) + yield + await ha_helpers.clear_db_writes(ops_test) + + +@pytest.mark.abort_on_fail +async def test_build_and_deploy(ops_test: OpsTest) -> None: + """Build and deploy one unit of MongoDB.""" + # it is possible for users to provide their own cluster for testing. Hence check if there + # is a pre-existing cluster. + db_app_name = await ha_helpers.get_application_name(ops_test, DATABASE_APP_NAME) + if db_app_name: + return + + async with ops_test.fast_forward(): + my_charm = await ops_test.build_charm(".") + resources = {"mongodb-image": METADATA["resources"]["mongodb-image"]["upstream-source"]} + await ops_test.model.deploy(my_charm, num_units=3, resources=resources, series="jammy") + await ops_test.model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", timeout=2000) + + # deploy the s3 integrator charm + await ops_test.model.deploy(S3_APP_NAME, channel="edge") + + await ops_test.model.wait_for_idle() + + +@pytest.mark.abort_on_fail +async def test_blocked_incorrect_creds(ops_test: OpsTest) -> None: + """Verifies that the charm goes into blocked status when s3 creds are incorrect.""" + db_app_name = await helpers.app_name(ops_test) + + # set incorrect s3 credentials + s3_integrator_unit = ops_test.model.applications[S3_APP_NAME].units[0] + parameters = {"access-key": "user", "secret-key": "doesnt-exist"} + action = await s3_integrator_unit.run_action(action_name="sync-s3-credentials", **parameters) + await action.wait() + + # relate after s3 becomes active add and wait for relation + await ops_test.model.wait_for_idle(apps=[S3_APP_NAME], status="active") + await ops_test.model.add_relation(S3_APP_NAME, db_app_name) + await ops_test.model.block_until( + lambda: helpers.is_relation_joined(ops_test, ENDPOINT, ENDPOINT) is True, + timeout=TIMEOUT, + ) + + # verify that Charmed MongoDB is blocked and reports incorrect credentials + await asyncio.gather( + ops_test.model.wait_for_idle(apps=[S3_APP_NAME], status="active"), + ops_test.model.wait_for_idle(apps=[db_app_name], status="blocked", idle_period=20), + ) + db_unit = ops_test.model.applications[db_app_name].units[0] + + assert db_unit.workload_status_message == "s3 credentials are incorrect." + + +@pytest.mark.abort_on_fail +async def test_blocked_incorrect_conf(ops_test: OpsTest) -> None: + """Verifies that the charm goes into blocked status when s3 config options are incorrect.""" + db_app_name = await helpers.app_name(ops_test) + + # set correct AWS credentials for s3 storage but incorrect configs + await helpers.set_credentials(ops_test, cloud="AWS") + + # wait for both applications to be idle with the correct statuses + await asyncio.gather( + ops_test.model.wait_for_idle(apps=[S3_APP_NAME], status="active"), + ops_test.model.wait_for_idle(apps=[db_app_name], status="blocked", idle_period=20), + ) + db_unit = ops_test.model.applications[db_app_name].units[0] + assert db_unit.workload_status_message == "s3 configurations are incompatible." + + +@pytest.mark.abort_on_fail +async def test_ready_correct_conf(ops_test: OpsTest) -> None: + """Verifies charm goes into active status when s3 config and creds options are correct.""" + db_app_name = await helpers.app_name(ops_test) + choices = string.ascii_letters + string.digits + unique_path = "".join([secrets.choice(choices) for _ in range(4)]) + configuration_parameters = { + "bucket": "data-charms-testing", + "path": f"mongodb-vm/test-{unique_path}", + "endpoint": "https://s3.amazonaws.com", + "region": "us-east-1", + } + + # apply new configuration options + await ops_test.model.applications[S3_APP_NAME].set_config(configuration_parameters) + + # after applying correct config options and creds the applications should both be active + await ops_test.model.wait_for_idle(apps=[S3_APP_NAME], status="active", timeout=TIMEOUT) + await ops_test.model.wait_for_idle( + apps=[db_app_name], status="active", timeout=TIMEOUT, idle_period=60 + ) + + +@pytest.mark.skip("Not implemented yet") +@pytest.mark.abort_on_fail +async def test_create_and_list_backups(ops_test: OpsTest) -> None: + db_unit = await helpers.get_leader_unit(ops_test) + + # verify backup list works + action = await db_unit.run_action(action_name="list-backups") + list_result = await action.wait() + backups = list_result.results["backups"] + assert backups, "backups not outputted" + + # verify backup is started + action = await db_unit.run_action(action_name="create-backup") + backup_result = await action.wait() + assert backup_result.results["backup-status"] == "backup started", "backup didn't start" + + # verify backup is present in the list of backups + # the action `create-backup` only confirms that the command was sent to the `pbm`. Creating a + # backup can take a lot of time so this function returns once the command was successfully + # sent to pbm. Therefore we should retry listing the backup several times + try: + for attempt in Retrying(stop=stop_after_delay(20), wait=wait_fixed(3)): + with attempt: + backups = await helpers.count_logical_backups(db_unit) + assert backups == 1 + except RetryError: + assert backups == 1, "Backup not created." + + +@pytest.mark.skip("Not implemented yet") +@pytest.mark.abort_on_fail +async def test_multi_backup(ops_test: OpsTest, continuous_writes_to_db) -> None: + """With writes in the DB test creating a backup while another one is running. + + Note that before creating the second backup we change the bucket and change the s3 storage + from AWS to GCP. This test verifies that the first backup in AWS is made, the second backup + in GCP is made, and that before the second backup is made that pbm correctly resyncs. + """ + db_app_name = await helpers.app_name(ops_test) + db_unit = await helpers.get_leader_unit(ops_test) + + # create first backup once ready + await asyncio.gather( + ops_test.model.wait_for_idle(apps=[db_app_name], status="active", idle_period=20), + ) + + action = await db_unit.run_action(action_name="create-backup") + first_backup = await action.wait() + assert first_backup.status == "completed", "First backup not started." + + # while first backup is running change access key, secret keys, and bucket name + # for GCP + await helpers.set_credentials(ops_test, cloud="GCP") + + # change to GCP configs and wait for PBM to resync + configuration_parameters = { + "bucket": "data-charms-testing", + "endpoint": "https://storage.googleapis.com", + "region": "", + } + await ops_test.model.applications[S3_APP_NAME].set_config(configuration_parameters) + + await asyncio.gather( + ops_test.model.wait_for_idle(apps=[db_app_name], status="active", idle_period=20), + ) + + # create a backup as soon as possible. might not be immediately possible since only one backup + # can happen at a time. + try: + for attempt in Retrying(stop=stop_after_delay(40), wait=wait_fixed(5)): + with attempt: + action = await db_unit.run_action(action_name="create-backup") + second_backup = await action.wait() + assert second_backup.status == "completed" + except RetryError: + assert second_backup.status == "completed", "Second backup not started." + + # the action `create-backup` only confirms that the command was sent to the `pbm`. Creating a + # backup can take a lot of time so this function returns once the command was successfully + # sent to pbm. Therefore before checking, wait for Charmed MongoDB to finish creating the + # backup + await asyncio.gather( + ops_test.model.wait_for_idle(apps=[db_app_name], status="active", idle_period=20), + ) + + # verify that backups was made in GCP bucket + try: + for attempt in Retrying(stop=stop_after_delay(4), wait=wait_fixed(5)): + with attempt: + backups = await helpers.count_logical_backups(db_unit) + assert backups == 1, "Backup not created in bucket on GCP." + except RetryError: + assert backups == 1, "Backup not created in first bucket on GCP." + + # set AWS credentials, set configs for s3 storage, and wait to resync + await helpers.set_credentials(ops_test, cloud="AWS") + configuration_parameters = { + "bucket": "data-charms-testing", + "region": "us-east-1", + "endpoint": "https://s3.amazonaws.com", + } + await ops_test.model.applications[S3_APP_NAME].set_config(configuration_parameters) + await asyncio.gather( + ops_test.model.wait_for_idle(apps=[db_app_name], status="active", idle_period=20), + ) + + # verify that backups was made on the AWS bucket + try: + for attempt in Retrying(stop=stop_after_delay(4), wait=wait_fixed(5)): + with attempt: + backups = await helpers.count_logical_backups(db_unit) + assert backups == 2, "Backup not created in bucket on AWS." + except RetryError: + assert backups == 2, "Backup not created in bucket on AWS." + + +@pytest.mark.skip("Not implemented yet") +@pytest.mark.abort_on_fail +async def test_restore(ops_test: OpsTest, add_writes_to_db) -> None: + """Simple backup tests that verifies that writes are correctly restored.""" + # count total writes + number_writes = await ha_helpers.count_writes(ops_test) + assert number_writes > 0, "no writes to backup" + + # create a backup in the AWS bucket + db_app_name = await helpers.app_name(ops_test) + db_unit = await helpers.get_leader_unit(ops_test) + prev_backups = await helpers.count_logical_backups(db_unit) + action = await db_unit.run_action(action_name="create-backup") + first_backup = await action.wait() + assert first_backup.status == "completed", "First backup not started." + + # verify that backup was made on the bucket + try: + for attempt in Retrying(stop=stop_after_delay(4), wait=wait_fixed(5)): + with attempt: + backups = await helpers.count_logical_backups(db_unit) + assert backups == prev_backups + 1, "Backup not created." + except RetryError: + assert backups == prev_backups + 1, "Backup not created." + + # add writes to be cleared after restoring the backup. Note these are written to the same + # collection that was backed up. + await helpers.insert_unwanted_data(ops_test) + new_number_of_writes = await ha_helpers.count_writes(ops_test) + assert new_number_of_writes > number_writes, "No writes to be cleared after restoring." + + # find most recent backup id and restore + action = await db_unit.run_action(action_name="list-backups") + list_result = await action.wait() + list_result = list_result.results["backups"] + most_recent_backup = list_result.split("\n")[-1] + backup_id = most_recent_backup.split()[0] + action = await db_unit.run_action(action_name="restore", **{"backup-id": backup_id}) + restore = await action.wait() + assert restore.results["restore-status"] == "restore started", "restore not successful" + + await asyncio.gather( + ops_test.model.wait_for_idle(apps=[db_app_name], status="active", idle_period=20), + ) + + # verify all writes are present + try: + for attempt in Retrying(stop=stop_after_delay(4), wait=wait_fixed(20)): + with attempt: + number_writes_restored = await ha_helpers.count_writes(ops_test) + assert number_writes == number_writes_restored, "writes not correctly restored" + except RetryError: + assert number_writes == number_writes_restored, "writes not correctly restored" + + +@pytest.mark.skip("Not implemented yet") +@pytest.mark.parametrize("cloud_provider", ["AWS", "GCP"]) +async def test_restore_new_cluster(ops_test: OpsTest, add_writes_to_db, cloud_provider): + # configure test for the cloud provider + db_app_name = await helpers.app_name(ops_test) + await helpers.set_credentials(ops_test, cloud=cloud_provider) + if cloud_provider == "AWS": + configuration_parameters = { + "bucket": "data-charms-testing", + "region": "us-east-1", + "endpoint": "https://s3.amazonaws.com", + } + else: + configuration_parameters = { + "bucket": "data-charms-testing", + "endpoint": "https://storage.googleapis.com", + "region": "", + } + + await ops_test.model.applications[S3_APP_NAME].set_config(configuration_parameters) + await asyncio.gather( + ops_test.model.wait_for_idle(apps=[S3_APP_NAME], status="active"), + ops_test.model.wait_for_idle(apps=[db_app_name], status="active", idle_period=20), + ) + + # create a backup + writes_in_old_cluster = await ha_helpers.count_writes(ops_test, db_app_name) + assert writes_in_old_cluster > 0, "old cluster has no writes." + await helpers.create_and_verify_backup(ops_test) + + # save old password, since after restoring we will need this password to authenticate. + old_password = await ha_helpers.get_password(ops_test, db_app_name) + + # deploy a new cluster with a different name + db_charm = await ops_test.build_charm(".") + await ops_test.model.deploy(db_charm, num_units=3, application_name=NEW_CLUSTER) + await asyncio.gather( + ops_test.model.wait_for_idle(apps=[NEW_CLUSTER], status="active", idle_period=20), + ) + + db_unit = await helpers.get_leader_unit(ops_test, db_app_name=NEW_CLUSTER) + action = await db_unit.run_action("set-password", **{"password": old_password}) + action = await action.wait() + assert action.status == "completed" + + # relate to s3 - s3 has the necessary configurations + await ops_test.model.add_relation(S3_APP_NAME, NEW_CLUSTER) + await ops_test.model.block_until( + lambda: helpers.is_relation_joined(ops_test, ENDPOINT, ENDPOINT) is True, + timeout=TIMEOUT, + ) + + # wait for new cluster to sync + await asyncio.gather( + ops_test.model.wait_for_idle(apps=[NEW_CLUSTER], status="active", idle_period=20), + ) + + # verify that the listed backups from the old cluster are not listed as failed. + assert ( + await helpers.count_failed_backups(db_unit) == 0 + ), "Backups from old cluster are listed as failed" + + # find most recent backup id and restore + action = await db_unit.run_action(action_name="list-backups") + list_result = await action.wait() + list_result = list_result.results["backups"] + most_recent_backup = list_result.split("\n")[-1] + backup_id = most_recent_backup.split()[0] + action = await db_unit.run_action(action_name="restore", **{"backup-id": backup_id}) + restore = await action.wait() + assert restore.results["restore-status"] == "restore started", "restore not successful" + + # verify all writes are present + try: + for attempt in Retrying(stop=stop_after_delay(4), wait=wait_fixed(20)): + with attempt: + writes_in_new_cluster = await ha_helpers.count_writes(ops_test, NEW_CLUSTER) + assert ( + writes_in_new_cluster == writes_in_old_cluster + ), "new cluster writes do not match old cluster writes after restore" + except RetryError: + assert ( + writes_in_new_cluster == writes_in_old_cluster + ), "new cluster writes do not match old cluster writes after restore" + + await helpers.destroy_cluster(ops_test, cluster_name=NEW_CLUSTER) + + +@pytest.mark.skip("Not implemented yet") +@pytest.mark.abort_on_fail +async def test_update_backup_password(ops_test: OpsTest) -> None: + """Verifies that after changing the backup password the pbm tool is updated and functional.""" + db_app_name = await helpers.app_name(ops_test) + db_unit = await helpers.get_leader_unit(ops_test) + + # wait for charm to be idle before setting password + await asyncio.gather( + ops_test.model.wait_for_idle(apps=[db_app_name], status="active", idle_period=20), + ) + + parameters = {"username": "backup"} + action = await db_unit.run_action("set-password", **parameters) + action = await action.wait() + assert action.status == "completed", "failed to set backup password" + + # wait for charm to be idle after setting password + await asyncio.gather( + ops_test.model.wait_for_idle(apps=[db_app_name], status="active", idle_period=20), + ) + + # verify we still have connection to pbm via creating a backup + action = await db_unit.run_action(action_name="create-backup") + backup_result = await action.wait() + assert backup_result.results["backup-status"] == "backup started", "backup didn't start" diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 54d80861a..1fcbfa918 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -807,6 +807,7 @@ def test__connect_mongodb_exporter_success( """Tests the _connect_mongodb_exporter method has been called.""" container = self.harness.model.unit.get_container("mongod") self.harness.set_can_connect(container, True) + self.harness.charm.app_peer_data["db_initialised"] = "True" self.harness.charm.on.mongod_pebble_ready.emit(container) password = self.harness.charm.get_secret("app", "monitor-password") diff --git a/tests/unit/test_mongodb_backups.py b/tests/unit/test_mongodb_backups.py new file mode 100644 index 000000000..5f6d5f1b9 --- /dev/null +++ b/tests/unit/test_mongodb_backups.py @@ -0,0 +1,740 @@ +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. +import unittest +from subprocess import CalledProcessError +from unittest import mock +from unittest.mock import patch + +import tenacity +from charms.mongodb.v0.mongodb_backups import ( + PBMBusyError, + ResyncError, + SetPBMConfigError, + stop_after_attempt, + wait_fixed, +) +from ops.model import ( + ActiveStatus, + BlockedStatus, + MaintenanceStatus, + ModelError, + WaitingStatus, +) +from ops.pebble import ExecError +from ops.testing import Harness + +from charm import MongoDBCharm + +from .helpers import patch_network_get + +RELATION_NAME = "s3-credentials" + + +class TestMongoBackups(unittest.TestCase): + @patch_network_get(private_address="1.1.1.1") + def setUp(self): + self.harness = Harness(MongoDBCharm) + self.harness.begin() + self.harness.add_relation("database-peers", "database-peers") + self.harness.set_leader(True) + self.charm = self.harness.charm + self.addCleanup(self.harness.cleanup) + + @patch("charm.MongoDBCharm.get_backup_service") + @patch("charm.MongoDBCharm.run_pbm_command") + def test_get_pbm_status_snap_not_present(self, pbm_command, service): + """Tests that when the snap is not present pbm is in blocked state.""" + container = self.harness.model.unit.get_container("mongod") + self.harness.set_can_connect(container, True) + pbm_command.side_effect = ModelError("service pbm-agent not found") + self.assertTrue(isinstance(self.harness.charm.backups._get_pbm_status(), BlockedStatus)) + + @patch("charm.MongoDBCharm.get_backup_service") + @patch("charm.MongoDBCharm.run_pbm_command") + def test_get_pbm_status_resync(self, pbm_command, service): + """Tests that when pbm is resyncing that pbm is in waiting state.""" + container = self.harness.model.unit.get_container("mongod") + self.harness.set_can_connect(container, True) + service.return_value = "pbm" + pbm_command.return_value = "Currently running:\n====\nResync op" + self.assertTrue(isinstance(self.harness.charm.backups._get_pbm_status(), WaitingStatus)) + + @patch("charm.MongoDBCharm.get_backup_service") + @patch("charm.MongoDBCharm.run_pbm_command") + def test_get_pbm_status_running(self, pbm_command, service): + """Tests that when pbm not running an op that pbm is in active state.""" + container = self.harness.model.unit.get_container("mongod") + self.harness.set_can_connect(container, True) + service.return_value = "pbm" + pbm_command.return_value = b"Currently running:\n====\n(none)" + self.assertTrue(isinstance(self.harness.charm.backups._get_pbm_status(), ActiveStatus)) + + @patch("charm.MongoDBCharm.get_backup_service") + @patch("charm.MongoDBCharm.run_pbm_command") + def test_get_pbm_status_incorrect_cred(self, pbm_command, service): + """Tests that when pbm has incorrect credentials that pbm is in blocked state.""" + container = self.harness.model.unit.get_container("mongod") + self.harness.set_can_connect(container, True) + service.return_value = "pbm" + pbm_command.side_effect = ExecError( + command=["/usr/bin/pbm", "status"], exit_code=1, stdout="status code: 403", stderr="" + ) + self.assertTrue(isinstance(self.harness.charm.backups._get_pbm_status(), BlockedStatus)) + + @patch("charm.MongoDBCharm.get_backup_service") + @patch("charm.MongoDBCharm.run_pbm_command") + def test_get_pbm_status_incorrect_conf(self, pbm_command, service): + """Tests that when pbm has incorrect configs that pbm is in blocked state.""" + container = self.harness.model.unit.get_container("mongod") + self.harness.set_can_connect(container, True) + service.return_value = "pbm" + pbm_command.side_effect = ExecError( + command=["/usr/bin/pbm", "status"], exit_code=1, stdout="status code: 404", stderr="" + ) + self.assertTrue(isinstance(self.harness.charm.backups._get_pbm_status(), BlockedStatus)) + + @patch("charms.mongodb.v0.mongodb_backups.wait_fixed") + @patch("charms.mongodb.v0.mongodb_backups.stop_after_attempt") + @patch("ops.model.Container.start") + @patch("charm.MongoDBCharm.get_backup_service") + @patch("charm.MongoDBCharm.run_pbm_command") + def test_verify_resync_config_error(self, pbm_command, service, start, retry_wait, retry_stop): + """Tests that when pbm cannot perform the resync command it raises an error.""" + container = self.harness.model.unit.get_container("mongod") + self.harness.set_can_connect(container, True) + service.return_value = "pbm" + pbm_command.side_effect = ExecError( + command=["pbm status"], exit_code=1, stdout="", stderr="" + ) + + retry_stop.return_value = stop_after_attempt(1) + retry_wait.return_value = wait_fixed(1) + + with self.assertRaises(ExecError): + self.harness.charm.backups._resync_config_options() + + @patch("charms.mongodb.v0.mongodb_backups.wait_fixed") + @patch("charms.mongodb.v0.mongodb_backups.stop_after_attempt") + @patch("ops.model.Container.start") + @patch("charm.MongoDBCharm.get_backup_service") + @patch("charm.MongoDBCharm.run_pbm_command") + def test_verify_resync_cred_error(self, pbm_command, service, start, retry_wait, retry_stop): + """Tests that when pbm cannot resync due to creds that it raises an error.""" + container = self.harness.model.unit.get_container("mongod") + self.harness.set_can_connect(container, True) + service.return_value = "pbm" + + retry_stop.return_value = stop_after_attempt(1) + retry_wait.return_value = wait_fixed(1) + pbm_command.side_effect = ExecError( + command=["pbm status"], exit_code=1, stdout="status code: 403", stderr="" + ) + + with self.assertRaises(ExecError): + self.harness.charm.backups._resync_config_options() + + @patch("ops.model.Container.restart") + @patch("ops.model.Container.start") + @patch("charm.MongoDBCharm.get_backup_service") + @patch("charm.MongoDBCharm.run_pbm_command") + @patch("charm.MongoDBBackups._get_pbm_status") + def test_verify_resync_syncing(self, pbm_status, run_pbm_command, service, start, restart): + """Tests that when pbm is syncing that it raises an error.""" + container = self.harness.model.unit.get_container("mongod") + self.harness.set_can_connect(container, True) + service.return_value = "pbm" + + pbm_status.return_value = "Currently running:\n====\nResync op" + run_pbm_command.return_value = "Currently running:\n====\nResync op" + + # disable retry + self.harness.charm.backups._wait_pbm_status.retry.retry = tenacity.retry_if_not_result( + lambda x: True + ) + + with self.assertRaises(ResyncError): + self.harness.charm.backups._resync_config_options() + + @patch("ops.model.Container.start") + @patch("charms.mongodb.v0.mongodb_backups.wait_fixed") + @patch("charms.mongodb.v0.mongodb_backups.stop_after_attempt") + @patch("charm.MongoDBCharm.get_backup_service") + @patch("charm.MongoDBBackups._get_pbm_status") + def test_resync_config_options_failure( + self, pbm_status, service, retry_stop, retry_wait, start + ): + """Verifies _resync_config_options raises an error when a resync cannot be performed.""" + container = self.harness.model.unit.get_container("mongod") + self.harness.set_can_connect(container, True) + + service.return_value = "pbm" + pbm_status.return_value = MaintenanceStatus() + + with self.assertRaises(PBMBusyError): + self.harness.charm.backups._resync_config_options() + + @patch("charms.mongodb.v0.mongodb_backups.wait_fixed") + @patch("charms.mongodb.v0.mongodb_backups.stop_after_attempt") + @patch("ops.model.Container.restart") + @patch("ops.model.Container.start") + @patch("charm.MongoDBCharm.get_backup_service") + @patch("charm.MongoDBBackups._get_pbm_status") + def test_resync_config_restart( + self, pbm_status, service, start, restart, retry_stop, retry_wait + ): + """Verifies _resync_config_options restarts that snap if alreaady resyncing.""" + container = self.harness.model.unit.get_container("mongod") + self.harness.set_can_connect(container, True) + + service.return_value = "pbm" + + retry_stop.return_value = stop_after_attempt(1) + retry_stop.return_value = wait_fixed(1) + pbm_status.return_value = WaitingStatus() + + with self.assertRaises(PBMBusyError): + self.harness.charm.backups._resync_config_options() + + container.restart.assert_called() + + @patch("charm.MongoDBBackups._get_pbm_configs") + @patch("charm.MongoDBCharm.run_pbm_command") + def test_set_config_options(self, pbm_command, pbm_configs): + """Verifies _set_config_options failure raises SetPBMConfigError.""" + container = self.harness.model.unit.get_container("mongod") + self.harness.set_can_connect(container, True) + pbm_command.side_effect = [ + None, + ExecError( + command=["/usr/bin/pbm config --set this_key=doesnt_exist"], + stdout="", + exit_code=42, + stderr="", + ), + ] + pbm_configs.return_value = {"this_key": "doesnt_exist"} + with self.assertRaises(SetPBMConfigError): + self.harness.charm.backups._set_config_options() + + def test_backup_without_rel(self): + """Verifies no backups are attempted without s3 relation.""" + action_event = mock.Mock() + action_event.params = {} + + self.harness.charm.backups._on_create_backup_action(action_event) + action_event.fail.assert_called() + + @patch("ops.framework.EventBase.defer") + def test_s3_credentials_no_db(self, defer): + """Verifies that when there is no DB that setting credentials is deferred.""" + del self.harness.charm.app_peer_data["db_initialised"] + + # triggering s3 event with correct fields + mock_s3_info = mock.Mock() + mock_s3_info.return_value = {"access-key": "noneya", "secret-key": "business"} + self.harness.charm.backups.s3_client.get_s3_connection_info = mock_s3_info + relation_id = self.harness.add_relation(RELATION_NAME, "s3-integrator") + self.harness.add_relation_unit(relation_id, "s3-integrator/0") + self.harness.update_relation_data( + relation_id, + "s3-integrator/0", + {"bucket": "hat"}, + ) + + defer.assert_called() + + @patch_network_get(private_address="1.1.1.1") + @patch("charm.MongoDBCharm.get_backup_service") + @patch("charm.MongoDBBackups._set_config_options") + def test_s3_credentials_set_pbm_failure(self, _set_config_options, service): + """Test charm goes into blocked state when setting pbm configs fail.""" + container = self.harness.model.unit.get_container("mongod") + self.harness.set_can_connect(container, True) + service.return_value = "pbm" + + _set_config_options.side_effect = SetPBMConfigError + self.harness.charm.app_peer_data["db_initialised"] = "True" + + # triggering s3 event with correct fields + mock_s3_info = mock.Mock() + mock_s3_info.return_value = {"access-key": "noneya", "secret-key": "business"} + self.harness.charm.backups.s3_client.get_s3_connection_info = mock_s3_info + relation_id = self.harness.add_relation(RELATION_NAME, "s3-integrator") + self.harness.add_relation_unit(relation_id, "s3-integrator/0") + self.harness.update_relation_data( + relation_id, + "s3-integrator/0", + {"bucket": "hat"}, + ) + + self.assertTrue(isinstance(self.harness.charm.unit.status, BlockedStatus)) + + @patch_network_get(private_address="1.1.1.1") + @patch("charm.MongoDBBackups._set_config_options") + @patch("charm.MongoDBBackups._resync_config_options") + @patch("ops.framework.EventBase.defer") + @patch("charm.MongoDBCharm.get_backup_service") + @patch("charm.MongoDBBackups._get_pbm_status") + def test_s3_credentials_config_error( + self, pbm_status, service, defer, resync, _set_config_options + ): + """Test charm defers when more time is needed to sync pbm.""" + container = self.harness.model.unit.get_container("mongod") + self.harness.set_can_connect(container, True) + self.harness.charm.app_peer_data["db_initialised"] = "True" + service.return_value = "pbm" + pbm_status.return_value = ActiveStatus() + resync.side_effect = SetPBMConfigError + + # triggering s3 event with correct fields + mock_s3_info = mock.Mock() + mock_s3_info.return_value = {"access-key": "noneya", "secret-key": "business"} + self.harness.charm.backups.s3_client.get_s3_connection_info = mock_s3_info + relation_id = self.harness.add_relation(RELATION_NAME, "s3-integrator") + self.harness.add_relation_unit(relation_id, "s3-integrator/0") + self.harness.update_relation_data( + relation_id, + "s3-integrator/0", + {"bucket": "hat"}, + ) + self.assertTrue(isinstance(self.harness.charm.unit.status, BlockedStatus)) + + @patch_network_get(private_address="1.1.1.1") + @patch("charm.MongoDBBackups._set_config_options") + @patch("charm.MongoDBBackups._resync_config_options") + @patch("ops.framework.EventBase.defer") + @patch("charm.MongoDBCharm.get_backup_service") + @patch("charm.MongoDBBackups._get_pbm_status") + def test_s3_credentials_syncing(self, pbm_status, service, defer, resync, _set_config_options): + """Test charm defers when more time is needed to sync pbm credentials.""" + container = self.harness.model.unit.get_container("mongod") + self.harness.set_can_connect(container, True) + self.harness.charm.app_peer_data["db_initialised"] = "True" + service.return_value = "pbm" + resync.side_effect = ResyncError + + # triggering s3 event with correct fields + mock_s3_info = mock.Mock() + mock_s3_info.return_value = {"access-key": "noneya", "secret-key": "business"} + self.harness.charm.backups.s3_client.get_s3_connection_info = mock_s3_info + relation_id = self.harness.add_relation(RELATION_NAME, "s3-integrator") + self.harness.add_relation_unit(relation_id, "s3-integrator/0") + self.harness.update_relation_data( + relation_id, + "s3-integrator/0", + {"bucket": "hat"}, + ) + + defer.assert_called() + self.assertTrue(isinstance(self.harness.charm.unit.status, WaitingStatus)) + + @patch_network_get(private_address="1.1.1.1") + @patch("charm.MongoDBBackups._set_config_options") + @patch("charm.MongoDBBackups._resync_config_options") + @patch("ops.framework.EventBase.defer") + @patch("charm.MongoDBCharm.get_backup_service") + @patch("charm.MongoDBBackups._get_pbm_status") + def test_s3_credentials_pbm_busy( + self, pbm_status, service, defer, resync, _set_config_options + ): + """Test charm defers when more time is needed to sync pbm.""" + container = self.harness.model.unit.get_container("mongod") + self.harness.set_can_connect(container, True) + self.harness.charm.app_peer_data["db_initialised"] = "True" + service.return_value = "pbm" + + resync.side_effect = PBMBusyError + + # triggering s3 event with correct fields + mock_s3_info = mock.Mock() + mock_s3_info.return_value = {"access-key": "noneya", "secret-key": "business"} + self.harness.charm.backups.s3_client.get_s3_connection_info = mock_s3_info + relation_id = self.harness.add_relation(RELATION_NAME, "s3-integrator") + self.harness.add_relation_unit(relation_id, "s3-integrator/0") + self.harness.update_relation_data( + relation_id, + "s3-integrator/0", + {"bucket": "hat"}, + ) + + defer.assert_called() + self.assertTrue(isinstance(self.harness.charm.unit.status, WaitingStatus)) + + @patch_network_get(private_address="1.1.1.1") + @patch("charm.MongoDBBackups._set_config_options") + @patch("charm.MongoDBBackups._resync_config_options") + @patch("ops.framework.EventBase.defer") + @patch("charm.MongoDBCharm.get_backup_service") + @patch("charm.MongoDBCharm.run_pbm_command") + def test_s3_credentials_pbm_error( + self, pbm_command, service, defer, resync, _set_config_options + ): + """Test charm defers when more time is needed to sync pbm.""" + container = self.harness.model.unit.get_container("mongod") + self.harness.set_can_connect(container, True) + service.return_value = "pbm" + self.harness.charm.app_peer_data["db_initialised"] = "True" + resync.side_effect = ExecError( + command=["/usr/bin/pbm status"], exit_code=1, stdout="status code: 403", stderr="" + ) + pbm_command.side_effect = ExecError( + command=["/usr/bin/pbm status"], exit_code=1, stdout="status code: 403", stderr="" + ) + + # triggering s3 event with correct fields + mock_s3_info = mock.Mock() + mock_s3_info.return_value = {"access-key": "noneya", "secret-key": "business"} + self.harness.charm.backups.s3_client.get_s3_connection_info = mock_s3_info + relation_id = self.harness.add_relation(RELATION_NAME, "s3-integrator") + self.harness.add_relation_unit(relation_id, "s3-integrator/0") + self.harness.update_relation_data( + relation_id, + "s3-integrator/0", + {"bucket": "hat"}, + ) + + defer.assert_not_called() + self.assertTrue(isinstance(self.harness.charm.unit.status, BlockedStatus)) + + @unittest.skip("Not implemented yet") + @patch("charm.MongoDBBackups._get_pbm_status") + @patch("charm.snap.SnapCache") + def test_backup_failed(self, snap, pbm_status, output): + """Verifies backup is fails if the pbm command failed.""" + mock_pbm_snap = mock.Mock() + mock_pbm_snap.present = True + snap.return_value = {"charmed-mongodb": mock_pbm_snap} + + action_event = mock.Mock() + action_event.params = {} + pbm_status.return_value = ActiveStatus("") + + output.side_effect = CalledProcessError(cmd="charmed-mongodb.pbm backup", returncode=42) + + self.harness.add_relation(RELATION_NAME, "s3-integrator") + self.harness.charm.backups._on_create_backup_action(action_event) + + action_event.fail.assert_called() + + @unittest.skip("Not implemented yet") + def test_backup_list_without_rel(self): + """Verifies no backup lists are attempted without s3 relation.""" + action_event = mock.Mock() + action_event.params = {} + + self.harness.charm.backups._on_list_backups_action(action_event) + action_event.fail.assert_called() + + @unittest.skip("Not implemented yet") + @patch("charm.snap.SnapCache") + def test_backup_list_syncing(self, snap, output): + """Verifies backup list is deferred if more time is needed to resync.""" + mock_pbm_snap = mock.Mock() + mock_pbm_snap.present = True + snap.return_value = {"charmed-mongodb": mock_pbm_snap} + + action_event = mock.Mock() + action_event.params = {} + output.return_value = b"Currently running:\n====\nResync op" + + self.harness.add_relation(RELATION_NAME, "s3-integrator") + self.harness.charm.backups._on_list_backups_action(action_event) + + action_event.defer.assert_called() + + @unittest.skip("Not implemented yet") + @patch("charm.snap.SnapCache") + def test_backup_list_wrong_cred(self, snap, output): + """Verifies backup list fails with wrong credentials.""" + mock_pbm_snap = mock.Mock() + mock_pbm_snap.present = True + snap.return_value = {"charmed-mongodb": mock_pbm_snap} + + action_event = mock.Mock() + action_event.params = {} + output.side_effect = CalledProcessError( + cmd="charmed-mongodb.pbm status", returncode=403, output=b"status code: 403" + ) + + self.harness.add_relation(RELATION_NAME, "s3-integrator") + self.harness.charm.backups._on_list_backups_action(action_event) + action_event.fail.assert_called() + + @unittest.skip("Not implemented yet") + @patch("charm.MongoDBBackups._get_pbm_status") + @patch("charm.snap.SnapCache") + def test_backup_list_failed(self, snap, pbm_status, output): + """Verifies backup list fails if the pbm command fails.""" + mock_pbm_snap = mock.Mock() + mock_pbm_snap.present = True + snap.return_value = {"charmed-mongodb": mock_pbm_snap} + + action_event = mock.Mock() + action_event.params = {} + pbm_status.return_value = ActiveStatus("") + + output.side_effect = CalledProcessError(cmd="charmed-mongodb.pbm list", returncode=42) + + self.harness.add_relation(RELATION_NAME, "s3-integrator") + self.harness.charm.backups._on_list_backups_action(action_event) + + action_event.fail.assert_called() + + @unittest.skip("Not implemented yet") + def test_generate_backup_list_output(self, check_output): + """Tests correct formation of backup list output. + + Specifically the spacing of the backups, the header, the backup order, and the backup + contents. + """ + # case 1: running backup is listed in error state + with open("tests/unit/data/pbm_status_duplicate_running.txt") as f: + output_contents = f.readlines() + output_contents = "".join(output_contents) + + check_output.return_value = output_contents.encode("utf-8") + formatted_output = self.harness.charm.backups._generate_backup_list_output() + formatted_output = formatted_output.split("\n") + header = formatted_output[0] + self.assertEqual(header, "backup-id | backup-type | backup-status") + divider = formatted_output[1] + self.assertEqual(divider, "-" * len(header)) + eariest_backup = formatted_output[2] + self.assertEqual( + eariest_backup, + "1900-02-14T13:59:14Z | physical | failed: internet not invented yet", + ) + failed_backup = formatted_output[3] + self.assertEqual(failed_backup, "2000-02-14T14:09:43Z | logical | finished") + inprogress_backup = formatted_output[4] + self.assertEqual(inprogress_backup, "2023-02-14T17:06:38Z | logical | in progress") + + # case 2: running backup is not listed in error state + with open("tests/unit/data/pbm_status.txt") as f: + output_contents = f.readlines() + output_contents = "".join(output_contents) + + check_output.return_value = output_contents.encode("utf-8") + formatted_output = self.harness.charm.backups._generate_backup_list_output() + formatted_output = formatted_output.split("\n") + header = formatted_output[0] + self.assertEqual(header, "backup-id | backup-type | backup-status") + divider = formatted_output[1] + self.assertEqual( + divider, "-" * len("backup-id | backup-type | backup-status") + ) + eariest_backup = formatted_output[2] + self.assertEqual( + eariest_backup, + "1900-02-14T13:59:14Z | physical | failed: internet not invented yet", + ) + failed_backup = formatted_output[3] + self.assertEqual(failed_backup, "2000-02-14T14:09:43Z | logical | finished") + inprogress_backup = formatted_output[4] + self.assertEqual(inprogress_backup, "2023-02-14T17:06:38Z | logical | in progress") + + @unittest.skip("Not implemented yet") + def test_restore_without_rel(self): + """Verifies no restores are attempted without s3 relation.""" + action_event = mock.Mock() + action_event.params = {"backup-id": "back-me-up"} + + self.harness.charm.backups._on_restore_action(action_event) + action_event.fail.assert_called() + + @unittest.skip("Not implemented yet") + @patch("charm.snap.SnapCache") + def test_restore_syncing(self, snap, output): + """Verifies restore is deferred if more time is needed to resync.""" + mock_pbm_snap = mock.Mock() + mock_pbm_snap.present = True + snap.return_value = {"charmed-mongodb": mock_pbm_snap} + + action_event = mock.Mock() + action_event.params = {"backup-id": "back-me-up"} + output.return_value = b"Currently running:\n====\nResync op" + + self.harness.add_relation(RELATION_NAME, "s3-integrator") + self.harness.charm.backups._on_restore_action(action_event) + + action_event.defer.assert_called() + + @unittest.skip("Not implemented yet") + @patch("charm.snap.SnapCache") + def test_restore_running_backup(self, snap, output): + """Verifies restore is fails if another backup is already running.""" + mock_pbm_snap = mock.Mock() + mock_pbm_snap.present = True + snap.return_value = {"charmed-mongodb": mock_pbm_snap} + + action_event = mock.Mock() + action_event.params = {"backup-id": "back-me-up"} + output.return_value = b"Currently running:\n====\nSnapshot backup" + + self.harness.add_relation(RELATION_NAME, "s3-integrator") + self.harness.charm.backups._on_restore_action(action_event) + + action_event.fail.assert_called() + + @unittest.skip("Not implemented yet") + @patch("charm.snap.SnapCache") + def test_restore_wrong_cred(self, snap, output): + """Verifies restore is fails if the credentials are incorrect.""" + mock_pbm_snap = mock.Mock() + mock_pbm_snap.present = True + snap.return_value = {"charmed-mongodb": mock_pbm_snap} + + action_event = mock.Mock() + action_event.params = {"backup-id": "back-me-up"} + output.side_effect = CalledProcessError( + cmd="charmed-mongodb.pbm status", returncode=403, output=b"status code: 403" + ) + + self.harness.add_relation(RELATION_NAME, "s3-integrator") + self.harness.charm.backups._on_restore_action(action_event) + action_event.fail.assert_called() + + @unittest.skip("Not implemented yet") + @patch("charm.MongoDBBackups._get_pbm_status") + @patch("charm.snap.SnapCache") + def test_restore_failed(self, snap, pbm_status, output): + """Verifies restore is fails if the pbm command failed.""" + mock_pbm_snap = mock.Mock() + mock_pbm_snap.present = True + snap.return_value = {"charmed-mongodb": mock_pbm_snap} + + action_event = mock.Mock() + action_event.params = {"backup-id": "back-me-up"} + pbm_status.return_value = ActiveStatus("") + + output.side_effect = CalledProcessError( + cmd="charmed-mongodb.pbm backup", returncode=42, output=b"failed" + ) + + self.harness.add_relation(RELATION_NAME, "s3-integrator") + self.harness.charm.backups._on_restore_action(action_event) + + action_event.fail.assert_called() + + @unittest.skip("Not implemented yet") + def test_remap_replicaset_no_backup(self, check_output): + """Test verifies that no remapping is given if the backup_id doesn't exist.""" + with open("tests/unit/data/pbm_status.txt") as f: + output_contents = f.readlines() + output_contents = "".join(output_contents) + + check_output.return_value = output_contents.encode("utf-8") + remap = self.harness.charm.backups._remap_replicaset("this-id-doesnt-exist") + self.assertEqual(remap, "") + + @unittest.skip("Not implemented yet") + def test_remap_replicaset_no_remap_necessary(self, check_output): + """Test verifies that no remapping is given if no remapping is necessary.""" + with open("tests/unit/data/pbm_status_error_remap.txt") as f: + output_contents = f.readlines() + output_contents = "".join(output_contents) + + check_output.return_value = output_contents.encode("utf-8") + + # first case is that the backup is not in the error state + remap = self.harness.charm.backups._remap_replicaset("2000-02-14T14:09:43Z") + self.assertEqual(remap, "") + + # second case is that the backup has an error not related to remapping + remap = self.harness.charm.backups._remap_replicaset("1900-02-14T13:59:14Z") + self.assertEqual(remap, "") + + # third case is that the backup has two errors one related to remapping and another + # related to something else + remap = self.harness.charm.backups._remap_replicaset("2001-02-14T13:59:14Z") + self.assertEqual(remap, "") + + @unittest.skip("Not implemented yet") + def test_remap_replicaset_remap_necessary(self, check_output): + """Test verifies that remapping is provided and correct when necessary.""" + with open("tests/unit/data/pbm_status_error_remap.txt") as f: + output_contents = f.readlines() + output_contents = "".join(output_contents) + + check_output.return_value = output_contents.encode("utf-8") + self.harness.charm.app.name = "current-app-name" + + # first case is that the backup is not in the error state + remap = self.harness.charm.backups._remap_replicaset("2002-02-14T13:59:14Z") + self.assertEqual(remap, "--replset-remapping current-app-name=old-cluster-name") + + @unittest.skip("Not implemented yet") + @patch("charm.snap.SnapCache") + def test_get_pbm_status_backup(self, snap, output): + """Tests that when pbm running a backup that pbm is in maintenance state.""" + mock_pbm_snap = mock.Mock() + mock_pbm_snap.present = True + snap.return_value = {"charmed-mongodb": mock_pbm_snap} + output.return_value = b"Currently running:\n====\nSnapshot backup" + self.assertTrue( + isinstance(self.harness.charm.backups._get_pbm_status(), MaintenanceStatus) + ) + + @unittest.skip("Not implemented yet") + def test_current_pbm_op(self): + """Test if _current_pbm_op can identify the operation pbm is running.""" + action = self.harness.charm.backups._current_pbm_op( + "nothing\nCurrently running:\n====\nexpected action" + ) + self.assertEqual(action, "expected action") + + no_action = self.harness.charm.backups._current_pbm_op("pbm not started") + self.assertEqual(no_action, "") + + @unittest.skip("Not implemented yet.") + @patch("charm.snap.SnapCache") + def test_backup_syncing(self, snap, output): + """Verifies backup is deferred if more time is needed to resync.""" + mock_pbm_snap = mock.Mock() + mock_pbm_snap.present = True + snap.return_value = {"charmed-mongodb": mock_pbm_snap} + + action_event = mock.Mock() + action_event.params = {} + output.return_value = b"Currently running:\n====\nResync op" + + self.harness.add_relation(RELATION_NAME, "s3-integrator") + self.harness.charm.backups._on_create_backup_action(action_event) + + action_event.defer.assert_called() + + @unittest.skip("Not implemented yet.") + @patch("charm.snap.SnapCache") + def test_backup_running_backup(self, snap, output): + """Verifies backup is fails if another backup is already running.""" + mock_pbm_snap = mock.Mock() + mock_pbm_snap.present = True + snap.return_value = {"charmed-mongodb": mock_pbm_snap} + + action_event = mock.Mock() + action_event.params = {} + output.return_value = b"Currently running:\n====\nSnapshot backup" + + self.harness.add_relation(RELATION_NAME, "s3-integrator") + self.harness.charm.backups._on_create_backup_action(action_event) + + action_event.fail.assert_called() + + @unittest.skip("Not implemented yet") + @patch("charm.MongoDBCharm.run_pbm_command") + def test_backup_wrong_cred(self, output): + """Verifies backup is fails if the credentials are incorrect.""" + container = self.harness.model.unit.get_container("mongod") + self.harness.set_can_connect(container, True) + action_event = mock.Mock() + action_event.params = {} + output.side_effect = ExecError( + command=["/usr/bin/pbm config --set this_key=doesnt_exist"], + exit_code=403, + stdout="status code: 403", + stderr="", + ) + + self.harness.add_relation(RELATION_NAME, "s3-integrator") + self.harness.charm.backups._on_create_backup_action(action_event) + action_event.fail.assert_called() diff --git a/tox.ini b/tox.ini index 813cfaa9b..299189109 100644 --- a/tox.ini +++ b/tox.ini @@ -131,7 +131,19 @@ commands_pre = poetry run pip install juju==2.9.44.0 commands = poetry run pytest -v --tb native --log-cli-level=INFO -s --durations=0 {posargs} {[vars]tests_path}/integration/test_teardown.py - + +[testenv:backup-integration] +description = Run backup integration tests +pass_env = + {[testenv]pass_env} + CI + CI_PACKED_CHARMS +commands_pre = + poetry install --with integration + poetry run pip install juju==2.9.42.1 +commands = + poetry run pytest -v --tb native --log-cli-level=INFO -s --durations=0 {posargs} {[vars]tests_path}/integration/backup_tests/test_backups.py + [testenv:cleanup_chaos_mesh] description = Cleanup chaos mesh commands =