Skip to content

Commit

Permalink
metrics for signature-aggregator
Browse files Browse the repository at this point in the history
fixes #384
  • Loading branch information
feuGeneA committed Aug 1, 2024
1 parent 9519013 commit 40028cc
Show file tree
Hide file tree
Showing 9 changed files with 232 additions and 7 deletions.
10 changes: 9 additions & 1 deletion main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/ava-labs/awm-relayer/peers"
"github.com/ava-labs/awm-relayer/relayer"
"github.com/ava-labs/awm-relayer/signature-aggregator/aggregator"
sigAggMetrics "github.com/ava-labs/awm-relayer/signature-aggregator/metrics"
"github.com/ava-labs/awm-relayer/utils"
"github.com/ava-labs/awm-relayer/vms"
"github.com/ava-labs/subnet-evm/ethclient"
Expand Down Expand Up @@ -206,7 +207,14 @@ func main() {
panic(err)
}

signatureAggregator := aggregator.NewSignatureAggregator(network, logger, messageCreator)
signatureAggregator := aggregator.NewSignatureAggregator(
network,
logger,
sigAggMetrics.NewSignatureAggregatorMetrics(
prometheus.DefaultRegisterer,
),
messageCreator,
)

applicationRelayers, minHeights, err := createApplicationRelayers(
context.Background(),
Expand Down
11 changes: 11 additions & 0 deletions signature-aggregator/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/ava-labs/avalanchego/utils/set"
avalancheWarp "github.com/ava-labs/avalanchego/vms/platformvm/warp"
"github.com/ava-labs/awm-relayer/peers"
"github.com/ava-labs/awm-relayer/signature-aggregator/metrics"
"github.com/ava-labs/awm-relayer/utils"
coreEthMsg "github.com/ava-labs/coreth/plugin/evm/message"
msg "github.com/ava-labs/subnet-evm/plugin/evm/message"
Expand Down Expand Up @@ -55,17 +56,20 @@ type SignatureAggregator struct {
messageCreator message.Creator
currentRequestID atomic.Uint32
mu sync.RWMutex
metrics *metrics.SignatureAggregatorMetrics
}

func NewSignatureAggregator(
network *peers.AppRequestNetwork,
logger logging.Logger,
metrics *metrics.SignatureAggregatorMetrics,
messageCreator message.Creator,
) *SignatureAggregator {
sa := SignatureAggregator{
network: network,
subnetIDsByBlockchainID: map[ids.ID]ids.ID{},
logger: logger,
metrics: metrics,
messageCreator: messageCreator,
currentRequestID: atomic.Uint32{},
}
Expand Down Expand Up @@ -101,6 +105,7 @@ func (s *SignatureAggregator) AggregateSignaturesAppRequest(
zap.String("warpMessageID", unsignedMessage.ID().String()),
zap.Error(err),
)
s.metrics.ValidatorFailures.Inc()
return nil, err
}
if !utils.CheckStakeWeightPercentageExceedsThreshold(
Expand All @@ -114,6 +119,7 @@ func (s *SignatureAggregator) AggregateSignaturesAppRequest(
zap.Uint64("totalValidatorWeight", connectedValidators.TotalValidatorWeight),
zap.Uint64("quorumPercentage", quorumPercentage),
)
s.metrics.ValidatorFailures.Inc()
return nil, errNotEnoughConnectedStake
}

Expand Down Expand Up @@ -217,6 +223,7 @@ func (s *SignatureAggregator) AggregateSignaturesAppRequest(
zap.Error(err),
)
responsesExpected--
s.metrics.ValidatorFailures.Inc()
}
}

Expand All @@ -242,6 +249,8 @@ func (s *SignatureAggregator) AggregateSignaturesAppRequest(
quorumPercentage,
)
if err != nil {
// don't increase node failures metric here, because we did
// it in handleResponse
return nil, err
}
if relevant {
Expand Down Expand Up @@ -337,6 +346,7 @@ func (s *SignatureAggregator) handleResponse(
// This is still a relevant response, since we are no longer expecting a response from that node.
if response.Op() == message.AppErrorOp {
s.logger.Debug("Request timed out")
s.metrics.ValidatorFailures.Inc()
return nil, true, nil
}

Expand All @@ -360,6 +370,7 @@ func (s *SignatureAggregator) handleResponse(
zap.String("warpMessageID", unsignedMessage.ID().String()),
zap.String("sourceBlockchainID", unsignedMessage.SourceChainID.String()),
)
s.metrics.ValidatorFailures.Inc()
return nil, true, nil
}

Expand Down
23 changes: 21 additions & 2 deletions signature-aggregator/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import (
"encoding/json"
"net/http"
"strings"
"time"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/awm-relayer/signature-aggregator/aggregator"
"github.com/ava-labs/awm-relayer/signature-aggregator/metrics"
"github.com/ava-labs/awm-relayer/types"
"github.com/ava-labs/awm-relayer/utils"
"go.uber.org/zap"
Expand Down Expand Up @@ -41,9 +43,17 @@ type AggregateSignaturesResponse struct {

func HandleAggregateSignaturesByRawMsgRequest(
logger logging.Logger,
metrics *metrics.SignatureAggregatorMetrics,
signatureAggregator *aggregator.SignatureAggregator,
) {
http.Handle(RawMessageAPIPath, signatureAggregationAPIHandler(logger, signatureAggregator))
http.Handle(
RawMessageAPIPath,
signatureAggregationAPIHandler(
logger,
metrics,
signatureAggregator,
),
)
}

func writeJsonError(
Expand All @@ -67,8 +77,14 @@ func writeJsonError(
}
}

func signatureAggregationAPIHandler(logger logging.Logger, aggregator *aggregator.SignatureAggregator) http.Handler {
func signatureAggregationAPIHandler(
logger logging.Logger,
metrics *metrics.SignatureAggregatorMetrics,
aggregator *aggregator.SignatureAggregator,
) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
metrics.AggregateSignaturesRequestCount.Inc()
startTime := time.Now()
var req AggregateSignaturesByRawMsgRequest
err := json.NewDecoder(r.Body).Decode(&req)
if err != nil {
Expand Down Expand Up @@ -152,5 +168,8 @@ func signatureAggregationAPIHandler(logger logging.Logger, aggregator *aggregato
if err != nil {
logger.Error("Error writing response", zap.Error(err))
}
metrics.AggregateSignaturesLatencyMS.Set(
float64(time.Since(startTime) / time.Millisecond),
)
})
}
2 changes: 2 additions & 0 deletions signature-aggregator/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type Config struct {
PChainAPI *baseCfg.APIConfig `mapstructure:"p-chain-api" json:"p-chain-api"`
InfoAPI *baseCfg.APIConfig `mapstructure:"info-api" json:"info-api"`
APIPort uint16 `mapstructure:"api-port" json:"api-port"`

MetricsPort uint16 `mapstructure:"metrics-port" json:"metrics-port"`
}

func DisplayUsageText() {
Expand Down
18 changes: 16 additions & 2 deletions signature-aggregator/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/ava-labs/awm-relayer/signature-aggregator/aggregator"
"github.com/ava-labs/awm-relayer/signature-aggregator/api"
"github.com/ava-labs/awm-relayer/signature-aggregator/config"
"github.com/ava-labs/awm-relayer/signature-aggregator/metrics"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -105,9 +106,22 @@ func main() {
logger.Fatal("Failed to create message creator", zap.Error(err))
panic(err)
}
signatureAggregator := aggregator.NewSignatureAggregator(network, logger, messageCreator)

api.HandleAggregateSignaturesByRawMsgRequest(logger, signatureAggregator)
registry := metrics.Initialize(cfg.MetricsPort)
metrics_ := metrics.NewSignatureAggregatorMetrics(registry)

signatureAggregator := aggregator.NewSignatureAggregator(
network,
logger,
metrics_,
messageCreator,
)

api.HandleAggregateSignaturesByRawMsgRequest(
logger,
metrics_,
signatureAggregator,
)

err = http.ListenAndServe(fmt.Sprintf(":%d", cfg.APIPort), nil)
if errors.Is(err, http.ErrServerClosed) {
Expand Down
110 changes: 110 additions & 0 deletions signature-aggregator/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright (C) 2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package metrics

import (
"errors"
"fmt"
"log"
"net/http"

"github.com/ava-labs/avalanchego/api/metrics"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

var (
ErrFailedToCreateSignatureAggregatorMetrics = errors.New(
"failed to create signature aggregator metrics",
)
)

var Opts = struct {
AggregateSignaturesLatencyMS prometheus.GaugeOpts
AggregateSignaturesRequestCount prometheus.CounterOpts
ValidatorFailures prometheus.CounterOpts
}{
AggregateSignaturesLatencyMS: prometheus.GaugeOpts{
Name: "agg_sigs_latency_ms",
Help: "Latency of requests for aggregate signatures",
},
AggregateSignaturesRequestCount: prometheus.CounterOpts{
Name: "agg_sigs_req_count",
Help: "Number of requests for aggregate signatures",
},
ValidatorFailures: prometheus.CounterOpts{
Name: "validator_failures",
Help: "Number of failed requests to validator nodes",
},
}

type SignatureAggregatorMetrics struct {
AggregateSignaturesLatencyMS prometheus.Gauge
AggregateSignaturesRequestCount prometheus.Counter
ValidatorFailures prometheus.Counter

// TODO: consider other failures to monitor. Issue #384 requires
// "network failures", but we probably don't handle those directly.
// Surely there are some error types specific to this layer that we can
// count.

// TODO: consider how the relayer keeps separate counts of aggregations
// by AppRequest vs by Warp API and whether we should have such counts.
}

func NewSignatureAggregatorMetrics(
registerer prometheus.Registerer,
) *SignatureAggregatorMetrics {
m := SignatureAggregatorMetrics{
AggregateSignaturesLatencyMS: prometheus.NewGauge(
Opts.AggregateSignaturesLatencyMS,
),
AggregateSignaturesRequestCount: prometheus.NewCounter(
Opts.AggregateSignaturesRequestCount,
),
ValidatorFailures: prometheus.NewCounter(
Opts.ValidatorFailures,
),
}

registerer.MustRegister(m.AggregateSignaturesLatencyMS)
registerer.MustRegister(m.AggregateSignaturesRequestCount)
registerer.MustRegister(m.ValidatorFailures)

return &m
}

func (m *SignatureAggregatorMetrics) HandleMetricsRequest(
gatherer metrics.MultiGatherer,
) {
http.Handle(
"/metrics",
promhttp.HandlerFor(gatherer, promhttp.HandlerOpts{}),
)
}

func Initialize(port uint16) *prometheus.Registry {
gatherer := metrics.NewPrefixGatherer()
registry := prometheus.NewRegistry()
err := gatherer.Register("signature-aggregator", registry)
if err != nil {
panic(
fmt.Errorf(
"failed to register metrics gatherer: %w",
err,
),
)
}

http.Handle(
"/metrics",
promhttp.HandlerFor(gatherer, promhttp.HandlerOpts{}),
)

go func() {
log.Fatalln(http.ListenAndServe(fmt.Sprintf(":%d", port), nil))
}()

return registry
}
2 changes: 1 addition & 1 deletion tests/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ var _ = ginkgo.Describe("[AWM Relayer Integration Tests", func() {
ginkgo.It("Warp API", func() {
WarpAPIRelay(localNetworkInstance)
})
ginkgo.It("Signature Aggregator", func() {
ginkgo.FIt("Signature Aggregator", func() {
SignatureAggregatorAPI(localNetworkInstance)
})
})
Loading

0 comments on commit 40028cc

Please sign in to comment.