Skip to content

Commit

Permalink
bytes transfet separated by capability and category (#8568)
Browse files Browse the repository at this point in the history
Co-authored-by: Mark Holt <mark@distributed.vision>
  • Loading branch information
dvovk and mh0lt authored Oct 27, 2023
1 parent f36d090 commit 9adf31b
Show file tree
Hide file tree
Showing 10 changed files with 476 additions and 80 deletions.
63 changes: 43 additions & 20 deletions cl/sentinel/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,7 @@ func (s *SentinelServer) PublishGossip(_ context.Context, msg *sentinelrpc.Gossi
// Snappify payload before sending it to gossip
compressedData := utils.CompressSnappy(msg.Data)

_, found := s.peerStatistics[msg.GetPeer().Pid]

if found {
s.peerStatistics[msg.GetPeer().Pid].BytesOut += uint64(len(compressedData))
} else {
s.peerStatistics[msg.GetPeer().Pid] = &diagnostics.PeerStatistics{
BytesIn: 0,
BytesOut: uint64(len(compressedData)),
}
}
s.trackPeerStatistics(msg.GetPeer().Pid, false, msg.Type.String(), "unknown", len(compressedData))

var subscription *sentinel.GossipSubscription

Expand Down Expand Up @@ -326,16 +317,8 @@ func (s *SentinelServer) handleGossipPacket(pkt *pubsub.Message) error {
return err
}

_, found := s.peerStatistics[string(textPid)]

if found {
s.peerStatistics[string(textPid)].BytesIn += uint64(len(data))
} else {
s.peerStatistics[string(textPid)] = &diagnostics.PeerStatistics{
BytesIn: uint64(len(data)),
BytesOut: 0,
}
}
msgType, msgCap := parseTopic(pkt.GetTopic())
s.trackPeerStatistics(string(textPid), true, msgType, msgCap, len(data))

// Check to which gossip it belongs to.
if strings.Contains(*pkt.Topic, string(sentinel.BeaconBlockTopic)) {
Expand Down Expand Up @@ -366,3 +349,43 @@ func (s *SentinelServer) GetPeersStatistics() map[string]*diagnostics.PeerStatis

return stats
}

func (s *SentinelServer) trackPeerStatistics(peerID string, inbound bool, msgType string, msgCap string, bytes int) {
if s.peerStatistics == nil {
s.peerStatistics = make(map[string]*diagnostics.PeerStatistics)
}

if _, exists := s.peerStatistics[peerID]; !exists {
s.peerStatistics[peerID] = &diagnostics.PeerStatistics{
CapBytesIn: make(map[string]uint64),
CapBytesOut: make(map[string]uint64),
TypeBytesIn: make(map[string]uint64),
TypeBytesOut: make(map[string]uint64),
}
}

stats := s.peerStatistics[peerID]

if inbound {
stats.BytesIn += uint64(bytes)
stats.CapBytesIn[msgCap] += uint64(bytes)
stats.TypeBytesIn[msgType] += uint64(bytes)
} else {
stats.BytesOut += uint64(bytes)
stats.CapBytesOut[msgCap] += uint64(bytes)
stats.TypeBytesOut[msgType] += uint64(bytes)
}
}

func parseTopic(input string) (string, string) {
parts := strings.Split(input, "/")

if len(parts) < 4 {
return "unknown", "unknown"
}

capability := parts[1]
topick := parts[3]

return capability, topick
}
26 changes: 19 additions & 7 deletions cmd/sentry/sentry/sentry_grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/datadir"
"github.com/ledgerwatch/erigon-lib/common/dir"
"github.com/ledgerwatch/erigon-lib/diagnostics"
"github.com/ledgerwatch/erigon-lib/direct"
"github.com/ledgerwatch/erigon-lib/gointerfaces"
"github.com/ledgerwatch/erigon-lib/gointerfaces/grpcutil"
Expand Down Expand Up @@ -345,13 +346,14 @@ func handShake(
func runPeer(
ctx context.Context,
peerID [64]byte,
protocol uint,
cap p2p.Cap,
rw p2p.MsgReadWriter,
peerInfo *PeerInfo,
send func(msgId proto_sentry.MessageId, peerID [64]byte, b []byte),
hasSubscribers func(msgId proto_sentry.MessageId) bool,
logger log.Logger,
) *p2p.PeerError {
protocol := cap.Version
printTime := time.Now().Add(time.Minute)
peerPrinted := false
defer func() {
Expand Down Expand Up @@ -384,8 +386,6 @@ func runPeer(
return p2p.NewPeerError(p2p.PeerErrorMessageReceive, p2p.DiscNetworkError, err, "sentry.runPeer: ReadMsg error")
}

peerInfo.peer.BytesTransfered += int(msg.Size)

if msg.Size > eth.ProtocolMaxMsgSize {
msg.Discard()
return p2p.NewPeerError(p2p.PeerErrorMessageSizeLimit, p2p.DiscSubprotocolError, nil, fmt.Sprintf("sentry.runPeer: message is too large %d, limit %d", msg.Size, eth.ProtocolMaxMsgSize))
Expand Down Expand Up @@ -536,6 +536,11 @@ func runPeer(
default:
logger.Error(fmt.Sprintf("[p2p] Unknown message code: %d, peerID=%x", msg.Code, peerID))
}

msgType := eth.ToProto[protocol][msg.Code]
msgCap := cap.String()
peerInfo.peer.CountBytesTransfered(msgType.String(), msgCap, uint64(msg.Size), true)

msg.Discard()
peerInfo.ClearDeadlines(time.Now(), givePermit)
}
Expand Down Expand Up @@ -631,10 +636,12 @@ func NewGrpcServer(ctx context.Context, dialCandidates func() enode.Iterator, re
return p2p.NewPeerError(p2p.PeerErrorFirstMessageSend, p2p.DiscNetworkError, getBlockHeadersErr, "p2p.Protocol.Run getBlockHeaders failure")
}

cap := p2p.Cap{Name: eth.ProtocolName, Version: protocol}

err = runPeer(
ctx,
peerID,
protocol,
cap,
rw,
peerInfo,
ss.send,
Expand Down Expand Up @@ -733,6 +740,11 @@ func (ss *GrpcServer) removePeer(peerID [64]byte, reason *p2p.PeerError) {

func (ss *GrpcServer) writePeer(logPrefix string, peerInfo *PeerInfo, msgcode uint64, data []byte, ttl time.Duration) {
peerInfo.Async(func() {

cap := p2p.Cap{Name: eth.ProtocolName, Version: peerInfo.protocol}
msgType := eth.ToProto[cap.Version][msgcode]
peerInfo.peer.CountBytesTransfered(msgType.String(), cap.String(), uint64(len(data)), false)

err := peerInfo.rw.WriteMsg(p2p.Msg{Code: msgcode, Size: uint32(len(data)), Payload: bytes.NewReader(data)})
if err != nil {
peerInfo.Remove(p2p.NewPeerError(p2p.PeerErrorMessageSend, p2p.DiscNetworkError, err, fmt.Sprintf("%s writePeer msgcode=%d", logPrefix, msgcode)))
Expand Down Expand Up @@ -1041,12 +1053,12 @@ func (ss *GrpcServer) Peers(_ context.Context, _ *emptypb.Empty) (*proto_sentry.
return &reply, nil
}

func (ss *GrpcServer) DiagnosticsPeersData() []*p2p.PeerInfo {
func (ss *GrpcServer) DiagnosticsPeersData() map[string]*diagnostics.PeerStatistics {
if ss.P2pServer == nil {
return []*p2p.PeerInfo{}
return map[string]*diagnostics.PeerStatistics{}
}

peers := ss.P2pServer.PeersInfo()
peers := ss.P2pServer.DiagnosticsPeersInfo()
return peers
}

Expand Down
97 changes: 56 additions & 41 deletions diagnostics/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,17 @@ import (
)

type PeerNetworkInfo struct {
LocalAddress string `json:"localAddress"` // Local endpoint of the TCP data connection
RemoteAddress string `json:"remoteAddress"` // Remote endpoint of the TCP data connection
Inbound bool `json:"inbound"`
Trusted bool `json:"trusted"`
Static bool `json:"static"`
LocalAddress string `json:"localAddress"` // Local endpoint of the TCP data connection
RemoteAddress string `json:"remoteAddress"` // Remote endpoint of the TCP data connection
Inbound bool `json:"inbound"`
Trusted bool `json:"trusted"`
Static bool `json:"static"`
BytesIn uint64 `json:"bytesIn"`
BytesOut uint64 `json:"bytesOut"`
CapBytesIn map[string]uint64 `json:"capBytesIn"`
CapBytesOut map[string]uint64 `json:"capBytesOut"`
TypeBytesIn map[string]uint64 `json:"typeBytesIn"`
TypeBytesOut map[string]uint64 `json:"typeBytesOut"`
}

type PeerResponse struct {
Expand All @@ -28,8 +34,6 @@ type PeerResponse struct {
Caps []string `json:"caps"` // Protocols advertised by this peer
Network PeerNetworkInfo `json:"network"`
Protocols map[string]interface{} `json:"protocols"` // Sub-protocol specific metadata fields
BytesIn int `json:"bytesIn"` // Number of bytes received from the peer
BytesOut int `json:"bytesOut"` // Number of bytes sent to the peer
}

func SetupPeersAccess(ctx *cli.Context, metricsMux *http.ServeMux, node *node.ErigonNode) {
Expand Down Expand Up @@ -66,20 +70,24 @@ func sentinelPeers(node *node.ErigonNode) ([]*PeerResponse, error) {

for key, value := range statisticsArray {
peer := PeerResponse{
ENR: "", //TODO: find a way how to get missing data
Enode: "",
ID: key,
Name: "",
BytesIn: int(value.BytesIn),
BytesOut: int(value.BytesOut),
Type: "Sentinel",
Caps: []string{},
ENR: "", //TODO: find a way how to get missing data
Enode: "",
ID: key,
Name: "",
Type: "Sentinel",
Caps: []string{},
Network: PeerNetworkInfo{
LocalAddress: "",
RemoteAddress: "",
Inbound: false,
Trusted: false,
Static: false,
BytesIn: value.BytesIn,
BytesOut: value.BytesOut,
CapBytesIn: value.CapBytesIn,
CapBytesOut: value.CapBytesOut,
TypeBytesIn: value.TypeBytesIn,
TypeBytesOut: value.TypeBytesOut,
},
Protocols: nil,
}
Expand All @@ -95,41 +103,48 @@ func sentinelPeers(node *node.ErigonNode) ([]*PeerResponse, error) {

func sentryPeers(node *node.ErigonNode) ([]*PeerResponse, error) {

reply := node.Backend().DiagnosticsPeersData()
statisticsArray := node.Backend().DiagnosticsPeersData()

peers := make([]*PeerResponse, 0, len(reply))

for _, rpcPeer := range reply {
var bin = 0
var bout = 0

if rpcPeer.Network.Inbound {
bin = rpcPeer.BytesTransfered
} else {
bout = rpcPeer.BytesTransfered
}
peers := make([]*PeerResponse, 0, len(statisticsArray))

for key, value := range statisticsArray {
peer := PeerResponse{
ENR: rpcPeer.ENR,
Enode: rpcPeer.Enode,
ID: rpcPeer.ID,
Name: rpcPeer.Name,
BytesIn: bin,
BytesOut: bout,
Type: "Sentry",
Caps: rpcPeer.Caps,
ENR: "", //TODO: find a way how to get missing data
Enode: "",
ID: key,
Name: "",
Type: "Sentry",
Caps: []string{},
Network: PeerNetworkInfo{
LocalAddress: rpcPeer.Network.LocalAddress,
RemoteAddress: rpcPeer.Network.RemoteAddress,
Inbound: rpcPeer.Network.Inbound,
Trusted: rpcPeer.Network.Trusted,
Static: rpcPeer.Network.Static,
LocalAddress: "",
RemoteAddress: "",
Inbound: false,
Trusted: false,
Static: false,
BytesIn: value.BytesIn,
BytesOut: value.BytesOut,
CapBytesIn: value.CapBytesIn,
CapBytesOut: value.CapBytesOut,
TypeBytesIn: value.TypeBytesIn,
TypeBytesOut: value.TypeBytesOut,
},
Protocols: nil,
}

peers = append(peers, &peer)
}

return peers, nil
return filterPeersWithoutBytesIn(peers), nil
}

func filterPeersWithoutBytesIn(peers []*PeerResponse) []*PeerResponse {
filteredPeers := make([]*PeerResponse, 0, len(peers))

for _, peer := range peers {
if peer.Network.BytesIn > 0 {
filteredPeers = append(filteredPeers, peer)
}
}

return filteredPeers
}
8 changes: 6 additions & 2 deletions erigon-lib/diagnostics/entities.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ type PeerStatisticsGetter interface {
}

type PeerStatistics struct {
BytesIn uint64
BytesOut uint64
BytesIn uint64
BytesOut uint64
CapBytesIn map[string]uint64
CapBytesOut map[string]uint64
TypeBytesIn map[string]uint64
TypeBytesOut map[string]uint64
}
23 changes: 23 additions & 0 deletions erigon-lib/diagnostics/network.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
Copyright 2021 Erigon contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package diagnostics

import "reflect"

func (p PeerStatistics) Type() Type {
return Type(reflect.TypeOf(p))
}
Loading

0 comments on commit 9adf31b

Please sign in to comment.