Skip to content

Commit

Permalink
Refactor metric writing (#20)
Browse files Browse the repository at this point in the history
* Refactor writing metrics for evm

* Fix bug evm collector, record and process block height only if it is recorded

* Refactor bitcoin collector to use metrics processor

* Refactor cardano collector to use metrics processor

* Refactor conflux to use metrics collector

* Refactor dogecoin to use metrics collector

* Refactor filecoinn to use metrics collector

* Refactor Solana  to use metrics collector

* Refactor Starkware to use metrics collector

* Refactor Starkware to use metrics collector
  • Loading branch information
namikmesic authored Sep 28, 2022
1 parent 2ae798f commit 8f68d4a
Show file tree
Hide file tree
Showing 10 changed files with 219 additions and 82 deletions.
18 changes: 10 additions & 8 deletions src/collectors/bitcoin.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from helpers import check_protocol, strip_url, generate_labels_from_metadata
from time import perf_counter
from settings import logger
from metrics_processor import results


class bitcoin_collector():
Expand All @@ -15,20 +16,21 @@ def __init__(self, rpc_metadata):
logger.error("Please provide https endpoint for {}".format(strip_url(self.url)))
exit(1)

async def _probe(self, metrics):
async def _probe(self) -> results:
results.register(self.url, self.labels_values)
try:
async with BitcoinRPC(self.url, "admin", "admin") as rpc:
start = perf_counter()
chain_info = await rpc.getblockchaininfo()
latency = (perf_counter() - start) * 1000

metrics['brpc_health'].add_metric(self.labels_values, True)
metrics['brpc_latency'].add_metric(self.labels_values, latency)
metrics['brpc_block_height'].add_metric(self.labels_values, chain_info['headers'])
metrics['brpc_total_difficulty'].add_metric(self.labels_values, chain_info['difficulty'])
results.record_health(self.url, True)
results.record_latency(self.url, latency)
results.record_block_height(self.url, chain_info['headers'])
results.record_total_difficulty(self.url, chain_info['difficulty'])
except Exception as exc:
logger.error("Failed probing {} with error: {}".format(strip_url(self.url), exc))
metrics['brpc_health'].add_metric(self.labels_values, False)
results.record_health(self.url, False)

def probe(self, metrics):
asyncio.run(self._probe(metrics))
def probe(self):
asyncio.run(self._probe())
13 changes: 8 additions & 5 deletions src/collectors/cardano.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
from collectors.ws import websocket_collector
from helpers import check_protocol, strip_url, generate_labels_from_metadata
from metrics_processor import results


class cardano_collector():
Expand Down Expand Up @@ -30,14 +31,16 @@ def _get_block_height(self):
except KeyError as err:
logger.error("Failed to fetch block height for {}, error: {}".format(strip_url(self.url), err))

def probe(self, metrics):
def probe(self) -> results:
results.register(self.url, self.labels_values)
try:
alive = self.ws_collector.get_liveliness()
if alive:
metrics['brpc_health'].add_metric(self.labels_values, True)
metrics['brpc_latency'].add_metric(self.labels_values, self.ws_collector.get_latency())
metrics['brpc_block_height'].add_metric(self.labels_values, self._get_block_height())
results.record_health(self.url, True)
results.record_latency(self.url, self.ws_collector.get_latency())
results.record_block_height(self.url, self._get_block_height())
else:
metrics['brpc_health'].add_metric(self.labels_values, False)
results.record_health(self.url, False)
except Exception as exc:
logger.error("Failed probing {} with error: {}".format(strip_url(self.url), exc))
results.record_health(self.url, False)
28 changes: 14 additions & 14 deletions src/collectors/conflux.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import asyncio
from helpers import strip_url, check_protocol, generate_labels_from_metadata
from collectors.ws import websocket_collector
from metrics_processor import results


class conflux_collector():
Expand All @@ -28,31 +29,30 @@ def __init__(self, rpc_metadata):
logger.error("Please provide wss/ws endpoint for {}".format(strip_url(self.url)))
exit(1)

def probe(self, metrics):
def probe(self) -> results:
results.register(self.url, self.labels_values)
try:
if self.client.isConnected():
metrics['brpc_health'].add_metric(self.labels_values, True)
metrics['brpc_head_count'].add_metric(self.labels_values, self.ws_collector.message_counter)
metrics['brpc_disconnects'].add_metric(self.labels_values, self.ws_collector.disconnects_counter)
metrics['brpc_latency'].add_metric(self.labels_values, self.ws_collector.get_latency())
metrics['brpc_block_height'].add_metric(self.labels_values, self.client.cfx.epoch_number)

results.record_health(self.url, True)
results.record_head_count(self.url, self.ws_collector.message_counter)
results.record_disconnects(self.url, self.ws_collector.disconnects_counter)
results.record_latency(self.url, self.ws_collector.get_latency())
results.record_block_height(self.url, self.client.cfx.epoch_number)
try:
difficulty = self.client.cfx.get_block_by_hash(self.client.cfx.get_best_block_hash())['difficulty']
metrics['brpc_difficulty'].add_metric(self.labels_values, difficulty)
results.record_difficulty(self.url, difficulty)
except TypeError:
logger.error(
"RPC Endpoint sent faulty response type when querying for difficulty. This is most likely issue with RPC endpoint."
)

metrics['brpc_gas_price'].add_metric(self.labels_values, self.client.cfx.gas_price)
metrics['brpc_client_version'].add_metric(self.labels_values,
value={"client_version": self.client.clientVersion})
results.record_gas_price(self.url, self.client.cfx.gas_price)
results.record_client_version(self.url, self.client.clientVersion)
else:
logger.info("Client is not connected to {}".format(strip_url(self.url)))
metrics['brpc_health'].add_metric(self.labels_values, False)
results.record_health(self.url, False)
except asyncio.exceptions.TimeoutError:
logger.info("Client timed out for {}".format(strip_url(self.url)))
metrics['brpc_health'].add_metric(self.labels_values, False)
results.record_health(self.url, False)
except Exception as exc:
logger.error("Failed probing {} with error: {}".format(strip_url(self.url), exc))
results.record_health(self.url, False)
19 changes: 10 additions & 9 deletions src/collectors/dogecoin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from settings import logger
from time import perf_counter
import requests

from metrics_processor import results

class doge_collector():

Expand All @@ -14,24 +14,25 @@ def __init__(self, rpc_metadata):
logger.error("Please provide https endpoint for {}".format(strip_url(self.url)))
exit(1)

def probe(self, metrics):
def probe(self) -> results:
results.register(self.url, self.labels_values)
try:
payload = {'version': '1.1', 'method': "getinfo", 'id': 1}
start = perf_counter()
response = requests.post(self.url, json=payload).json()
latency = (perf_counter() - start) * 1000

if response:
metrics['brpc_health'].add_metric(self.labels_values, True)
metrics['brpc_latency'].add_metric(self.labels_values, latency)
metrics['brpc_block_height'].add_metric(self.labels_values, response['result']['blocks'])
metrics['brpc_total_difficulty'].add_metric(self.labels_values, response['result']['difficulty'])
results.record_health(self.url, True)
results.record_latency(self.url, latency)
results.record_block_height(self.url, response['result']['blocks'])
results.record_total_difficulty(self.url, response['result']['difficulty'])
else:
logger.error("Bad response from client {}: {}".format(strip_url(self.url), exc))
metrics['brpc_health'].add_metric(self.labels_values, False)
results.record_health(self.url, False)
except requests.RequestException as exc:
logger.error("Health check failed for {}: {}".format(strip_url(self.url), exc))
metrics['brpc_health'].add_metric(self.labels_values, False)
results.record_health(self.url, False)
except Exception as exc:
logger.error("Health check failed for {}: {}".format(strip_url(self.url), exc))
metrics['brpc_health'].add_metric(self.labels_values, False)
results.record_health(self.url, False)
38 changes: 18 additions & 20 deletions src/collectors/evm.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from helpers import strip_url, check_protocol, generate_labels_from_metadata
from collectors.ws import websocket_collector
from websockets.exceptions import WebSocketException
from metrics_processor import results


class evm_collector():
Expand All @@ -12,9 +13,7 @@ def __init__(self, rpc_metadata):
self.url = rpc_metadata['url']
if check_protocol(rpc_metadata['url'], "wss") or check_protocol(rpc_metadata['url'], 'ws'):
self.client = Web3(Web3.WebsocketProvider(self.url, websocket_timeout=cfg.response_timeout))

self.labels, self.labels_values = generate_labels_from_metadata(rpc_metadata)

self.ws_collector = websocket_collector(self.url,
sub_payload={
"method": "eth_subscribe",
Expand All @@ -24,35 +23,34 @@ def __init__(self, rpc_metadata):
})
self.ws_collector.setDaemon(True)
self.ws_collector.start()

else:
logger.error("Please provide wss/ws endpoint for {}".format(strip_url(self.url)))
exit(1)

def probe(self, metrics):
def probe(self) -> results:
results.register(self.url, self.labels_values)
try:
if self.client.isConnected():
metrics['brpc_health'].add_metric(self.labels_values, True)
metrics['brpc_head_count'].add_metric(self.labels_values, self.ws_collector.message_counter)
metrics['brpc_disconnects'].add_metric(self.labels_values, self.ws_collector.disconnects_counter)
metrics['brpc_latency'].add_metric(self.labels_values, self.ws_collector.get_latency())
metrics['brpc_block_height'].add_metric(self.labels_values, self.client.eth.block_number)
metrics['brpc_total_difficulty'].add_metric(self.labels_values,
self.client.eth.get_block('latest')['totalDifficulty'])
metrics['brpc_difficulty'].add_metric(self.labels_values,
self.client.eth.get_block('latest')['difficulty'])
metrics['brpc_gas_price'].add_metric(self.labels_values, self.client.eth.gas_price)
metrics['brpc_max_priority_fee'].add_metric(self.labels_values, self.client.eth.max_priority_fee)
metrics['brpc_client_version'].add_metric(self.labels_values,
value={"client_version": self.client.clientVersion})
results.record_health(self.url, True)
results.record_head_count(self.url, self.ws_collector.message_counter)
results.record_disconnects(self.url, self.ws_collector.disconnects_counter)
results.record_latency(self.url, self.ws_collector.get_latency())
results.record_block_height(self.url, self.client.eth.block_number)
results.record_total_difficulty(self.url, self.client.eth.get_block('latest')['totalDifficulty'])
results.record_difficulty(self.url, self.client.eth.get_block('latest')['difficulty'])
results.record_gas_price(self.url, self.client.eth.gas_price)
results.record_max_priority_fee(self.url, self.client.eth.max_priority_fee)
results.record_client_version(self.url, self.client.clientVersion)
else:
logger.info("Client is not connected to {}".format(strip_url(self.url)))
metrics['brpc_health'].add_metric(self.labels_values, False)
results.record_health(self.url, False)
except asyncio.exceptions.TimeoutError as exc:
logger.info("Client timed out for {}: {}".format(strip_url(self.url), exc))
metrics['brpc_health'].add_metric(self.labels_values, False)
results.record_health(self.url, False)
except WebSocketException as exc:
logger.info("Websocket client exception {}: {}".format(strip_url(self.url), exc))
metrics['brpc_health'].add_metric(self.labels_values, False)
results.record_health(self.url, False)
except Exception as exc:
logger.error("Failed probing {} with error: {}".format(strip_url(self.url), exc))
metrics['brpc_health'].add_metric(self.labels_values, False)
results.record_health(self.url, False)
16 changes: 9 additions & 7 deletions src/collectors/filecoin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from settings import logger
from time import perf_counter
import requests
from metrics_processor import results


class filecoin_collector():
Expand All @@ -14,24 +15,25 @@ def __init__(self, rpc_metadata):
logger.error("Please provide https endpoint for {}".format(strip_url(self.url)))
exit(1)

def probe(self, metrics):
def probe(self) -> results:
results.register(self.url, self.labels_values)
try:
payload = {'jsonrpc': '2.0', 'method': "Filecoin.ChainHead", 'id': 1}
start = perf_counter()
response = requests.post(self.url, json=payload)
latency = (perf_counter() - start) * 1000

if response.ok:
metrics['brpc_health'].add_metric(self.labels_values, True)
metrics['brpc_latency'].add_metric(self.labels_values, latency)
metrics['brpc_block_height'].add_metric(self.labels_values, response.json()['result']['Height'])
results.record_health(self.url, True)
results.record_latency(self.url, latency)
results.record_block_height(self.url, response.json()['result']['Height'])
else:
logger.error("Bad response from client while fetching Filecoin.ChainHead method for {}: {}".format(
strip_url(self.url), response))
metrics['brpc_health'].add_metric(self.labels_values, False)
results.record_health(self.url, False)
except requests.RequestException as exc:
logger.error("Health check failed for {}: {}".format(strip_url(self.url), exc))
metrics['brpc_health'].add_metric(self.labels_values, False)
results.record_health(self.url, False)
except Exception as e:
logger.error("Health check failed for {}: {}".format(strip_url(self.url), e))
metrics['brpc_health'].add_metric(self.labels_values, False)
results.record_health(self.url, False)
16 changes: 9 additions & 7 deletions src/collectors/solana.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from helpers import strip_url, url_join, check_protocol, generate_labels_from_metadata
from collectors.ws import websocket_collector
import requests

from metrics_processor import results

class solana_collector():

Expand Down Expand Up @@ -48,14 +48,16 @@ def is_connected(self) -> bool:
return False
return response.ok

def probe(self, metrics):
def probe(self) -> results:
results.register(self.url, self.labels_values)
try:
if self.is_connected():
metrics['brpc_health'].add_metric(self.labels_values, True)
metrics['brpc_head_count'].add_metric(self.labels_values, self.ws_collector.message_counter)
metrics['brpc_disconnects'].add_metric(self.labels_values, self.ws_collector.disconnects_counter)
metrics['brpc_block_height'].add_metric(self.labels_values, self.client.get_block_height()['result'])
results.record_health(self.url, True)
results.record_head_count(self.url, self.ws_collector.message_counter)
results.record_disconnects(self.url, self.ws_collector.disconnects_counter)
results.record_block_height(self.url, self.client.get_block_height()['result'])
else:
metrics['brpc_health'].add_metric(self.labels_values, False)
results.record_health(self.url, False)
except Exception as exc:
logger.error("Failed probing {} with error: {}".format(strip_url(self.url), exc))
results.record_health(self.url, False)
17 changes: 9 additions & 8 deletions src/collectors/starkware.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from helpers import strip_url, check_protocol, generate_labels_from_metadata
from time import perf_counter
import requests

from metrics_processor import results

class starkware_collector():

Expand All @@ -14,7 +14,8 @@ def __init__(self, rpc_metadata):
logger.error("Please provide https endpoint for {}".format(strip_url(self.url)))
exit(1)

def probe(self, metrics):
def probe(self) -> results:
results.register(self.url, self.labels_values)
try:
payload = {"method": "starknet_blockNumber", "jsonrpc": "2.0", "id": 1}
start = perf_counter()
Expand All @@ -23,15 +24,15 @@ def probe(self, metrics):
latency = (perf_counter() - start) * 1000

if response:
metrics['brpc_health'].add_metric(self.labels_values, True)
metrics['brpc_latency'].add_metric(self.labels_values, latency)
metrics['brpc_block_height'].add_metric(self.labels_values, response['result'])
results.record_health(self.url, True)
results.record_latency(self.url, latency)
results.record_block_height(self.url, response['result'])
else:
logger.error("Bad response from client {}: {}".format(strip_url(self.url), exc))
metrics['brpc_health'].add_metric(self.labels_values, False)
results.record_health(self.url, False)
except requests.RequestException as exc:
logger.error("Health check failed for {}: {}".format(strip_url(self.url), exc))
metrics['brpc_health'].add_metric(self.labels_values, False)
results.record_health(self.url, False)
except Exception as exc:
logger.error("Health check failed for {}: {}".format(strip_url(self.url), exc))
metrics['brpc_health'].add_metric(self.labels_values, False)
results.record_health(self.url, False)
Loading

0 comments on commit 8f68d4a

Please sign in to comment.