Skip to content

Commit

Permalink
Merge branch 'main' into 118-refactor-broadcast_alerts-and-broadcast_…
Browse files Browse the repository at this point in the history
…status_report-into-a-generic-broadcastfunction
  • Loading branch information
mourginakis committed Nov 27, 2023
2 parents d1b9375 + fca6c55 commit b38762a
Show file tree
Hide file tree
Showing 15 changed files with 343 additions and 42 deletions.
4 changes: 4 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,7 @@ DB_NAME=example_db_name
DB_USERNAME=example_db_username
DB_PASSWORD=example_db_password
DB_PORT=25060

# Watchdog
NODE_MONITOR_URL = "http://url.of.your.node.monitor.instance/"
EMAIL_ADMINS_LIST = "mail1@mail.com,mail2@mail.com"
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Implemented connection pooling.
- Changed database schema.
- Made class more friendly to database schema changes.
- Fixed an issue where messages longer than 4096 characters could not be sent through Telegram.
- Added a feature to automatically record all node provider info into the database.
- Added a Watchdog script to notify developers as soon as Node Monitor is down.


## [1.0.0-alpha.1] - 2023-10-20
Expand Down
12 changes: 12 additions & 0 deletions data/np_t0.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"node_providers":[
{
"display_name":"1G",
"principal_id":"7k7b7-4pzhf-aivy6-y654t-uqyup-2auiz-ew2cm-4qkl4-nsl4v-bul5k-5qe"
},
{
"display_name":"43rd Big Idea Films",
"principal_id":"sqhxa-h6ili-qkwup-ohzwn-yofnm-vvnp5-kxdhg-saabw-rvua3-xp325-zqe"
}
]
}
12 changes: 12 additions & 0 deletions data/np_t1.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"node_providers":[
{
"display_name":"Node Provider A",
"principal_id":"test-dummy-principal-1"
},
{
"display_name":"Node Provider B",
"principal_id":"test-dummy-principal-2"
}
]
}
19 changes: 13 additions & 6 deletions node_monitor/bot_telegram.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,29 @@
import requests
import textwrap
from typing import List


class TelegramBot:
def __init__(self, telegram_token: str) -> None:
self.telegram_token = telegram_token


def send_message(
self, chat_id: str, message: str
) -> None | requests.exceptions.HTTPError:
"""Send a message to a single Telegram chat."""
max_message_length = 4096
message_parts = textwrap.wrap(message, width=max_message_length)

try:
request = requests.get(
f"https://api.telegram.org/bot{self.telegram_token}"
f"/sendMessage?chat_id={chat_id}&text={message}"
)
request.raise_for_status()
for part in message_parts:
payload = {"chat_id": chat_id, "text": part}
response = requests.post(
f"https://api.telegram.org/bot{self.telegram_token}/sendMessage",
data=payload
)
response.raise_for_status()
except requests.exceptions.HTTPError as e:
# print(f"Got an error: {e}")
return e
return None

Expand All @@ -33,3 +39,4 @@ def send_messages(
if this_err is not None:
err = this_err
return err

21 changes: 20 additions & 1 deletion node_monitor/ic_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
## Prelim
assert pydantic.__version__.startswith("2.")
nodes_endpoint = "https://ic-api.internetcomputer.org/api/v3/nodes"
node_provider_endpoint = "https://ic-api.internetcomputer.org/api/v3/node-providers"



Expand Down Expand Up @@ -37,6 +38,12 @@ class Node(BaseModel):
class Nodes(BaseModel):
nodes: List[Node]

class NodeProvider(BaseModel):
principal_id: Principal
display_name: str

class NodeProviders(BaseModel):
node_providers: List[NodeProvider]


##############################################
Expand All @@ -54,6 +61,16 @@ def get_nodes_from_file(file_path: str) -> Nodes:
j = json.load(f)
return Nodes(**j)

def get_node_providers() -> NodeProviders:
"""slurps node providers from the dfinity api"""
response = requests.get(node_provider_endpoint)
return NodeProviders(**response.json())

def get_node_providers_from_file(file_path: str) -> NodeProviders:
"""slurps node providers from a json file previously retrieved with curl"""
with open(file_path) as f:
j = json.load(f)
return NodeProviders(**j)



Expand All @@ -63,4 +80,6 @@ def get_nodes_from_file(file_path: str) -> Nodes:
if __name__ == "__main__":
from devtools import debug
debug(get_nodes())
debug(get_nodes_from_file("tests/t0.json"))
debug(get_nodes_from_file("data/t0.json"))
debug(get_node_providers())
debug(get_node_providers_from_file("data/np_t0.json"))
3 changes: 3 additions & 0 deletions node_monitor/load_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
DB_NAME = os.environ.get('DB_NAME', '')
DB_PORT = os.environ.get('DB_PORT', '')
FEEDBACK_FORM_URL = os.environ.get('FEEDBACK_FORM_URL', '')
NODE_MONITOR_URL = os.environ.get('NODE_MONITOR_URL', '')
EMAIL_ADMINS_LIST = os.environ.get('EMAIL_ADMINS_LIST', '').split(',')



## Pre-flight check
Expand Down
39 changes: 35 additions & 4 deletions node_monitor/node_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
import schedule
import logging

import node_monitor.ic_api as ic_api
from node_monitor.bot_email import EmailBot
from node_monitor.bot_slack import SlackBot
from node_monitor.bot_telegram import TelegramBot
from node_monitor.node_provider_db import NodeProviderDB
from node_monitor.node_monitor_helpers.get_compromised_nodes import \
get_compromised_nodes
import node_monitor.node_monitor_helpers.messages as messages
import node_monitor.ic_api as ic_api

Seconds = int
Principal = str
Expand Down Expand Up @@ -63,7 +63,9 @@ def __init__(
self.actionables: Dict[Principal, List[ic_api.Node]] = {}
self.jobs = [
schedule.every().day.at("15:00", "UTC").do(
self.broadcast_status_report)
self.broadcast_status_report),
schedule.every().day.at("15:00", "UTC").do(
self.update_node_provider_lookup_if_new)
]


Expand All @@ -76,8 +78,8 @@ def _resync(self, override_data: ic_api.Nodes | None = None) -> None:
live fetching Nodes from the ic-api. Useful for testing.
"""
logging.info("Resyncing node states from ic-api...")
data = override_data if override_data else ic_api.get_nodes()
self.snapshots.append(data)
nodes_api = override_data if override_data else ic_api.get_nodes()
self.snapshots.append(nodes_api)
self.last_update = time.time()


Expand Down Expand Up @@ -156,6 +158,35 @@ def broadcast_status_report(self) -> None:
logging.info(f"Broadcasting status report {node_provider_id}...")
subject, message = messages.nodes_status_message(nodes, node_labels)
broadcaster(node_provider_id, subject, message)


def update_node_provider_lookup_if_new(
self,
override_data: ic_api.NodeProviders | None = None) -> None:
"""Fetches the current node providers from the ic-api and compares
them to what is currently in the node_provider_lookup table in the
database. If there is a new node provider in the API, they will be
added to our database.
Args:
override_data: If provided, this arg will be used instead of
live fetching Node Providers from the ic-api. Useful for testing.
"""
data = override_data if override_data else ic_api.get_node_providers()

node_providers_api = {d.principal_id: d.display_name for d in data.node_providers}
node_providers_db = self.node_provider_db.get_node_providers_as_dict()

principals_api = set(node_providers_api.keys())
principals_db = set(node_providers_db.keys())
principals_diff = principals_api - principals_db

node_providers_new = {
principal: node_providers_api[principal] for principal in principals_diff}

if node_providers_new:
self.node_provider_db.insert_node_providers(node_providers_new)



def step(self) -> None:
Expand Down
86 changes: 75 additions & 11 deletions node_monitor/node_provider_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,18 @@ class NodeProviderDB:
'node_label': 'text'
}

# table: node_provider_lookup
create_table_node_provider_lookup = """
CREATE TABLE IF NOT EXISTS node_provider_lookup (
node_provider_id TEXT PRIMARY KEY,
node_provider_name TEXT
);
"""
schema_table_node_provider_lookup = {
'node_provider_id': 'text',
'node_provider_name': 'text'
}


## Methods
def __init__(self, host: str, db: str, port: str,
Expand All @@ -116,22 +128,44 @@ def __init__(self, host: str, db: str, port: str,

def _execute(self, sql: str,
params: Tuple[Any, ...]) -> List[Dict[str, Any]]:
"""Execute a SQL statement with a connection from the pool.
An empty tuple should be passed if no parameters are needed.
All transactions are committed.
Returns a list of dicts instead of the default list of tuples.
Ex. [{'column_name': value, ...}, ...]
"""Execute a SQL statement with a connection from the pool. All
transactions are committed.
Parameters
----------
sql : str
The SQL statement to execute.
params : Tuple[Any, ...]
The parameters to pass to the SQL statement. An empty tuple should
be passed if no parameters are needed.
Returns
-------
List[Dict[str, Any]]
A list of dicts representing the rows returned by the SQL statement.
If the SQL statement does not return any results, an empty list is
returned. This allows us to call `SELECT`, `INSERT`, `UPDATE`, and
`DELETE` statements with the same function.
Ex. [{'column_name': value, ...}, ...].
"""
# Note: this method can also be used for read-only queries, because
# conn.commit() adds insignificant overhead for read-only queries.
# Note: we convert 'result' from type List[RealDictCursor] to List[dict]
# Notes:
# 1. conn.commit() is necessary for queries that write to the database,
# and adds insignificant overhead for read-only queries. This allows
# us to use the same method for both read and write queries.
# 2. We convert `result` from type List[RealDictCursor] to List[dict].
# 3. Only `SELECT` statements return results, so we must check if the
# query returned results before we call cur.fetchall(), otherwise
# we get an error. We do this by checking if cur.description is None.
conn = self.pool.getconn()
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(sql, params)
result = cur.fetchall()
if cur.description is not None:
result = [dict(r) for r in cur.fetchall()]
else:
result = []
conn.commit()
self.pool.putconn(conn)
return [dict(r) for r in result]
return result


def _execute1(self, sql: str, params: Tuple[Any, ...]) -> List[Tuple[Any, ...]]:
Expand All @@ -148,7 +182,7 @@ def _execute1(self, sql: str, params: Tuple[Any, ...]) -> List[Tuple[Any, ...]]:
conn.commit()
self.pool.putconn(conn)
return result


def _get_schema(self, table_name: str) -> Dict[str, str]:
"""Returns the schema for a table.
Expand Down Expand Up @@ -212,6 +246,36 @@ def get_node_labels_as_dict(self) -> Dict[Principal, str]:
rows = self._execute("SELECT * FROM node_label_lookup", ())
lookupd = {row['node_id']: row['node_label'] for row in rows}
return lookupd


def get_node_providers_as_dict(self) -> Dict[Principal, str]:
"""Returns the table of all node providers as a dictionary.
One to one relationship."""
rows = self._execute("SELECT * FROM node_provider_lookup", ())
lookupd = {row['node_provider_id']: row['node_provider_name'] for row in rows}
return lookupd


def insert_node_providers(self, node_providers: Dict[Principal, str]) -> None:
"""Inserts a NodeProvider object into node_provider_lookup"""
query = """
INSERT INTO node_provider_lookup (
node_provider_id,
node_provider_name
) VALUES (%s, %s)
"""
for node_provider_principal in node_providers.keys():
params = (node_provider_principal, node_providers[node_provider_principal])
self._execute(query, params)


def delete_node_provider(self, node_provider_id: Principal) -> None:
"""Deletes a record in node_proivder_lookup by node_provider_id"""
query = """
DELETE FROM node_provider_lookup
WHERE node_provider_id = %s
"""
self._execute(query, (node_provider_id,))


def close(self) -> None:
Expand Down
2 changes: 2 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ def pytest_collection_modifyitems(config, items):
"two_nodes_down": ic_api.get_nodes_from_file("data/t2.json"),
"one_node_change_subnet": ic_api.get_nodes_from_file("data/t3.json"),
"one_node_removed": ic_api.get_nodes_from_file("data/t4.json"),
"node_provider_control": ic_api.get_node_providers_from_file("data/np_t0.json"),
"new_node_providers": ic_api.get_node_providers_from_file("data/np_t1.json"),
}

# Do we want this implemented?
Expand Down
18 changes: 11 additions & 7 deletions tests/test_bot_telegram.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,23 @@
import node_monitor.load_config as c
from node_monitor.bot_telegram import TelegramBot

@patch("requests.get")
def test_send_message(mock_get):
@patch("requests.post")
def test_send_message(mock_post):
telegram_bot = TelegramBot(c.TOKEN_TELEGRAM)
chat_id = "1234567890"
chat_id = "1234567890"
message = "Test message"
mock_response = mock_get.return_value
payload = {
"chat_id": chat_id,
"text": message
}
mock_response = mock_post.return_value
mock_response.raise_for_status.return_value = None

telegram_bot.send_message(chat_id, message)

mock_get.assert_called_once_with(
f"https://api.telegram.org/bot{telegram_bot.telegram_token}/sendMessage?chat_id={chat_id}&text={message}"
mock_post.assert_called_once_with(
f"https://api.telegram.org/bot{telegram_bot.telegram_token}/sendMessage",
data=payload
)
mock_response.raise_for_status.assert_called_once()

Expand All @@ -30,4 +35,3 @@ def test_send_live_message():
err = telegram_bot.send_message(chat_id, message)
if err is not None:
raise err

8 changes: 8 additions & 0 deletions tests/test_ic_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,11 @@ def test_get_nodes():
def test_get_nodes_from_file():
nodes = ic_api.get_nodes_from_file("data/t0.json")
assert len(nodes.nodes) > 0

def test_get_node_providers():
node_providers = ic_api.get_node_providers()
assert len(node_providers.node_providers) > 0

def test_get_node_providers_from_file():
node_providers = ic_api.get_node_providers_from_file("data/np_t0.json")
assert len(node_providers.node_providers) > 0
Loading

0 comments on commit b38762a

Please sign in to comment.