77// if it's compiled with this flag.
88#![ deny( missing_docs) ]
99
10+ use std:: sync:: Arc ;
11+
1012pub use pallas:: network:: miniprotocols:: Point ;
1113use pallas:: {
1214 ledger:: traverse:: MultiEraBlock ,
1315 network:: {
1416 facades:: PeerClient ,
15- miniprotocols:: { MAINNET_MAGIC , PREVIEW_MAGIC , PRE_PRODUCTION_MAGIC , TESTNET_MAGIC } ,
17+ miniprotocols:: {
18+ chainsync, MAINNET_MAGIC , PREVIEW_MAGIC , PRE_PRODUCTION_MAGIC , TESTNET_MAGIC ,
19+ } ,
1620 } ,
1721} ;
1822use thiserror:: Error ;
23+ use tokio:: {
24+ sync:: { mpsc, Mutex } ,
25+ task:: JoinHandle ,
26+ } ;
1927
2028/// Default [`Follower`] block buffer size.
2129const DEFAULT_CHAIN_UPDATE_BUFFER_SIZE : usize = 32 ;
@@ -36,7 +44,10 @@ pub enum Error {
3644 Blockfetch ( pallas:: network:: miniprotocols:: blockfetch:: ClientError ) ,
3745 /// Chainsync protocol error.
3846 #[ error( "Chainsync error: {0:?}" ) ]
39- Chainsync ( pallas:: network:: miniprotocols:: chainsync:: ClientError ) ,
47+ Chainsync ( chainsync:: ClientError ) ,
48+ /// Follower start point was not found.
49+ #[ error( "Follower start point was not found" ) ]
50+ FollowerStartPointNotFound ,
4051}
4152
4253/// Crate result type.
@@ -131,9 +142,7 @@ impl Reader {
131142 ///
132143 /// Returns Err if the block was not found or if some communication error ocurred.
133144 pub async fn read_block < P > ( & mut self , at : P ) -> Result < MultiEraBlockData >
134- where
135- P : Into < PointOrTip > ,
136- {
145+ where P : Into < PointOrTip > {
137146 let point = self . resolve_point_or_tip ( at. into ( ) ) . await ?;
138147
139148 let block_data = self
@@ -160,9 +169,7 @@ impl Reader {
160169 pub async fn read_block_range < P > (
161170 & mut self , from : Point , to : P ,
162171 ) -> Result < Vec < MultiEraBlockData > >
163- where
164- P : Into < PointOrTip > ,
165- {
172+ where P : Into < PointOrTip > {
166173 let to_point = self . resolve_point_or_tip ( to. into ( ) ) . await ?;
167174
168175 let data_vec = self
@@ -178,6 +185,7 @@ impl Reader {
178185 Ok ( data_vec)
179186 }
180187
188+ /// Resolves [`PointOrTip`] to a point finding the tip point if needed.
181189 #[ inline]
182190 async fn resolve_point_or_tip ( & mut self , point_or_tip : PointOrTip ) -> Result < Point > {
183191 match point_or_tip {
@@ -254,9 +262,7 @@ impl FollowerConfigBuilder {
254262 /// * `from`: Sync starting point.
255263 #[ must_use]
256264 pub fn follow_from < P > ( mut self , from : P ) -> Self
257- where
258- P : Into < PointOrTip > ,
259- {
265+ where P : Into < PointOrTip > {
260266 self . follow_from = from. into ( ) ;
261267 self
262268 }
@@ -283,7 +289,14 @@ pub struct FollowerConfig {
283289}
284290
285291/// Cardano chain follower.
286- pub struct Follower { }
292+ pub struct Follower {
293+ /// Client shared by the follower and its task.
294+ client : Arc < Mutex < PeerClient > > ,
295+ /// Chain update receiver.
296+ chain_update_rx : mpsc:: Receiver < Result < ChainUpdate > > ,
297+ /// Task thread join handle.
298+ task_join_handle : Option < JoinHandle < ( ) > > ,
299+ }
287300
288301impl Follower {
289302 /// Connects the follower to a producer using the node-to-node protocol.
@@ -297,14 +310,43 @@ impl Follower {
297310 /// # Errors
298311 ///
299312 /// Returns Err if the connection could not be established.
300- pub async fn connect (
301- _address : & str , _network : Network , _config : FollowerConfig ,
302- ) -> Result < Self > {
303- todo ! ( )
313+ pub async fn connect ( address : & str , network : Network , config : FollowerConfig ) -> Result < Self > {
314+ let client = Arc :: new ( Mutex :: new (
315+ PeerClient :: connect ( address, network. into ( ) )
316+ . await
317+ . map_err ( Error :: Client ) ?,
318+ ) ) ;
319+
320+ let ( chain_update_tx, chain_update_rx) = mpsc:: channel ( config. chain_update_buffer_size ) ;
321+
322+ let mut this = Self {
323+ client : client. clone ( ) ,
324+ chain_update_rx,
325+ task_join_handle : None ,
326+ } ;
327+
328+ let start_point = this
329+ . set_read_pointer ( config. follow_from )
330+ . await ?
331+ . ok_or ( Error :: FollowerStartPointNotFound ) ?;
332+ tracing:: debug!(
333+ slot = start_point. slot_or_default( ) ,
334+ "Follower read pointer set to starting point"
335+ ) ;
336+
337+ let task_join_handle = tokio:: spawn ( follow_task:: run (
338+ client,
339+ chain_update_tx,
340+ config. max_await_retries ,
341+ ) ) ;
342+ this. task_join_handle = Some ( task_join_handle) ;
343+
344+ Ok ( this)
304345 }
305346
306- /// Set the follower's chain read-pointer. Returns None if the point was
307- /// not found on the chain.
347+ /// Set the follower's chain read-pointer.
348+ ///
349+ /// Returns None if the point was not found on the chain.
308350 ///
309351 /// # Arguments
310352 ///
@@ -313,20 +355,194 @@ impl Follower {
313355 /// # Errors
314356 ///
315357 /// Returns Err if something went wrong while communicating with the producer.
316- pub async fn set_read_pointer < P > ( & mut self , _at : P ) -> Result < Option < Point > >
317- where
318- P : Into < PointOrTip > ,
319- {
320- todo ! ( )
358+ pub async fn set_read_pointer < P > ( & mut self , at : P ) -> Result < Option < Point > >
359+ where P : Into < PointOrTip > {
360+ let mut client = self . client . lock ( ) . await ;
361+
362+ match Into :: < PointOrTip > :: into ( at) {
363+ PointOrTip :: Point ( Point :: Origin ) => {
364+ let point = client
365+ . chainsync ( )
366+ . intersect_origin ( )
367+ . await
368+ . map_err ( Error :: Chainsync ) ?;
369+
370+ Ok ( Some ( point) )
371+ } ,
372+ PointOrTip :: Point ( p @ Point :: Specific ( ..) ) => {
373+ client
374+ . chainsync ( )
375+ . find_intersect ( vec ! [ p] )
376+ . await
377+ . map ( |( point, _) | point)
378+ . map_err ( Error :: Chainsync )
379+ } ,
380+ PointOrTip :: Tip => {
381+ let point = client
382+ . chainsync ( )
383+ . intersect_tip ( )
384+ . await
385+ . map_err ( Error :: Chainsync ) ?;
386+
387+ Ok ( Some ( point) )
388+ } ,
389+ }
321390 }
322391
323392 /// Receive the next chain update from the producer.
324393 ///
325394 /// # Errors
326395 ///
327396 /// Returns Err if any producer communication errors occurred.
397+ #[ allow( clippy:: missing_panics_doc) ]
328398 pub async fn next ( & mut self ) -> Result < ChainUpdate > {
329- todo ! ( )
399+ // This will not panic
400+ #[ allow( clippy:: expect_used) ]
401+ self . chain_update_rx
402+ . recv ( )
403+ . await
404+ . expect ( "Follow task should be running" )
405+ }
406+
407+ /// Closes the follower connection and stops its background task.
408+ ///
409+ /// # Errors
410+ ///
411+ /// Returns Err if some error occurred in the background task.
412+ pub async fn close ( mut self ) -> std:: result:: Result < ( ) , tokio:: task:: JoinError > {
413+ self . chain_update_rx . close ( ) ;
414+
415+ if let Some ( join_handle) = self . task_join_handle {
416+ join_handle. await
417+ } else {
418+ Ok ( ( ) )
419+ }
420+ }
421+ }
422+
423+ /// Contains functions related to the Follower's background task.
424+ mod follow_task {
425+ use std:: sync:: Arc ;
426+
427+ use pallas:: {
428+ ledger:: traverse:: MultiEraHeader ,
429+ network:: {
430+ facades:: PeerClient ,
431+ miniprotocols:: { chainsync, Point } ,
432+ } ,
433+ } ;
434+ use tokio:: sync:: { mpsc, oneshot, Mutex } ;
435+
436+ use crate :: { ChainUpdate , Error , MultiEraBlockData } ;
437+
438+ /// Runs a [`Follower`](super::Follower) background task.
439+ ///
440+ /// The task runs until the chain update channel is closed (e.g. when the follower is
441+ /// dropped or the close fn is called).
442+ ///
443+ /// It keeps asking the connected node new chain updates. Every update and
444+ /// communication errors are sent through the channel to the follower.
445+ ///
446+ /// Backpressure is achieved with the channel's limited size.
447+ pub async fn run (
448+ client : Arc < Mutex < PeerClient > > , chain_update_tx : mpsc:: Sender < crate :: Result < ChainUpdate > > ,
449+ max_retries_count : u32 ,
450+ ) {
451+ ' main: loop {
452+ let try_count = 0 ;
453+
454+ ' tries: loop {
455+ assert ! ( try_count <= max_retries_count, "Node misbehavior" ) ;
456+
457+ let ( cancel_tx, _cancel_rx) = oneshot:: channel :: < ( ) > ( ) ;
458+
459+ tokio:: select! {
460+ ( ) = chain_update_tx. closed( ) => {
461+ break ' main;
462+ }
463+
464+ res = get_next_response( client. clone( ) , cancel_tx) => match res {
465+ Err ( err) => {
466+ if chain_update_tx. send( Err ( err) ) . await . is_err( ) {
467+ break ' main;
468+ }
469+ } ,
470+ Ok ( next_response) => {
471+ if let Some ( chain_update) = next_response {
472+ if chain_update_tx. send( Ok ( chain_update) ) . await . is_err( ) {
473+ break ' tries;
474+ }
475+ }
476+ }
477+ }
478+ } ;
479+ }
480+ }
481+
482+ tracing:: debug!( "Follower background task shutdown" ) ;
483+ }
484+
485+ /// Waits for the next update from the node the client is connected to.
486+ ///
487+ /// Can be cancelled by closing the `cancel_tx` receiver end (explicitly or by
488+ /// dropping it).
489+ async fn get_next_response (
490+ client : Arc < Mutex < PeerClient > > , mut cancel_tx : oneshot:: Sender < ( ) > ,
491+ ) -> crate :: Result < Option < ChainUpdate > > {
492+ let res = {
493+ let mut client_lock = client. lock ( ) . await ;
494+
495+ if client_lock. chainsync ( ) . has_agency ( ) {
496+ tokio:: select! {
497+ ( ) = cancel_tx. closed( ) => { return Ok ( None ) ; }
498+ res = client_lock. chainsync( ) . request_next( ) => { res }
499+ }
500+ } else {
501+ tokio:: select! {
502+ ( ) = cancel_tx. closed( ) => { return Ok ( None ) ; }
503+ res = client_lock. chainsync( ) . recv_while_must_reply( ) => { res }
504+ }
505+ }
506+ . map_err ( Error :: Chainsync ) ?
507+ } ;
508+
509+ match res {
510+ chainsync:: NextResponse :: RollForward ( header, _tip) => {
511+ let decoded_header = MultiEraHeader :: decode (
512+ header. variant ,
513+ header. byron_prefix . map ( |p| p. 0 ) ,
514+ & header. cbor ,
515+ )
516+ . map_err ( Error :: Codec ) ?;
517+
518+ let mut client_lock = client. lock ( ) . await ;
519+
520+ let req_fut = client_lock. blockfetch ( ) . fetch_single ( Point :: Specific (
521+ decoded_header. slot ( ) ,
522+ decoded_header. hash ( ) . to_vec ( ) ,
523+ ) ) ;
524+
525+ let block_data = tokio:: select! {
526+ ( ) = cancel_tx. closed( ) => { return Ok ( None ) ; }
527+ res = req_fut => { res. map_err( Error :: Blockfetch ) ? }
528+ } ;
529+
530+ Ok ( Some ( ChainUpdate :: Block ( MultiEraBlockData ( block_data) ) ) )
531+ } ,
532+ chainsync:: NextResponse :: RollBackward ( point, _tip) => {
533+ let mut client_lock = client. lock ( ) . await ;
534+
535+ let req_fut = client_lock. blockfetch ( ) . fetch_single ( point) ;
536+
537+ let block_data = tokio:: select! {
538+ ( ) = cancel_tx. closed( ) => { return Ok ( None ) ; }
539+ res = req_fut => { res. map_err( Error :: Blockfetch ) ? }
540+ } ;
541+
542+ Ok ( Some ( ChainUpdate :: Rollback ( MultiEraBlockData ( block_data) ) ) )
543+ } ,
544+ chainsync:: NextResponse :: Await => Ok ( None ) ,
545+ }
330546 }
331547}
332548
0 commit comments