From 4c277d5f088ac3a2991e5d9b789876d30720c6f8 Mon Sep 17 00:00:00 2001 From: elsirion Date: Mon, 8 Apr 2024 18:02:56 +0200 Subject: [PATCH] feat: add session block height into DB --- schema/v0.sql | 79 +++++++++++++++++++++++++++++++++++++- src/federation/observer.rs | 54 ++++++++++++++++++++++++-- 2 files changed, 128 insertions(+), 5 deletions(-) diff --git a/schema/v0.sql b/schema/v0.sql index 9e9cf04..714cb9c 100644 --- a/schema/v0.sql +++ b/schema/v0.sql @@ -68,10 +68,87 @@ CREATE TABLE IF NOT EXISTS ln_contracts ( CREATE INDEX IF NOT EXISTS ln_contract_federation_contract ON ln_contracts(federation_id, contract_id); CREATE INDEX IF NOT EXISTS ln_contract_federation ON ln_contracts (federation_id); CREATE INDEX IF NOT EXISTS ln_contract_hashes ON ln_contracts (payment_hash); -CREATE INDEX IF NOT EXISTS ln_contract_gateways ON ln_contracts(gateway_id); CREATE TABLE IF NOT EXISTS block_times ( block_height INTEGER PRIMARY KEY, timestamp INTEGER NOT NULL ); CREATE INDEX IF NOT EXISTS block_times_time ON block_times(timestamp); + +CREATE TABLE IF NOT EXISTS block_height_votes ( + federation_id BLOB NOT NULL REFERENCES federations(federation_id), + session_index INTEGER NOT NULL, + item_index INTEGER NOT NULL, + proposer INTEGER NOT NULL, + height_vote INTEGER NOT NULL REFERENCES block_times(block_height), + PRIMARY KEY (federation_id, session_index, item_index), + FOREIGN KEY (federation_id, session_index) REFERENCES sessions(federation_id, session_index) +); +CREATE INDEX IF NOT EXISTS block_height_vote_federation_sessions ON block_height_votes(federation_id, session_index); +CREATE INDEX IF NOT EXISTS block_height_vote_heights ON block_height_votes(height_vote); + +CREATE VIEW IF NOT EXISTS session_times AS WITH session_votes AS ( + SELECT + s.session_index, + federation_id + FROM + sessions s + WHERE + federation_id = x'120879c1233789679a4ed9b47ba557f8da3d4577b4e0b3f61fa5afd3137b824a' +), SortedVotes AS ( + SELECT + sv.session_index, + height_vote, + ROW_NUMBER() OVER (PARTITION BY sv.session_index ORDER BY height_vote) AS rn, + COUNT(bhv.height_vote) OVER (PARTITION BY sv.session_index) AS total_votes + FROM + session_votes sv + LEFT JOIN + block_height_votes bhv ON sv.session_index = bhv.session_index + AND sv.federation_id = bhv.federation_id +), session_max_height AS ( + SELECT + session_index, + MAX(height_vote) AS max_height_vote -- Include max to handle NULLs in averaging + FROM + SortedVotes + WHERE + total_votes > 0 + GROUP BY + session_index +), session_time AS ( + SELECT + sv.session_index, + ( + SELECT + bt.timestamp + FROM + block_times bt + WHERE + mh.max_height_vote IS NOT NULL + AND bt.block_height = mh.max_height_vote + ) AS timestamp + FROM + session_votes sv + LEFT JOIN + session_max_height mh ON sv.session_index = mh.session_index +), grouped_sessions AS ( + SELECT + *, + SUM(CASE WHEN timestamp IS NOT NULL THEN 1 ELSE 0 END) OVER (ORDER BY session_index) AS time_group + FROM + session_time +), propagated_times AS ( + SELECT + session_index, + FIRST_VALUE(timestamp) OVER (PARTITION BY time_group ORDER BY session_index) AS estimated_session_timestamp + FROM + grouped_sessions +) +SELECT + session_index, + estimated_session_timestamp +FROM + propagated_times +ORDER BY + session_index; diff --git a/src/federation/observer.rs b/src/federation/observer.rs index eedc5db..8c4241e 100644 --- a/src/federation/observer.rs +++ b/src/federation/observer.rs @@ -3,16 +3,17 @@ use std::time::{Duration, SystemTime}; use anyhow::ensure; use fedimint_core::api::{DynGlobalApi, InviteCode}; use fedimint_core::config::{ClientConfig, FederationId}; +use fedimint_core::core::DynModuleConsensusItem; use fedimint_core::encoding::Encodable; use fedimint_core::epoch::ConsensusItem; use fedimint_core::session_outcome::SessionOutcome; use fedimint_core::task::TaskGroup; use fedimint_core::util::retry; -use fedimint_core::Amount; +use fedimint_core::{Amount, PeerId}; use fedimint_ln_common::contracts::{Contract, IdentifiableContract}; use fedimint_ln_common::{LightningInput, LightningOutput, LightningOutputV0}; use fedimint_mint_common::{MintInput, MintOutput}; -use fedimint_wallet_common::{WalletInput, WalletOutput}; +use fedimint_wallet_common::{WalletConsensusItem, WalletInput, WalletOutput}; use futures::StreamExt; use hex::ToHex; use serde::Serialize; @@ -135,7 +136,7 @@ impl FederationObserver { } async fn fetch_block_times(self) { - const SLEEP_SECS: u64 = 10; + const SLEEP_SECS: u64 = 60; loop { if let Err(e) = self.fetch_block_times_inner().await { warn!("Error while fetching block times: {e:?}"); @@ -295,8 +296,20 @@ impl FederationObserver { ) .await?; } + ConsensusItem::Module(module_ci) => { + Self::process_ci( + dbtx, + federation_id, + &config, + session_index, + item_idx as u64, + item.peer, + module_ci, + ) + .await?; + } _ => { - // FIXME: process module CIs + // Ignore unknown CIs } } } @@ -465,6 +478,39 @@ impl FederationObserver { Ok(()) } + async fn process_ci( + dbtx: &mut Transaction<'_, Any>, + federation_id: FederationId, + config: &ClientConfig, + session_index: u64, + item_index: u64, + peer_id: PeerId, + ci: DynModuleConsensusItem, + ) -> sqlx::Result<()> { + let kind = instance_to_kind(config, ci.module_instance_id()); + + if kind != "wallet" { + return Ok(()); + } + + let wallet_ci = ci + .as_any() + .downcast_ref::() + .expect("config says this should be a wallet CI"); + if let WalletConsensusItem::BlockCount(height_vote) = wallet_ci { + query("INSERT INTO block_height_votes VALUES ($1, $2, $3, $4, $5)") + .bind(federation_id.consensus_encode_to_vec()) + .bind(session_index as i64) + .bind(item_index as i64) + .bind(peer_id.to_usize() as i64) + .bind(*height_vote as i64) + .execute(dbtx.as_mut()) + .await?; + } + + Ok(()) + } + pub async fn get_federation_assets( &self, federation_id: FederationId,