Skip to content

Commit

Permalink
[MAIN-1863] - avg utilization of mem/cpu (#1502)
Browse files Browse the repository at this point in the history
* added avg_cpu and memory for the past hour for nodes

* db changes

* made queries overridable, some accounts might need to configure this

* updated queries

---------

Co-authored-by: arik <alon.arik@gmail.com>
  • Loading branch information
Avi-Robusta and arikalon1 authored Jul 24, 2024
1 parent 7e8804d commit 96472f2
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 15 deletions.
8 changes: 5 additions & 3 deletions src/robusta/core/sinks/robusta/dal/supabase_dal.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
JOBS_TABLE = "Jobs"
HELM_RELEASES_TABLE = "HelmReleases"
NAMESPACES_TABLE = "Namespaces"
UPDATE_CLUSTER_NODE_COUNT = "update_cluster_node_count"
UPDATE_CLUSTER_NODE_COUNT = "update_cluster_node_count_v2"
SCANS_RESULT_TABLE = "ScansResults"
SCANS_META_TABLE = "ScansMeta"
RESOURCE_EVENTS = "ResourceEvents"
Expand Down Expand Up @@ -556,12 +556,14 @@ def publish_namespaces(self, namespaces: List[NamespaceInfo]):
logging.error(f"Failed to persist namespaces {namespaces} error: {e}")
raise

def publish_cluster_nodes(self, node_count: int, pod_count: int):
def publish_cluster_nodes(self, node_count: int, pod_count: int, avg_cpu: Optional[float] = None, avg_mem: Optional[float] = None):
data = {
"_account_id": self.account_id,
"_cluster_id": self.cluster,
"_node_count": node_count,
"_pod_count": pod_count,
"_cpu_utilization": avg_cpu,
"_memory_utilization": avg_mem,
"_pod_count": pod_count
}
try:
self.client.rpc(UPDATE_CLUSTER_NODE_COUNT, data).execute()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
import logging
import os
import threading
import time
from typing import Optional

from prometrix import PrometheusNotFound, VictoriaMetricsNotFound, PrometheusFlagsConnectionError
from prometrix import PrometheusFlagsConnectionError, PrometheusNotFound, VictoriaMetricsNotFound
from pydantic import BaseModel

from robusta.core.exceptions import (
AlertsManagerNotFound,
NoAlertManagerUrlFound,
NoPrometheusUrlFound,
)
from robusta.core.exceptions import AlertsManagerNotFound, NoAlertManagerUrlFound, NoPrometheusUrlFound
from robusta.core.model.base_params import PrometheusParams
from robusta.core.model.env_vars import PROMETHEUS_ERROR_LOG_PERIOD_SEC
from robusta.core.playbooks.prometheus_enrichment_utils import run_prometheus_query
from robusta.integrations.prometheus.utils import get_prometheus_connect, get_prometheus_flags
from robusta.utils.silence_utils import AlertManagerParams, get_alertmanager_silences_connection

Expand All @@ -22,7 +21,7 @@ class PrometheusHealthStatus(BaseModel):
alertmanager: bool = True


class PrometheusHealthChecker:
class PrometheusDiscoveryUtils:
def __init__(self, discovery_period_sec: int, global_config: dict):
self.status: PrometheusHealthStatus = PrometheusHealthStatus()
self.__discovery_period_sec = discovery_period_sec
Expand All @@ -39,6 +38,30 @@ def __init__(self, discovery_period_sec: int, global_config: dict):
def get_status(self) -> PrometheusHealthStatus:
return self.status

def get_cluster_avg_cpu(self) -> Optional[float]:
cpu_query = os.getenv("OVERRIDE_CLUSTER_CPU_AVG_QUERY",
f'100 * sum(rate(node_cpu_seconds_total{{mode!="idle"}}[1h])) / sum(machine_cpu_cores{{}})')
return self._get_query_prometheus_value(query=cpu_query)

def get_cluster_avg_memory(self) -> Optional[float]:
memory_query = os.getenv("OVERRIDE_CLUSTER_MEM_AVG_QUERY",
f'100 * (1 - sum(avg_over_time(node_memory_MemAvailable_bytes{{}}[1h])) / sum(machine_memory_bytes{{}}))')
return self._get_query_prometheus_value(query=memory_query)

def _get_query_prometheus_value(self, query: str) -> Optional[float]:
try:
global_config = self.__global_config
prometheus_params = PrometheusParams(**global_config)
query_result = run_prometheus_query(prometheus_params=prometheus_params, query=query)
if query_result.result_type == "error" or query_result.vector_result is None:
logging.error(f"PrometheusDiscoveryUtils failed to get prometheus results.")
return
value = query_result.vector_result[0].value.value
return float('%.2f' % float(value))
except:
logging.exception(f"PrometheusDiscoveryUtils failed to get prometheus results.")
return

def __run_checks(self):
while True:
try:
Expand Down
11 changes: 6 additions & 5 deletions src/robusta/core/sinks/robusta/robusta_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from robusta.core.reporting.base import Finding
from robusta.core.reporting.consts import ScanState, ScanType
from robusta.core.sinks.robusta.discovery_metrics import DiscoveryMetrics
from robusta.core.sinks.robusta.prometheus_health_checker import PrometheusHealthChecker
from robusta.core.sinks.robusta.prometheus_discovery_utils import PrometheusDiscoveryUtils
from robusta.core.sinks.robusta.robusta_sink_params import RobustaSinkConfigWrapper, RobustaToken
from robusta.core.sinks.robusta.rrm.rrm import RRM
from robusta.core.sinks.sink_base import SinkBase
Expand All @@ -51,7 +51,6 @@ def __init__(self, sink_config: RobustaSinkConfigWrapper, registry):
self.token = sink_config.robusta_sink.token
self.ttl_hours = sink_config.robusta_sink.ttl_hours
self.persist_events = sink_config.robusta_sink.persist_events

robusta_token = RobustaToken(**json.loads(base64.b64decode(self.token)))
if self.account_id != robusta_token.account_id:
logging.error(
Expand Down Expand Up @@ -79,7 +78,7 @@ def __init__(self, sink_config: RobustaSinkConfigWrapper, registry):
self.__discovery_period_sec = DISCOVERY_PERIOD_SEC

global_config = self.get_global_config()
self.__prometheus_health_checker = PrometheusHealthChecker(
self.__prometheus_discovery_util = PrometheusDiscoveryUtils(
discovery_period_sec=self.__discovery_period_sec, global_config=global_config
)
self.__rrm_checker = RRM(dal=self.dal, cluster=self.cluster_name, account_id=self.account_id)
Expand Down Expand Up @@ -495,7 +494,7 @@ def __publish_new_helm_releases(self, active_helm_releases: List[HelmRelease]):

def __update_cluster_status(self):
self.last_send_time = time.time()
prometheus_health_checker_status = self.__prometheus_health_checker.get_status()
prometheus_health_checker_status = self.__prometheus_discovery_util.get_status()
activity_stats = ActivityStats(
relayConnection=False,
alertManagerConnection=prometheus_health_checker_status.alertmanager,
Expand Down Expand Up @@ -526,7 +525,9 @@ def __update_cluster_status(self):
)

self.dal.publish_cluster_status(cluster_status)
self.dal.publish_cluster_nodes(cluster_stats.nodes, self.__pods_running_count)
avg_cpu = self.__prometheus_discovery_util.get_cluster_avg_cpu()
avg_mem = self.__prometheus_discovery_util.get_cluster_avg_memory()
self.dal.publish_cluster_nodes(cluster_stats.nodes, self.__pods_running_count, avg_cpu, avg_mem)
except Exception:
logging.exception(
f"Failed to run periodic update cluster status for {self.sink_name}",
Expand Down

0 comments on commit 96472f2

Please sign in to comment.