diff --git a/.changeset/kind-parents-jump.md b/.changeset/kind-parents-jump.md new file mode 100644 index 00000000000..e633f1af1fe --- /dev/null +++ b/.changeset/kind-parents-jump.md @@ -0,0 +1,11 @@ +--- +"chainlink": patch +--- + +Add two new metrics for monitoring LLO transmitter health #added + +`llo_mercurytransmitter_concurrent_transmit_gauge` +Gauge that measures the number of transmit threads currently waiting on a remote transmit call. You may wish to alert if this exceeds some number for a given period of time, or if it ever reaches its max. + +`llo_mercurytransmitter_concurrent_delete_gauge` +Gauge that measures the number of delete threads currently waiting on a delete call to the DB. You may wish to alert if this exceeds some number for a given period of time, or if it ever reaches its max. diff --git a/core/cmd/app.go b/core/cmd/app.go index 53c96980de4..ad944f0d0a6 100644 --- a/core/cmd/app.go +++ b/core/cmd/app.go @@ -10,6 +10,7 @@ import ( "slices" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/urfave/cli" "github.com/smartcontractkit/chainlink/v2/core/build" @@ -85,6 +86,7 @@ func NewApp(s *Shell) *cli.App { } s.Logger = lggr + s.Registerer = prometheus.DefaultRegisterer // use the global DefaultRegisterer, should be safe since we only ever run one instance of the app per shell s.CloseLogger = closeFn s.Config = cfg diff --git a/core/cmd/shell.go b/core/cmd/shell.go index e4f4c5bd6e3..1edd53c1efc 100644 --- a/core/cmd/shell.go +++ b/core/cmd/shell.go @@ -25,6 +25,7 @@ import ( "github.com/gin-gonic/gin" "github.com/jmoiron/sqlx" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/urfave/cli" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" @@ -62,7 +63,7 @@ import ( var ( initGlobalsOnce sync.Once - prometheus *ginprom.Prometheus + ginPrometheus *ginprom.Prometheus grpcOpts loop.GRPCOpts ) @@ -71,7 +72,7 @@ func initGlobals(cfgProm config.Prometheus, cfgTracing config.Tracing, cfgTeleme var err error initGlobalsOnce.Do(func() { err = func() error { - prometheus = ginprom.New(ginprom.Namespace("service"), ginprom.Token(cfgProm.AuthToken())) + ginPrometheus = ginprom.New(ginprom.Namespace("service"), ginprom.Token(cfgProm.AuthToken())) grpcOpts = loop.NewGRPCOpts(nil) // default prometheus.Registerer otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) { @@ -139,6 +140,7 @@ type Shell struct { Renderer Config chainlink.GeneralConfig // initialized in Before Logger logger.Logger // initialized in Before + Registerer prometheus.Registerer // initialized in Before CloseLogger func() error // called in After AppFactory AppFactory KeyStoreAuthenticator TerminalKeyStoreAuthenticator @@ -178,14 +180,14 @@ func (s *Shell) configExitErr(validateFn func() error) cli.ExitCoder { // AppFactory implements the NewApplication method. type AppFactory interface { - NewApplication(ctx context.Context, cfg chainlink.GeneralConfig, appLggr logger.Logger, db *sqlx.DB, keyStoreAuthenticator TerminalKeyStoreAuthenticator) (chainlink.Application, error) + NewApplication(ctx context.Context, cfg chainlink.GeneralConfig, appLggr logger.Logger, appRegisterer prometheus.Registerer, db *sqlx.DB, keyStoreAuthenticator TerminalKeyStoreAuthenticator) (chainlink.Application, error) } // ChainlinkAppFactory is used to create a new Application. type ChainlinkAppFactory struct{} // NewApplication returns a new instance of the node with the given config. -func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.GeneralConfig, appLggr logger.Logger, db *sqlx.DB, keyStoreAuthenticator TerminalKeyStoreAuthenticator) (app chainlink.Application, err error) { +func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.GeneralConfig, appLggr logger.Logger, appRegisterer prometheus.Registerer, db *sqlx.DB, keyStoreAuthenticator TerminalKeyStoreAuthenticator) (app chainlink.Application, err error) { err = migrate.SetMigrationENVVars(cfg) if err != nil { return nil, err @@ -237,6 +239,7 @@ func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.G // create the relayer-chain interoperators from application configuration relayerFactory := chainlink.RelayerFactory{ Logger: appLggr, + Registerer: appRegisterer, LoopRegistry: loopRegistry, GRPCOpts: grpcOpts, MercuryPool: mercuryPool, @@ -425,7 +428,7 @@ func (n ChainlinkRunner) Run(ctx context.Context, app chainlink.Application) err return errors.New("You must specify at least one port to listen on") } - handler, err := web.NewRouter(app, prometheus) + handler, err := web.NewRouter(app, ginPrometheus) if err != nil { return errors.Wrap(err, "failed to create web router") } diff --git a/core/cmd/shell_local.go b/core/cmd/shell_local.go index 6261d23ef82..bead4ba5afd 100644 --- a/core/cmd/shell_local.go +++ b/core/cmd/shell_local.go @@ -382,7 +382,7 @@ func (s *Shell) runNode(c *cli.Context) error { // From now on, DB locks and DB connection will be released on every return. // Keep watching on logger.Fatal* calls and os.Exit(), because defer will not be executed. - app, err := s.AppFactory.NewApplication(rootCtx, s.Config, s.Logger, ldb.DB(), s.KeyStoreAuthenticator) + app, err := s.AppFactory.NewApplication(rootCtx, s.Config, s.Logger, s.Registerer, ldb.DB(), s.KeyStoreAuthenticator) if err != nil { return s.errorOut(errors.Wrap(err, "fatal error instantiating application")) } @@ -629,7 +629,7 @@ func (s *Shell) RebroadcastTransactions(c *cli.Context) (err error) { } defer lggr.ErrorIfFn(db.Close, "Error closing db") - app, err := s.AppFactory.NewApplication(ctx, s.Config, lggr, db, s.KeyStoreAuthenticator) + app, err := s.AppFactory.NewApplication(ctx, s.Config, lggr, s.Registerer, db, s.KeyStoreAuthenticator) if err != nil { return s.errorOut(errors.Wrap(err, "fatal error instantiating application")) } @@ -1275,7 +1275,7 @@ func (s *Shell) RemoveBlocks(c *cli.Context) error { // From now on, DB locks and DB connection will be released on every return. // Keep watching on logger.Fatal* calls and os.Exit(), because defer will not be executed. - app, err := s.AppFactory.NewApplication(ctx, s.Config, s.Logger, ldb.DB(), s.KeyStoreAuthenticator) + app, err := s.AppFactory.NewApplication(ctx, s.Config, s.Logger, s.Registerer, ldb.DB(), s.KeyStoreAuthenticator) if err != nil { return s.errorOut(errors.Wrap(err, "fatal error instantiating application")) } diff --git a/core/internal/cltest/cltest.go b/core/internal/cltest/cltest.go index 29515df7034..554b11b5aa8 100644 --- a/core/internal/cltest/cltest.go +++ b/core/internal/cltest/cltest.go @@ -29,6 +29,7 @@ import ( "github.com/jmoiron/sqlx" "github.com/manyminds/api2go/jsonapi" "github.com/onsi/gomega" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -406,6 +407,7 @@ func NewApplicationWithConfig(t testing.TB, cfg chainlink.GeneralConfig, flagsAn Logger: lggr, LoopRegistry: loopRegistry, GRPCOpts: loop.GRPCOpts{}, + Registerer: prometheus.NewRegistry(), // Don't use global registry here since otherwise multiple apps can create name conflicts. Could also potentially give a mock registry to test prometheus. MercuryPool: mercuryPool, CapabilitiesRegistry: capabilitiesRegistry, HTTPClient: c, diff --git a/core/internal/cltest/mocks.go b/core/internal/cltest/mocks.go index b8bb4657056..17c79f00831 100644 --- a/core/internal/cltest/mocks.go +++ b/core/internal/cltest/mocks.go @@ -11,6 +11,7 @@ import ( "time" "github.com/jmoiron/sqlx" + "github.com/prometheus/client_golang/prometheus" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" @@ -87,7 +88,7 @@ type InstanceAppFactoryWithKeystoreMock struct { } // NewApplication creates a new application with specified config and calls the authenticate function of the keystore -func (f InstanceAppFactoryWithKeystoreMock) NewApplication(ctx context.Context, cfg chainlink.GeneralConfig, lggr logger.Logger, db *sqlx.DB, ks cmd.TerminalKeyStoreAuthenticator) (chainlink.Application, error) { +func (f InstanceAppFactoryWithKeystoreMock) NewApplication(ctx context.Context, cfg chainlink.GeneralConfig, lggr logger.Logger, registerer prometheus.Registerer, db *sqlx.DB, ks cmd.TerminalKeyStoreAuthenticator) (chainlink.Application, error) { keyStore := f.App.GetKeyStore() err := ks.Authenticate(ctx, keyStore, cfg.Password()) if err != nil { @@ -102,7 +103,7 @@ type InstanceAppFactory struct { } // NewApplication creates a new application with specified config -func (f InstanceAppFactory) NewApplication(context.Context, chainlink.GeneralConfig, logger.Logger, *sqlx.DB, cmd.TerminalKeyStoreAuthenticator) (chainlink.Application, error) { +func (f InstanceAppFactory) NewApplication(context.Context, chainlink.GeneralConfig, logger.Logger, prometheus.Registerer, *sqlx.DB, cmd.TerminalKeyStoreAuthenticator) (chainlink.Application, error) { return f.App, nil } @@ -110,7 +111,7 @@ type seededAppFactory struct { Application chainlink.Application } -func (s seededAppFactory) NewApplication(context.Context, chainlink.GeneralConfig, logger.Logger, *sqlx.DB, cmd.TerminalKeyStoreAuthenticator) (chainlink.Application, error) { +func (s seededAppFactory) NewApplication(context.Context, chainlink.GeneralConfig, logger.Logger, prometheus.Registerer, *sqlx.DB, cmd.TerminalKeyStoreAuthenticator) (chainlink.Application, error) { return noopStopApplication{s.Application}, nil } diff --git a/core/services/chainlink/relayer_factory.go b/core/services/chainlink/relayer_factory.go index cec7e5bb48c..32b64d402b1 100644 --- a/core/services/chainlink/relayer_factory.go +++ b/core/services/chainlink/relayer_factory.go @@ -7,6 +7,7 @@ import ( "net/http" "github.com/pelletier/go-toml/v2" + "github.com/prometheus/client_golang/prometheus" "github.com/smartcontractkit/chainlink-common/pkg/loop" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" @@ -37,6 +38,7 @@ type RelayerFactory struct { logger.Logger *plugins.LoopRegistry loop.GRPCOpts + Registerer prometheus.Registerer MercuryPool wsrpc.Pool CapabilitiesRegistry coretypes.CapabilitiesRegistry HTTPClient *http.Client @@ -81,6 +83,7 @@ func (r *RelayerFactory) NewEVM(ctx context.Context, config EVMFactoryConfig) (m relayerOpts := evmrelay.RelayerOpts{ DS: ccOpts.DS, + Registerer: r.Registerer, CSAETHKeystore: config.CSAETHKeystore, MercuryPool: r.MercuryPool, MercuryConfig: config.MercuryConfig, diff --git a/core/services/llo/bm/dummy_transmitter.go b/core/services/llo/bm/dummy_transmitter.go index b7fa2bd9e15..f62635a7953 100644 --- a/core/services/llo/bm/dummy_transmitter.go +++ b/core/services/llo/bm/dummy_transmitter.go @@ -23,9 +23,11 @@ import ( // A dummy transmitter useful for benchmarking and testing var ( - transmitSuccessCount = promauto.NewCounter(prometheus.CounterOpts{ - Name: "llo_transmit_success_count", - Help: "Running count of successful transmits", + promTransmitSuccessCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: "llo", + Subsystem: "dummytransmitter", + Name: "transmit_success_count", + Help: "Running count of successful transmits", }) ) @@ -101,7 +103,7 @@ func (t *transmitter) Transmit( lggr.Debugw(fmt.Sprintf("Failed to decode report with type %s", report.Info.ReportFormat), "err", err) } } - transmitSuccessCount.Inc() + promTransmitSuccessCount.Inc() lggr.Infow("Transmit (dummy)", "digest", digest, "seqNr", seqNr, "report.Report", report.Report, "report.Info", report.Info, "sigs", sigs) return nil } diff --git a/core/services/llo/data_source.go b/core/services/llo/data_source.go index 0585dec49dc..481fd0b790a 100644 --- a/core/services/llo/data_source.go +++ b/core/services/llo/data_source.go @@ -24,14 +24,18 @@ import ( var ( promMissingStreamCount = promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "llo_stream_missing_count", - Help: "Number of times we tried to observe a stream, but it was missing", + Namespace: "llo", + Subsystem: "datasource", + Name: "stream_missing_count", + Help: "Number of times we tried to observe a stream, but it was missing", }, []string{"streamID"}, ) promObservationErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "llo_stream_observation_error_count", - Help: "Number of times we tried to observe a stream, but it failed with an error", + Namespace: "llo", + Subsystem: "datasource", + Name: "stream_observation_error_count", + Help: "Number of times we tried to observe a stream, but it failed with an error", }, []string{"streamID"}, ) diff --git a/core/services/llo/mercurytransmitter/queue.go b/core/services/llo/mercurytransmitter/queue.go index eae9a0b9d0c..6610010a469 100644 --- a/core/services/llo/mercurytransmitter/queue.go +++ b/core/services/llo/mercurytransmitter/queue.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "strconv" "sync" "time" @@ -22,9 +23,11 @@ type asyncDeleter interface { var _ services.Service = (*transmitQueue)(nil) -var transmitQueueLoad = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Name: "llo_transmit_queue_load", - Help: "Current count of items in the transmit queue", +var promTransmitQueueLoad = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "llo", + Subsystem: "mercurytransmitter", + Name: "transmit_queue_load", + Help: "Current count of items in the transmit queue", }, []string{"donID", "serverURL", "capacity"}, ) @@ -75,7 +78,7 @@ func NewTransmitQueue(lggr logger.Logger, serverURL string, maxlen int, asyncDel maxlen, false, nil, - transmitQueueLoad.WithLabelValues(fmt.Sprintf("%d", asyncDeleter.DonID()), serverURL, fmt.Sprintf("%d", maxlen)), + promTransmitQueueLoad.WithLabelValues(strconv.FormatUint(uint64(asyncDeleter.DonID()), 10), serverURL, strconv.FormatInt(int64(maxlen), 10)), } } diff --git a/core/services/llo/mercurytransmitter/server.go b/core/services/llo/mercurytransmitter/server.go index 22472349c25..308ff6a73a4 100644 --- a/core/services/llo/mercurytransmitter/server.go +++ b/core/services/llo/mercurytransmitter/server.go @@ -3,7 +3,9 @@ package mercurytransmitter import ( "context" "fmt" + "strconv" "sync" + "sync/atomic" "time" "github.com/jpillora/backoff" @@ -28,27 +30,35 @@ import ( ) var ( - transmitQueueDeleteErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "llo_mercury_transmit_queue_delete_error_count", - Help: "Running count of DB errors when trying to delete an item from the queue DB", + promTransmitQueueDeleteErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "llo", + Subsystem: "mercurytransmitter", + Name: "transmit_queue_delete_error_count", + Help: "Running count of DB errors when trying to delete an item from the queue DB", }, []string{"donID", "serverURL"}, ) - transmitQueueInsertErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "llo_mercury_transmit_queue_insert_error_count", - Help: "Running count of DB errors when trying to insert an item into the queue DB", + promTransmitQueueInsertErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "llo", + Subsystem: "mercurytransmitter", + Name: "transmit_queue_insert_error_count", + Help: "Running count of DB errors when trying to insert an item into the queue DB", }, []string{"donID", "serverURL"}, ) - transmitQueuePushErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "llo_mercury_transmit_queue_push_error_count", - Help: "Running count of DB errors when trying to push an item onto the queue", + promTransmitQueuePushErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "llo", + Subsystem: "mercurytransmitter", + Name: "transmit_queue_push_error_count", + Help: "Running count of DB errors when trying to push an item onto the queue", }, []string{"donID", "serverURL"}, ) - transmitServerErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "llo_mercury_transmit_server_error_count", - Help: "Number of errored transmissions that failed due to an error returned by the mercury server", + promTransmitServerErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "llo", + Subsystem: "mercurytransmitter", + Name: "transmit_server_error_count", + Help: "Number of errored transmissions that failed due to an error returned by the mercury server", }, []string{"donID", "serverURL", "code"}, ) @@ -83,6 +93,9 @@ type server struct { transmitQueueDeleteErrorCount prometheus.Counter transmitQueueInsertErrorCount prometheus.Counter transmitQueuePushErrorCount prometheus.Counter + + transmitThreadBusyCount atomic.Int32 + deleteThreadBusyCount atomic.Int32 } type QueueConfig interface { @@ -100,7 +113,7 @@ func newServer(lggr logger.Logger, verboseLogging bool, cfg QueueConfig, client codecLggr = corelogger.NullLogger } - return &server{ + s := &server{ logger.Sugared(lggr), verboseLogging, cfg.TransmitTimeout().Duration(), @@ -111,13 +124,17 @@ func newServer(lggr logger.Logger, verboseLogging bool, cfg QueueConfig, client serverURL, evm.NewReportCodecPremiumLegacy(codecLggr), llo.JSONReportCodec{}, - transmitSuccessCount.WithLabelValues(donIDStr, serverURL), - transmitDuplicateCount.WithLabelValues(donIDStr, serverURL), - transmitConnectionErrorCount.WithLabelValues(donIDStr, serverURL), - transmitQueueDeleteErrorCount.WithLabelValues(donIDStr, serverURL), - transmitQueueInsertErrorCount.WithLabelValues(donIDStr, serverURL), - transmitQueuePushErrorCount.WithLabelValues(donIDStr, serverURL), + promTransmitSuccessCount.WithLabelValues(donIDStr, serverURL), + promTransmitDuplicateCount.WithLabelValues(donIDStr, serverURL), + promTransmitConnectionErrorCount.WithLabelValues(donIDStr, serverURL), + promTransmitQueueDeleteErrorCount.WithLabelValues(donIDStr, serverURL), + promTransmitQueueInsertErrorCount.WithLabelValues(donIDStr, serverURL), + promTransmitQueuePushErrorCount.WithLabelValues(donIDStr, serverURL), + atomic.Int32{}, + atomic.Int32{}, } + + return s } func (s *server) HealthReport() map[string]error { @@ -144,6 +161,7 @@ func (s *server) runDeleteQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup select { case hash := <-s.deleteQueue: for { + s.deleteThreadBusyCount.Add(1) if err := s.pm.orm.Delete(ctx, [][32]byte{hash}); err != nil { s.lggr.Errorw("Failed to delete transmission record", "err", err, "transmissionHash", hash) s.transmitQueueDeleteErrorCount.Inc() @@ -152,6 +170,7 @@ func (s *server) runDeleteQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup // Wait a backoff duration before trying to delete again continue case <-stopCh: + s.deleteThreadBusyCount.Add(-1) // abort and return immediately on stop even if items remain in queue return } @@ -160,6 +179,7 @@ func (s *server) runDeleteQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup } // success b.Reset() + s.deleteThreadBusyCount.Add(-1) case <-stopCh: // abort and return immediately on stop even if items remain in queue return @@ -179,61 +199,69 @@ func (s *server) runQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup, donI } ctx, cancel := stopCh.NewCtx() defer cancel() - for { - t := s.q.BlockingPop() - if t == nil { - // queue was closed - return - } - req, res, err := func(ctx context.Context) (*pb.TransmitRequest, *pb.TransmitResponse, error) { - ctx, cancelFn := context.WithTimeout(ctx, utils.WithJitter(s.transmitTimeout)) - defer cancelFn() - return s.transmit(ctx, t) - }(ctx) - if ctx.Err() != nil { - // only canceled on transmitter close so we can exit - return - } else if err != nil { - s.transmitConnectionErrorCount.Inc() - s.lggr.Errorw("Transmit report failed", "err", err, "req.Payload", req.Payload, "req.ReportFormat", req.ReportFormat, "transmission", t) - if ok := s.q.Push(t); !ok { - s.lggr.Error("Failed to push report to transmit queue; queue is closed") - return + cont := true + for cont { + cont = func() bool { + t := s.q.BlockingPop() + if t == nil { + // queue was closed + return false } - // Wait a backoff duration before pulling the most recent transmission - // the heap - select { - case <-time.After(b.Duration()): - continue - case <-stopCh: - return + + s.transmitThreadBusyCount.Add(1) + defer s.transmitThreadBusyCount.Add(-1) + + req, res, err := func(ctx context.Context) (*pb.TransmitRequest, *pb.TransmitResponse, error) { + ctx, cancelFn := context.WithTimeout(ctx, utils.WithJitter(s.transmitTimeout)) + defer cancelFn() + return s.transmit(ctx, t) + }(ctx) + if ctx.Err() != nil { + // only canceled on transmitter close so we can exit + return false + } else if err != nil { + s.transmitConnectionErrorCount.Inc() + s.lggr.Errorw("Transmit report failed", "err", err, "req.Payload", req.Payload, "req.ReportFormat", req.ReportFormat, "transmission", t) + if ok := s.q.Push(t); !ok { + s.lggr.Error("Failed to push report to transmit queue; queue is closed") + return false + } + // Wait a backoff duration before pulling the most recent transmission + // the heap + select { + case <-time.After(b.Duration()): + return true + case <-stopCh: + return false + } } - } - b.Reset() - if res.Error == "" { - s.transmitSuccessCount.Inc() - s.lggr.Debugw("Transmit report success", "req.ReportFormat", req.ReportFormat, "req.Payload", req.Payload, "transmission", t, "response", res) - } else { - // We don't need to retry here because the mercury server - // has confirmed it received the report. We only need to retry - // on networking/unknown errors - switch res.Code { - case DuplicateReport: + b.Reset() + if res.Error == "" { s.transmitSuccessCount.Inc() - s.transmitDuplicateCount.Inc() - s.lggr.Debugw("Transmit report success; duplicate report", "req.ReportFormat", req.ReportFormat, "req.Payload", req.Payload, "transmission", t, "response", res) - default: - transmitServerErrorCount.WithLabelValues(donIDStr, s.url, fmt.Sprintf("%d", res.Code)).Inc() - s.lggr.Errorw("Transmit report failed; mercury server returned error", "req.ReportFormat", req.ReportFormat, "req.Payload", req.Payload, "response", res, "transmission", t, "err", res.Error, "code", res.Code) + s.lggr.Debugw("Transmit report success", "req.ReportFormat", req.ReportFormat, "req.Payload", req.Payload, "transmission", t, "response", res) + } else { + // We don't need to retry here because the mercury server + // has confirmed it received the report. We only need to retry + // on networking/unknown errors + switch res.Code { + case DuplicateReport: + s.transmitSuccessCount.Inc() + s.transmitDuplicateCount.Inc() + s.lggr.Debugw("Transmit report success; duplicate report", "req.ReportFormat", req.ReportFormat, "req.Payload", req.Payload, "transmission", t, "response", res) + default: + promTransmitServerErrorCount.WithLabelValues(donIDStr, s.url, strconv.FormatInt(int64(res.Code), 10)).Inc() + s.lggr.Errorw("Transmit report failed; mercury server returned error", "req.ReportFormat", req.ReportFormat, "req.Payload", req.Payload, "response", res, "transmission", t, "err", res.Error, "code", res.Code) + } } - } - select { - case s.deleteQueue <- t.Hash(): - default: - s.lggr.Criticalw("Delete queue is full", "transmission", t, "transmissionHash", fmt.Sprintf("%x", t.Hash())) - } + select { + case s.deleteQueue <- t.Hash(): + default: + s.lggr.Criticalw("Delete queue is full", "transmission", t, "transmissionHash", fmt.Sprintf("%x", t.Hash())) + } + return true + }() } } diff --git a/core/services/llo/mercurytransmitter/transmitter.go b/core/services/llo/mercurytransmitter/transmitter.go index 024a98174c6..8e60bf938a5 100644 --- a/core/services/llo/mercurytransmitter/transmitter.go +++ b/core/services/llo/mercurytransmitter/transmitter.go @@ -33,21 +33,27 @@ const ( ) var ( - transmitSuccessCount = promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "llo_mercury_transmit_success_count", - Help: "Number of successful transmissions (duplicates are counted as success)", + promTransmitSuccessCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "llo", + Subsystem: "mercurytransmitter", + Name: "transmit_success_count", + Help: "Number of successful transmissions (duplicates are counted as success)", }, []string{"donID", "serverURL"}, ) - transmitDuplicateCount = promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "llo_mercury_transmit_duplicate_count", - Help: "Number of transmissions where the server told us it was a duplicate", + promTransmitDuplicateCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "llo", + Subsystem: "mercurytransmitter", + Name: "transmit_duplicate_count", + Help: "Number of transmissions where the server told us it was a duplicate", }, []string{"donID", "serverURL"}, ) - transmitConnectionErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "llo_mercury_transmit_connection_error_count", - Help: "Number of errored transmissions that failed due to problem with the connection", + promTransmitConnectionErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "llo", + Subsystem: "mercurytransmitter", + Name: "transmit_connection_error_count", + Help: "Number of errored transmissions that failed due to problem with the connection", }, []string{"donID", "serverURL"}, ) @@ -107,8 +113,10 @@ type transmitter struct { verboseLogging bool cfg Config - orm ORM - servers map[string]*server + orm ORM + servers map[string]*server + registerer prometheus.Registerer + collectors []prometheus.Collector donID uint32 fromAccount string @@ -119,6 +127,7 @@ type transmitter struct { type Opts struct { Lggr logger.Logger + Registerer prometheus.Registerer VerboseLogging bool Cfg Config Clients map[string]wsrpc.Client @@ -145,6 +154,8 @@ func newTransmitter(opts Opts) *transmitter { opts.Cfg, opts.ORM, servers, + opts.Registerer, + nil, opts.DonID, fmt.Sprintf("%x", opts.FromAccount), make(services.StopChan), @@ -183,6 +194,31 @@ func (mt *transmitter) Start(ctx context.Context) (err error) { go s.runDeleteQueueLoop(mt.stopCh, mt.wg) go s.runQueueLoop(mt.stopCh, mt.wg, donIDStr) } + mt.collectors = append(mt.collectors, prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: "llo", + Subsystem: "mercurytransmitter", + Name: "concurrent_transmit_gauge", + Help: "Gauge that measures the number of transmit threads currently waiting on a remote transmit call. You may wish to alert if this exceeds some number for a given period of time, or if it ever reaches its max.", + ConstLabels: prometheus.Labels{"donID": donIDStr, "serverURL": s.url, "maxConcurrentTransmits": strconv.FormatInt(int64(nThreads), 10)}, + }, func() float64 { + return float64(s.transmitThreadBusyCount.Load()) + })) + mt.collectors = append(mt.collectors, prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: "llo", + Subsystem: "mercurytransmitter", + Name: "concurrent_delete_gauge", + Help: "Gauge that measures the number of delete threads currently waiting on a delete call to the DB. You may wish to alert if this exceeds some number for a given period of time, or if it ever reaches its max.", + ConstLabels: prometheus.Labels{"donID": donIDStr, "serverURL": s.url, "maxConcurrentDeletes": strconv.FormatInt(int64(nThreads), 10)}, + }, func() float64 { + return float64(s.deleteThreadBusyCount.Load()) + })) + for _, c := range mt.collectors { + if err := mt.registerer.Register(c); err != nil { + return err + } + } } if err := (&services.MultiStart{}).Start(ctx, startClosers...); err != nil { return err @@ -214,7 +250,12 @@ func (mt *transmitter) Close() error { closers = append(closers, s.pm) closers = append(closers, s.c) } - return services.CloseAll(closers...) + err := services.CloseAll(closers...) + // Unregister all the gauge funcs + for _, c := range mt.collectors { + mt.registerer.Unregister(c) + } + return err }) } diff --git a/core/services/relay/evm/evm.go b/core/services/relay/evm/evm.go index 8008fc4fd9e..0be78caf249 100644 --- a/core/services/relay/evm/evm.go +++ b/core/services/relay/evm/evm.go @@ -17,6 +17,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/google/uuid" pkgerrors "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "golang.org/x/exp/maps" "github.com/smartcontractkit/libocr/gethwrappers2/ocr2aggregator" @@ -143,6 +144,7 @@ type Relayer struct { ds sqlutil.DataSource chain legacyevm.Chain lggr logger.SugaredLogger + registerer prometheus.Registerer ks CSAETHKeystore mercuryPool wsrpc.Pool codec commontypes.Codec @@ -169,7 +171,8 @@ type MercuryConfig interface { } type RelayerOpts struct { - DS sqlutil.DataSource + DS sqlutil.DataSource + Registerer prometheus.Registerer CSAETHKeystore MercuryPool wsrpc.Pool RetirementReportCache llo.RetirementReportCache @@ -214,6 +217,7 @@ func NewRelayer(ctx context.Context, lggr logger.Logger, chain legacyevm.Chain, ds: opts.DS, chain: chain, lggr: sugared, + registerer: opts.Registerer, ks: opts.CSAETHKeystore, mercuryPool: opts.MercuryPool, cdcFactory: cdcFactory, @@ -563,6 +567,7 @@ func (r *Relayer) NewLLOProvider(ctx context.Context, rargs commontypes.RelayArg VerboseLogging: r.mercuryCfg.VerboseLogging(), MercuryTransmitterOpts: mercurytransmitter.Opts{ Lggr: r.lggr, + Registerer: r.registerer, VerboseLogging: r.mercuryCfg.VerboseLogging(), Cfg: r.mercuryCfg.Transmitter(), Clients: clients,