Skip to content

Commit

Permalink
CCIP-2840 Add basic monitoring for telemetry client (#14661)
Browse files Browse the repository at this point in the history
* Add basic monitoring for telemetry client

* Add changeset

* Fix changeset

* Add tests

* Fix tests

* Fix sonar findings

* Fixes after review

* Swap mutex with RWmutex

* Address review feedback
  • Loading branch information
emate authored Oct 8, 2024
1 parent 1b41e69 commit 9641ea2
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 1 deletion.
5 changes: 5 additions & 0 deletions .changeset/giant-pillows-sort.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#added Add prometheus metrics exposing health of telemetry client
33 changes: 33 additions & 0 deletions core/services/synchronization/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package synchronization

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

var (
TelemetryClientConnectionStatus = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "telemetry_client_connection_status",
Help: "Status of the connection to the telemetry ingress server",
}, []string{"endpoint"})

TelemetryClientMessagesSent = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "telemetry_client_messages_sent",
Help: "Number of telemetry messages sent to the telemetry ingress server",
}, []string{"endpoint", "telemetry_type"})

TelemetryClientMessagesSendErrors = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "telemetry_client_messages_send_errors",
Help: "Number of telemetry messages that failed to send to the telemetry ingress server",
}, []string{"endpoint", "telemetry_type"})

TelemetryClientMessagesDropped = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "telemetry_client_messages_dropped",
Help: "Number of telemetry messages dropped",
}, []string{"endpoint", "telemetry_type"})

TelemetryClientWorkers = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "telemetry_client_workers",
Help: "Number of telemetry workers",
}, []string{"endpoint", "telemetry_type"})
)
37 changes: 36 additions & 1 deletion core/services/synchronization/telemetry_ingress_batch_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/smartcontractkit/wsrpc"
"github.com/smartcontractkit/wsrpc/examples/simple/keys"
"google.golang.org/grpc/connectivity"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
Expand Down Expand Up @@ -57,9 +58,11 @@ type telemetryIngressBatchClient struct {
telemSendTimeout time.Duration

workers map[string]*telemetryIngressBatchWorker
workersMutex sync.Mutex
workersMutex sync.RWMutex

useUniConn bool

healthMonitorCancel context.CancelFunc
}

// NewTelemetryIngressBatchClient returns a client backed by wsrpc that
Expand Down Expand Up @@ -127,14 +130,43 @@ func (tc *telemetryIngressBatchClient) start(ctx context.Context) error {
}
tc.telemClient = telemPb.NewTelemClient(conn)
tc.closeFn = func() error { conn.Close(); return nil }
tc.startHealthMonitoring(ctx, conn)
}
}

return nil
}

// startHealthMonitoring starts a goroutine to monitor the connection state and update other relevant metrics every 5 seconds
func (tc *telemetryIngressBatchClient) startHealthMonitoring(ctx context.Context, conn *wsrpc.ClientConn) {
_, cancel := context.WithCancel(ctx)
tc.healthMonitorCancel = cancel

tc.eng.Go(func(ctx context.Context) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

for {
select {
case <-ticker.C:
// Check the connection state
connected := float64(0)
if conn.GetState() == connectivity.Ready {
connected = float64(1)
}
TelemetryClientConnectionStatus.WithLabelValues(tc.url.String()).Set(connected)
case <-ctx.Done():
return
}
}
})
}

// Close disconnects the wsrpc client from the ingress server and waits for all workers to exit
func (tc *telemetryIngressBatchClient) close() error {
if tc.healthMonitorCancel != nil {
tc.healthMonitorCancel()
}
if (tc.useUniConn && tc.connected.Load()) || !tc.useUniConn {
return tc.closeFn()
}
Expand Down Expand Up @@ -197,11 +229,14 @@ func (tc *telemetryIngressBatchClient) findOrCreateWorker(payload TelemPayload)
payload.TelemType,
tc.eng,
tc.logging,
tc.url.String(),
)
tc.eng.GoTick(timeutil.NewTicker(func() time.Duration {
return tc.telemSendInterval
}), worker.Send)
tc.workers[workerKey] = worker

TelemetryClientWorkers.WithLabelValues(tc.url.String(), string(payload.TelemType)).Inc()
}

return worker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ type telemetryIngressBatchWorker struct {
logging bool
lggr logger.Logger
dropMessageCount atomic.Uint32

// endpointURL is used for reporting metrics
endpointURL string
}

// NewTelemetryIngressBatchWorker returns a worker for a given contractID that can send
Expand All @@ -38,6 +41,7 @@ func NewTelemetryIngressBatchWorker(
telemType TelemetryType,
lggr logger.Logger,
logging bool,
endpointURL string,
) *telemetryIngressBatchWorker {
return &telemetryIngressBatchWorker{
telemSendTimeout: telemSendTimeout,
Expand All @@ -48,6 +52,7 @@ func NewTelemetryIngressBatchWorker(
telemType: telemType,
logging: logging,
lggr: logger.Named(lggr, "TelemetryIngressBatchWorker"),
endpointURL: endpointURL,
}
}

Expand All @@ -65,8 +70,10 @@ func (tw *telemetryIngressBatchWorker) Send(ctx context.Context) {

if err != nil {
tw.lggr.Warnf("Could not send telemetry: %v", err)
TelemetryClientMessagesSendErrors.WithLabelValues(tw.endpointURL, string(tw.telemType)).Inc()
return
}
TelemetryClientMessagesSent.WithLabelValues(tw.endpointURL, string(tw.telemType)).Inc()
if tw.logging {
tw.lggr.Debugw("Successfully sent telemetry to ingress server", "contractID", telemBatchReq.ContractId, "telemType", telemBatchReq.TelemetryType, "telemetry", telemBatchReq.Telemetry)
}
Expand All @@ -86,6 +93,8 @@ func (tw *telemetryIngressBatchWorker) Send(ctx context.Context) {
// etc...
func (tw *telemetryIngressBatchWorker) logBufferFullWithExpBackoff(payload TelemPayload) {
count := tw.dropMessageCount.Add(1)
TelemetryClientMessagesDropped.WithLabelValues(tw.endpointURL, string(tw.telemType)).Inc()

if count > 0 && (count%100 == 0 || count&(count-1) == 0) {
tw.lggr.Warnw("telemetry ingress client buffer full, dropping message", "telemetry", payload.Telemetry, "droppedCount", count)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func TestTelemetryIngressWorker_BuildTelemBatchReq(t *testing.T) {
synchronization.OCR,
logger.TestLogger(t),
false,
"test-endpoint",
)

chTelemetry <- telemPayload
Expand Down

0 comments on commit 9641ea2

Please sign in to comment.