Skip to content

Commit

Permalink
Add 'stream' to allowed job type list; fix issue adding large numbers…
Browse files Browse the repository at this point in the history
… of channels (#13692)

* Add 'stream' to allowed job type list

- Improve streams logging; add "Verbose" option
- Bump chainlink-data-streams
- Fix some LLO bugs

* Add changeset

* Add changeset tag

* fix linter

* Latest chainlink-data-streams master
  • Loading branch information
samsondav authored Jul 8, 2024
1 parent bf9122a commit 5f3d58b
Show file tree
Hide file tree
Showing 38 changed files with 242 additions and 79 deletions.
14 changes: 14 additions & 0 deletions .changeset/cuddly-toys-warn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
---
"chainlink": patch
---

Add "VerboseLogging" option to mercury

Off by default, can be enabled like so:

```toml
[Mercury]
VerboseLogging = true
```

#updated
5 changes: 5 additions & 0 deletions core/config/docs/core.toml
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,11 @@ env = 'test' # Example

[Mercury]

# VerboseLogging enables detailed logging of mercury/LLO operations. These logs
# can be expensive since they may serialize large structs, so they are disabled
# by default.
VerboseLogging = false # Default

# Mercury.Cache controls settings for the price retrieval cache querying a mercury server
[Mercury.Cache]
# LatestReportTTL controls how "stale" we will allow a price to be e.g. if
Expand Down
1 change: 1 addition & 0 deletions core/config/mercury_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ type Mercury interface {
Cache() MercuryCache
TLS() MercuryTLS
Transmitter() MercuryTransmitter
VerboseLogging() bool
}
10 changes: 7 additions & 3 deletions core/config/toml/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1324,15 +1324,19 @@ func (m *MercuryTransmitter) setFrom(f *MercuryTransmitter) {
}

type Mercury struct {
Cache MercuryCache `toml:",omitempty"`
TLS MercuryTLS `toml:",omitempty"`
Transmitter MercuryTransmitter `toml:",omitempty"`
Cache MercuryCache `toml:",omitempty"`
TLS MercuryTLS `toml:",omitempty"`
Transmitter MercuryTransmitter `toml:",omitempty"`
VerboseLogging *bool `toml:",omitempty"`
}

func (m *Mercury) setFrom(f *Mercury) {
m.Cache.setFrom(&f.Cache)
m.TLS.setFrom(&f.TLS)
m.Transmitter.setFrom(&f.Transmitter)
if v := f.VerboseLogging; v != nil {
m.VerboseLogging = v
}
}

func (m *Mercury) ValidateConfig() (err error) {
Expand Down
2 changes: 2 additions & 0 deletions core/internal/testutils/configtest/general_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ func overrides(c *chainlink.Config, s *chainlink.Secrets) {
c.JobPipeline.ReaperInterval = commonconfig.MustNewDuration(0)
c.JobPipeline.VerboseLogging = ptr(true)

c.Mercury.VerboseLogging = ptr(true)

c.P2P.V2.Enabled = ptr(false)

c.WebServer.SessionTimeout = commonconfig.MustNewDuration(2 * time.Minute)
Expand Down
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ require (
github.com/shirou/gopsutil/v3 v3.24.3 // indirect
github.com/smartcontractkit/chain-selectors v1.0.10 // indirect
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240621143432-85370a54b141 // indirect
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240702144807-761f63e7b527 // indirect
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240702145022-37a2c3a742d1 // indirect
github.com/smartcontractkit/chainlink-feeds v0.0.0-20240522213638-159fb2d99917 // indirect
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20240708130426-294b81e4afe7 // indirect
github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20240625074951-06ab5e670dba // indirect
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1211,8 +1211,8 @@ github.com/smartcontractkit/chainlink-common v0.1.7-0.20240703234618-dc1fbe45acc
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240703234618-dc1fbe45acc0/go.mod h1:fh9eBbrReCmv31bfz52ENCAMa7nTKQbdhb2B3+S2VGo=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240621143432-85370a54b141 h1:TMOoYaeSDkkI3jkCH7lKHOZaLkeDuxFTNC+XblD6M0M=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240621143432-85370a54b141/go.mod h1:0UNuO3nDt9MFsZPaHJBEUolxVkN0iC69j1ccDp95e8k=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240702144807-761f63e7b527 h1:Vs6myS+bpPwb8chUY7XxveJyhvejknhOmhDTddgsK5I=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240702144807-761f63e7b527/go.mod h1:KRK7KlAEpmORi+nJgT0vxQVWvlLEBQ6zgzXziZuKvUM=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240702145022-37a2c3a742d1 h1:dsTmitRaVizHxoYFoGz4+y/zVa8XnvKUiTaZdx+6t9M=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240702145022-37a2c3a742d1/go.mod h1:6DgCnHMGdBaIh0bLs1dK0MtdeMZfeNhc/nvBUN6KIUg=
github.com/smartcontractkit/chainlink-feeds v0.0.0-20240522213638-159fb2d99917 h1:MD80ZRCTvxxJ8PBmhtrKoTnky8cVNYrCrIBLVRbrOM0=
github.com/smartcontractkit/chainlink-feeds v0.0.0-20240522213638-159fb2d99917/go.mod h1:jwVxhctE6BgLOSSsVq9wbREpZ8Ev34H+UBxeUhESZRs=
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20240708130426-294b81e4afe7 h1:tBJo/Rn0Ur7XtzqTEgIhLYDDWdcXBf985AF0PegsDk8=
Expand Down
4 changes: 4 additions & 0 deletions core/services/chainlink/config_mercury.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,7 @@ func (m *mercuryConfig) TLS() config.MercuryTLS {
func (m *mercuryConfig) Transmitter() config.MercuryTransmitter {
return &mercuryTransmitterConfig{c: m.c.Transmitter}
}

func (m *mercuryConfig) VerboseLogging() bool {
return *m.c.VerboseLogging
}
3 changes: 3 additions & 0 deletions core/services/chainlink/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,7 @@ func TestConfig_Marshal(t *testing.T) {
TransmitQueueMaxSize: ptr(uint32(123)),
TransmitTimeout: commoncfg.MustNewDuration(234 * time.Second),
},
VerboseLogging: ptr(true),
}

for _, tt := range []struct {
Expand Down Expand Up @@ -1186,6 +1187,8 @@ URL = 'http://stark.node'
APIKey = 'key'
`},
{"Mercury", Config{Core: toml.Core{Mercury: full.Mercury}}, `[Mercury]
VerboseLogging = true
[Mercury.Cache]
LatestReportTTL = '1m40s'
MaxStaleAge = '1m41s'
Expand Down
2 changes: 2 additions & 0 deletions core/services/chainlink/testdata/config-empty-effective.toml
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ Mode = 'tls'
TLSCertPath = ''

[Mercury]
VerboseLogging = false

[Mercury.Cache]
LatestReportTTL = '1s'
MaxStaleAge = '1h0m0s'
Expand Down
2 changes: 2 additions & 0 deletions core/services/chainlink/testdata/config-full.toml
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,8 @@ env = 'dev'
test = 'load'

[Mercury]
VerboseLogging = true

[Mercury.Cache]
LatestReportTTL = '1m40s'
MaxStaleAge = '1m41s'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ Mode = 'tls'
TLSCertPath = ''

[Mercury]
VerboseLogging = false

[Mercury.Cache]
LatestReportTTL = '1s'
MaxStaleAge = '1h0m0s'
Expand Down
22 changes: 21 additions & 1 deletion core/services/llo/bm/dummy_transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"github.com/smartcontractkit/libocr/offchainreporting2plus/types"
ocr2types "github.com/smartcontractkit/libocr/offchainreporting2plus/types"

"github.com/smartcontractkit/chainlink-data-streams/llo"

"github.com/smartcontractkit/chainlink-common/pkg/services"
llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo"

Expand Down Expand Up @@ -57,8 +59,26 @@ func (t *transmitter) Transmit(
report ocr3types.ReportWithInfo[llotypes.ReportInfo],
sigs []types.AttributedOnchainSignature,
) error {
lggr := t.lggr
switch report.Info.ReportFormat {
case llotypes.ReportFormatJSON:
r, err := (llo.JSONReportCodec{}).Decode(report.Report)
if err != nil {
lggr.Debugw("Failed to decode JSON report", "err", err)
}
lggr = lggr.With(
"report.Report.ConfigDigest", r.ConfigDigest,
"report.Report.ChainSelector", r.ChainSelector,
"report.Report.SeqNr", r.SeqNr,
"report.Report.ChannelID", r.ChannelID,
"report.Report.ValidAfterSeconds", r.ValidAfterSeconds,
"report.Report.ValidUntilSeconds", r.ValidUntilSeconds,
"report.Report.Values", r.Values,
"report.Report.Specimen", r.Specimen,
)
}
transmitSuccessCount.Inc()
t.lggr.Debugw("Transmit", "digest", digest, "seqNr", seqNr, "report.Report", report.Report, "report.Info", report.Info, "sigs", sigs)
lggr.Infow("Transmit (dummy)", "digest", digest, "seqNr", seqNr, "report.Report", report.Report, "report.Info", report.Info, "sigs", sigs)
return nil
}

Expand Down
134 changes: 94 additions & 40 deletions core/services/llo/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,18 @@ import (
"context"
"fmt"
"math/big"
"sort"
"sync"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"golang.org/x/exp/maps"

llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo"
"github.com/smartcontractkit/chainlink-data-streams/llo"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
"github.com/smartcontractkit/chainlink/v2/core/services/streams"
)

Expand All @@ -31,16 +34,35 @@ var (
)
)

type ErrMissingStream struct {
id string
}

type Registry interface {
Get(streamID streams.StreamID) (strm streams.Stream, exists bool)
}

func (e ErrMissingStream) Error() string {
return fmt.Sprintf("missing stream definition for: %q", e.id)
type ErrObservationFailed struct {
inner error
reason string
streamID streams.StreamID
run *pipeline.Run
}

func (e *ErrObservationFailed) Error() string {
s := fmt.Sprintf("StreamID: %d; Reason: %s", e.streamID, e.reason)
if e.inner != nil {
s += fmt.Sprintf("; Err: %v", e.inner)
}
if e.run != nil {
// NOTE: Could log more info about the run here if necessary
s += fmt.Sprintf("; RunID: %d; RunErrors: %v", e.run.ID, e.run.AllErrors)
}
return s
}

func (e *ErrObservationFailed) String() string {
return e.Error()
}

func (e *ErrObservationFailed) Unwrap() error {
return e.inner
}

var _ llo.DataSource = &dataSource{}
Expand All @@ -54,54 +76,86 @@ func newDataSource(lggr logger.Logger, registry Registry) llo.DataSource {
return &dataSource{lggr.Named("DataSource"), registry}
}

// Observe looks up all streams in the registry and returns a map of stream ID => value
func (d *dataSource) Observe(ctx context.Context, streamIDs map[llotypes.StreamID]struct{}) (llo.StreamValues, error) {
// Observe looks up all streams in the registry and populates a map of stream ID => value
func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues, opts llo.DSOpts) error {
var wg sync.WaitGroup
wg.Add(len(streamIDs))
sv := make(llo.StreamValues)
var mu sync.Mutex

d.lggr.Debugw("Observing streams", "streamIDs", streamIDs)
wg.Add(len(streamValues))
var svmu sync.Mutex
var errors []ErrObservationFailed
var errmu sync.Mutex

if opts.VerboseLogging() {
streamIDs := make([]streams.StreamID, 0, len(streamValues))
for streamID := range streamValues {
streamIDs = append(streamIDs, streamID)
}
sort.Slice(streamIDs, func(i, j int) bool { return streamIDs[i] < streamIDs[j] })
d.lggr.Debugw("Observing streams", "streamIDs", streamIDs, "seqNr", opts.SeqNr())
}

for streamID := range streamIDs {
for _, streamID := range maps.Keys(streamValues) {
go func(streamID llotypes.StreamID) {
defer wg.Done()

var res llo.ObsResult[*big.Int]
var val *big.Int

stream, exists := d.registry.Get(streamID)
if exists {
run, trrs, err := stream.Run(ctx)
if err != nil {
var runID int64
if run != nil {
runID = run.ID
}
d.lggr.Debugw("Observation failed for stream", "err", err, "streamID", streamID, "runID", runID)
promObservationErrorCount.WithLabelValues(fmt.Sprintf("%d", streamID)).Inc()
} else {
// TODO: support types other than *big.Int
// https://smartcontract-it.atlassian.net/browse/MERC-3525
val, err := streams.ExtractBigInt(trrs)
if err == nil {
res.Val = val
res.Valid = true
}
}
} else {
d.lggr.Errorw(fmt.Sprintf("Missing stream: %q", streamID), "streamID", streamID)
if !exists {
errmu.Lock()
errors = append(errors, ErrObservationFailed{streamID: streamID, reason: fmt.Sprintf("missing stream: %d", streamID)})
errmu.Unlock()
promMissingStreamCount.WithLabelValues(fmt.Sprintf("%d", streamID)).Inc()
return
}
run, trrs, err := stream.Run(ctx)
if err != nil {
errmu.Lock()
errors = append(errors, ErrObservationFailed{inner: err, run: run, streamID: streamID, reason: "pipeline run failed"})
errmu.Unlock()
promObservationErrorCount.WithLabelValues(fmt.Sprintf("%d", streamID)).Inc()
return
}
// TODO: support types other than *big.Int
// https://smartcontract-it.atlassian.net/browse/MERC-3525
val, err = streams.ExtractBigInt(trrs)
if err != nil {
errmu.Lock()
errors = append(errors, ErrObservationFailed{inner: err, run: run, streamID: streamID, reason: "failed to extract big.Int"})
errmu.Unlock()
return
}

mu.Lock()
defer mu.Unlock()
sv[streamID] = res
if val != nil {
svmu.Lock()
defer svmu.Unlock()
streamValues[streamID] = val
}
}(streamID)
}

wg.Wait()

d.lggr.Debugw("Observed streams", "streamIDs", streamIDs, "values", sv)
// Failed observations are always logged at warn level
var failedStreamIDs []streams.StreamID
if len(errors) > 0 {
sort.Slice(errors, func(i, j int) bool { return errors[i].streamID < errors[j].streamID })
failedStreamIDs = make([]streams.StreamID, len(errors))
for i, e := range errors {
failedStreamIDs[i] = e.streamID
}
d.lggr.Warnw("Observation failed for streams", "failedStreamIDs", failedStreamIDs, "errors", errors, "seqNr", opts.SeqNr())
}

if opts.VerboseLogging() {
successes := make([]streams.StreamID, 0, len(streamValues))
for strmID, res := range streamValues {
if res != nil {
successes = append(successes, strmID)
}
}
sort.Slice(successes, func(i, j int) bool { return successes[i] < successes[j] })
d.lggr.Debugw("Observation complete", "successfulStreamIDs", successes, "failedStreamIDs", failedStreamIDs, "values", streamValues, "seqNr", opts.SeqNr())
}

return sv, nil
return nil
}
Loading

0 comments on commit 5f3d58b

Please sign in to comment.