Skip to content

Commit

Permalink
feat: add session block height into DB
Browse files Browse the repository at this point in the history
  • Loading branch information
elsirion committed Apr 8, 2024
1 parent e8c615d commit 4c277d5
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 5 deletions.
79 changes: 78 additions & 1 deletion schema/v0.sql
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,87 @@ CREATE TABLE IF NOT EXISTS ln_contracts (
CREATE INDEX IF NOT EXISTS ln_contract_federation_contract ON ln_contracts(federation_id, contract_id);
CREATE INDEX IF NOT EXISTS ln_contract_federation ON ln_contracts (federation_id);
CREATE INDEX IF NOT EXISTS ln_contract_hashes ON ln_contracts (payment_hash);
CREATE INDEX IF NOT EXISTS ln_contract_gateways ON ln_contracts(gateway_id);

CREATE TABLE IF NOT EXISTS block_times (
block_height INTEGER PRIMARY KEY,
timestamp INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS block_times_time ON block_times(timestamp);

CREATE TABLE IF NOT EXISTS block_height_votes (
federation_id BLOB NOT NULL REFERENCES federations(federation_id),
session_index INTEGER NOT NULL,
item_index INTEGER NOT NULL,
proposer INTEGER NOT NULL,
height_vote INTEGER NOT NULL REFERENCES block_times(block_height),
PRIMARY KEY (federation_id, session_index, item_index),
FOREIGN KEY (federation_id, session_index) REFERENCES sessions(federation_id, session_index)
);
CREATE INDEX IF NOT EXISTS block_height_vote_federation_sessions ON block_height_votes(federation_id, session_index);
CREATE INDEX IF NOT EXISTS block_height_vote_heights ON block_height_votes(height_vote);

CREATE VIEW IF NOT EXISTS session_times AS WITH session_votes AS (
SELECT
s.session_index,
federation_id
FROM
sessions s
WHERE
federation_id = x'120879c1233789679a4ed9b47ba557f8da3d4577b4e0b3f61fa5afd3137b824a'
), SortedVotes AS (
SELECT
sv.session_index,
height_vote,
ROW_NUMBER() OVER (PARTITION BY sv.session_index ORDER BY height_vote) AS rn,
COUNT(bhv.height_vote) OVER (PARTITION BY sv.session_index) AS total_votes
FROM
session_votes sv
LEFT JOIN
block_height_votes bhv ON sv.session_index = bhv.session_index
AND sv.federation_id = bhv.federation_id
), session_max_height AS (
SELECT
session_index,
MAX(height_vote) AS max_height_vote -- Include max to handle NULLs in averaging
FROM
SortedVotes
WHERE
total_votes > 0
GROUP BY
session_index
), session_time AS (
SELECT
sv.session_index,
(
SELECT
bt.timestamp
FROM
block_times bt
WHERE
mh.max_height_vote IS NOT NULL
AND bt.block_height = mh.max_height_vote
) AS timestamp
FROM
session_votes sv
LEFT JOIN
session_max_height mh ON sv.session_index = mh.session_index
), grouped_sessions AS (
SELECT
*,
SUM(CASE WHEN timestamp IS NOT NULL THEN 1 ELSE 0 END) OVER (ORDER BY session_index) AS time_group
FROM
session_time
), propagated_times AS (
SELECT
session_index,
FIRST_VALUE(timestamp) OVER (PARTITION BY time_group ORDER BY session_index) AS estimated_session_timestamp
FROM
grouped_sessions
)
SELECT
session_index,
estimated_session_timestamp
FROM
propagated_times
ORDER BY
session_index;
54 changes: 50 additions & 4 deletions src/federation/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@ use std::time::{Duration, SystemTime};
use anyhow::ensure;
use fedimint_core::api::{DynGlobalApi, InviteCode};
use fedimint_core::config::{ClientConfig, FederationId};
use fedimint_core::core::DynModuleConsensusItem;
use fedimint_core::encoding::Encodable;
use fedimint_core::epoch::ConsensusItem;
use fedimint_core::session_outcome::SessionOutcome;
use fedimint_core::task::TaskGroup;
use fedimint_core::util::retry;
use fedimint_core::Amount;
use fedimint_core::{Amount, PeerId};
use fedimint_ln_common::contracts::{Contract, IdentifiableContract};
use fedimint_ln_common::{LightningInput, LightningOutput, LightningOutputV0};
use fedimint_mint_common::{MintInput, MintOutput};
use fedimint_wallet_common::{WalletInput, WalletOutput};
use fedimint_wallet_common::{WalletConsensusItem, WalletInput, WalletOutput};
use futures::StreamExt;
use hex::ToHex;
use serde::Serialize;
Expand Down Expand Up @@ -135,7 +136,7 @@ impl FederationObserver {
}

async fn fetch_block_times(self) {
const SLEEP_SECS: u64 = 10;
const SLEEP_SECS: u64 = 60;
loop {
if let Err(e) = self.fetch_block_times_inner().await {
warn!("Error while fetching block times: {e:?}");
Expand Down Expand Up @@ -295,8 +296,20 @@ impl FederationObserver {
)
.await?;
}
ConsensusItem::Module(module_ci) => {
Self::process_ci(
dbtx,
federation_id,
&config,
session_index,
item_idx as u64,
item.peer,
module_ci,
)
.await?;
}
_ => {
// FIXME: process module CIs
// Ignore unknown CIs
}
}
}
Expand Down Expand Up @@ -465,6 +478,39 @@ impl FederationObserver {
Ok(())
}

async fn process_ci(
dbtx: &mut Transaction<'_, Any>,
federation_id: FederationId,
config: &ClientConfig,
session_index: u64,
item_index: u64,
peer_id: PeerId,
ci: DynModuleConsensusItem,
) -> sqlx::Result<()> {
let kind = instance_to_kind(config, ci.module_instance_id());

if kind != "wallet" {
return Ok(());
}

let wallet_ci = ci
.as_any()
.downcast_ref::<WalletConsensusItem>()
.expect("config says this should be a wallet CI");
if let WalletConsensusItem::BlockCount(height_vote) = wallet_ci {
query("INSERT INTO block_height_votes VALUES ($1, $2, $3, $4, $5)")
.bind(federation_id.consensus_encode_to_vec())
.bind(session_index as i64)
.bind(item_index as i64)
.bind(peer_id.to_usize() as i64)
.bind(*height_vote as i64)
.execute(dbtx.as_mut())
.await?;
}

Ok(())
}

pub async fn get_federation_assets(
&self,
federation_id: FederationId,
Expand Down

0 comments on commit 4c277d5

Please sign in to comment.