Skip to content

Commit

Permalink
Merge branch 'main' into fix/docs
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenj authored Aug 10, 2023
2 parents 96f0baa + 0292919 commit 1588e53
Show file tree
Hide file tree
Showing 12 changed files with 326 additions and 240 deletions.
53 changes: 0 additions & 53 deletions services/voting-node/voting_node/committee.py

This file was deleted.

99 changes: 74 additions & 25 deletions services/voting-node/voting_node/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@
from pydantic import BaseModel

from .envvar import SECRET_SECRET
from .models import Event, HostInfo, LeaderHostInfo, Proposal, Snapshot, VotePlan
from .models import Contribution, Event, HostInfo, LeaderHostInfo, Proposal, Snapshot, VotePlan, Voter
from .utils import LEADER_REGEX, decrypt_secret, encrypt_secret, get_hostname


class EventDb(BaseModel):
"""Convenient abstraction to call the EventDB within the voting node service."""
"""Convenient abstraction to call the EventDB within the voting node service.
Each method that queries the EventDB opens and closes its own connection.
"""

connection: Connection | None = None
db_url: str
Expand All @@ -29,15 +32,7 @@ async def connect(self):
if conn is None:
raise Exception("failed to connect to the database")
self.connection = conn

def conn(self) -> Connection:
"""Return the connection to the DB.
Raise exception if connection is None.
"""
if self.connection is None:
raise Exception("no connection to EventDB found")
return self.connection
return conn

async def close(self):
"""Close a connection to the DB."""
Expand All @@ -47,6 +42,7 @@ async def close(self):
async def fetch_upcoming_event(self) -> Event:
"""Look in EventDB for the next event that will start."""
# first, check if there is an event that has not finished
conn = await self.connect()
now = datetime.datetime.utcnow()
query = """
SELECT
Expand All @@ -58,7 +54,8 @@ async def fetch_upcoming_event(self) -> Event:
ORDER BY
start_time ASC
LIMIT 1"""
result = await self.conn().fetchrow(query, now)
result = await conn.fetchrow(query, now)
await conn.close()

if result is None:
raise Exception("failed to fetch event from DB")
Expand All @@ -69,9 +66,12 @@ async def fetch_upcoming_event(self) -> Event:

async def fetch_leader_host_info(self, event_row_id: int) -> HostInfo:
"""Return HostInfo for leaders, sorted by hostname."""
conn = await self.connect()
conds = "hostname = $1 AND event = $2"
query = f"SELECT * FROM voting_node WHERE {conds}"
result = await self.conn().fetchrow(query, get_hostname(), event_row_id)
result = await conn.fetchrow(query, get_hostname(), event_row_id)
await conn.close()

match result:
case None:
raise Exception("failed to fetch leader node info from DB")
Expand All @@ -91,6 +91,7 @@ async def fetch_leader_host_info(self, event_row_id: int) -> HostInfo:

async def insert_leader_host_info(self, host_info: HostInfo):
"""Insert the hostname row into the voting_node table."""
conn = await self.connect()
fields = "hostname, event, seckey, pubkey, netkey"
values = "$1, $2, $3, $4, $5"
query = f"INSERT INTO voting_node({fields}) VALUES({values}) RETURNING *"
Expand All @@ -100,14 +101,16 @@ async def insert_leader_host_info(self, host_info: HostInfo):

enc_sk = encrypt_secret(h.seckey, encrypt_pass)
enc_nk = encrypt_secret(h.netkey, encrypt_pass)
result = await self.conn().execute(
result = await conn.execute(
query,
h.hostname,
h.event,
enc_sk,
h.pubkey,
enc_nk,
)
await conn.close()

if result is None:
raise Exception(f"failed to insert '{h.hostname}' info to DB")
logger.debug(f"{h.hostname} info added: {result}")
Expand All @@ -118,12 +121,15 @@ async def fetch_sorted_leaders_host_info(self, event_row_id: int) -> list[Leader
Fetch host information for leader nodes.
Raises exceptions if the DB fails to return a list of records, or if the list is empty.
"""
conn = await self.connect()
query = f"""
SELECT (hostname, pubkey)
FROM voting_node
WHERE hostname ~ '{LEADER_REGEX}' AND event = $1
ORDER BY hostname ASC"""
result = await self.conn().fetch(query, event_row_id)
result = await conn.fetch(query, event_row_id)
await conn.close()

match result:
case None:
raise Exception("DB error fetching leaders host info")
Expand All @@ -142,8 +148,11 @@ def extract_leader_info(leader):

async def fetch_proposals(self) -> list[Proposal]:
"""Return a list of proposals ."""
conn = await self.connect()
query = "SELECT * FROM proposal ORDER BY id ASC"
result = await self.conn().fetch(query)
result = await conn.fetch(query)
await conn.close()

if result is None:
raise Exception("proposals DB error")
logger.debug(f"proposals retrieved from DB: {len(result)}")
Expand All @@ -158,8 +167,11 @@ async def fetch_proposals(self) -> list[Proposal]:

async def check_if_snapshot_is_final(self, event_id: int) -> bool:
"""Query if the snapshot is finalized."""
conn = await self.connect()
query = "SELECT final FROM snapshot WHERE event = $1"
result = await self.conn().fetchrow(query, event_id)
result = await conn.fetchrow(query, event_id)
await conn.close()

match result:
case None:
raise Exception("snapshot DB error")
Expand All @@ -174,11 +186,14 @@ async def check_if_snapshot_is_final(self, event_id: int) -> bool:

async def fetch_snapshot(self, event_id: int) -> Snapshot:
"""Fetch the snapshot row for the event_id."""
conn = await self.connect()
# fetch the voters
columns = "(row_id, event, as_at, last_updated, dbsync_snapshot_data"
columns += ", drep_data, catalyst_snapshot_data, final)"
query = f"SELECT {columns} FROM snapshot WHERE event = $1"
result = await self.conn().fetchrow(query, event_id)
result = await conn.fetchrow(query, event_id)
await conn.close()

if result is None:
raise Exception("snapshot DB error")
logger.debug("snapshot retrieved from DB")
Expand All @@ -190,32 +205,66 @@ async def fetch_snapshot(self, event_id: int) -> Snapshot:
logger.debug("snapshot retrieved from DB")
return snapshot

async def fetch_voters(self, event_id: int) -> list[Voter]:
"""Fetch the voters registered for the event_id."""
conn = await self.connect()
query = """
SELECT * FROM voter WHERE snapshot_id IN (SELECT row_id FROM snapshot WHERE event = $1)
"""
result = await conn.fetch(query, event_id)
await conn.close()

match result:
case None:
raise Exception("DB error fetching voters")
case [*voters]:
logger.debug(f"voters retrieved from DB: {len(voters)}")
return [Voter(**dict(r)) for r in voters]

async def fetch_contributions(self, event_id: int) -> list[Contribution]:
"""Fetch the contributions registered for the event_id."""
conn = await self.connect()
query = """
SELECT * FROM contribution WHERE snapshot_id IN (SELECT row_id FROM snapshot WHERE event = $1)
"""
result = await conn.fetch(query, event_id)
await conn.close()

match result:
case None:
raise Exception("DB error fetching contributions")
case [*contributions]:
logger.debug(f"contributions retrieved from DB: {len(contributions)}")
return [Contribution(**dict(r)) for r in contributions]

async def fetch_voteplans(self, event_id: int) -> list[VotePlan]:
"""Fetch the voteplans for the event_id."""
conn = await self.connect()
query = "SELECT * FROM voteplan WHERE objective_id IN (SELECT row_id FROM objective WHERE event = $1) ORDER BY id ASC"
result = await self.conn().fetch(query, event_id)
if result is None:
raise Exception("voteplan DB error")
logger.debug(f"voteplans retrieved from DB: {len(result)}")
result = await conn.fetch(query, event_id)
await conn.close()

match result:
case None:
raise Exception("DB error fetching voteplans")
case []:
raise Exception("no voteplans found in DB")
case [*voteplans]:
logger.debug(f"voteplans retrieved from DB: {len(voteplans)}")
return [VotePlan(**dict(r)) for r in voteplans]

async def insert_block0_info(self, event_row_id: int, block0_bytes: bytes, block0_hash: str):
"""Update the event with the given block0 bytes and hash."""
conn = await self.connect()
columns = "block0 = $1, block0_hash = $2"
condition = "row_id = $3"
returning = "name"
query = f"UPDATE event SET {columns} WHERE {condition} RETURNING {returning}"
try:
result = await self.conn().execute(query, block0_bytes, block0_hash, event_row_id)
result = await conn.execute(query, block0_bytes, block0_hash, event_row_id)

if result is None:
raise Exception("failed to insert block0 info from DB")
logger.debug(f"block0 info added to event: {result}")
except Exception as e:
raise Exception(f"inserting block0 info went wrong: {e}") from e
finally:
await conn.close()
8 changes: 5 additions & 3 deletions services/voting-node/voting_node/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,11 @@ async def snapshot_import(self, event_id: int):
snapshot_tool_output_dir=snapshot_tool_out_dir,
)
else:
raise Exception("SSH_SNAPSHOT_TOOL_PATH, SSH_SNAPSHOT_TOOL_OUTPUT_DIR, "
"SSH_SNAPSHOT_TOOL_OUTPUT_DIR and SSH_SNAPSHOT_TOOL_DESTINATION "
"are all required when SNAPSHOT_TOOL_SSH is set")
raise Exception(
"SSH_SNAPSHOT_TOOL_PATH, SSH_SNAPSHOT_TOOL_OUTPUT_DIR, "
"SSH_SNAPSHOT_TOOL_OUTPUT_DIR and SSH_SNAPSHOT_TOOL_DESTINATION "
"are all required when SNAPSHOT_TOOL_SSH is set"
)
else:
ssh_config = None

Expand Down
2 changes: 1 addition & 1 deletion services/voting-node/voting_node/jcli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import asyncio
from pathlib import Path

from .committee import ElectionKey
from .models.committee import ElectionKey


class JCli:
Expand Down
Loading

0 comments on commit 1588e53

Please sign in to comment.