From 96472f276ed21f921484c937446523087d250fe4 Mon Sep 17 00:00:00 2001 From: Avi-Robusta <97387909+Avi-Robusta@users.noreply.github.com> Date: Wed, 24 Jul 2024 10:28:44 +0300 Subject: [PATCH] [MAIN-1863] - avg utilization of mem/cpu (#1502) * added avg_cpu and memory for the past hour for nodes * db changes * made queries overridable, some accounts might need to configure this * updated queries --------- Co-authored-by: arik --- .../core/sinks/robusta/dal/supabase_dal.py | 8 ++-- ...ecker.py => prometheus_discovery_utils.py} | 37 +++++++++++++++---- .../core/sinks/robusta/robusta_sink.py | 11 +++--- 3 files changed, 41 insertions(+), 15 deletions(-) rename src/robusta/core/sinks/robusta/{prometheus_health_checker.py => prometheus_discovery_utils.py} (71%) diff --git a/src/robusta/core/sinks/robusta/dal/supabase_dal.py b/src/robusta/core/sinks/robusta/dal/supabase_dal.py index 654e63504..0d2be61d3 100644 --- a/src/robusta/core/sinks/robusta/dal/supabase_dal.py +++ b/src/robusta/core/sinks/robusta/dal/supabase_dal.py @@ -41,7 +41,7 @@ JOBS_TABLE = "Jobs" HELM_RELEASES_TABLE = "HelmReleases" NAMESPACES_TABLE = "Namespaces" -UPDATE_CLUSTER_NODE_COUNT = "update_cluster_node_count" +UPDATE_CLUSTER_NODE_COUNT = "update_cluster_node_count_v2" SCANS_RESULT_TABLE = "ScansResults" SCANS_META_TABLE = "ScansMeta" RESOURCE_EVENTS = "ResourceEvents" @@ -556,12 +556,14 @@ def publish_namespaces(self, namespaces: List[NamespaceInfo]): logging.error(f"Failed to persist namespaces {namespaces} error: {e}") raise - def publish_cluster_nodes(self, node_count: int, pod_count: int): + def publish_cluster_nodes(self, node_count: int, pod_count: int, avg_cpu: Optional[float] = None, avg_mem: Optional[float] = None): data = { "_account_id": self.account_id, "_cluster_id": self.cluster, "_node_count": node_count, - "_pod_count": pod_count, + "_cpu_utilization": avg_cpu, + "_memory_utilization": avg_mem, + "_pod_count": pod_count } try: self.client.rpc(UPDATE_CLUSTER_NODE_COUNT, data).execute() diff --git a/src/robusta/core/sinks/robusta/prometheus_health_checker.py b/src/robusta/core/sinks/robusta/prometheus_discovery_utils.py similarity index 71% rename from src/robusta/core/sinks/robusta/prometheus_health_checker.py rename to src/robusta/core/sinks/robusta/prometheus_discovery_utils.py index d11102932..1d3c9930d 100644 --- a/src/robusta/core/sinks/robusta/prometheus_health_checker.py +++ b/src/robusta/core/sinks/robusta/prometheus_discovery_utils.py @@ -1,17 +1,16 @@ import logging +import os import threading import time +from typing import Optional -from prometrix import PrometheusNotFound, VictoriaMetricsNotFound, PrometheusFlagsConnectionError +from prometrix import PrometheusFlagsConnectionError, PrometheusNotFound, VictoriaMetricsNotFound from pydantic import BaseModel -from robusta.core.exceptions import ( - AlertsManagerNotFound, - NoAlertManagerUrlFound, - NoPrometheusUrlFound, -) +from robusta.core.exceptions import AlertsManagerNotFound, NoAlertManagerUrlFound, NoPrometheusUrlFound from robusta.core.model.base_params import PrometheusParams from robusta.core.model.env_vars import PROMETHEUS_ERROR_LOG_PERIOD_SEC +from robusta.core.playbooks.prometheus_enrichment_utils import run_prometheus_query from robusta.integrations.prometheus.utils import get_prometheus_connect, get_prometheus_flags from robusta.utils.silence_utils import AlertManagerParams, get_alertmanager_silences_connection @@ -22,7 +21,7 @@ class PrometheusHealthStatus(BaseModel): alertmanager: bool = True -class PrometheusHealthChecker: +class PrometheusDiscoveryUtils: def __init__(self, discovery_period_sec: int, global_config: dict): self.status: PrometheusHealthStatus = PrometheusHealthStatus() self.__discovery_period_sec = discovery_period_sec @@ -39,6 +38,30 @@ def __init__(self, discovery_period_sec: int, global_config: dict): def get_status(self) -> PrometheusHealthStatus: return self.status + def get_cluster_avg_cpu(self) -> Optional[float]: + cpu_query = os.getenv("OVERRIDE_CLUSTER_CPU_AVG_QUERY", + f'100 * sum(rate(node_cpu_seconds_total{{mode!="idle"}}[1h])) / sum(machine_cpu_cores{{}})') + return self._get_query_prometheus_value(query=cpu_query) + + def get_cluster_avg_memory(self) -> Optional[float]: + memory_query = os.getenv("OVERRIDE_CLUSTER_MEM_AVG_QUERY", + f'100 * (1 - sum(avg_over_time(node_memory_MemAvailable_bytes{{}}[1h])) / sum(machine_memory_bytes{{}}))') + return self._get_query_prometheus_value(query=memory_query) + + def _get_query_prometheus_value(self, query: str) -> Optional[float]: + try: + global_config = self.__global_config + prometheus_params = PrometheusParams(**global_config) + query_result = run_prometheus_query(prometheus_params=prometheus_params, query=query) + if query_result.result_type == "error" or query_result.vector_result is None: + logging.error(f"PrometheusDiscoveryUtils failed to get prometheus results.") + return + value = query_result.vector_result[0].value.value + return float('%.2f' % float(value)) + except: + logging.exception(f"PrometheusDiscoveryUtils failed to get prometheus results.") + return + def __run_checks(self): while True: try: diff --git a/src/robusta/core/sinks/robusta/robusta_sink.py b/src/robusta/core/sinks/robusta/robusta_sink.py index 8a9a9dcbf..1d4ff4f1f 100644 --- a/src/robusta/core/sinks/robusta/robusta_sink.py +++ b/src/robusta/core/sinks/robusta/robusta_sink.py @@ -32,7 +32,7 @@ from robusta.core.reporting.base import Finding from robusta.core.reporting.consts import ScanState, ScanType from robusta.core.sinks.robusta.discovery_metrics import DiscoveryMetrics -from robusta.core.sinks.robusta.prometheus_health_checker import PrometheusHealthChecker +from robusta.core.sinks.robusta.prometheus_discovery_utils import PrometheusDiscoveryUtils from robusta.core.sinks.robusta.robusta_sink_params import RobustaSinkConfigWrapper, RobustaToken from robusta.core.sinks.robusta.rrm.rrm import RRM from robusta.core.sinks.sink_base import SinkBase @@ -51,7 +51,6 @@ def __init__(self, sink_config: RobustaSinkConfigWrapper, registry): self.token = sink_config.robusta_sink.token self.ttl_hours = sink_config.robusta_sink.ttl_hours self.persist_events = sink_config.robusta_sink.persist_events - robusta_token = RobustaToken(**json.loads(base64.b64decode(self.token))) if self.account_id != robusta_token.account_id: logging.error( @@ -79,7 +78,7 @@ def __init__(self, sink_config: RobustaSinkConfigWrapper, registry): self.__discovery_period_sec = DISCOVERY_PERIOD_SEC global_config = self.get_global_config() - self.__prometheus_health_checker = PrometheusHealthChecker( + self.__prometheus_discovery_util = PrometheusDiscoveryUtils( discovery_period_sec=self.__discovery_period_sec, global_config=global_config ) self.__rrm_checker = RRM(dal=self.dal, cluster=self.cluster_name, account_id=self.account_id) @@ -495,7 +494,7 @@ def __publish_new_helm_releases(self, active_helm_releases: List[HelmRelease]): def __update_cluster_status(self): self.last_send_time = time.time() - prometheus_health_checker_status = self.__prometheus_health_checker.get_status() + prometheus_health_checker_status = self.__prometheus_discovery_util.get_status() activity_stats = ActivityStats( relayConnection=False, alertManagerConnection=prometheus_health_checker_status.alertmanager, @@ -526,7 +525,9 @@ def __update_cluster_status(self): ) self.dal.publish_cluster_status(cluster_status) - self.dal.publish_cluster_nodes(cluster_stats.nodes, self.__pods_running_count) + avg_cpu = self.__prometheus_discovery_util.get_cluster_avg_cpu() + avg_mem = self.__prometheus_discovery_util.get_cluster_avg_memory() + self.dal.publish_cluster_nodes(cluster_stats.nodes, self.__pods_running_count, avg_cpu, avg_mem) except Exception: logging.exception( f"Failed to run periodic update cluster status for {self.sink_name}",