Skip to content

Commit d1897fd

Browse files
author
Felipe Rosa
committed
feat: Chain reader implemetation
1 parent db18905 commit d1897fd

File tree

4 files changed

+195
-33
lines changed

4 files changed

+195
-33
lines changed

hermes/Cargo.lock

Lines changed: 107 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

hermes/crates/cardano-chain-follower/Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@ license.workspace = true
1010

1111
[dependencies]
1212
pallas.workspace = true
13+
thiserror = "1.0.50"
14+
tokio = { version = "1.34.0", default-features = false, features = ["macros", "rt", "net", "rt-multi-thread"] }
15+
tracing = "0.1.40"
1316

1417
[dev-dependencies]
1518
hex = "0.4.3"
16-
tokio = { version = "1.34.0", default-features = false, features = ["macros", "rt", "net", "rt-multi-thread"] }
19+
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }

hermes/crates/cardano-chain-follower/examples/read_block_range.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
2626
let mut total_txs = 0;
2727
for data in data_vec {
2828
let block = data.decode()?;
29-
total_txs = block.tx_count();
29+
total_txs += block.tx_count();
3030
}
3131

3232
println!("Total transactions: {total_txs}");

hermes/crates/cardano-chain-follower/src/lib.rs

Lines changed: 83 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -10,30 +10,34 @@
1010
pub use pallas::network::miniprotocols::Point;
1111
use pallas::{
1212
ledger::traverse::MultiEraBlock,
13-
network::miniprotocols::{MAINNET_MAGIC, PREVIEW_MAGIC, PRE_PRODUCTION_MAGIC, TESTNET_MAGIC},
13+
network::{
14+
facades::PeerClient,
15+
miniprotocols::{MAINNET_MAGIC, PREVIEW_MAGIC, PRE_PRODUCTION_MAGIC, TESTNET_MAGIC},
16+
},
1417
};
18+
use thiserror::Error;
1519

1620
/// Default [`Follower`] block buffer size.
1721
const DEFAULT_CHAIN_UPDATE_BUFFER_SIZE: usize = 32;
1822
/// Default [`Follower`] max await retries.
1923
const DEFAULT_MAX_AWAIT_RETRIES: u32 = 3;
2024

2125
/// Crate error type.
22-
///
23-
/// We are using a boxed error here until we have some implementation of the
24-
/// the crate's API.
25-
///
26-
/// In the future this will probably be something as:
27-
///
28-
/// ```ignore
29-
/// use thiserror::Error;
30-
///
31-
/// #[derive(Debug, Error)]
32-
/// pub enum Error {
33-
/// ...
34-
/// }
35-
/// ```
36-
pub type Error = Box<dyn std::error::Error>;
26+
#[derive(Debug, Error)]
27+
pub enum Error {
28+
/// Data encoding/decoding error.
29+
#[error("Codec error: {0:?}")]
30+
Codec(pallas::ledger::traverse::Error),
31+
/// Client connection error.
32+
#[error("Client error: {0:?}")]
33+
Client(pallas::network::facades::Error),
34+
/// Blockfetch protocol error.
35+
#[error("Blockfetch error: {0:?}")]
36+
Blockfetch(pallas::network::miniprotocols::blockfetch::ClientError),
37+
/// Chainsync protocol error.
38+
#[error("Chainsync error: {0:?}")]
39+
Chainsync(pallas::network::miniprotocols::chainsync::ClientError),
40+
}
3741

3842
/// Crate result type.
3943
pub type Result<T> = std::result::Result<T, Error>;
@@ -63,7 +67,7 @@ impl MultiEraBlockData {
6367
/// Returns Err if the block's era couldn't be decided or if the encoded data is
6468
/// invalid.
6569
pub fn decode(&self) -> Result<MultiEraBlock> {
66-
let block = MultiEraBlock::decode(&self.0).map_err(Box::new)?;
70+
let block = MultiEraBlock::decode(&self.0).map_err(Error::Codec)?;
6771

6872
Ok(block)
6973
}
@@ -93,7 +97,10 @@ impl From<Network> for u64 {
9397
}
9498

9599
/// Cardano chain Reader.
96-
pub struct Reader {}
100+
pub struct Reader {
101+
/// Connection used by the reader to read blocks.
102+
client: PeerClient,
103+
}
97104

98105
impl Reader {
99106
/// Connects the Reader to a producer using the node-to-node protocol.
@@ -106,8 +113,12 @@ impl Reader {
106113
/// # Errors
107114
///
108115
/// Returns Err if the connection could not be established.
109-
pub async fn connect(_address: &str, _network: Network) -> Result<Self> {
110-
todo!()
116+
pub async fn connect(address: &str, network: Network) -> Result<Self> {
117+
let client = PeerClient::connect(address, network.into())
118+
.await
119+
.map_err(Error::Client)?;
120+
121+
Ok(Self { client })
111122
}
112123

113124
/// Reads a single block from the chain.
@@ -119,9 +130,20 @@ impl Reader {
119130
/// # Errors
120131
///
121132
/// Returns Err if the block was not found or if some communication error ocurred.
122-
pub async fn read_block<P>(&mut self, _at: P) -> Result<MultiEraBlockData>
123-
where P: Into<PointOrTip> {
124-
todo!()
133+
pub async fn read_block<P>(&mut self, at: P) -> Result<MultiEraBlockData>
134+
where
135+
P: Into<PointOrTip>,
136+
{
137+
let point = self.resolve_point_or_tip(at.into()).await?;
138+
139+
let block_data = self
140+
.client
141+
.blockfetch()
142+
.fetch_single(point)
143+
.await
144+
.map_err(Error::Blockfetch)?;
145+
146+
Ok(MultiEraBlockData(block_data))
125147
}
126148

127149
/// Reads a range of blocks from the chain.
@@ -136,10 +158,39 @@ impl Reader {
136158
/// Returns Err if the block range was not found or if some communication error
137159
/// ocurred.
138160
pub async fn read_block_range<P>(
139-
&mut self, _from: Point, _to: P,
161+
&mut self, from: Point, to: P,
140162
) -> Result<Vec<MultiEraBlockData>>
141-
where P: Into<PointOrTip> {
142-
todo!()
163+
where
164+
P: Into<PointOrTip>,
165+
{
166+
let to_point = self.resolve_point_or_tip(to.into()).await?;
167+
168+
let data_vec = self
169+
.client
170+
.blockfetch()
171+
.fetch_range((from, to_point))
172+
.await
173+
.map_err(Error::Blockfetch)?
174+
.into_iter()
175+
.map(MultiEraBlockData)
176+
.collect();
177+
178+
Ok(data_vec)
179+
}
180+
181+
#[inline]
182+
async fn resolve_point_or_tip(&mut self, point_or_tip: PointOrTip) -> Result<Point> {
183+
match point_or_tip {
184+
PointOrTip::Point(point) => Ok(point),
185+
PointOrTip::Tip => {
186+
// Find the chain tip's point
187+
self.client
188+
.chainsync()
189+
.intersect_tip()
190+
.await
191+
.map_err(Error::Chainsync)
192+
},
193+
}
143194
}
144195
}
145196

@@ -203,7 +254,9 @@ impl FollowerConfigBuilder {
203254
/// * `from`: Sync starting point.
204255
#[must_use]
205256
pub fn follow_from<P>(mut self, from: P) -> Self
206-
where P: Into<PointOrTip> {
257+
where
258+
P: Into<PointOrTip>,
259+
{
207260
self.follow_from = from.into();
208261
self
209262
}
@@ -261,7 +314,9 @@ impl Follower {
261314
///
262315
/// Returns Err if something went wrong while communicating with the producer.
263316
pub async fn set_read_pointer<P>(&mut self, _at: P) -> Result<Option<Point>>
264-
where P: Into<PointOrTip> {
317+
where
318+
P: Into<PointOrTip>,
319+
{
265320
todo!()
266321
}
267322

0 commit comments

Comments
 (0)