diff --git a/services/voting-node/voting_node/committee.py b/services/voting-node/voting_node/committee.py deleted file mode 100644 index 048a2b8389..0000000000 --- a/services/voting-node/voting_node/committee.py +++ /dev/null @@ -1,53 +0,0 @@ -"""Committee of members that will tally the votes.""" -from pydantic import BaseModel - - -class WalletKeys(BaseModel): - """The keys to an external wallet. - - `seckey` is the secret key for the wallet. - `pubkey` is the public key for the wallet. - `hex_encoded` is used to generate the genesis block. - """ - - seckey: str - pubkey: str - hex_encoded: str - - -class Keypair(BaseModel): - """A pair of key.""" - - seckey: str - """ Secret key.""" - - pubkey: str - """ Public key.""" - - -class CommunicationKeys(Keypair): - """Committee member communication keys.""" - - -class MemberKeys(Keypair): - """Committee member keys.""" - - -class CommitteeMember(BaseModel): - """Committee member.""" - - index: int - """Zero-based index for this committee member.""" - communication_keys: CommunicationKeys - """Member communication keypair.""" - member_keys: MemberKeys - """Committee member keypair.""" - - -class ElectionKey(BaseModel): - """The election key is used to sign every vote. - - This key can be rebuilt with the committee member keys. - """ - - pubkey: str diff --git a/services/voting-node/voting_node/db.py b/services/voting-node/voting_node/db.py index 80273f7d9a..3f0e77120b 100644 --- a/services/voting-node/voting_node/db.py +++ b/services/voting-node/voting_node/db.py @@ -8,12 +8,15 @@ from pydantic import BaseModel from .envvar import SECRET_SECRET -from .models import Event, HostInfo, LeaderHostInfo, Proposal, Snapshot, VotePlan +from .models import Contribution, Event, HostInfo, LeaderHostInfo, Proposal, Snapshot, VotePlan, Voter from .utils import LEADER_REGEX, decrypt_secret, encrypt_secret, get_hostname class EventDb(BaseModel): - """Convenient abstraction to call the EventDB within the voting node service.""" + """Convenient abstraction to call the EventDB within the voting node service. + + Each method that queries the EventDB opens and closes its own connection. + """ connection: Connection | None = None db_url: str @@ -29,15 +32,7 @@ async def connect(self): if conn is None: raise Exception("failed to connect to the database") self.connection = conn - - def conn(self) -> Connection: - """Return the connection to the DB. - - Raise exception if connection is None. - """ - if self.connection is None: - raise Exception("no connection to EventDB found") - return self.connection + return conn async def close(self): """Close a connection to the DB.""" @@ -47,6 +42,7 @@ async def close(self): async def fetch_upcoming_event(self) -> Event: """Look in EventDB for the next event that will start.""" # first, check if there is an event that has not finished + conn = await self.connect() now = datetime.datetime.utcnow() query = """ SELECT @@ -58,7 +54,8 @@ async def fetch_upcoming_event(self) -> Event: ORDER BY start_time ASC LIMIT 1""" - result = await self.conn().fetchrow(query, now) + result = await conn.fetchrow(query, now) + await conn.close() if result is None: raise Exception("failed to fetch event from DB") @@ -69,9 +66,12 @@ async def fetch_upcoming_event(self) -> Event: async def fetch_leader_host_info(self, event_row_id: int) -> HostInfo: """Return HostInfo for leaders, sorted by hostname.""" + conn = await self.connect() conds = "hostname = $1 AND event = $2" query = f"SELECT * FROM voting_node WHERE {conds}" - result = await self.conn().fetchrow(query, get_hostname(), event_row_id) + result = await conn.fetchrow(query, get_hostname(), event_row_id) + await conn.close() + match result: case None: raise Exception("failed to fetch leader node info from DB") @@ -91,6 +91,7 @@ async def fetch_leader_host_info(self, event_row_id: int) -> HostInfo: async def insert_leader_host_info(self, host_info: HostInfo): """Insert the hostname row into the voting_node table.""" + conn = await self.connect() fields = "hostname, event, seckey, pubkey, netkey" values = "$1, $2, $3, $4, $5" query = f"INSERT INTO voting_node({fields}) VALUES({values}) RETURNING *" @@ -100,7 +101,7 @@ async def insert_leader_host_info(self, host_info: HostInfo): enc_sk = encrypt_secret(h.seckey, encrypt_pass) enc_nk = encrypt_secret(h.netkey, encrypt_pass) - result = await self.conn().execute( + result = await conn.execute( query, h.hostname, h.event, @@ -108,6 +109,8 @@ async def insert_leader_host_info(self, host_info: HostInfo): h.pubkey, enc_nk, ) + await conn.close() + if result is None: raise Exception(f"failed to insert '{h.hostname}' info to DB") logger.debug(f"{h.hostname} info added: {result}") @@ -118,12 +121,15 @@ async def fetch_sorted_leaders_host_info(self, event_row_id: int) -> list[Leader Fetch host information for leader nodes. Raises exceptions if the DB fails to return a list of records, or if the list is empty. """ + conn = await self.connect() query = f""" SELECT (hostname, pubkey) FROM voting_node WHERE hostname ~ '{LEADER_REGEX}' AND event = $1 ORDER BY hostname ASC""" - result = await self.conn().fetch(query, event_row_id) + result = await conn.fetch(query, event_row_id) + await conn.close() + match result: case None: raise Exception("DB error fetching leaders host info") @@ -142,8 +148,11 @@ def extract_leader_info(leader): async def fetch_proposals(self) -> list[Proposal]: """Return a list of proposals .""" + conn = await self.connect() query = "SELECT * FROM proposal ORDER BY id ASC" - result = await self.conn().fetch(query) + result = await conn.fetch(query) + await conn.close() + if result is None: raise Exception("proposals DB error") logger.debug(f"proposals retrieved from DB: {len(result)}") @@ -158,8 +167,11 @@ async def fetch_proposals(self) -> list[Proposal]: async def check_if_snapshot_is_final(self, event_id: int) -> bool: """Query if the snapshot is finalized.""" + conn = await self.connect() query = "SELECT final FROM snapshot WHERE event = $1" - result = await self.conn().fetchrow(query, event_id) + result = await conn.fetchrow(query, event_id) + await conn.close() + match result: case None: raise Exception("snapshot DB error") @@ -174,11 +186,14 @@ async def check_if_snapshot_is_final(self, event_id: int) -> bool: async def fetch_snapshot(self, event_id: int) -> Snapshot: """Fetch the snapshot row for the event_id.""" + conn = await self.connect() # fetch the voters columns = "(row_id, event, as_at, last_updated, dbsync_snapshot_data" columns += ", drep_data, catalyst_snapshot_data, final)" query = f"SELECT {columns} FROM snapshot WHERE event = $1" - result = await self.conn().fetchrow(query, event_id) + result = await conn.fetchrow(query, event_id) + await conn.close() + if result is None: raise Exception("snapshot DB error") logger.debug("snapshot retrieved from DB") @@ -190,32 +205,66 @@ async def fetch_snapshot(self, event_id: int) -> Snapshot: logger.debug("snapshot retrieved from DB") return snapshot + async def fetch_voters(self, event_id: int) -> list[Voter]: + """Fetch the voters registered for the event_id.""" + conn = await self.connect() + query = """ + SELECT * FROM voter WHERE snapshot_id IN (SELECT row_id FROM snapshot WHERE event = $1) + """ + result = await conn.fetch(query, event_id) + await conn.close() + + match result: + case None: + raise Exception("DB error fetching voters") + case [*voters]: + logger.debug(f"voters retrieved from DB: {len(voters)}") + return [Voter(**dict(r)) for r in voters] + + async def fetch_contributions(self, event_id: int) -> list[Contribution]: + """Fetch the contributions registered for the event_id.""" + conn = await self.connect() + query = """ + SELECT * FROM contribution WHERE snapshot_id IN (SELECT row_id FROM snapshot WHERE event = $1) + """ + result = await conn.fetch(query, event_id) + await conn.close() + + match result: + case None: + raise Exception("DB error fetching contributions") + case [*contributions]: + logger.debug(f"contributions retrieved from DB: {len(contributions)}") + return [Contribution(**dict(r)) for r in contributions] + async def fetch_voteplans(self, event_id: int) -> list[VotePlan]: """Fetch the voteplans for the event_id.""" + conn = await self.connect() query = "SELECT * FROM voteplan WHERE objective_id IN (SELECT row_id FROM objective WHERE event = $1) ORDER BY id ASC" - result = await self.conn().fetch(query, event_id) - if result is None: - raise Exception("voteplan DB error") - logger.debug(f"voteplans retrieved from DB: {len(result)}") + result = await conn.fetch(query, event_id) + await conn.close() + match result: case None: raise Exception("DB error fetching voteplans") - case []: - raise Exception("no voteplans found in DB") case [*voteplans]: logger.debug(f"voteplans retrieved from DB: {len(voteplans)}") return [VotePlan(**dict(r)) for r in voteplans] async def insert_block0_info(self, event_row_id: int, block0_bytes: bytes, block0_hash: str): """Update the event with the given block0 bytes and hash.""" + conn = await self.connect() columns = "block0 = $1, block0_hash = $2" condition = "row_id = $3" returning = "name" query = f"UPDATE event SET {columns} WHERE {condition} RETURNING {returning}" try: - result = await self.conn().execute(query, block0_bytes, block0_hash, event_row_id) + result = await conn.execute(query, block0_bytes, block0_hash, event_row_id) + if result is None: raise Exception("failed to insert block0 info from DB") logger.debug(f"block0 info added to event: {result}") except Exception as e: raise Exception(f"inserting block0 info went wrong: {e}") from e + finally: + await conn.close() diff --git a/services/voting-node/voting_node/importer.py b/services/voting-node/voting_node/importer.py index 6b298e34fb..224c2e6f67 100644 --- a/services/voting-node/voting_node/importer.py +++ b/services/voting-node/voting_node/importer.py @@ -102,9 +102,11 @@ async def snapshot_import(self, event_id: int): snapshot_tool_output_dir=snapshot_tool_out_dir, ) else: - raise Exception("SSH_SNAPSHOT_TOOL_PATH, SSH_SNAPSHOT_TOOL_OUTPUT_DIR, " - "SSH_SNAPSHOT_TOOL_OUTPUT_DIR and SSH_SNAPSHOT_TOOL_DESTINATION " - "are all required when SNAPSHOT_TOOL_SSH is set") + raise Exception( + "SSH_SNAPSHOT_TOOL_PATH, SSH_SNAPSHOT_TOOL_OUTPUT_DIR, " + "SSH_SNAPSHOT_TOOL_OUTPUT_DIR and SSH_SNAPSHOT_TOOL_DESTINATION " + "are all required when SNAPSHOT_TOOL_SSH is set" + ) else: ssh_config = None diff --git a/services/voting-node/voting_node/jcli.py b/services/voting-node/voting_node/jcli.py index be25e22298..bdba7c35f6 100644 --- a/services/voting-node/voting_node/jcli.py +++ b/services/voting-node/voting_node/jcli.py @@ -2,7 +2,7 @@ import asyncio from pathlib import Path -from .committee import ElectionKey +from .models.committee import ElectionKey class JCli: diff --git a/services/voting-node/voting_node/models.py b/services/voting-node/voting_node/models/__init__.py similarity index 82% rename from services/voting-node/voting_node/models.py rename to services/voting-node/voting_node/models/__init__.py index 18bf13b07a..ac87f7d743 100644 --- a/services/voting-node/voting_node/models.py +++ b/services/voting-node/voting_node/models/__init__.py @@ -7,10 +7,6 @@ import yaml from aiofile import async_open -from loguru import logger -from pydantic import BaseModel - -from .committee import CommitteeMember, CommunicationKeys, ElectionKey, MemberKeys ### Base types @@ -143,60 +139,6 @@ class LeaderHostInfo: consensus_leader_id: str -class Committee(BaseModel): - """The tallying committee. - - `event_id` the number of committee members. - `size` the number of committee members. - `threshold` the minimum number of members needed to tally. - `committee_pk` the encrypted private key of the Committee address. - `committee_id` the hex-encoded public key of the Committee address. - `crs` the encrypted Common Reference String shared in the creation of every set of committee member keys. - `members` list of containing the communication and member secrets of each member of the commitee. - `election_key` public key used to sign every vote in the event. This key is created from the committee member public keys. - """ - - row_id: int | None = None - """`row_id` the unique key for this committee in the DB.""" - event_id: int - size: int - threshold: int - crs: str - committee_pk: str - committee_id: str - members: list[CommitteeMember] | None = None - election_key: ElectionKey - - def as_yaml(self) -> str: - """Return the content as YAML.""" - return yaml.safe_dump(self.dict()) - - @classmethod - async def read_file(cls, file: Path) -> Self: - """Read and return the yaml_type from the file path.""" - afp = await async_open(file, "r") - yaml_str = await afp.read() - await afp.close() - yaml_dict = yaml.safe_load(yaml_str) - try: - members_list = yaml_dict["members"] - - def committee_member(member: dict) -> CommitteeMember: - comm_keys = [print(keys) for keys in member["communication_keys"]] - comm_keys = [CommunicationKeys(**keys) for keys in member["communication_keys"]] - logger.debug(f"comm_keys: {comm_keys}") - member["communication_keys"] = comm_keys - member_keys = [MemberKeys(**keys) for keys in member["member_keys"]] - member["member_keys"] = member_keys - return CommitteeMember(**member) - - yaml_dict["members"] = [committee_member(member) for member in members_list] - committee = cls(**yaml_dict) - return committee - except Exception as e: - raise Exception(f"invalid committee in {file}: {e}") - - @dataclass class Block0: """Represents the path to 'block0.bin' and its hash.""" @@ -307,6 +249,14 @@ def get_snapshot_start(self) -> datetime: raise Exception("event has no snapshot start time") return self.snapshot_start + def has_snapshot_started(self) -> bool: + """Return True when current time is equal or greater to the voting start time. + + This method raises exception if the timestamp is None. + """ + snapshot_start = self.get_snapshot_start() + return datetime.utcnow() >= snapshot_start + def get_voting_start(self) -> datetime: """Get the timestamp for when the event voting starts. @@ -430,6 +380,30 @@ class Voter: voting_power: int +@dataclass +class Contribution: + """Individual contributions from the stake public key to the voting key.""" + + row_id: str + # Stake Public key for the voter. + stake_public_key: str + # The ID of the snapshot this record belongs to + snapshot_id: str + + # The voting key. If None, it is the raw staked ADA. + voting_key: str | None + # The weight that this key gets from the total. + voting_weight: int | None + # The index from 0 of the keys in the delegation array. + voting_key_idx: int | None + # The amount of ADA contributed to this voting key from the stake address. + value: int + # The group that this contribution goes to. + voting_group: str + # Currently unused. + reward_address: str | None + + @dataclass class VotePlan: """A vote plan for this event.""" diff --git a/services/voting-node/voting_node/models/committee.py b/services/voting-node/voting_node/models/committee.py new file mode 100644 index 0000000000..7540bb8a89 --- /dev/null +++ b/services/voting-node/voting_node/models/committee.py @@ -0,0 +1,109 @@ +"""Committee of members that will tally the votes.""" +import yaml +from aiofile import async_open +from loguru import logger +from pathlib import Path +from pydantic import BaseModel +from typing import Self + + +class WalletKeys(BaseModel): + """The keys to an external wallet.""" + + seckey: str + """`seckey` is the secret key for the wallet.""" + pubkey: str + """`pubkey` is the public key for the wallet.""" + hex_encoded: str + """`hex_encoded` is used to generate the genesis block.""" + + +class Keypair(BaseModel): + """A pair of key.""" + + seckey: str + """ Secret key.""" + + pubkey: str + """ Public key.""" + + +class CommunicationKeys(Keypair): + """Committee member communication keys.""" + + +class MemberKeys(Keypair): + """Committee member keys.""" + + +class CommitteeMember(BaseModel): + """Committee member.""" + + index: int + """Zero-based index for this committee member.""" + communication_keys: CommunicationKeys + """Member communication keypair.""" + member_keys: MemberKeys + """Committee member keypair.""" + + +class ElectionKey(BaseModel): + """The election key is used to sign every vote. + + This key can be rebuilt with the committee member keys. + """ + + pubkey: str + """ Public key.""" + + +class Committee(BaseModel): + """The tallying committee.""" + + row_id: int | None = None + """`row_id` the unique key for this committee in the DB.""" + event_id: int + """`event_id` the number of committee members.""" + size: int + """`size` the number of committee members.""" + threshold: int + """`threshold` the minimum number of members needed to tally.""" + crs: str + """`crs` the encrypted Common Reference String shared in the creation of every set of committee member keys.""" + committee_pk: str + """`committee_pk` the encrypted private key of the Committee address.""" + committee_id: str + """`committee_id` the hex-encoded public key of the Committee address.""" + members: list[CommitteeMember] | None = None + """`members` list of containing the communication and member secrets of each member of the commitee.""" + election_key: ElectionKey + """`election_key` public key used to sign every vote in the event. This key is created from the committee member public keys.""" + + def as_yaml(self) -> str: + """Return the content as YAML.""" + return yaml.safe_dump(self.dict()) + + @classmethod + async def read_file(cls, file: Path) -> Self: + """Read and return the yaml_type from the file path.""" + afp = await async_open(file, "r") + yaml_str = await afp.read() + await afp.close() + yaml_dict = yaml.safe_load(yaml_str) + try: + members_list = yaml_dict["members"] + + def committee_member(member: dict) -> CommitteeMember: + comm_keys = [print(keys) for keys in member["communication_keys"]] + comm_keys = [CommunicationKeys(**keys) for keys in member["communication_keys"]] + logger.debug(f"comm_keys: {comm_keys}") + member["communication_keys"] = comm_keys + member_keys = [MemberKeys(**keys) for keys in member["member_keys"]] + member["member_keys"] = member_keys + return CommitteeMember(**member) + + yaml_dict["members"] = [committee_member(member) for member in members_list] + committee = cls(**yaml_dict) + return committee + except Exception as e: + raise Exception(f"invalid committee in {file}: {e}") diff --git a/services/voting-node/voting_node/models/token.py b/services/voting-node/voting_node/models/token.py new file mode 100644 index 0000000000..332489761c --- /dev/null +++ b/services/voting-node/voting_node/models/token.py @@ -0,0 +1,5 @@ +class TokenId: + """The token id for a voting group.""" + + policy_hash: bytes + token_name: bytes diff --git a/services/voting-node/voting_node/node.py b/services/voting-node/voting_node/node.py index fc92638bf4..25aa6a06cd 100644 --- a/services/voting-node/voting_node/node.py +++ b/services/voting-node/voting_node/node.py @@ -12,7 +12,6 @@ from .models import ( Block0, - Committee, Event, FundsForToken, Genesis, @@ -24,6 +23,7 @@ Proposal, VotePlanCertificate, ) +from .models.committee import Committee class BaseNode(BaseModel): @@ -143,8 +143,8 @@ def has_snapshot_started(self) -> bool: This is the time when the snapshot is considered to be stable. """ - snapshot_start = self.get_snapshot_start() - return datetime.utcnow() > snapshot_start + event = self.get_event() + return event.has_snapshot_started() def has_voting_started(self) -> bool: """Get the timestamp for when the event voting starts. diff --git a/services/voting-node/voting_node/service.py b/services/voting-node/voting_node/service.py index 299f25e175..a70a6c0a4b 100644 --- a/services/voting-node/voting_node/service.py +++ b/services/voting-node/voting_node/service.py @@ -106,3 +106,5 @@ def get_schedule(self): return tasks.LeaderSchedule(self.settings) case ("follower", _): return tasks.FollowerSchedule(self.settings) + case _: + return None diff --git a/services/voting-node/voting_node/storage.py b/services/voting-node/voting_node/storage.py index 0ee7b1a059..8108165be0 100644 --- a/services/voting-node/voting_node/storage.py +++ b/services/voting-node/voting_node/storage.py @@ -5,9 +5,8 @@ from loguru import logger from pydantic import BaseModel -from .committee import ElectionKey from .envvar import SECRET_SECRET -from .models import Committee +from .models.committee import Committee, ElectionKey from .utils import decrypt_secret, encrypt_secret @@ -67,6 +66,7 @@ async def get_committee(self, event_id: int) -> Committee: threshold=record["threshold"], crs=crs, committee_id=record["committee_id"], + committee_pk=record["committee_pk"], election_key=ElectionKey(pubkey=record["election_key"]), ) # fetch committee members diff --git a/services/voting-node/voting_node/tasks.py b/services/voting-node/voting_node/tasks.py index 755fa6740e..a6f4a0df7a 100644 --- a/services/voting-node/voting_node/tasks.py +++ b/services/voting-node/voting_node/tasks.py @@ -41,52 +41,55 @@ SCHEDULE_RESET_MSG = "schedule was reset" LEADER_NODE_SCHEDULE: Final = [ - "connect_db", - "fetch_upcoming_event", - "wait_for_start_time", - "fetch_host_keys", - "fetch_leaders", - "set_node_secret", - "set_node_topology_key", - "set_node_config", - "get_block0", - "wait_for_voting", - "voting", - "wait_for_tally", - "tally", - "cleanup", + "node_fetch_event", + "node_wait_for_start_time", + "node_fetch_host_keys", + "node_fetch_leaders", + "node_set_secret", + "node_set_topology_key", + "node_set_config", + "block0_fetch", + "node_wait_for_voting", + "event_voting_period", + "node_wait_for_tally", + "event_tally_period", + "node_cleanup", ] +"""List of scheduled tasks for peer leader nodes.""" LEADER0_NODE_SCHEDULE: Final = [ - "connect_db", - "fetch_upcoming_event", - "wait_for_start_time", - "fetch_host_keys", - "fetch_leaders", - "set_node_secret", - "set_node_topology_key", - "set_node_config", - "import_snapshot_data", - "collect_snapshot_data", - "setup_tally_committee", - "setup_block0", - "publish_block0", - "wait_for_voting", - "voting", - "wait_for_tally", - "tally", - "cleanup", + "node_fetch_event", + "node_wait_for_start_time", + "node_fetch_host_keys", + "node_fetch_leaders", + "node_set_secret", + "node_set_topology_key", + "node_set_config", + "event_snapshot_period", + "node_snapshot_data", + "block0_tally_committee", + "block0_voting_tokens", + "block0_wallet_registrations", + "block0_token_distributions", + "block0_genesis_build", + "block0_publish", + "node_wait_for_voting", + "event_voting_period", + "node_wait_for_tally", + "event_tally_period", + "node_cleanup", ] +"""List of scheduled tasks for leader0 nodes.""" FOLLOWER_NODE_SCHEDULE: Final = [ - "connect_db", - "fetch_upcoming_event", - "wait_for_start_time", - "fetch_host_keys", - "fetch_leaders", - "set_node_secret", - "set_node_topology_key", - "set_node_config", - "cleanup", + "node_fetch_event", + "node_wait_for_start_time", + "node_fetch_host_keys", + "node_fetch_leaders", + "node_set_secret", + "node_set_topology_key", + "node_set_config", + "node_cleanup", ] +"""List of scheduled tasks for follower nodes.""" class ScheduleRunner: @@ -183,14 +186,14 @@ def jorm(self) -> Jormungandr: """Return the wrapper to the 'jormungandr' shell command.""" return Jormungandr(self.settings.jorm_path_str) - async def connect_db(self): - """Async connection to the DB.""" - await self.db.connect() + async def node_fetch_event(self): + """Fetch the event from the DB. - async def fetch_upcoming_event(self): - """Fetch the upcoming event from the DB.""" - # This all starts by getting the event row that has the nearest - # `voting_start`. We query the DB to get the row, and store it. + This all starts by getting the event row that has the nearest + `voting_start`, or an event that has stared and not ended. + + We query the DB to get the row, and store it in the node. + """ try: event = await self.db.fetch_upcoming_event() logger.debug("upcoming event retrieved from DB") @@ -198,7 +201,7 @@ async def fetch_upcoming_event(self): except Exception as e: self.reset_schedule(f"{e}") - async def wait_for_start_time(self): + async def node_wait_for_start_time(self): """Wait for the event start time.""" # check if the event has started, otherwise, resets the schedule # raises exception if the event has no start time defined @@ -208,7 +211,7 @@ async def wait_for_start_time(self): logger.debug("event has not started") self.reset_schedule(f"event will start on {self.node.get_start_time()}") - async def fetch_host_keys(self): + async def node_fetch_host_keys(self): """Fetch or create voting node secret key information, by hostname, for the current event. Reset the schedule if no current event is defined. @@ -259,7 +262,7 @@ async def fetch_host_keys(self): except Exception as e: self.reset_schedule(f"{e}") - async def fetch_leaders(self): + async def node_fetch_leaders(self): """Fetch from the DB host info for other leaders.""" # gets the event, raises exception if none is found. event = self.node.get_event() @@ -271,7 +274,7 @@ async def fetch_leaders(self): except Exception as e: self.reset_schedule(f"{e}") - async def set_node_secret(self): + async def node_set_secret(self): """Set the seckey from the host info and saves it to the node storage node_secret.yaml.""" # get the node secret from HostInfo, if it's not found, reset match self.node.host_info: @@ -296,7 +299,7 @@ async def set_node_secret(self): case _: self.reset_schedule("no node host info was found") - async def set_node_topology_key(self): + async def node_set_topology_key(self): """Set the node network topology key.""" match self.node.host_info: case HostInfo(netkey=netkey): @@ -309,7 +312,7 @@ async def set_node_topology_key(self): case _: self.reset_schedule("host info was not found for this node") - async def set_node_config(self): + async def node_set_config(self): """Set the node configuration.""" # check that we have the info we need, otherwise, we reset if self.node.topology_key is None: @@ -365,14 +368,11 @@ async def set_node_config(self): logger.debug("node config saved") self.node.config = node_config_yaml - async def cleanup(self): + async def node_cleanup(self): """Execute cleanup chores to stop the voting node service. - * Close the DB connection. * ... """ - # close the DB connection - await self.db.close() class LeaderSchedule(NodeTaskSchedule): @@ -383,7 +383,7 @@ class LeaderSchedule(NodeTaskSchedule): # Leader Node tasks tasks: list[str] = LEADER_NODE_SCHEDULE - async def get_block0(self): + async def block0_fetch(self): """Get block0 information from the node event. Raises exception if the node has no leaders, or no event, or no event start time defined. @@ -407,7 +407,7 @@ async def get_block0(self): self.node.block0_path = block0_path logger.debug(f"block0 found in voting event: {self.node.block0}") - async def wait_for_voting(self): + async def node_wait_for_voting(self): """Wait for the event voting time.""" # get the voting start timestamp # raises an exception otherwise @@ -418,7 +418,7 @@ async def wait_for_voting(self): logger.debug("voting has started") - async def voting(self): + async def event_voting_period(self): """Execute jormungandr node for voting.""" logger.debug(f"NODE: {self.node.secret}") if self.node.secret is None: @@ -429,7 +429,7 @@ async def voting(self): self.reset_schedule("event has no block0.bin") await self.jorm().start_leader(self.node.secret.path, self.node.config.path, self.node.block0_path) - async def wait_for_tally(self): + async def node_wait_for_tally(self): """Wait for vote tally to begin.""" # get the voting end timestamp # raises an exception otherwise @@ -440,7 +440,7 @@ async def wait_for_tally(self): logger.debug("voting has ended, tallying has begun") - async def tally(self): + async def event_tally_period(self): """Execute the vote tally.""" @@ -452,29 +452,11 @@ class Leader0Schedule(LeaderSchedule): # Leader0 Node tasks tasks: list[str] = LEADER0_NODE_SCHEDULE - async def fetch_upcoming_event(self): - """Override common method to fetch the upcoming event from the DB. + async def event_snapshot_period(self): + """Timespan for continuous snapshot import from DBSync and IdeaScale. - 'leader0' nodes that don't find an upcoming event, create one with - default values. + A snapshot is taken at intervals of `SNAPSHOT_INTERVAL_SECONDS` (default 1800 seconds). """ - try: - event = await self.db.fetch_upcoming_event() - logger.debug("current event retrieved from DB") - self.node.event = event - except Exception: - # run helper to add a default event to the DB - from .helpers import add_default_event - - logger.debug("event not found from DB, attempting to create") - await add_default_event(db_url=self.settings.db_url) - logger.info("event added to DB") - event = await self.db.fetch_upcoming_event() - logger.debug("current event retrieved from DB") - self.node.event = event - - async def import_snapshot_data(self): - """Collect the snapshot data from EventDB.""" event = self.node.get_event() registration_time = event.get_registration_snapshot_time() snapshot_start = event.get_snapshot_start() @@ -482,12 +464,12 @@ async def import_snapshot_data(self): logger.info(f"Execute snapshot runner for event in row {event.row_id}.") await runner.take_snapshots(event.row_id) - async def collect_snapshot_data(self): + async def node_snapshot_data(self): """Collect the snapshot data from EventDB.""" # gets the event, raises exception if none is found. event = self.node.get_event() - snapshot_start = self.node.get_snapshot_start() - if not self.node.has_snapshot_started(): + snapshot_start = event.get_snapshot_start() + if not event.has_snapshot_started(): raise Exception(f"snapshot will be stable on {snapshot_start} UTC") # check for this field before getting the data @@ -523,7 +505,7 @@ async def collect_snapshot_data(self): logger.warning("no proposals were found") # raise Exception(f"failed to fetch proposals from DB: {e}") from e - async def setup_tally_committee(self): + async def block0_tally_committee(self): """Fetch or create tally committee data. 1. Fetch the committee from the node secret storage @@ -533,9 +515,11 @@ async def setup_tally_committee(self): then a random 32-byte hex token is generated and used. """ event = self.node.get_event() + conn = await self.db.connect() + secret_store = SecretDBStorage(conn=conn) # TODO: fetch tally committee data from secret storage try: - committee = await SecretDBStorage(conn=self.db.conn()).get_committee(event.row_id) + committee = await secret_store.get_committee(event.row_id) logger.debug("fetched committee from storage") self.node.committee = committee except Exception as e: @@ -557,11 +541,25 @@ async def setup_tally_committee(self): ) logger.debug(f"created committee: {committee.as_yaml()}") self.node.committee = committee - await SecretDBStorage(conn=self.db.conn()).save_committee(event_id=event.row_id, committee=committee) + await secret_store.save_committee(event_id=event.row_id, committee=committee) logger.debug("saved committee to storage") + finally: + await conn.close() + + async def block0_voting_tokens(self): + """Build voting tokens for each voting group in the current event.""" + logger.error("block0_voting_tokens is not implemented!") + + async def block0_wallet_registrations(self): + """Build initial fund fragments for registered wallets.""" + logger.error("block0_wallet_registrations is not implemented!") + + async def block0_token_distributions(self): + """Build initial token distribution fragments for registered contributions.""" + logger.error("block0_token_distributions is not implemented!") - async def setup_block0(self): - """Check DB event for block0 information. + async def block0_genesis_build(self): + """Build genesis block. Create and add it to the DB when the needed information is found. Reset the schedule otherwise. @@ -625,7 +623,7 @@ async def setup_block0(self): self.node.block0_path = block0_path logger.debug(f"block0 created and saved: {self.node.block0.hash}") - async def publish_block0(self): + async def block0_publish(self): """Publish block0 to the current event in EventDB.""" event = self.node.get_event() match self.node.block0: diff --git a/services/voting-node/voting_node/utils.py b/services/voting-node/voting_node/utils.py index 89bf5dea31..5306ab4112 100644 --- a/services/voting-node/voting_node/utils.py +++ b/services/voting-node/voting_node/utils.py @@ -14,9 +14,9 @@ from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC from loguru import logger -from .committee import CommitteeMember, CommunicationKeys, MemberKeys, WalletKeys from .jcli import JCli -from .models import Committee, Event, Genesis, LeaderHostInfo, NodeConfig +from .models import Event, Genesis, LeaderHostInfo, NodeConfig +from .models.committee import Committee, CommitteeMember, CommunicationKeys, MemberKeys, WalletKeys from .templates import ( GENESIS_YAML, NODE_CONFIG_FOLLOWER,