From 40028cca61d44b65aab384912ca067327431d4af Mon Sep 17 00:00:00 2001 From: "F. Eugene Aumson" Date: Thu, 1 Aug 2024 14:25:46 +0000 Subject: [PATCH] metrics for signature-aggregator fixes #384 --- main/main.go | 10 +- signature-aggregator/aggregator/aggregator.go | 11 ++ signature-aggregator/api/api.go | 23 +++- signature-aggregator/config/config.go | 2 + signature-aggregator/main/main.go | 18 ++- signature-aggregator/metrics/metrics.go | 110 ++++++++++++++++++ tests/e2e_test.go | 2 +- tests/signature_aggregator_api.go | 60 ++++++++++ tests/utils/utils.go | 3 +- 9 files changed, 232 insertions(+), 7 deletions(-) create mode 100644 signature-aggregator/metrics/metrics.go diff --git a/main/main.go b/main/main.go index 78038857..df845c29 100644 --- a/main/main.go +++ b/main/main.go @@ -27,6 +27,7 @@ import ( "github.com/ava-labs/awm-relayer/peers" "github.com/ava-labs/awm-relayer/relayer" "github.com/ava-labs/awm-relayer/signature-aggregator/aggregator" + sigAggMetrics "github.com/ava-labs/awm-relayer/signature-aggregator/metrics" "github.com/ava-labs/awm-relayer/utils" "github.com/ava-labs/awm-relayer/vms" "github.com/ava-labs/subnet-evm/ethclient" @@ -206,7 +207,14 @@ func main() { panic(err) } - signatureAggregator := aggregator.NewSignatureAggregator(network, logger, messageCreator) + signatureAggregator := aggregator.NewSignatureAggregator( + network, + logger, + sigAggMetrics.NewSignatureAggregatorMetrics( + prometheus.DefaultRegisterer, + ), + messageCreator, + ) applicationRelayers, minHeights, err := createApplicationRelayers( context.Background(), diff --git a/signature-aggregator/aggregator/aggregator.go b/signature-aggregator/aggregator/aggregator.go index c1aa4ff4..ef7fa01b 100644 --- a/signature-aggregator/aggregator/aggregator.go +++ b/signature-aggregator/aggregator/aggregator.go @@ -23,6 +23,7 @@ import ( "github.com/ava-labs/avalanchego/utils/set" avalancheWarp "github.com/ava-labs/avalanchego/vms/platformvm/warp" "github.com/ava-labs/awm-relayer/peers" + "github.com/ava-labs/awm-relayer/signature-aggregator/metrics" "github.com/ava-labs/awm-relayer/utils" coreEthMsg "github.com/ava-labs/coreth/plugin/evm/message" msg "github.com/ava-labs/subnet-evm/plugin/evm/message" @@ -55,17 +56,20 @@ type SignatureAggregator struct { messageCreator message.Creator currentRequestID atomic.Uint32 mu sync.RWMutex + metrics *metrics.SignatureAggregatorMetrics } func NewSignatureAggregator( network *peers.AppRequestNetwork, logger logging.Logger, + metrics *metrics.SignatureAggregatorMetrics, messageCreator message.Creator, ) *SignatureAggregator { sa := SignatureAggregator{ network: network, subnetIDsByBlockchainID: map[ids.ID]ids.ID{}, logger: logger, + metrics: metrics, messageCreator: messageCreator, currentRequestID: atomic.Uint32{}, } @@ -101,6 +105,7 @@ func (s *SignatureAggregator) AggregateSignaturesAppRequest( zap.String("warpMessageID", unsignedMessage.ID().String()), zap.Error(err), ) + s.metrics.ValidatorFailures.Inc() return nil, err } if !utils.CheckStakeWeightPercentageExceedsThreshold( @@ -114,6 +119,7 @@ func (s *SignatureAggregator) AggregateSignaturesAppRequest( zap.Uint64("totalValidatorWeight", connectedValidators.TotalValidatorWeight), zap.Uint64("quorumPercentage", quorumPercentage), ) + s.metrics.ValidatorFailures.Inc() return nil, errNotEnoughConnectedStake } @@ -217,6 +223,7 @@ func (s *SignatureAggregator) AggregateSignaturesAppRequest( zap.Error(err), ) responsesExpected-- + s.metrics.ValidatorFailures.Inc() } } @@ -242,6 +249,8 @@ func (s *SignatureAggregator) AggregateSignaturesAppRequest( quorumPercentage, ) if err != nil { + // don't increase node failures metric here, because we did + // it in handleResponse return nil, err } if relevant { @@ -337,6 +346,7 @@ func (s *SignatureAggregator) handleResponse( // This is still a relevant response, since we are no longer expecting a response from that node. if response.Op() == message.AppErrorOp { s.logger.Debug("Request timed out") + s.metrics.ValidatorFailures.Inc() return nil, true, nil } @@ -360,6 +370,7 @@ func (s *SignatureAggregator) handleResponse( zap.String("warpMessageID", unsignedMessage.ID().String()), zap.String("sourceBlockchainID", unsignedMessage.SourceChainID.String()), ) + s.metrics.ValidatorFailures.Inc() return nil, true, nil } diff --git a/signature-aggregator/api/api.go b/signature-aggregator/api/api.go index c12f00d3..b0b8ab05 100644 --- a/signature-aggregator/api/api.go +++ b/signature-aggregator/api/api.go @@ -8,10 +8,12 @@ import ( "encoding/json" "net/http" "strings" + "time" "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/awm-relayer/signature-aggregator/aggregator" + "github.com/ava-labs/awm-relayer/signature-aggregator/metrics" "github.com/ava-labs/awm-relayer/types" "github.com/ava-labs/awm-relayer/utils" "go.uber.org/zap" @@ -41,9 +43,17 @@ type AggregateSignaturesResponse struct { func HandleAggregateSignaturesByRawMsgRequest( logger logging.Logger, + metrics *metrics.SignatureAggregatorMetrics, signatureAggregator *aggregator.SignatureAggregator, ) { - http.Handle(RawMessageAPIPath, signatureAggregationAPIHandler(logger, signatureAggregator)) + http.Handle( + RawMessageAPIPath, + signatureAggregationAPIHandler( + logger, + metrics, + signatureAggregator, + ), + ) } func writeJsonError( @@ -67,8 +77,14 @@ func writeJsonError( } } -func signatureAggregationAPIHandler(logger logging.Logger, aggregator *aggregator.SignatureAggregator) http.Handler { +func signatureAggregationAPIHandler( + logger logging.Logger, + metrics *metrics.SignatureAggregatorMetrics, + aggregator *aggregator.SignatureAggregator, +) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + metrics.AggregateSignaturesRequestCount.Inc() + startTime := time.Now() var req AggregateSignaturesByRawMsgRequest err := json.NewDecoder(r.Body).Decode(&req) if err != nil { @@ -152,5 +168,8 @@ func signatureAggregationAPIHandler(logger logging.Logger, aggregator *aggregato if err != nil { logger.Error("Error writing response", zap.Error(err)) } + metrics.AggregateSignaturesLatencyMS.Set( + float64(time.Since(startTime) / time.Millisecond), + ) }) } diff --git a/signature-aggregator/config/config.go b/signature-aggregator/config/config.go index 3948b075..1386e8cb 100644 --- a/signature-aggregator/config/config.go +++ b/signature-aggregator/config/config.go @@ -28,6 +28,8 @@ type Config struct { PChainAPI *baseCfg.APIConfig `mapstructure:"p-chain-api" json:"p-chain-api"` InfoAPI *baseCfg.APIConfig `mapstructure:"info-api" json:"info-api"` APIPort uint16 `mapstructure:"api-port" json:"api-port"` + + MetricsPort uint16 `mapstructure:"metrics-port" json:"metrics-port"` } func DisplayUsageText() { diff --git a/signature-aggregator/main/main.go b/signature-aggregator/main/main.go index e942eac5..19489342 100644 --- a/signature-aggregator/main/main.go +++ b/signature-aggregator/main/main.go @@ -17,6 +17,7 @@ import ( "github.com/ava-labs/awm-relayer/signature-aggregator/aggregator" "github.com/ava-labs/awm-relayer/signature-aggregator/api" "github.com/ava-labs/awm-relayer/signature-aggregator/config" + "github.com/ava-labs/awm-relayer/signature-aggregator/metrics" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" ) @@ -105,9 +106,22 @@ func main() { logger.Fatal("Failed to create message creator", zap.Error(err)) panic(err) } - signatureAggregator := aggregator.NewSignatureAggregator(network, logger, messageCreator) - api.HandleAggregateSignaturesByRawMsgRequest(logger, signatureAggregator) + registry := metrics.Initialize(cfg.MetricsPort) + metrics_ := metrics.NewSignatureAggregatorMetrics(registry) + + signatureAggregator := aggregator.NewSignatureAggregator( + network, + logger, + metrics_, + messageCreator, + ) + + api.HandleAggregateSignaturesByRawMsgRequest( + logger, + metrics_, + signatureAggregator, + ) err = http.ListenAndServe(fmt.Sprintf(":%d", cfg.APIPort), nil) if errors.Is(err, http.ErrServerClosed) { diff --git a/signature-aggregator/metrics/metrics.go b/signature-aggregator/metrics/metrics.go new file mode 100644 index 00000000..97da174e --- /dev/null +++ b/signature-aggregator/metrics/metrics.go @@ -0,0 +1,110 @@ +// Copyright (C) 2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package metrics + +import ( + "errors" + "fmt" + "log" + "net/http" + + "github.com/ava-labs/avalanchego/api/metrics" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +var ( + ErrFailedToCreateSignatureAggregatorMetrics = errors.New( + "failed to create signature aggregator metrics", + ) +) + +var Opts = struct { + AggregateSignaturesLatencyMS prometheus.GaugeOpts + AggregateSignaturesRequestCount prometheus.CounterOpts + ValidatorFailures prometheus.CounterOpts +}{ + AggregateSignaturesLatencyMS: prometheus.GaugeOpts{ + Name: "agg_sigs_latency_ms", + Help: "Latency of requests for aggregate signatures", + }, + AggregateSignaturesRequestCount: prometheus.CounterOpts{ + Name: "agg_sigs_req_count", + Help: "Number of requests for aggregate signatures", + }, + ValidatorFailures: prometheus.CounterOpts{ + Name: "validator_failures", + Help: "Number of failed requests to validator nodes", + }, +} + +type SignatureAggregatorMetrics struct { + AggregateSignaturesLatencyMS prometheus.Gauge + AggregateSignaturesRequestCount prometheus.Counter + ValidatorFailures prometheus.Counter + + // TODO: consider other failures to monitor. Issue #384 requires + // "network failures", but we probably don't handle those directly. + // Surely there are some error types specific to this layer that we can + // count. + + // TODO: consider how the relayer keeps separate counts of aggregations + // by AppRequest vs by Warp API and whether we should have such counts. +} + +func NewSignatureAggregatorMetrics( + registerer prometheus.Registerer, +) *SignatureAggregatorMetrics { + m := SignatureAggregatorMetrics{ + AggregateSignaturesLatencyMS: prometheus.NewGauge( + Opts.AggregateSignaturesLatencyMS, + ), + AggregateSignaturesRequestCount: prometheus.NewCounter( + Opts.AggregateSignaturesRequestCount, + ), + ValidatorFailures: prometheus.NewCounter( + Opts.ValidatorFailures, + ), + } + + registerer.MustRegister(m.AggregateSignaturesLatencyMS) + registerer.MustRegister(m.AggregateSignaturesRequestCount) + registerer.MustRegister(m.ValidatorFailures) + + return &m +} + +func (m *SignatureAggregatorMetrics) HandleMetricsRequest( + gatherer metrics.MultiGatherer, +) { + http.Handle( + "/metrics", + promhttp.HandlerFor(gatherer, promhttp.HandlerOpts{}), + ) +} + +func Initialize(port uint16) *prometheus.Registry { + gatherer := metrics.NewPrefixGatherer() + registry := prometheus.NewRegistry() + err := gatherer.Register("signature-aggregator", registry) + if err != nil { + panic( + fmt.Errorf( + "failed to register metrics gatherer: %w", + err, + ), + ) + } + + http.Handle( + "/metrics", + promhttp.HandlerFor(gatherer, promhttp.HandlerOpts{}), + ) + + go func() { + log.Fatalln(http.ListenAndServe(fmt.Sprintf(":%d", port), nil)) + }() + + return registry +} diff --git a/tests/e2e_test.go b/tests/e2e_test.go index 7de98215..c6c3ace1 100644 --- a/tests/e2e_test.go +++ b/tests/e2e_test.go @@ -153,7 +153,7 @@ var _ = ginkgo.Describe("[AWM Relayer Integration Tests", func() { ginkgo.It("Warp API", func() { WarpAPIRelay(localNetworkInstance) }) - ginkgo.It("Signature Aggregator", func() { + ginkgo.FIt("Signature Aggregator", func() { SignatureAggregatorAPI(localNetworkInstance) }) }) diff --git a/tests/signature_aggregator_api.go b/tests/signature_aggregator_api.go index 205e3d1d..eee927f5 100644 --- a/tests/signature_aggregator_api.go +++ b/tests/signature_aggregator_api.go @@ -4,6 +4,7 @@ package tests import ( + "bufio" "bytes" "context" "encoding/hex" @@ -11,10 +12,13 @@ import ( "fmt" "io" "net/http" + "strconv" + "strings" "time" avalancheWarp "github.com/ava-labs/avalanchego/vms/platformvm/warp" "github.com/ava-labs/awm-relayer/signature-aggregator/api" + "github.com/ava-labs/awm-relayer/signature-aggregator/metrics" testUtils "github.com/ava-labs/awm-relayer/tests/utils" "github.com/ava-labs/teleporter/tests/interfaces" "github.com/ava-labs/teleporter/tests/utils" @@ -95,4 +99,60 @@ func SignatureAggregatorAPI(network interfaces.LocalNetwork) { Expect(err).Should(BeNil()) Expect(signedMessage.ID()).Should(Equal(warpMessage.ID())) } + + // Check metrics + metricsSample := sampleMetrics(signatureAggregatorConfig.MetricsPort) + Expect( + metricsSample[metrics.Opts.AggregateSignaturesRequestCount.Name], + ).Should(BeNumerically("==", 1)) + Expect( + metricsSample[metrics.Opts.AggregateSignaturesLatencyMS.Name], + ).Should(BeNumerically(">", 0)) + Expect( + metricsSample[metrics.Opts.ValidatorFailures.Name], + ).Should(BeNumerically("<", 11)) +} + +// returns a map of metric names to metric samples +func sampleMetrics(port uint16) map[string]uint64 { + resp, err := http.Get( + fmt.Sprintf("http://localhost:%d/metrics", port), + ) + Expect(err).Should(BeNil()) + + body, err := io.ReadAll(resp.Body) + Expect(err).Should(BeNil()) + defer resp.Body.Close() + + var samples = make(map[string]uint64) + scanner := bufio.NewScanner(strings.NewReader(string(body))) + for scanner.Scan() { + line := scanner.Text() + for _, metricName := range []string{ + metrics.Opts.AggregateSignaturesLatencyMS.Name, + metrics.Opts.AggregateSignaturesRequestCount.Name, + metrics.Opts.ValidatorFailures.Name, + } { + if strings.HasPrefix( + line, + "U__signature_2d_aggregator_"+metricName, + ) { + log.Debug("Found metric line", "line", line) + parts := strings.Fields(line) + + // Fetch the metric count from the last field of the line + value, err := strconv.ParseUint(parts[len(parts)-1], 10, 64) + if err != nil { + log.Warn("failed to parse value from metric line") + continue + } + log.Debug("parsed metric", "name", metricName, "value", value) + + samples[metricName] = value + } else { + log.Debug("Ignoring non-metric line", "line", line) + } + } + } + return samples } diff --git a/tests/utils/utils.go b/tests/utils/utils.go index 0ee43121..44d31678 100644 --- a/tests/utils/utils.go +++ b/tests/utils/utils.go @@ -294,7 +294,8 @@ func CreateDefaultSignatureAggregatorConfig( InfoAPI: &config.APIConfig{ BaseURL: sourceSubnetsInfo[0].NodeURIs[0], }, - APIPort: 8080, + APIPort: 8080, + MetricsPort: 8081, } }