From 0c0086a55cd1cf1e2c229ab4b3a0169cd8b5cbe7 Mon Sep 17 00:00:00 2001 From: Daniela Plascencia Date: Tue, 13 Feb 2024 15:46:49 +0100 Subject: [PATCH] fix: expose metrics port using kubernetes_service_patch lib (#151) (#152) * fix: expose metrics port using kubernetes_service_patch lib This commit ensures the metrics port is exposed in the Kubernetes Service for the training-operator using the kubernetes_service_patch lib. This makes the metrics endpoint reachable from external prometheus scraper. This commit also changes the unit tests slightly to adapt to the added service patcher. Part of canonical/bundle-kubeflow#564 * tests: add an assertion for checking unit is available The test_prometheus_grafana_integration test case was doing queries to prometheus and checking the request returned successfully and that the application name and model was listed correctly. To make this test case more accurately, we can add an assertion that also checks that the unit is available, this way we avoid issues like the one described in canonical/bundle-kubeflow#564. Part of canonical/bundle-kubeflow#564 --- .../v1/kubernetes_service_patch.py | 341 ++++++++++++++++++ src/charm.py | 8 + tests/integration/test_charm.py | 6 + tests/unit/test_charm.py | 8 + 4 files changed, 363 insertions(+) create mode 100644 lib/charms/observability_libs/v1/kubernetes_service_patch.py diff --git a/lib/charms/observability_libs/v1/kubernetes_service_patch.py b/lib/charms/observability_libs/v1/kubernetes_service_patch.py new file mode 100644 index 0000000..2cce729 --- /dev/null +++ b/lib/charms/observability_libs/v1/kubernetes_service_patch.py @@ -0,0 +1,341 @@ +# Copyright 2021 Canonical Ltd. +# See LICENSE file for licensing details. + +"""# KubernetesServicePatch Library. + +This library is designed to enable developers to more simply patch the Kubernetes Service created +by Juju during the deployment of a sidecar charm. When sidecar charms are deployed, Juju creates a +service named after the application in the namespace (named after the Juju model). This service by +default contains a "placeholder" port, which is 65536/TCP. + +When modifying the default set of resources managed by Juju, one must consider the lifecycle of the +charm. In this case, any modifications to the default service (created during deployment), will be +overwritten during a charm upgrade. + +When initialised, this library binds a handler to the parent charm's `install` and `upgrade_charm` +events which applies the patch to the cluster. This should ensure that the service ports are +correct throughout the charm's life. + +The constructor simply takes a reference to the parent charm, and a list of +[`lightkube`](https://github.com/gtsystem/lightkube) ServicePorts that each define a port for the +service. For information regarding the `lightkube` `ServicePort` model, please visit the +`lightkube` [docs](https://gtsystem.github.io/lightkube-models/1.23/models/core_v1/#serviceport). + +Optionally, a name of the service (in case service name needs to be patched as well), labels, +selectors, and annotations can be provided as keyword arguments. + +## Getting Started + +To get started using the library, you just need to fetch the library using `charmcraft`. **Note +that you also need to add `lightkube` and `lightkube-models` to your charm's `requirements.txt`.** + +```shell +cd some-charm +charmcraft fetch-lib charms.observability_libs.v1.kubernetes_service_patch +cat << EOF >> requirements.txt +lightkube +lightkube-models +EOF +``` + +Then, to initialise the library: + +For `ClusterIP` services: + +```python +# ... +from charms.observability_libs.v1.kubernetes_service_patch import KubernetesServicePatch +from lightkube.models.core_v1 import ServicePort + +class SomeCharm(CharmBase): + def __init__(self, *args): + # ... + port = ServicePort(443, name=f"{self.app.name}") + self.service_patcher = KubernetesServicePatch(self, [port]) + # ... +``` + +For `LoadBalancer`/`NodePort` services: + +```python +# ... +from charms.observability_libs.v1.kubernetes_service_patch import KubernetesServicePatch +from lightkube.models.core_v1 import ServicePort + +class SomeCharm(CharmBase): + def __init__(self, *args): + # ... + port = ServicePort(443, name=f"{self.app.name}", targetPort=443, nodePort=30666) + self.service_patcher = KubernetesServicePatch( + self, [port], "LoadBalancer" + ) + # ... +``` + +Port protocols can also be specified. Valid protocols are `"TCP"`, `"UDP"`, and `"SCTP"` + +```python +# ... +from charms.observability_libs.v1.kubernetes_service_patch import KubernetesServicePatch +from lightkube.models.core_v1 import ServicePort + +class SomeCharm(CharmBase): + def __init__(self, *args): + # ... + tcp = ServicePort(443, name=f"{self.app.name}-tcp", protocol="TCP") + udp = ServicePort(443, name=f"{self.app.name}-udp", protocol="UDP") + sctp = ServicePort(443, name=f"{self.app.name}-sctp", protocol="SCTP") + self.service_patcher = KubernetesServicePatch(self, [tcp, udp, sctp]) + # ... +``` + +Bound with custom events by providing `refresh_event` argument: +For example, you would like to have a configurable port in your charm and want to apply +service patch every time charm config is changed. + +```python +from charms.observability_libs.v1.kubernetes_service_patch import KubernetesServicePatch +from lightkube.models.core_v1 import ServicePort + +class SomeCharm(CharmBase): + def __init__(self, *args): + # ... + port = ServicePort(int(self.config["charm-config-port"]), name=f"{self.app.name}") + self.service_patcher = KubernetesServicePatch( + self, + [port], + refresh_event=self.on.config_changed + ) + # ... +``` + +Additionally, you may wish to use mocks in your charm's unit testing to ensure that the library +does not try to make any API calls, or open any files during testing that are unlikely to be +present, and could break your tests. The easiest way to do this is during your test `setUp`: + +```python +# ... + +@patch("charm.KubernetesServicePatch", lambda x, y: None) +def setUp(self, *unused): + self.harness = Harness(SomeCharm) + # ... +``` +""" + +import logging +from types import MethodType +from typing import List, Literal, Optional, Union + +from lightkube import ApiError, Client # pyright: ignore +from lightkube.core import exceptions +from lightkube.models.core_v1 import ServicePort, ServiceSpec +from lightkube.models.meta_v1 import ObjectMeta +from lightkube.resources.core_v1 import Service +from lightkube.types import PatchType +from ops.charm import CharmBase +from ops.framework import BoundEvent, Object + +logger = logging.getLogger(__name__) + +# The unique Charmhub library identifier, never change it +LIBID = "0042f86d0a874435adef581806cddbbb" + +# Increment this major API version when introducing breaking changes +LIBAPI = 1 + +# Increment this PATCH version before using `charmcraft publish-lib` or reset +# to 0 if you are raising the major API version +LIBPATCH = 9 + +ServiceType = Literal["ClusterIP", "LoadBalancer"] + + +class KubernetesServicePatch(Object): + """A utility for patching the Kubernetes service set up by Juju.""" + + def __init__( + self, + charm: CharmBase, + ports: List[ServicePort], + service_name: Optional[str] = None, + service_type: ServiceType = "ClusterIP", + additional_labels: Optional[dict] = None, + additional_selectors: Optional[dict] = None, + additional_annotations: Optional[dict] = None, + *, + refresh_event: Optional[Union[BoundEvent, List[BoundEvent]]] = None, + ): + """Constructor for KubernetesServicePatch. + + Args: + charm: the charm that is instantiating the library. + ports: a list of ServicePorts + service_name: allows setting custom name to the patched service. If none given, + application name will be used. + service_type: desired type of K8s service. Default value is in line with ServiceSpec's + default value. + additional_labels: Labels to be added to the kubernetes service (by default only + "app.kubernetes.io/name" is set to the service name) + additional_selectors: Selectors to be added to the kubernetes service (by default only + "app.kubernetes.io/name" is set to the service name) + additional_annotations: Annotations to be added to the kubernetes service. + refresh_event: an optional bound event or list of bound events which + will be observed to re-apply the patch (e.g. on port change). + The `install` and `upgrade-charm` events would be observed regardless. + """ + super().__init__(charm, "kubernetes-service-patch") + self.charm = charm + self.service_name = service_name if service_name else self._app + self.service = self._service_object( + ports, + service_name, + service_type, + additional_labels, + additional_selectors, + additional_annotations, + ) + + # Make mypy type checking happy that self._patch is a method + assert isinstance(self._patch, MethodType) + # Ensure this patch is applied during the 'install' and 'upgrade-charm' events + self.framework.observe(charm.on.install, self._patch) + self.framework.observe(charm.on.upgrade_charm, self._patch) + self.framework.observe(charm.on.update_status, self._patch) + + # apply user defined events + if refresh_event: + if not isinstance(refresh_event, list): + refresh_event = [refresh_event] + + for evt in refresh_event: + self.framework.observe(evt, self._patch) + + def _service_object( + self, + ports: List[ServicePort], + service_name: Optional[str] = None, + service_type: ServiceType = "ClusterIP", + additional_labels: Optional[dict] = None, + additional_selectors: Optional[dict] = None, + additional_annotations: Optional[dict] = None, + ) -> Service: + """Creates a valid Service representation. + + Args: + ports: a list of ServicePorts + service_name: allows setting custom name to the patched service. If none given, + application name will be used. + service_type: desired type of K8s service. Default value is in line with ServiceSpec's + default value. + additional_labels: Labels to be added to the kubernetes service (by default only + "app.kubernetes.io/name" is set to the service name) + additional_selectors: Selectors to be added to the kubernetes service (by default only + "app.kubernetes.io/name" is set to the service name) + additional_annotations: Annotations to be added to the kubernetes service. + + Returns: + Service: A valid representation of a Kubernetes Service with the correct ports. + """ + if not service_name: + service_name = self._app + labels = {"app.kubernetes.io/name": self._app} + if additional_labels: + labels.update(additional_labels) + selector = {"app.kubernetes.io/name": self._app} + if additional_selectors: + selector.update(additional_selectors) + return Service( + apiVersion="v1", + kind="Service", + metadata=ObjectMeta( + namespace=self._namespace, + name=service_name, + labels=labels, + annotations=additional_annotations, # type: ignore[arg-type] + ), + spec=ServiceSpec( + selector=selector, + ports=ports, + type=service_type, + ), + ) + + def _patch(self, _) -> None: + """Patch the Kubernetes service created by Juju to map the correct port. + + Raises: + PatchFailed: if patching fails due to lack of permissions, or otherwise. + """ + try: + client = Client() # pyright: ignore + except exceptions.ConfigError as e: + logger.warning("Error creating k8s client: %s", e) + return + + try: + if self._is_patched(client): + return + if self.service_name != self._app: + self._delete_and_create_service(client) + client.patch(Service, self.service_name, self.service, patch_type=PatchType.MERGE) + except ApiError as e: + if e.status.code == 403: + logger.error("Kubernetes service patch failed: `juju trust` this application.") + else: + logger.error("Kubernetes service patch failed: %s", str(e)) + else: + logger.info("Kubernetes service '%s' patched successfully", self._app) + + def _delete_and_create_service(self, client: Client): + service = client.get(Service, self._app, namespace=self._namespace) + service.metadata.name = self.service_name # type: ignore[attr-defined] + service.metadata.resourceVersion = service.metadata.uid = None # type: ignore[attr-defined] # noqa: E501 + client.delete(Service, self._app, namespace=self._namespace) + client.create(service) + + def is_patched(self) -> bool: + """Reports if the service patch has been applied. + + Returns: + bool: A boolean indicating if the service patch has been applied. + """ + client = Client() # pyright: ignore + return self._is_patched(client) + + def _is_patched(self, client: Client) -> bool: + # Get the relevant service from the cluster + try: + service = client.get(Service, name=self.service_name, namespace=self._namespace) + except ApiError as e: + if e.status.code == 404 and self.service_name != self._app: + return False + logger.error("Kubernetes service get failed: %s", str(e)) + raise + + # Construct a list of expected ports, should the patch be applied + expected_ports = [(p.port, p.targetPort) for p in self.service.spec.ports] # type: ignore[attr-defined] + # Construct a list in the same manner, using the fetched service + fetched_ports = [ + (p.port, p.targetPort) for p in service.spec.ports # type: ignore[attr-defined] + ] # noqa: E501 + return expected_ports == fetched_ports + + @property + def _app(self) -> str: + """Name of the current Juju application. + + Returns: + str: A string containing the name of the current Juju application. + """ + return self.charm.app.name + + @property + def _namespace(self) -> str: + """The Kubernetes namespace we're running in. + + Returns: + str: A string containing the name of the current Kubernetes namespace. + """ + with open("/var/run/secrets/kubernetes.io/serviceaccount/namespace", "r") as f: + return f.read().strip() diff --git a/src/charm.py b/src/charm.py index 8a40643..463ef3a 100755 --- a/src/charm.py +++ b/src/charm.py @@ -9,9 +9,11 @@ from charmed_kubeflow_chisme.kubernetes import KubernetesResourceHandler from charmed_kubeflow_chisme.lightkube.batch import delete_many from charms.grafana_k8s.v0.grafana_dashboard import GrafanaDashboardProvider +from charms.observability_libs.v1.kubernetes_service_patch import KubernetesServicePatch from charms.prometheus_k8s.v0.prometheus_scrape import MetricsEndpointProvider from lightkube import ApiError from lightkube.generic_resource import load_in_cluster_generic_resources +from lightkube.models.core_v1 import ServicePort from ops.charm import CharmBase from ops.main import main from ops.model import ActiveStatus, MaintenanceStatus, WaitingStatus @@ -36,6 +38,12 @@ def __init__(self, *args): super().__init__(*args) self.logger = logging.getLogger(__name__) + metrics_port = ServicePort(int(METRICS_PORT), name="metrics-port") + self.service_patcher = KubernetesServicePatch( + self, + [metrics_port], + service_name=f"{self.model.app.name}", + ) self.prometheus_provider = MetricsEndpointProvider( charm=self, diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index 4fde947..af0b12e 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -191,6 +191,12 @@ async def test_prometheus_grafana_integration(ops_test: OpsTest): logger.info(f"Response status is {response_status}") assert response_status == "success" + # Assert the unit is available by checking the query result + # The data is presented as a list [1707357912.349, '1'], where the + # first value is a timestamp and the second value is the state of the unit + # 1 means available, 0 means unavailable + assert response["data"]["result"][0]["value"][1] == "1" + response_metric = response["data"]["result"][0]["metric"] assert response_metric["juju_application"] == APP_NAME assert response_metric["juju_model"] == ops_test.model_name diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 4f28b69..69db39a 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -53,6 +53,7 @@ class TestCharm: @patch("charm.TrainingOperatorCharm.k8s_resource_handler") @patch("charm.TrainingOperatorCharm.crd_resource_handler") + @patch("charm.KubernetesServicePatch", lambda *_, **__: None) def test_not_leader( self, _: MagicMock, # k8s_resource_handler @@ -66,6 +67,7 @@ def test_not_leader( @patch("charm.TrainingOperatorCharm.k8s_resource_handler") @patch("charm.TrainingOperatorCharm.crd_resource_handler") + @patch("charm.KubernetesServicePatch", lambda *_, **__: None) def test_no_relation( self, _: MagicMock, # k8s_resource_handler @@ -89,6 +91,7 @@ def test_no_relation( @patch("charm.TrainingOperatorCharm.k8s_resource_handler") @patch("charm.TrainingOperatorCharm.crd_resource_handler") + @patch("charm.KubernetesServicePatch", lambda *_, **__: None) def test_pebble_layer( self, _: MagicMock, # k8s_resource_handler @@ -111,6 +114,7 @@ def test_pebble_layer( @patch("charm.TrainingOperatorCharm.k8s_resource_handler") @patch("charm.TrainingOperatorCharm.crd_resource_handler") + @patch("charm.KubernetesServicePatch", lambda *_, **__: None) def test_apply_k8s_resources_success( self, k8s_resource_handler: MagicMock, @@ -124,6 +128,7 @@ def test_apply_k8s_resources_success( k8s_resource_handler.apply.assert_called() assert isinstance(harness.charm.model.unit.status, MaintenanceStatus) + @patch("charm.KubernetesServicePatch", lambda *_, **__: None) @patch("charm.TrainingOperatorCharm.k8s_resource_handler") @patch("charm.TrainingOperatorCharm.crd_resource_handler") @patch("charm.ApiError", _FakeApiError) @@ -146,6 +151,7 @@ def test_blocked_on_appierror_on_k8s_resource_handler( ), ) + @patch("charm.KubernetesServicePatch", lambda *_, **__: None) @patch("charm.TrainingOperatorCharm.k8s_resource_handler") @patch("charm.TrainingOperatorCharm.crd_resource_handler") @patch("charm.ApiError", _FakeApiError) @@ -171,6 +177,7 @@ def test_blocked_on_appierror_on_crd_resource_handler( ), ) + @patch("charm.KubernetesServicePatch", lambda *_, **__: None) @patch("charm.TrainingOperatorCharm.k8s_resource_handler") @patch("charm.TrainingOperatorCharm.crd_resource_handler") @patch("charm.delete_many") @@ -187,6 +194,7 @@ def test_on_remove_success( crd_resource_handler.assert_has_calls([call.render_manifests()]) delete_many.assert_called() + @patch("charm.KubernetesServicePatch", lambda *_, **__: None) @patch("charm.TrainingOperatorCharm.k8s_resource_handler") @patch("charm.TrainingOperatorCharm.crd_resource_handler") @patch("charm.delete_many")