Skip to content

Commit 2773af1

Browse files
authored
refactor: Split Sync P2P fetching code into BlockFetcher (#3140)
1 parent f52e073 commit 2773af1

File tree

3 files changed

+110
-65
lines changed

3 files changed

+110
-65
lines changed

p2p/p2p.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,8 @@ func NewWithHost(p2phost host.Host, peers string, feederNode bool, bc *blockchai
145145

146146
// todo: reconsider initialising synchroniser here because if node is a feedernode we shouldn't not create an instance of it.
147147

148-
synchroniser := p2pSync.New(bc, p2phost, snNetwork, log)
148+
blockFetcher := p2pSync.NewBlockFetcher(bc, p2phost, snNetwork, log)
149+
synchroniser := p2pSync.New(bc, log, &blockFetcher)
149150
s := &Service{
150151
synchroniser: synchroniser,
151152
log: log,

p2p/sync/sync.go

Lines changed: 107 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -32,25 +32,23 @@ import (
3232
)
3333

3434
type Service struct {
35-
host host.Host
36-
network *utils.Network
37-
client *Client // todo: merge all the functionality of Client with p2p SyncService
38-
3935
blockchain *blockchain.Blockchain
40-
listener junoSync.EventListener
4136
log utils.SimpleLogger
4237

43-
blockCh chan BlockBody
38+
blockFetcher *BlockFetcher
39+
blockCh chan BlockBody
4440
}
4541

46-
func New(bc *blockchain.Blockchain, h host.Host, n *utils.Network, log utils.SimpleLogger) *Service {
42+
func New(
43+
bc *blockchain.Blockchain,
44+
log utils.SimpleLogger,
45+
blockFetcher *BlockFetcher,
46+
) *Service {
4747
return &Service{
48-
host: h,
49-
network: n,
50-
blockchain: bc,
51-
log: log,
52-
listener: &junoSync.SelectiveListener{},
53-
blockCh: make(chan BlockBody),
48+
blockchain: bc,
49+
log: log,
50+
blockFetcher: blockFetcher,
51+
blockCh: make(chan BlockBody),
5452
}
5553
}
5654

@@ -63,8 +61,6 @@ func (s *Service) Run(ctx context.Context) {
6361
defer cancel()
6462
defer close(s.blockCh)
6563

66-
s.client = NewClient(s.randomPeerStream, s.network, s.log)
67-
6864
for i := 0; ; i++ {
6965
if err := ctx.Err(); err != nil {
7066
break
@@ -83,7 +79,7 @@ func (s *Service) Run(ctx context.Context) {
8379

8480
// todo change iteration to fetch several objects uint64(min(blockBehind, maxBlocks))
8581
blockNumber := uint64(nextHeight)
86-
if err = s.processBlock(iterCtx, blockNumber); err != nil {
82+
if err = s.blockFetcher.ProcessBlock(iterCtx, blockNumber, s.blockCh); err != nil {
8783
s.logError("Failed to process block", fmt.Errorf("blockNumber: %d, err: %w", blockNumber, err))
8884
cancelIteration()
8985
continue
@@ -92,6 +88,10 @@ func (s *Service) Run(ctx context.Context) {
9288
}
9389
}
9490

91+
func (s *Service) WithListener(l junoSync.EventListener) {
92+
s.blockFetcher.WithListener(l)
93+
}
94+
9595
func (s *Service) getNextHeight() (int, error) {
9696
curHeight, err := s.blockchain.Height()
9797
if err == nil {
@@ -102,7 +102,50 @@ func (s *Service) getNextHeight() (int, error) {
102102
return 0, err
103103
}
104104

105-
func (s *Service) processBlock(ctx context.Context, blockNumber uint64) error {
105+
func (s *Service) logError(msg string, err error) {
106+
if !errors.Is(err, context.Canceled) {
107+
var log utils.SimpleLogger
108+
if v, ok := s.log.(*utils.ZapLogger); ok {
109+
enhancedLogger := v.SugaredLogger.Desugar().WithOptions(zap.AddCallerSkip(1)).Sugar()
110+
log = &utils.ZapLogger{SugaredLogger: enhancedLogger}
111+
} else {
112+
log = s.log
113+
}
114+
115+
log.Errorw(msg, "err", err)
116+
} else {
117+
s.log.Debugw("Sync context canceled")
118+
}
119+
}
120+
121+
type BlockFetcher struct {
122+
network *utils.Network
123+
client *Client // todo: merge all the functionality of Client with p2p SyncService
124+
blockchain *blockchain.Blockchain
125+
listener junoSync.EventListener
126+
log utils.SimpleLogger
127+
}
128+
129+
func NewBlockFetcher(
130+
bc *blockchain.Blockchain,
131+
h host.Host,
132+
n *utils.Network,
133+
log utils.SimpleLogger,
134+
) BlockFetcher {
135+
return BlockFetcher{
136+
network: n,
137+
blockchain: bc,
138+
log: log,
139+
listener: &junoSync.SelectiveListener{},
140+
client: NewClient(randomPeerStream(h, log), n, log),
141+
}
142+
}
143+
144+
func (s *BlockFetcher) ProcessBlock(
145+
ctx context.Context,
146+
blockNumber uint64,
147+
outputs chan<- BlockBody,
148+
) error {
106149
headersAndSigsCh, err := s.genHeadersAndSigs(ctx, blockNumber)
107150
if err != nil {
108151
return fmt.Errorf("failed to get block headers parts: %w", err)
@@ -130,7 +173,7 @@ func (s *Service) processBlock(ctx context.Context, blockNumber uint64) error {
130173

131174
pipeline.Bridge(
132175
ctx,
133-
s.blockCh,
176+
outputs,
134177
s.processSpecBlockParts(
135178
ctx,
136179
blockNumber,
@@ -151,22 +194,6 @@ func specBlockPartsFunc[T specBlockHeaderAndSigs | specTxWithReceipts | specEven
151194
return specBlockParts(i)
152195
}
153196

154-
func (s *Service) logError(msg string, err error) {
155-
if !errors.Is(err, context.Canceled) {
156-
var log utils.SimpleLogger
157-
if v, ok := s.log.(*utils.ZapLogger); ok {
158-
enhancedLogger := v.SugaredLogger.Desugar().WithOptions(zap.AddCallerSkip(1)).Sugar()
159-
log = &utils.ZapLogger{SugaredLogger: enhancedLogger}
160-
} else {
161-
log = s.log
162-
}
163-
164-
log.Errorw(msg, "err", err)
165-
} else {
166-
s.log.Debugw("Sync context canceled")
167-
}
168-
}
169-
170197
// BlockBody is used to mange all the different parts of the blocks require to store the block in the blockchain.Store()
171198
type BlockBody struct {
172199
Block *core.Block
@@ -177,7 +204,7 @@ type BlockBody struct {
177204
}
178205

179206
//nolint:gocyclo
180-
func (s *Service) processSpecBlockParts(
207+
func (s *BlockFetcher) processSpecBlockParts(
181208
ctx context.Context, startingBlockNum uint64, specBlockPartsCh <-chan specBlockParts,
182209
) <-chan <-chan BlockBody {
183210
orderedBlockBodiesCh := make(chan (<-chan BlockBody))
@@ -270,7 +297,7 @@ func (s *Service) processSpecBlockParts(
270297
}
271298

272299
//nolint:gocyclo,funlen
273-
func (s *Service) adaptAndSanityCheckBlock(
300+
func (s *BlockFetcher) adaptAndSanityCheckBlock(
274301
ctx context.Context,
275302
header *header.SignedBlockHeader,
276303
contractDiffs []*state.ContractDiff,
@@ -431,7 +458,10 @@ func (s specBlockHeaderAndSigs) blockNumber() uint64 {
431458
return s.header.Number
432459
}
433460

434-
func (s *Service) genHeadersAndSigs(ctx context.Context, blockNumber uint64) (<-chan specBlockHeaderAndSigs, error) {
461+
func (s *BlockFetcher) genHeadersAndSigs(
462+
ctx context.Context,
463+
blockNumber uint64,
464+
) (<-chan specBlockHeaderAndSigs, error) {
435465
it := s.createIteratorForBlock(blockNumber)
436466
headersIt, err := s.client.RequestBlockHeaders(ctx, &header.BlockHeadersRequest{Iteration: it})
437467
if err != nil {
@@ -475,7 +505,10 @@ func (s specClasses) blockNumber() uint64 {
475505
return s.number
476506
}
477507

478-
func (s *Service) genClasses(ctx context.Context, blockNumber uint64) (<-chan specClasses, error) {
508+
func (s *BlockFetcher) genClasses(
509+
ctx context.Context,
510+
blockNumber uint64,
511+
) (<-chan specClasses, error) {
479512
it := s.createIteratorForBlock(blockNumber)
480513
classesIt, err := s.client.RequestClasses(ctx, &syncclass.ClassesRequest{Iteration: it})
481514
if err != nil {
@@ -521,7 +554,10 @@ func (s specContractDiffs) blockNumber() uint64 {
521554
return s.number
522555
}
523556

524-
func (s *Service) genStateDiffs(ctx context.Context, blockNumber uint64) (<-chan specContractDiffs, error) {
557+
func (s *BlockFetcher) genStateDiffs(
558+
ctx context.Context,
559+
blockNumber uint64,
560+
) (<-chan specContractDiffs, error) {
525561
it := s.createIteratorForBlock(blockNumber)
526562
stateDiffsIt, err := s.client.RequestStateDiffs(ctx, &state.StateDiffsRequest{Iteration: it})
527563
if err != nil {
@@ -569,7 +605,10 @@ func (s specEvents) blockNumber() uint64 {
569605
return s.number
570606
}
571607

572-
func (s *Service) genEvents(ctx context.Context, blockNumber uint64) (<-chan specEvents, error) {
608+
func (s *BlockFetcher) genEvents(
609+
ctx context.Context,
610+
blockNumber uint64,
611+
) (<-chan specEvents, error) {
573612
it := s.createIteratorForBlock(blockNumber)
574613
eventsIt, err := s.client.RequestEvents(ctx, &event.EventsRequest{Iteration: it})
575614
if err != nil {
@@ -616,7 +655,10 @@ func (s specTxWithReceipts) blockNumber() uint64 {
616655
return s.number
617656
}
618657

619-
func (s *Service) genTransactions(ctx context.Context, blockNumber uint64) (<-chan specTxWithReceipts, error) {
658+
func (s *BlockFetcher) genTransactions(
659+
ctx context.Context,
660+
blockNumber uint64,
661+
) (<-chan specTxWithReceipts, error) {
620662
it := s.createIteratorForBlock(blockNumber)
621663
txsIt, err := s.client.RequestTransactions(ctx, &synctransaction.TransactionsRequest{Iteration: it})
622664
if err != nil {
@@ -662,47 +704,49 @@ func (s *Service) genTransactions(ctx context.Context, blockNumber uint64) (<-ch
662704
return txsCh, nil
663705
}
664706

665-
func (s *Service) randomPeer() peer.ID {
666-
store := s.host.Peerstore()
707+
func randomPeer(host host.Host, log utils.SimpleLogger) peer.ID {
708+
store := host.Peerstore()
667709
// todo do not request same block from all peers
668710
peers := utils.Filter(store.Peers(), func(peerID peer.ID) bool {
669-
return peerID != s.host.ID()
711+
return peerID != host.ID()
670712
})
671713
if len(peers) == 0 {
672714
return ""
673715
}
674716

675717
p := peers[rand.Intn(len(peers))] //nolint:gosec
676718

677-
s.log.Debugw("Number of peers", "len", len(peers))
678-
s.log.Debugw("Random chosen peer's info", "peerInfo", store.PeerInfo(p))
719+
log.Debugw("Number of peers", "len", len(peers))
720+
log.Debugw("Random chosen peer's info", "peerInfo", store.PeerInfo(p))
679721

680722
return p
681723
}
682724

683725
var errNoPeers = errors.New("no peers available")
684726

685-
func (s *Service) randomPeerStream(ctx context.Context, pids ...protocol.ID) (network.Stream, error) {
686-
randPeer := s.randomPeer()
687-
if randPeer == "" {
688-
return nil, errNoPeers
689-
}
690-
stream, err := s.host.NewStream(ctx, randPeer, pids...)
691-
if err != nil {
692-
s.log.Debugw("Error creating stream", "peer", randPeer, "err", err)
693-
s.removePeer(randPeer)
694-
return nil, err
727+
func randomPeerStream(host host.Host, log utils.SimpleLogger) NewStreamFunc {
728+
return func(ctx context.Context, pids ...protocol.ID) (network.Stream, error) {
729+
randPeer := randomPeer(host, log)
730+
if randPeer == "" {
731+
return nil, errNoPeers
732+
}
733+
stream, err := host.NewStream(ctx, randPeer, pids...)
734+
if err != nil {
735+
log.Debugw("Error creating stream", "peer", randPeer, "err", err)
736+
removePeer(host, log, randPeer)
737+
return nil, err
738+
}
739+
return stream, err
695740
}
696-
return stream, err
697741
}
698742

699-
func (s *Service) removePeer(id peer.ID) {
700-
s.log.Debugw("Removing peer", "peerID", id)
701-
s.host.Peerstore().RemovePeer(id)
702-
s.host.Peerstore().ClearAddrs(id)
743+
func removePeer(host host.Host, log utils.SimpleLogger, id peer.ID) {
744+
log.Debugw("Removing peer", "peerID", id)
745+
host.Peerstore().RemovePeer(id)
746+
host.Peerstore().ClearAddrs(id)
703747
}
704748

705-
func (s *Service) createIteratorForBlock(blockNumber uint64) *synccommon.Iteration {
749+
func (s *BlockFetcher) createIteratorForBlock(blockNumber uint64) *synccommon.Iteration {
706750
return &synccommon.Iteration{
707751
Start: &synccommon.Iteration_BlockNumber{BlockNumber: blockNumber},
708752
Direction: synccommon.Iteration_Forward,
@@ -711,6 +755,6 @@ func (s *Service) createIteratorForBlock(blockNumber uint64) *synccommon.Iterati
711755
}
712756
}
713757

714-
func (s *Service) WithListener(l junoSync.EventListener) {
758+
func (s *BlockFetcher) WithListener(l junoSync.EventListener) {
715759
s.listener = l
716760
}

utils/pipeline/pipeline.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func FanIn[T any](ctx context.Context, channels ...<-chan T) <-chan T {
6262
return out
6363
}
6464

65-
func Bridge[T any](ctx context.Context, out chan T, chanCh <-chan <-chan T) {
65+
func Bridge[T any](ctx context.Context, out chan<- T, chanCh <-chan <-chan T) {
6666
if chanCh == nil {
6767
return
6868
}

0 commit comments

Comments
 (0)