Skip to content

Commit

Permalink
cmd/relay: enabled new libp2p relay metrics (#3040)
Browse files Browse the repository at this point in the history
libp2p introduced set of new metrics for relay:
https://github.com/libp2p/go-libp2p/blob/master/p2p/protocol/circuitv2/relay/metrics.go

This work integrates these metrics into our instrumentation flow.

Note: metrics reported by libp2p will be prefixed: `libp2p_relaysvc` whereas metrics reported by charon code are prefixed: `relay_p2p`. This distinction seems useful for clear separation and avoiding clashes. Therefore, `metrics.md` remains intact. This only enriches libp2p metrics with charon cluster labels.

category: feature
ticket: #2544
  • Loading branch information
pinebit authored Apr 16, 2024
1 parent 6bebc88 commit 62a9492
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 17 deletions.
27 changes: 19 additions & 8 deletions cmd/relay/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,37 +16,46 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
ma "github.com/multiformats/go-multiaddr"
"github.com/prometheus/client_golang/prometheus"

"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/log"
"github.com/obolnetwork/charon/app/peerinfo"
"github.com/obolnetwork/charon/app/promauto"
"github.com/obolnetwork/charon/app/z"
"github.com/obolnetwork/charon/p2p"
)

// startP2P returns a started libp2p host or an error.
func startP2P(ctx context.Context, config Config, key *k1.PrivateKey, reporter metrics.Reporter) (host.Host, error) {
func startP2P(ctx context.Context, config Config, key *k1.PrivateKey, reporter metrics.Reporter) (host.Host, *prometheus.Registry, error) {
if len(config.P2PConfig.TCPAddrs) == 0 {
return nil, errors.New("p2p TCP addresses required")
return nil, nil, errors.New("p2p TCP addresses required")
}

if config.LibP2PLogLevel != "" {
if err := libp2plog.SetLogLevel("relay", config.LibP2PLogLevel); err != nil {
return nil, errors.Wrap(err, "set relay log level")
return nil, nil, errors.Wrap(err, "set relay log level")
}
if err := libp2plog.SetLogLevel("rcmgr", config.LibP2PLogLevel); err != nil {
return nil, errors.Wrap(err, "set rcmgr log level")
return nil, nil, errors.Wrap(err, "set rcmgr log level")
}
}

tcpNode, err := p2p.NewTCPNode(ctx, config.P2PConfig, key, p2p.NewOpenGater(), config.FilterPrivAddrs,
libp2p.ResourceManager(new(network.NullResourceManager)), libp2p.BandwidthReporter(reporter))
if err != nil {
return nil, errors.Wrap(err, "new tcp node")
return nil, nil, errors.Wrap(err, "new tcp node")
}

p2p.RegisterConnectionLogger(ctx, tcpNode, nil)

labels := map[string]string{"relay_peer": p2p.PeerName(tcpNode.ID())}
log.SetLokiLabels(labels)
promRegistry, err := promauto.NewRegistry(labels)
if err != nil {
return nil, nil, errors.Wrap(err, "create prometheus registry")
}

relayResources := relay.DefaultResources()
relayResources.Limit.Data = 32 * (1 << 20) // 32MB
relayResources.Limit.Duration = time.Hour
Expand All @@ -56,9 +65,11 @@ func startP2P(ctx context.Context, config Config, key *k1.PrivateKey, reporter m
relayResources.MaxReservations = config.MaxConns
relayResources.MaxCircuits = config.MaxResPerPeer

relayService, err := relay.New(tcpNode, relay.WithResources(relayResources))
// This enables relay metrics: https://github.com/libp2p/go-libp2p/blob/master/p2p/protocol/circuitv2/relay/metrics.go
mt := relay.NewMetricsTracer(relay.WithRegisterer(promRegistry))
relayService, err := relay.New(tcpNode, relay.WithResources(relayResources), relay.WithMetricsTracer(mt))
if err != nil {
return nil, errors.Wrap(err, "new relay service")
return nil, nil, errors.Wrap(err, "new relay service")
}

go func() {
Expand All @@ -67,7 +78,7 @@ func startP2P(ctx context.Context, config Config, key *k1.PrivateKey, reporter m
_ = relayService.Close()
}()

return tcpNode, nil
return tcpNode, promRegistry, nil
}

const unknownCluster = "unknown"
Expand Down
10 changes: 1 addition & 9 deletions cmd/relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/log"
"github.com/obolnetwork/charon/app/promauto"
"github.com/obolnetwork/charon/app/version"
"github.com/obolnetwork/charon/app/z"
"github.com/obolnetwork/charon/eth2util/enr"
Expand Down Expand Up @@ -72,20 +71,13 @@ func Run(ctx context.Context, config Config) error {
bwTuples := make(chan bwTuple)
counter := newBandwidthCounter(ctx, bwTuples)

tcpNode, err := startP2P(ctx, config, key, counter)
tcpNode, promRegistry, err := startP2P(ctx, config, key, counter)
if err != nil {
return err
}

go monitorConnections(ctx, tcpNode, bwTuples)

labels := map[string]string{"relay_peer": p2p.PeerName(tcpNode.ID())}
log.SetLokiLabels(labels)
promRegistry, err := promauto.NewRegistry(labels)
if err != nil {
return err
}

// Start serving HTTP: ENR and monitoring.
serverErr := make(chan error, 3) // Buffer for 3 servers.
go func() {
Expand Down
42 changes: 42 additions & 0 deletions cmd/relay/relay_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"io"
"net/http"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -224,3 +225,44 @@ func testServeAddrs(t *testing.T, p2pConfig p2p.Config, path string, asserter fu

require.NoError(t, eg.Wait())
}

func TestRelayMetricsExported(t *testing.T) {
temp := t.TempDir()

config := Config{
DataDir: temp,
LogConfig: log.DefaultConfig(),
P2PConfig: p2p.Config{TCPAddrs: []string{testutil.AvailableAddr(t).String()}},
HTTPAddr: testutil.AvailableAddr(t).String(),
MonitoringAddr: testutil.AvailableAddr(t).String(),
}

_, err := p2p.NewSavedPrivKey(temp)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())

go func() {
err := Run(ctx, config)
testutil.SkipIfBindErr(t, err)
assert.NoError(t, err)
}()

fetchMetrics := func() string {
resp, err := http.Get(fmt.Sprintf("http://%s/metrics", config.MonitoringAddr))
if err == nil {
body, err := io.ReadAll(resp.Body)
if err == nil {
return string(body)
}
}

return ""
}

require.Eventually(t, func() bool {
return strings.Contains(fetchMetrics(), "libp2p_relaysvc_")
}, 10*time.Second, time.Second, "waiting for relay service to start")

cancel()
}

0 comments on commit 62a9492

Please sign in to comment.