Skip to content

Commit cff2c64

Browse files
authored
Merge pull request #1513 from txpipe/feat/stake-distribution
add pallas stake snapshots integration
2 parents 17afb2c + bd8cbcd commit cff2c64

File tree

3 files changed

+198
-22
lines changed

3 files changed

+198
-22
lines changed

Cargo.lock

+5-5
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

mithril-common/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "mithril-common"
3-
version = "0.3.6"
3+
version = "0.3.7"
44
description = "Common types, interfaces, and utilities for Mithril nodes."
55
authors = { workspace = true }
66
edition = { workspace = true }

mithril-common/src/chain_observer/pallas_observer.rs

+192-16
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,24 @@
11
use anyhow::{anyhow, Context};
22
use async_trait::async_trait;
3+
use bech32::{self, ToBase32, Variant};
34
use pallas_addresses::Address;
45
use pallas_codec::utils::{Bytes, CborWrap, TagWrap};
56
use pallas_network::{
67
facades::NodeClient,
78
miniprotocols::localstate::{
89
queries_v16::{
9-
self, Addr, Addrs, PostAlonsoTransactionOutput, TransactionOutput, UTxOByAddress,
10+
self, Addr, Addrs, PostAlonsoTransactionOutput, StakeSnapshot, Stakes,
11+
TransactionOutput, UTxOByAddress,
1012
},
1113
Client,
1214
},
1315
};
16+
1417
use pallas_primitives::ToCanonicalJson;
15-
use std::path::{Path, PathBuf};
18+
use std::{
19+
collections::BTreeSet,
20+
path::{Path, PathBuf},
21+
};
1622

1723
use crate::{
1824
chain_observer::{interface::*, ChainAddress, TxDatum},
@@ -175,6 +181,64 @@ impl PallasChainObserver {
175181
Ok(utxo)
176182
}
177183

184+
/// Fetches the current stake distribution using the provided `statequery` client.
185+
async fn do_stake_snapshots_state_query(
186+
&self,
187+
statequery: &mut Client,
188+
) -> StdResult<StakeSnapshot> {
189+
statequery
190+
.acquire(None)
191+
.await
192+
.map_err(|err| anyhow!(err))
193+
.with_context(|| "PallasChainObserver failed to acquire statequery")?;
194+
195+
let era = queries_v16::get_current_era(statequery)
196+
.await
197+
.map_err(|err| anyhow!(err))
198+
.with_context(|| "PallasChainObserver failed to get current era")?;
199+
200+
let state_snapshot = queries_v16::get_stake_snapshots(statequery, era, BTreeSet::new())
201+
.await
202+
.map_err(|err| anyhow!(err))
203+
.with_context(|| "PallasChainObserver failed to get stake snapshot")?;
204+
205+
Ok(state_snapshot)
206+
}
207+
208+
/// Returns the stake pool hash from the given bytestring.
209+
fn get_stake_pool_hash(&self, key: &Bytes) -> Result<String, ChainObserverError> {
210+
let pool_hash = bech32::encode("pool", key.to_base32(), Variant::Bech32)
211+
.map_err(|err| anyhow!(err))
212+
.with_context(|| "PallasChainObserver failed to encode stake pool hash")?;
213+
214+
Ok(pool_hash)
215+
}
216+
217+
/// Fetches the current stake distribution using the provided `statequery` client.
218+
async fn get_stake_distribution(
219+
&self,
220+
client: &mut NodeClient,
221+
) -> Result<Option<StakeDistribution>, ChainObserverError> {
222+
let statequery = client.statequery();
223+
224+
let stake_snapshot = self.do_stake_snapshots_state_query(statequery).await?;
225+
226+
let mut stake_distribution = StakeDistribution::new();
227+
228+
let have_stakes_in_two_epochs = |stakes: &Stakes| stakes.snapshot_mark_pool > 0;
229+
for (key, stakes) in stake_snapshot
230+
.snapshots
231+
.stake_snapshots
232+
.iter()
233+
.filter(|(_, stakes)| have_stakes_in_two_epochs(stakes))
234+
{
235+
let pool_hash = self.get_stake_pool_hash(key)?;
236+
stake_distribution.insert(pool_hash, stakes.snapshot_mark_pool);
237+
}
238+
239+
Ok(Some(stake_distribution))
240+
}
241+
178242
/// Processes a state query with the `NodeClient`, releasing the state query.
179243
async fn process_statequery(&self, client: &mut NodeClient) -> StdResult<()> {
180244
let statequery = client.statequery();
@@ -245,8 +309,15 @@ impl ChainObserver for PallasChainObserver {
245309
async fn get_current_stake_distribution(
246310
&self,
247311
) -> Result<Option<StakeDistribution>, ChainObserverError> {
248-
let fallback = self.get_fallback();
249-
fallback.get_current_stake_distribution().await
312+
let mut client = self.get_client().await?;
313+
314+
let stake_distribution = self.get_stake_distribution(&mut client).await?;
315+
316+
self.post_process_statequery(&mut client).await?;
317+
318+
client.abort().await;
319+
320+
Ok(stake_distribution)
250321
}
251322

252323
async fn get_current_kes_period(
@@ -264,7 +335,12 @@ mod tests {
264335

265336
use pallas_codec::utils::{AnyCbor, AnyUInt, KeyValuePairs, TagWrap};
266337
use pallas_crypto::hash::Hash;
267-
use pallas_network::miniprotocols::localstate::{queries_v16::Value, ClientQueryRequest};
338+
use pallas_network::miniprotocols::localstate::{
339+
queries_v16::{
340+
BlockQuery, HardForkQuery, LedgerQuery, Request, Snapshots, StakeSnapshot, Value,
341+
},
342+
ClientQueryRequest,
343+
};
268344
use tokio::net::UnixListener;
269345

270346
use super::*;
@@ -302,6 +378,64 @@ mod tests {
302378
UTxOByAddress { utxo }
303379
}
304380

381+
fn get_fake_stake_snapshot() -> StakeSnapshot {
382+
let stake_snapshots = KeyValuePairs::from(vec![
383+
(
384+
Bytes::from(
385+
hex::decode("00000036d515e12e18cd3c88c74f09a67984c2c279a5296aa96efe89")
386+
.unwrap(),
387+
),
388+
Stakes {
389+
snapshot_mark_pool: 300000000001,
390+
snapshot_set_pool: 300000000002,
391+
snapshot_go_pool: 300000000000,
392+
},
393+
),
394+
(
395+
Bytes::from(
396+
hex::decode("000000f66e28b0f18aef20555f4c4954234e3270dfbbdcc13f54e799")
397+
.unwrap(),
398+
),
399+
Stakes {
400+
snapshot_mark_pool: 600000000001,
401+
snapshot_set_pool: 600000000002,
402+
snapshot_go_pool: 600000000000,
403+
},
404+
),
405+
(
406+
Bytes::from(
407+
hex::decode("00000110093effbf3ce788aebd3e7506b80322bd3995ad432e61fad5")
408+
.unwrap(),
409+
),
410+
Stakes {
411+
snapshot_mark_pool: 1200000000001,
412+
snapshot_set_pool: 1200000000002,
413+
snapshot_go_pool: 1200000000000,
414+
},
415+
),
416+
(
417+
Bytes::from(
418+
hex::decode("00000ffff93effbf3ce788aebd3e7506b80322bd3995ad432e61fad5")
419+
.unwrap(),
420+
),
421+
Stakes {
422+
snapshot_mark_pool: 0,
423+
snapshot_set_pool: 1300000000002,
424+
snapshot_go_pool: 0,
425+
},
426+
),
427+
]);
428+
429+
StakeSnapshot {
430+
snapshots: Snapshots {
431+
stake_snapshots,
432+
snapshot_stake_mark_total: 2100000000003,
433+
snapshot_stake_set_total: 2100000000006,
434+
snapshot_stake_go_total: 2100000000000,
435+
},
436+
}
437+
}
438+
305439
/// pallas responses mock server.
306440
async fn mock_server(server: &mut pallas_network::facades::NodeServer) -> AnyCbor {
307441
let query: queries_v16::Request =
@@ -311,27 +445,34 @@ mod tests {
311445
};
312446

313447
match query {
314-
queries_v16::Request::LedgerQuery(queries_v16::LedgerQuery::HardForkQuery(
315-
queries_v16::HardForkQuery::GetCurrentEra,
316-
)) => AnyCbor::from_encode(4),
317-
queries_v16::Request::LedgerQuery(queries_v16::LedgerQuery::BlockQuery(
318-
_,
319-
queries_v16::BlockQuery::GetEpochNo,
320-
)) => AnyCbor::from_encode([8]),
321-
queries_v16::Request::LedgerQuery(queries_v16::LedgerQuery::BlockQuery(
322-
_,
323-
queries_v16::BlockQuery::GetUTxOByAddress(_),
324-
)) => AnyCbor::from_encode(get_fake_utxo_by_address()),
448+
Request::LedgerQuery(LedgerQuery::HardForkQuery(HardForkQuery::GetCurrentEra)) => {
449+
AnyCbor::from_encode(4)
450+
}
451+
Request::LedgerQuery(LedgerQuery::BlockQuery(_, BlockQuery::GetEpochNo)) => {
452+
AnyCbor::from_encode([8])
453+
}
454+
Request::LedgerQuery(LedgerQuery::BlockQuery(_, BlockQuery::GetUTxOByAddress(_))) => {
455+
AnyCbor::from_encode(get_fake_utxo_by_address())
456+
}
457+
Request::LedgerQuery(LedgerQuery::BlockQuery(_, BlockQuery::GetStakeSnapshots(_))) => {
458+
AnyCbor::from_encode(get_fake_stake_snapshot())
459+
}
325460
_ => panic!("unexpected query from client: {query:?}"),
326461
}
327462
}
328463

329464
/// Creates a new work directory in the system's temporary folder.
330465
fn create_temp_dir(folder_name: &str) -> PathBuf {
466+
#[cfg(not(target_os = "macos"))]
331467
let temp_dir = std::env::temp_dir()
332468
.join("mithril_test")
333469
.join("pallas_chain_observer_test")
334470
.join(folder_name);
471+
472+
// macOS-domain addresses are variable-length filesystem pathnames of at most 104 characters.
473+
#[cfg(target_os = "macos")]
474+
let temp_dir: PathBuf = std::env::temp_dir().join(folder_name);
475+
335476
if temp_dir.exists() {
336477
fs::remove_dir_all(&temp_dir).expect("Previous work dir removal failed");
337478
}
@@ -403,4 +544,39 @@ mod tests {
403544
let datums = client_res.expect("Client failed");
404545
assert_eq!(vec![TxDatum(r#"{"constructor":0,"fields":[{"bytes":"7b226d61726b657273223a5b7b226e616d65223a227468616c6573222c2265706f6368223a307d5d2c227369676e6174757265223a2238356632326562626164"},{"bytes":"33333537633865613264663036323039376639613138306464333564396633626131643236383263373263386431323238386661643863623864306365656562"},{"bytes":"366134643665383465653865353631376164323037313836366363313930373466326137366538373864663166393733346438343061227d"}]}"#.to_string())], datums);
405546
}
547+
548+
#[tokio::test]
549+
async fn get_current_stake_distribution_with_fallback() {
550+
let socket_path =
551+
create_temp_dir("get_current_stake_distribution_with_fallback").join("node.socket");
552+
let server = setup_server(socket_path.clone()).await;
553+
let client = tokio::spawn(async move {
554+
let fallback = CardanoCliChainObserver::new(Box::<TestCliRunner>::default());
555+
let observer = super::PallasChainObserver::new(
556+
socket_path.as_path(),
557+
CardanoNetwork::TestNet(10),
558+
fallback,
559+
);
560+
observer.get_current_stake_distribution().await.unwrap()
561+
});
562+
563+
let (_, client_res) = tokio::join!(server, client);
564+
let computed_stake_distribution = client_res.unwrap().unwrap();
565+
566+
let mut expected_stake_distribution = StakeDistribution::new();
567+
expected_stake_distribution.insert(
568+
"pool1qqqqqdk4zhsjuxxd8jyvwncf5eucfskz0xjjj64fdmlgj735lr9".to_string(),
569+
300000000001,
570+
);
571+
expected_stake_distribution.insert(
572+
"pool1qqqqpanw9zc0rzh0yp247nzf2s35uvnsm7aaesfl2nnejaev0uc".to_string(),
573+
600000000001,
574+
);
575+
expected_stake_distribution.insert(
576+
"pool1qqqqzyqf8mlm70883zht60n4q6uqxg4a8x266sewv8ad2grkztl".to_string(),
577+
1200000000001,
578+
);
579+
580+
assert_eq!(expected_stake_distribution, computed_stake_distribution);
581+
}
406582
}

0 commit comments

Comments
 (0)