@@ -18,18 +18,20 @@ import (
18
18
"syscall"
19
19
"time"
20
20
21
+ "github.com/ledgerwatch/log/v3"
22
+ "google.golang.org/grpc"
23
+ "google.golang.org/grpc/health"
24
+ "google.golang.org/grpc/health/grpc_health_v1"
25
+ "google.golang.org/protobuf/types/known/emptypb"
26
+
21
27
libcommon "github.com/ledgerwatch/erigon-lib/common"
22
28
"github.com/ledgerwatch/erigon-lib/common/datadir"
23
29
"github.com/ledgerwatch/erigon-lib/common/dir"
30
+ "github.com/ledgerwatch/erigon-lib/direct"
24
31
"github.com/ledgerwatch/erigon-lib/gointerfaces"
25
32
"github.com/ledgerwatch/erigon-lib/gointerfaces/grpcutil"
26
33
proto_sentry "github.com/ledgerwatch/erigon-lib/gointerfaces/sentry"
27
34
proto_types "github.com/ledgerwatch/erigon-lib/gointerfaces/types"
28
- "github.com/ledgerwatch/log/v3"
29
- "google.golang.org/grpc"
30
- "google.golang.org/grpc/health"
31
- "google.golang.org/grpc/health/grpc_health_v1"
32
- "google.golang.org/protobuf/types/known/emptypb"
33
35
34
36
"github.com/ledgerwatch/erigon/cmd/utils"
35
37
"github.com/ledgerwatch/erigon/common/debug"
@@ -414,7 +416,7 @@ func runPeer(
414
416
}
415
417
send (eth.ToProto [protocol ][msg.Code ], peerID , b )
416
418
case eth .GetNodeDataMsg :
417
- if protocol >= eth .ETH67 {
419
+ if protocol >= direct .ETH67 {
418
420
msg .Discard ()
419
421
return fmt .Errorf ("unexpected GetNodeDataMsg from %s in eth/%d" , peerID , protocol )
420
422
}
@@ -558,75 +560,67 @@ func NewGrpcServer(ctx context.Context, dialCandidates func() enode.Iterator, re
558
560
logger : logger ,
559
561
}
560
562
561
- protocols := [] uint { protocol }
562
- if protocol == eth . ETH67 {
563
- protocols = append ( protocols , eth . ETH66 )
563
+ var disc enode. Iterator
564
+ if dialCandidates != nil {
565
+ disc = dialCandidates ( )
564
566
}
565
-
566
- for _ , p := range protocols {
567
- protocol := p
568
- var disc enode.Iterator
569
- if dialCandidates != nil {
570
- disc = dialCandidates ()
571
- }
572
- ss .Protocols = append (ss .Protocols , p2p.Protocol {
573
- Name : eth .ProtocolName ,
574
- Version : protocol ,
575
- Length : 17 ,
576
- DialCandidates : disc ,
577
- Run : func (peer * p2p.Peer , rw p2p.MsgReadWriter ) error {
578
- peerID := peer .Pubkey ()
579
- printablePeerID := hex .EncodeToString (peerID [:])[:20 ]
580
- if ss .getPeer (peerID ) != nil {
581
- logger .Trace ("[p2p] peer already has connection" , "peerId" , printablePeerID )
582
- return nil
583
- }
584
- logger .Trace ("[p2p] start with peer" , "peerId" , printablePeerID )
585
-
586
- peerInfo := NewPeerInfo (peer , rw )
587
- peerInfo .protocol = protocol
588
- defer peerInfo .Close ()
589
-
590
- defer ss .GoodPeers .Delete (peerID )
591
- err := handShake (ctx , ss .GetStatus (), peerID , rw , protocol , protocol , func (bestHash libcommon.Hash ) error {
592
- ss .GoodPeers .Store (peerID , peerInfo )
593
- ss .sendNewPeerToClients (gointerfaces .ConvertHashToH512 (peerID ))
594
- return ss .startSync (ctx , bestHash , peerID )
595
- })
596
- if err != nil {
597
- if errors .Is (err , NetworkIdMissmatchErr ) || errors .Is (err , io .EOF ) || errors .Is (err , p2p .ErrShuttingDown ) {
598
- logger .Trace ("[p2p] Handshake failure" , "peer" , printablePeerID , "err" , err )
599
- } else {
600
- logger .Debug ("[p2p] Handshake failure" , "peer" , printablePeerID , "err" , err )
601
- }
602
- return fmt .Errorf ("[p2p]handshake to peer %s: %w" , printablePeerID , err )
603
- }
604
- logger .Trace ("[p2p] Received status message OK" , "peerId" , printablePeerID , "name" , peer .Name ())
605
-
606
- err = runPeer (
607
- ctx ,
608
- peerID ,
609
- protocol ,
610
- rw ,
611
- peerInfo ,
612
- ss .send ,
613
- ss .hasSubscribers ,
614
- logger ,
615
- ) // runPeer never returns a nil error
616
- logger .Trace ("[p2p] error while running peer" , "peerId" , printablePeerID , "err" , err )
617
- ss .sendGonePeerToClients (gointerfaces .ConvertHashToH512 (peerID ))
567
+ ss .Protocols = append (ss .Protocols , p2p.Protocol {
568
+ Name : eth .ProtocolName ,
569
+ Version : protocol ,
570
+ Length : 17 ,
571
+ DialCandidates : disc ,
572
+ Run : func (peer * p2p.Peer , rw p2p.MsgReadWriter ) error {
573
+ peerID := peer .Pubkey ()
574
+ printablePeerID := hex .EncodeToString (peerID [:])[:20 ]
575
+ if ss .getPeer (peerID ) != nil {
576
+ logger .Trace ("[p2p] peer already has connection" , "peerId" , printablePeerID )
618
577
return nil
619
- },
620
- NodeInfo : func () interface {} {
621
- return readNodeInfo ()
622
- },
623
- PeerInfo : func (peerID [64 ]byte ) interface {} {
624
- // TODO: remember handshake reply per peer ID and return eth-related Status info (see ethPeerInfo in geth)
625
- return nil
626
- },
627
- //Attributes: []enr.Entry{eth.CurrentENREntry(chainConfig, genesisHash, headHeight)},
628
- })
629
- }
578
+ }
579
+ logger .Trace ("[p2p] start with peer" , "peerId" , printablePeerID )
580
+
581
+ peerInfo := NewPeerInfo (peer , rw )
582
+ peerInfo .protocol = protocol
583
+ defer peerInfo .Close ()
584
+
585
+ defer ss .GoodPeers .Delete (peerID )
586
+ err := handShake (ctx , ss .GetStatus (), peerID , rw , protocol , protocol , func (bestHash libcommon.Hash ) error {
587
+ ss .GoodPeers .Store (peerID , peerInfo )
588
+ ss .sendNewPeerToClients (gointerfaces .ConvertHashToH512 (peerID ))
589
+ return ss .startSync (ctx , bestHash , peerID )
590
+ })
591
+ if err != nil {
592
+ if errors .Is (err , NetworkIdMissmatchErr ) || errors .Is (err , io .EOF ) || errors .Is (err , p2p .ErrShuttingDown ) {
593
+ logger .Trace ("[p2p] Handshake failure" , "peer" , printablePeerID , "err" , err )
594
+ } else {
595
+ logger .Debug ("[p2p] Handshake failure" , "peer" , printablePeerID , "err" , err )
596
+ }
597
+ return fmt .Errorf ("[p2p]handshake to peer %s: %w" , printablePeerID , err )
598
+ }
599
+ logger .Trace ("[p2p] Received status message OK" , "peerId" , printablePeerID , "name" , peer .Name ())
600
+
601
+ err = runPeer (
602
+ ctx ,
603
+ peerID ,
604
+ protocol ,
605
+ rw ,
606
+ peerInfo ,
607
+ ss .send ,
608
+ ss .hasSubscribers ,
609
+ logger ,
610
+ ) // runPeer never returns a nil error
611
+ logger .Trace ("[p2p] error while running peer" , "peerId" , printablePeerID , "err" , err )
612
+ ss .sendGonePeerToClients (gointerfaces .ConvertHashToH512 (peerID ))
613
+ return nil
614
+ },
615
+ NodeInfo : func () interface {} {
616
+ return readNodeInfo ()
617
+ },
618
+ PeerInfo : func (peerID [64 ]byte ) interface {} {
619
+ // TODO: remember handshake reply per peer ID and return eth-related Status info (see ethPeerInfo in geth)
620
+ return nil
621
+ },
622
+ //Attributes: []enr.Entry{eth.CurrentENREntry(chainConfig, genesisHash, headHeight)},
623
+ })
630
624
631
625
return ss
632
626
}
@@ -934,11 +928,11 @@ func (ss *GrpcServer) SendMessageToAll(ctx context.Context, req *proto_sentry.Ou
934
928
func (ss * GrpcServer ) HandShake (context.Context , * emptypb.Empty ) (* proto_sentry.HandShakeReply , error ) {
935
929
reply := & proto_sentry.HandShakeReply {}
936
930
switch ss .Protocols [0 ].Version {
937
- case eth .ETH66 :
931
+ case direct .ETH66 :
938
932
reply .Protocol = proto_sentry .Protocol_ETH66
939
- case eth .ETH67 :
933
+ case direct .ETH67 :
940
934
reply .Protocol = proto_sentry .Protocol_ETH67
941
- case eth .ETH68 :
935
+ case direct .ETH68 :
942
936
reply .Protocol = proto_sentry .Protocol_ETH68
943
937
}
944
938
return reply , nil
0 commit comments