Skip to content

Commit

Permalink
Merge pull request #119 from aviate-labs/118-refactor-broadcast_alert…
Browse files Browse the repository at this point in the history
…s-and-broadcast_status_report-into-a-generic-broadcastfunction

118 refactor broadcast alerts and broadcast status report into a generic broadcastfunction
  • Loading branch information
mourginakis authored Nov 27, 2023
2 parents fca6c55 + b38762a commit 2099ef0
Showing 1 changed file with 29 additions and 48 deletions.
77 changes: 29 additions & 48 deletions node_monitor/node_monitor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import time
from collections import deque
from typing import Deque, List, Dict, Optional
from typing import Deque, List, Dict, Optional, Callable
from toolz import groupby # type: ignore
import schedule
import logging
Expand Down Expand Up @@ -100,83 +100,64 @@ def _analyze(self) -> None:
self.actionables = {k: v for k, v
in self.compromised_nodes_by_provider.items()
if k in subscriber_ids}


def broadcast_alerts(self) -> None:
"""Broadcast relevant alerts to the appropriate channels. Retrieves
subscribers, node_labels, and email_recipients from the database."""

def _make_broadcaster(self) -> Callable[[str, str, str], None]:
"""A closure that returns a broadcast function with a local cache.
Allows the returned function to be run in a loop without
querying the database.
"""
subscribers = self.node_provider_db.get_subscribers_as_dict()
node_labels = self.node_provider_db.get_node_labels_as_dict()
email_recipients = self.node_provider_db.get_emails_as_dict()
slack_channels = self.node_provider_db.get_slack_channels_as_dict()
telegram_chats = self.node_provider_db.get_telegram_chats_as_dict()
for node_provider_id, nodes in self.actionables.items():

def broadcaster(node_provider_id: str,
subject: str, message: str) -> None:
"""Broadcasts a generic message to a subscriber through their
selected communication channel(s)."""
preferences = subscribers[node_provider_id]
subject, message = messages.nodes_compromised_message(nodes, node_labels)
# - - - - - - - - - - - - - - - - -
if preferences['notify_email'] == True:
recipients = email_recipients[node_provider_id]
logging.info(f"Sending alert email message to {recipients}...")
self.email_bot.send_emails(recipients, subject, message)
if preferences['notify_slack'] == True:
if preferences['notify_slack'] == True:
if self.slack_bot is not None:
channels = slack_channels[node_provider_id]
logging.info(f"Sending alert slack messages to {channels}...")
err1 = self.slack_bot.send_messages(channels, message)
if err1 is not None:
logging.error(f"SlackBot.send_message() failed with error: {err1}")
if preferences['notify_telegram'] == True:
if self.telegram_bot is not None:
chats = telegram_chats[node_provider_id]
logging.info(f"Sending alert telegram messages to {chats}...")
err2 = self.telegram_bot.send_messages(chats, message)
if err2 is not None:
logging.error(f"TelegramBot.send_message() failed with error: {err2}")
# - - - - - - - - - - - - - - - - -
return None

return broadcaster


def broadcast_alerts(self) -> None:
"""Broadcast relevant alerts to the appropriate channels."""
broadcaster = self._make_broadcaster()
node_labels = self.node_provider_db.get_node_labels_as_dict()
for node_provider_id, nodes in self.actionables.items():
logging.info(f"Broadcasting alert message to {node_provider_id}...")
subject, message = messages.nodes_compromised_message(nodes, node_labels)
broadcaster(node_provider_id, subject, message)


def broadcast_status_report(self) -> None:
"""Broadcasts a Node Status Report to all Node Providers.
Retrieves subscribers, node_labels, and email_recipients from the
database. Filters out Node Providers that are not subscribed to
status reports.
"""
"""Broadcasts a Node Status Report to all Node Providers."""
broadcaster = self._make_broadcaster()
subscribers = self.node_provider_db.get_subscribers_as_dict()
node_labels = self.node_provider_db.get_node_labels_as_dict()
email_recipients = self.node_provider_db.get_emails_as_dict()
slack_channels = self.node_provider_db.get_slack_channels_as_dict()
telegram_chats = self.node_provider_db.get_telegram_chats_as_dict()
latest_snapshot_nodes = self.snapshots[-1].nodes
all_nodes_by_provider: Dict[Principal, List[ic_api.Node]] = \
groupby(lambda node: node.node_provider_id, latest_snapshot_nodes)
reportable_nodes = {k: v for k, v
in all_nodes_by_provider.items()
if k in subscribers.keys()}
# - - - - - - - - - - - - - - - - -
for node_provider_id, nodes in reportable_nodes.items():
logging.info(f"Broadcasting status report {node_provider_id}...")
preferences = subscribers[node_provider_id]
subject, message = messages.nodes_status_message(nodes, node_labels)
# - - - - - - - - - - - - - - - - -
if preferences['notify_email'] == True:
recipients = email_recipients[node_provider_id]
logging.info(f"Sending status report email to {recipients}...")
self.email_bot.send_emails(recipients, subject, message)
if preferences['notify_slack'] == True:
if self.slack_bot is not None:
channels = slack_channels[node_provider_id]
logging.info(f"Sending status report slack message to {channels}...")
err1 = self.slack_bot.send_messages(channels, message)
if err1 is not None:
logging.error(f"SlackBot.send_message() failed with error: {err1}")
if preferences['notify_telegram'] == True:
if self.telegram_bot is not None:
chats = telegram_chats[node_provider_id]
logging.info(f"Sending status report telegram messages to {chats}...")
err2 = self.telegram_bot.send_messages(chats, message)
if err2 is not None:
logging.error(f"TelegramBot.send_message() failed with error: {err2}")
# - - - - - - - - - - - - - - - - -
broadcaster(node_provider_id, subject, message)


def update_node_provider_lookup_if_new(
Expand Down

0 comments on commit 2099ef0

Please sign in to comment.