-
Notifications
You must be signed in to change notification settings - Fork 254
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added prometheus alert RRM #938
Changes from 49 commits
17f27b3
17dc4ff
6149da7
d82dfdb
438f34a
10cb2c3
6ff3de0
68ccc67
7b37675
4ba43b6
d28d7bd
e56c2b5
d8d445f
47976b7
7d53b7b
a602ceb
3e17354
4f13655
891d9c1
2296ad7
aed984e
1a75387
943f024
a80e9ab
a302cd1
b8005be
91febe6
1a093a5
e64bf03
861c10e
6831b45
2318427
e228d7c
5507f78
693c65f
d5591c0
9f25d84
5b1b6d2
5bf829a
d2a13c8
f4afa2d
d9a4f08
206620d
e08375c
c1031cd
c3c6c1e
01171c0
0b15e04
7143c40
08c4978
780e93a
11d1f1e
45c4c9b
1656c35
36e597c
7a78bde
b7c7b1d
978416b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -52,6 +52,8 @@ spec: | |
value: {{ .Release.Name | quote }} | ||
- name: PROMETHEUS_ENABLED | ||
value: {{ .Values.enablePrometheusStack | quote}} | ||
- name: MANAGED_PROMETHEUS_ALERTS_ENABLED | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rename to MANAGED_CONFIGURATION_ENABLED |
||
value: {{ .Values.enableManagedPrometheusAlerts | quote}} | ||
- name: SEND_ADDITIONAL_TELEMETRY | ||
value: {{ .Values.runner.sendAdditionalTelemetry | quote }} | ||
- name: LOG_LEVEL | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -74,6 +74,7 @@ lightActions: | |
|
||
# install prometheus, alert-manager, and grafana along with Robusta? | ||
enablePrometheusStack: false | ||
enableManagedPrometheusAlerts: false | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this should be renamed to |
||
enableServiceMonitors: true | ||
monitorHelmReleases: true | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,5 @@ | ||
from datetime import datetime | ||
|
||
from pydantic.main import BaseModel, List, Optional | ||
|
||
|
||
|
@@ -18,6 +20,7 @@ class ActivityStats(BaseModel): | |
alertManagerConnection: bool | ||
prometheusConnection: bool | ||
prometheusRetentionTime: str | ||
managedPrometheusAlerts: bool | ||
|
||
|
||
class ClusterStatus(BaseModel): | ||
|
@@ -29,3 +32,11 @@ class ClusterStatus(BaseModel): | |
ttl_hours: int | ||
stats: ClusterStats | ||
activity_stats: ActivityStats | ||
|
||
|
||
class Account(BaseModel): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this class is not in use, is it? |
||
id: str | ||
account_type: int | ||
is_test_account: bool | ||
name: Optional[str] | ||
creation_date: datetime |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,14 +1,16 @@ | ||
import json | ||
import logging | ||
import time | ||
from typing import Any, Dict, List | ||
|
||
from datetime import datetime | ||
from typing import Any, Dict, List, Optional | ||
import requests | ||
from postgrest.base_request_builder import BaseFilterRequestBuilder | ||
from postgrest.utils import sanitize_param | ||
from postgrest.types import ReturnMethod | ||
from supabase import create_client | ||
from supabase.lib.client_options import ClientOptions | ||
|
||
from robusta.core.model.cluster_status import ClusterStatus | ||
from robusta.core.model.cluster_status import ClusterStatus, Account | ||
from robusta.core.model.env_vars import SUPABASE_LOGIN_RATE_LIMIT_SEC, SUPABASE_TIMEOUT_SECONDS | ||
from robusta.core.model.helm_release import HelmRelease | ||
from robusta.core.model.jobs import JobInfo | ||
|
@@ -19,8 +21,10 @@ | |
from robusta.core.reporting.base import Finding | ||
from robusta.core.reporting.blocks import EventsBlock, EventsRef, ScanReportBlock, ScanReportRow | ||
from robusta.core.reporting.consts import EnrichmentAnnotation | ||
from robusta.core.sinks.robusta import RobustaSinkParams | ||
from robusta.core.sinks.robusta.dal.model_conversion import ModelConversion | ||
from robusta.core.sinks.robusta.rrm.account_resource_fetcher import AccountResourceFetcher | ||
from robusta.core.sinks.robusta.rrm.types import AccountResource, ResourceKind, \ | ||
AccountResourceStatusType, AccountResourceStatusInfo | ||
|
||
SERVICES_TABLE = "Services" | ||
NODES_TABLE = "Nodes" | ||
|
@@ -33,19 +37,23 @@ | |
UPDATE_CLUSTER_NODE_COUNT = "update_cluster_node_count" | ||
SCANS_RESULT_TABLE = "ScansResults" | ||
RESOURCE_EVENTS = "ResourceEvents" | ||
ACCOUNT_RESOURCE_TABLE = "AccountResource" | ||
ACCOUNT_RESOURCE_STATUS_TABLE = "AccountResourceStatus" | ||
ACCOUNTS_TABLE = "Accounts" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not used |
||
|
||
|
||
class SupabaseDal: | ||
class SupabaseDal(AccountResourceFetcher): | ||
def __init__( | ||
self, | ||
url: str, | ||
key: str, | ||
account_id: str, | ||
email: str, | ||
password: str, | ||
sink_params: RobustaSinkParams, | ||
cluster_name: str, | ||
signing_key: str, | ||
self, | ||
url: str, | ||
key: str, | ||
account_id: str, | ||
email: str, | ||
password: str, | ||
sink_name: str, | ||
persist_events: bool, | ||
cluster_name: str, | ||
signing_key: str, | ||
): | ||
self.url = url | ||
self.key = key | ||
|
@@ -58,7 +66,8 @@ def __init__( | |
self.sign_in_time = 0 | ||
self.sign_in() | ||
self.client.auth.on_auth_state_change(self.__update_token_patch) | ||
self.sink_params = sink_params | ||
self.sink_name = sink_name | ||
self.persist_events = persist_events | ||
self.signing_key = signing_key | ||
|
||
def __to_db_scanResult(self, scanResult: ScanReportRow) -> Dict[Any, Any]: | ||
|
@@ -111,7 +120,7 @@ def persist_finding(self, finding: Finding): | |
evidence = ModelConversion.to_evidence_json( | ||
account_id=self.account_id, | ||
cluster_id=self.cluster, | ||
sink_name=self.sink_params.name, | ||
sink_name=self.sink_name, | ||
signing_key=self.signing_key, | ||
finding_id=finding.id, | ||
enrichment=enrichment, | ||
|
@@ -172,15 +181,8 @@ def get_active_services(self) -> List[ServiceInfo]: | |
res = ( | ||
self.client.table(SERVICES_TABLE) | ||
.select( | ||
"name", | ||
"type", | ||
"namespace", | ||
"classification", | ||
"config", | ||
"ready_pods", | ||
"total_pods", | ||
"is_helm_release", | ||
) | ||
"name", "type", "namespace", "classification", "config", "ready_pods", "total_pods", | ||
"is_helm_release") | ||
.filter("account_id", "eq", self.account_id) | ||
.filter("cluster", "eq", self.cluster) | ||
.filter("deleted", "eq", False) | ||
|
@@ -280,6 +282,14 @@ def publish_nodes(self, nodes: List[NodeInfo]): | |
self.handle_supabase_error() | ||
raise | ||
|
||
@staticmethod | ||
def custom_filter_request_builder(frq: BaseFilterRequestBuilder, operator: str, | ||
criteria: str) -> BaseFilterRequestBuilder: | ||
key, val = sanitize_param(operator), f"{criteria}" | ||
frq.params = frq.params.set(key, val) | ||
|
||
return frq | ||
|
||
def get_active_jobs(self) -> List[JobInfo]: | ||
try: | ||
res = ( | ||
|
@@ -495,13 +505,93 @@ def persist_events_block(self, block: EventsBlock): | |
def persist_platform_blocks(self, enrichment: Enrichment, finding_id): | ||
blocks = enrichment.blocks | ||
for i, block in enumerate(blocks): | ||
if isinstance(block, EventsBlock) and self.sink_params.persist_events and block.events: | ||
if isinstance(block, EventsBlock) and self.persist_events and block.events: | ||
self.persist_events_block(block) | ||
event = block.events[0] | ||
blocks[i] = EventsRef(name=event.name, namespace=event.namespace, kind=event.kind.lower()) | ||
if isinstance(block, ScanReportBlock): | ||
self.persist_scan(block) | ||
|
||
def get_account_resources( | ||
self, resource_kind: ResourceKind, updated_at: Optional[datetime] | ||
) -> List[AccountResource]: | ||
try: | ||
query_builder = ( | ||
self.client.table(ACCOUNT_RESOURCE_TABLE) | ||
.select("entity_id", "resource_kind", "clusters_target_set", "resource_state", "deleted", "enabled", | ||
"updated_at") | ||
.filter("resource_kind", "eq", resource_kind) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for equals, use |
||
.filter("account_id", "eq", self.account_id) | ||
) | ||
if updated_at: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It will be much easier to understand if this is called There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
query_builder.gt("updated_at", updated_at.isoformat()) | ||
else: | ||
# in the initial db fetch don't include the deleted records. | ||
# in the subsequent db fetch allow even the deleted records so that they can be removed from the cluster | ||
query_builder.filter("deleted", "eq", False) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use |
||
query_builder.filter("enabled", "eq", True) | ||
|
||
query_builder = SupabaseDal.custom_filter_request_builder( | ||
query_builder, | ||
operator="or", | ||
criteria=f'(clusters_target_set.cs.["*"], clusters_target_set.cs.["{self.cluster}"])', | ||
) | ||
query_builder = query_builder.order(column="updated_at", desc=False) | ||
|
||
res = query_builder.execute() | ||
except Exception as e: | ||
msg = f"Failed to get existing account resources (supabase) error: {e}" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use exc_info=true, to get the real stack trace |
||
logging.error(msg) | ||
self.handle_supabase_error() | ||
raise Exception(msg) | ||
|
||
account_resources: List[AccountResource] = [] | ||
for data in res.data: | ||
resource_state = data["resource_state"] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why you initialize this fields differently than all others? |
||
resource = AccountResource( | ||
entity_id=data["entity_id"], | ||
resource_kind=data["resource_kind"], | ||
clusters_target_set=data["clusters_target_set"], | ||
resource_state=resource_state, | ||
deleted=data["deleted"], | ||
enabled=data["enabled"], | ||
updated_at=data["updated_at"], | ||
) | ||
|
||
account_resources.append(resource) | ||
|
||
return account_resources | ||
|
||
def __to_db_account_resource_status(self, | ||
status_type: Optional[AccountResourceStatusType], | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
info: Optional[AccountResourceStatusInfo]) -> Dict[Any, Any]: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if |
||
|
||
data = { | ||
"account_id": self.account_id, | ||
"cluster_id": self.cluster, | ||
"status": status_type, | ||
"info": info.dict() if info else None, | ||
"updated_at": "now()", | ||
"latest_revision": "now()"} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
|
||
if status_type != AccountResourceStatusType.error: | ||
data["synced_revision"] = "now()" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same here. It's not |
||
|
||
return data | ||
|
||
def set_account_resource_status( | ||
self, status_type: Optional[AccountResourceStatusType], | ||
info: Optional[AccountResourceStatusInfo] | ||
): | ||
try: | ||
data = self.__to_db_account_resource_status(status_type=status_type, info=info) | ||
|
||
self.client.table(ACCOUNT_RESOURCE_STATUS_TABLE).upsert(data).execute() | ||
except Exception as e: | ||
logging.error(f"Failed to persist resource events error: {e}") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use exc_info, and a correct error message |
||
self.handle_supabase_error() | ||
raise | ||
|
||
def __rpc_patch(self, func_name: str, params: dict) -> Dict[str, Any]: | ||
""" | ||
Supabase client is async. Sync impl of rpc call | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this related to helm releases?