Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix telemetry manager health report #11397

Merged
merged 8 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/services/synchronization/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ import (

// NewTestTelemetryIngressClient calls NewTelemetryIngressClient and injects telemClient.
func NewTestTelemetryIngressClient(t *testing.T, url *url.URL, serverPubKeyHex string, ks keystore.CSA, logging bool, telemClient telemPb.TelemClient) TelemetryService {
tc := NewTelemetryIngressClient(url, serverPubKeyHex, ks, logging, logger.TestLogger(t), 100)
tc := NewTelemetryIngressClient(url, serverPubKeyHex, ks, logging, logger.TestLogger(t), 100, "test", "test")
tc.(*telemetryIngressClient).telemClient = telemClient
return tc
}

// NewTestTelemetryIngressBatchClient calls NewTelemetryIngressBatchClient and injects telemClient.
func NewTestTelemetryIngressBatchClient(t *testing.T, url *url.URL, serverPubKeyHex string, ks keystore.CSA, logging bool, telemClient telemPb.TelemClient, sendInterval time.Duration, uniconn bool) TelemetryService {
tc := NewTelemetryIngressBatchClient(url, serverPubKeyHex, ks, logging, logger.TestLogger(t), 100, 50, sendInterval, time.Second, uniconn)
tc := NewTelemetryIngressBatchClient(url, serverPubKeyHex, ks, logging, logger.TestLogger(t), 100, 50, sendInterval, time.Second, uniconn, "test", "test")
tc.(*telemetryIngressBatchClient).close = func() error { return nil }
tc.(*telemetryIngressBatchClient).telemClient = telemClient
return tc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type telemetryIngressBatchClient struct {

// NewTelemetryIngressBatchClient returns a client backed by wsrpc that
// can send telemetry to the telemetry ingress server
func NewTelemetryIngressBatchClient(url *url.URL, serverPubKeyHex string, ks keystore.CSA, logging bool, lggr logger.Logger, telemBufferSize uint, telemMaxBatchSize uint, telemSendInterval time.Duration, telemSendTimeout time.Duration, useUniconn bool) TelemetryService {
func NewTelemetryIngressBatchClient(url *url.URL, serverPubKeyHex string, ks keystore.CSA, logging bool, lggr logger.Logger, telemBufferSize uint, telemMaxBatchSize uint, telemSendInterval time.Duration, telemSendTimeout time.Duration, useUniconn bool, network string, chainID string) TelemetryService {
return &telemetryIngressBatchClient{
telemBufferSize: telemBufferSize,
telemMaxBatchSize: telemMaxBatchSize,
Expand All @@ -77,7 +77,7 @@ func NewTelemetryIngressBatchClient(url *url.URL, serverPubKeyHex string, ks key
serverPubKeyHex: serverPubKeyHex,
globalLogger: lggr,
logging: logging,
lggr: lggr.Named("TelemetryIngressBatchClient"),
lggr: lggr.Named("TelemetryIngressBatchClient").Named(network).Named(chainID),
chDone: make(chan struct{}),
workers: make(map[string]*telemetryIngressBatchWorker),
useUniConn: useUniconn,
Expand Down
4 changes: 2 additions & 2 deletions core/services/synchronization/telemetry_ingress_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ type telemetryIngressClient struct {

// NewTelemetryIngressClient returns a client backed by wsrpc that
// can send telemetry to the telemetry ingress server
func NewTelemetryIngressClient(url *url.URL, serverPubKeyHex string, ks keystore.CSA, logging bool, lggr logger.Logger, telemBufferSize uint) TelemetryService {
func NewTelemetryIngressClient(url *url.URL, serverPubKeyHex string, ks keystore.CSA, logging bool, lggr logger.Logger, telemBufferSize uint, network string, chainID string) TelemetryService {
return &telemetryIngressClient{
url: url,
ks: ks,
serverPubKeyHex: serverPubKeyHex,
logging: logging,
lggr: lggr.Named("TelemetryIngressClient"),
lggr: lggr.Named("TelemetryIngressClient").Named(network).Named(chainID),
chTelemetry: make(chan TelemPayload, telemBufferSize),
chDone: make(services.StopChan),
}
Expand Down
13 changes: 5 additions & 8 deletions core/services/telemetry/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package telemetry

import (
"context"
"fmt"
"net/url"
"strings"
"time"
Expand Down Expand Up @@ -68,7 +67,6 @@ func (l *legacyEndpointConfig) URL() *url.URL {
}

type telemetryEndpoint struct {
services.StateMachine
ChainID string
Network string
URL *url.URL
Expand Down Expand Up @@ -140,11 +138,10 @@ func (m *Manager) Name() string {
}

func (m *Manager) HealthReport() map[string]error {
hr := make(map[string]error)
hr[m.lggr.Name()] = m.Healthy()
hr := map[string]error{m.Name(): m.Healthy()}

for _, e := range m.endpoints {
name := fmt.Sprintf("%s.%s.%s", m.lggr.Name(), e.Network, e.ChainID)
hr[name] = e.Healthy()
services.CopyHealth(hr, e.client.HealthReport())
}
return hr
}
Expand Down Expand Up @@ -190,9 +187,9 @@ func (m *Manager) addEndpoint(e config.TelemetryIngressEndpoint) error {

var tClient synchronization.TelemetryService
if m.useBatchSend {
tClient = synchronization.NewTelemetryIngressBatchClient(e.URL(), e.ServerPubKey(), m.ks, m.logging, m.lggr, m.bufferSize, m.maxBatchSize, m.sendInterval, m.sendTimeout, m.uniConn)
tClient = synchronization.NewTelemetryIngressBatchClient(e.URL(), e.ServerPubKey(), m.ks, m.logging, m.lggr, m.bufferSize, m.maxBatchSize, m.sendInterval, m.sendTimeout, m.uniConn, e.Network(), e.ChainID())
} else {
tClient = synchronization.NewTelemetryIngressClient(e.URL(), e.ServerPubKey(), m.ks, m.logging, m.lggr, m.bufferSize)
tClient = synchronization.NewTelemetryIngressClient(e.URL(), e.ServerPubKey(), m.ks, m.logging, m.lggr, m.bufferSize, e.Network(), e.ChainID())
}

te := telemetryEndpoint{
Expand Down
8 changes: 3 additions & 5 deletions core/services/telemetry/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"

"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink/v2/core/config"
"github.com/smartcontractkit/chainlink/v2/core/config/mocks"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
Expand Down Expand Up @@ -246,10 +245,9 @@ func TestCorrectEndpointRouting(t *testing.T) {
})

tm.endpoints[i] = &telemetryEndpoint{
StateMachine: services.StateMachine{},
ChainID: e.chainID,
Network: e.network,
client: clientMock,
ChainID: e.chainID,
Network: e.network,
client: clientMock,
}

}
Expand Down
3 changes: 3 additions & 0 deletions docs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [dev]

### Fixed
- Fixed a bug that caused the Telemetry Manager to report incorrect health

### Added

- Added a tracker component to the txmgr for tracking and gracefully handling abandoned transactions. Abandoned transactions occur when a fromAddress is removed from the keystore by a node operator. The tracker gives abandoned transactions a chance to be finalized on chain, or marks them as fatal_error if they are not finalized within a specified time to live (default 6hrs).
Expand Down
Loading