-
Notifications
You must be signed in to change notification settings - Fork 254
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added prometheus alert RRM #938
Conversation
…ment' into feature/Robusta-Resources-Management
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey, nice work, but some important things need to be completed.
- You need to update only the resources that changed. Use
timestamp
(updated_at
) to find it - You need to keep a local table mapping
entity_id
to the real resource location. To avoid loading all CRD objects
try: | ||
account_resources = self.dal.get_account_resources() | ||
prometheus_rules = [] | ||
for resource in account_resources: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here on each iteration, you update all alerts whether they have changed or not. You have to do it only on the first iteration when Robusta starts.
After that, you need to update only alerts that have been changed. For that, you have to get only records with larger timestamp
stored on the previous iteration
@@ -537,3 +541,43 @@ def publish_cluster_nodes(self, node_count: int): | |||
self.handle_supabase_error() | |||
|
|||
logging.info(f"cluster nodes: {UPDATE_CLUSTER_NODE_COUNT} => {data}") | |||
|
|||
def get_account_resources(self) -> List[AccountResource]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest adding parameters to the function like resource_kind
, and from_timestamp
. This will make this function reusable for other resources + will allow to fetch only recently updated changes
|
||
resource_state: Optional[ResourceState] = None | ||
if data["resource_kind"] == ResourceKind.PrometheusAlert: | ||
if "*" in data["clusters_target_set"] or self.cluster in data["clusters_target_set"]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The clusters_target_set
be a part of the query. You don't have to fetch records for all clusters
|
||
def create_prometheus_alerts(self, active_alert_rules: List[PrometheusAlertRule]): | ||
try: | ||
local_k8_crd_object = self.__list_crd_objects() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here you need to keep a table with mapping entity_id
to alert
(from rules.alert
) and probably to a CRD object name (if we want to support multiple CRDs). Then when there is a change in particular entity_id
detected you can quickly find which alert has to be updated
# Conflicts: # src/robusta/core/sinks/robusta/dal/supabase_dal.py
fixed bugs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well done!
See my notes for slightly improving your code writing.
==> Only one fix is mandatory for my opinion: init_resources
error handling
name=name, | ||
grace_period_seconds=60 | ||
) | ||
except Exception as e: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it throws what does it mean? What is a recovery strategy? Pay attention, it also terminates the whole rrm __thread_loop
!!. If you have other resource types in RRM loop it will terminate them too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I have done here is added a check. If the API call fails, the initialization method will make additional attempts to list and delete the custom objects. If these attempts also fail, we will remove the failed resources from the __resource_managers
list, rendering them inert.
rule = resource.resource_state.get('rule') | ||
if "labels" not in rule: | ||
rule["labels"] = {} | ||
rule["labels"]["entity_id"] = resource.entity_id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
try to make this stuff shorter (here and in further places). You just want to assign the entity_id
label. Examples:
rule = {**rule, {"labels":{"entityId": entity_id}}}
rule.labels = {**rule.labels, {"entityId": entity_id} }'
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
def __append_rule(self, slot: int, resource: AccountResource): | ||
name = PrometheusAlertResourceManager.__crd_name(slot) | ||
|
||
if "rule" not in resource.resource_state: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why if?
See next comment
except Exception as e: | ||
# If the crd file does not exist, a new crd file will be created. | ||
# Only raise an error if it's not a 404 exception. | ||
if not (isinstance(e, exceptions.ApiException) and e.status == 404): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would anyway continue to next slot, not only for 404
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another question, can get_namespaced_custom_object
return None
without throwing? What can be the reason for that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even though get_namespaced_custom_object
could be None
, I believe an exception will always be thrown in the case of a non-existent CRD file. Nonetheless, I have added a check for None
.
) | ||
self.__cdr_slots_len[slot] += 1 | ||
except Exception as e: | ||
raise f"An error occured while replacing PrometheusRules CRD alert => entity: {resource}. Error: {e}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
occurRed:)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, thanks!:)
) | ||
|
||
index = index_of(crd_obj["spec"]["groups"][0]["rules"], | ||
lambda r: r["labels"]["entity_id"] == resource.entity_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it may fail if one of the rules doesn't have a label
for some reason. I know it is a corner case. It can happen only if someone modifies a CRD. I think later or sooner we will need to handle it as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
return self._resource_kind | ||
|
||
# create resources | ||
def create_resource(self, resource: AccountResource) -> Union[ResourceEntry, None]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't it using docstring a more conventional way of documenting in python?
something like
def get_resource_kind(self) -> ResourceKind:
""" create resources """
....
....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
…ment' into feature/Robusta-Resources-Management
…th managed_prometheus_alerts_enabled
# Conflicts: # src/robusta/core/sinks/robusta/dal/supabase_dal.py
Migrated to the newer supabase library
Added support for alert sync status to the db Added managedPrometheusAlerts value to the db
# Conflicts: # .gitignore # src/robusta/core/sinks/robusta/dal/supabase_dal.py # src/robusta/core/sinks/robusta/robusta_sink.py # src/robusta/utils/common.py
# Conflicts: # src/robusta/core/model/env_vars.py
…encing the alerts creation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work
Left some comments
I think I'm missing some logic for prometheus rules helm chart.
If the use chooses to use the saas configuration, and is using Robusta bundled Prometheus, will he have a duplication of the rules? (once installed via the helm chart, and once via saas)
Also, in this case will he still have the recording rules?
@@ -44,6 +44,20 @@ rules: | |||
- watch | |||
- patch | |||
|
|||
{{- if .Values.monitorHelmReleases }} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this related to helm releases?
@@ -29,3 +32,11 @@ class ClusterStatus(BaseModel): | |||
ttl_hours: int | |||
stats: ClusterStats | |||
activity_stats: ActivityStats | |||
|
|||
|
|||
class Account(BaseModel): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this class is not in use, is it?
src/robusta/utils/common.py
Outdated
@@ -37,3 +37,10 @@ def duplicate_without_fields(obj: HikaruBase, omitted_fields: List[str]): | |||
pass # in case the field doesn't exist on this object | |||
|
|||
return duplication | |||
|
|||
|
|||
def index_of(array: list, predicate): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is not used
@@ -33,19 +37,23 @@ | |||
UPDATE_CLUSTER_NODE_COUNT = "update_cluster_node_count" | |||
SCANS_RESULT_TABLE = "ScansResults" | |||
RESOURCE_EVENTS = "ResourceEvents" | |||
ACCOUNT_RESOURCE_TABLE = "AccountResource" | |||
ACCOUNT_RESOURCE_STATUS_TABLE = "AccountResourceStatus" | |||
ACCOUNTS_TABLE = "Accounts" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not used
self.client.table(ACCOUNT_RESOURCE_TABLE) | ||
.select("entity_id", "resource_kind", "clusters_target_set", "resource_state", "deleted", "enabled", | ||
"updated_at") | ||
.filter("resource_kind", "eq", resource_kind) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for equals, use .eq
and not .filter("xx", "eq", ...)
pass | ||
|
||
|
||
class PrometheusAlertAnnotations(BaseModel): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the types of annotations
and labels
on the DB?
Aren't these just dictionaries?
Why do we have a typed classes, with a subset of the fields?
@staticmethod | ||
def from_supabase_dict(data: dict, entity_id: str): | ||
labels = data.get("labels", {}) | ||
labels["entity_id"] = entity_id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we save the entity id to the alert labels?
What is the use of it?
) | ||
|
||
@staticmethod | ||
def from_local_k8_cluster_dict(data: dict): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not in use
helm/robusta/values.yaml
Outdated
@@ -74,6 +74,7 @@ lightActions: | |||
|
|||
# install prometheus, alert-manager, and grafana along with Robusta? | |||
enablePrometheusStack: false | |||
enableManagedPrometheusAlerts: false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be renamed to enabledManagedConfiguration
A single value to all types of managed configurations
helm/robusta/templates/runner.yaml
Outdated
@@ -52,6 +52,8 @@ spec: | |||
value: {{ .Release.Name | quote }} | |||
- name: PROMETHEUS_ENABLED | |||
value: {{ .Values.enablePrometheusStack | quote}} | |||
- name: MANAGED_PROMETHEUS_ALERTS_ENABLED |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename to MANAGED_CONFIGURATION_ENABLED
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good
Left few comments
|
||
account_resources_map[resource.resource_kind].append(resource) | ||
|
||
return dict(account_resources_map) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't it already of type Dict?
Why do we need to wrap it with dict
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
def __to_db_account_resource_status( | ||
self, | ||
status_type: AccountResourceStatusType, | ||
last_updated_at: datetime, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think would call is latest_revision
I think it's not an update date.
It's a revision, which has a date value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
if resource_kind: | ||
query_builder.eq("resource_kind", resource_kind) | ||
|
||
if updated_at: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be much easier to understand if this is called latest_revision
or something similar
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
from robusta.core.sinks.robusta.rrm.types import ResourceKind, AccountResourceStatusType, AccountResourceStatusInfo | ||
|
||
|
||
class BaseResourceManager: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this class is redundant
I think last_updated_at
, cluster
and dal
should all be part of the RRM
class
(there's only one of those, and not one per resource_kind
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
AccountResource, ResourceKind, AccountResourceStatusType, AccountResourceStatusInfo | ||
|
||
|
||
class PrometheusAlertResourceManager(BaseResourceManager): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would call this PrometheusAlertResourceHandler
I would make a base class, BaseResourceHandler
, with one method:
def handle_resources(resources: List[AccountResource]) -> str
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
if in_cluster: | ||
self.__alerts_config_supabase_cache[res.entity_id] = rule | ||
|
||
self.prepare_syncing_rules() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there are no account_resources
maybe we can skip this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
self.__delete_crd_file(name=existing_cr_name) | ||
|
||
except Exception as e: | ||
if isinstance(e, ApiException): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think weee should return an error
here, or None
So the calling function, can handle errors from multiple resource types
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
logging.error("RRM Alerts config threw an error while executing `start_syncing_alerts`", | ||
exc_info=True) | ||
|
||
def __thread_start(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would name this run_rrm
or some logic name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
def __periodic_loop(self): | ||
account_resources = self.dal \ | ||
.get_account_resources(updated_at=self.__last_updated_at) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should also be under the same try/except
If it fails, also update the resource status
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
account_resources = self.dal \ | ||
.get_account_resources(updated_at=self.__last_updated_at) | ||
|
||
for resource_kind, resources in account_resources.items(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would try to make this a little more generic:
errors: List[str] = []
for resource_kind, resources in account_resources.items():
handler = resource_handlers.get(resource_kind)
if handler:
error = handler.handle_resources(resources)
if error:
errors.append(error)
else:
logger.warning(f"Could not handle resources from kind {resource_kind}")
# then report sync status based on the collected errors
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
# Conflicts: # src/robusta/core/sinks/robusta/robusta_sink.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work!
No description provided.