Skip to content

Commit

Permalink
EVM ORM Refactors Pt.2 (#12189)
Browse files Browse the repository at this point in the history
* Initial commit

* Refactor headtracker orm

* Remove unused loggers

* Remove comments

* Add timeout

* Refactor log_poller ORM

* Refactor logpoller ORM

* Fix logpoller tests

* Update logpoller orm

* Use EventSig

* update logpoller orm

* Update orm.go

* Update logpoller_wrapper_test.go

* Update log_poller_test.go

* Remove query

* Remove ORM timeouts

* Add context

* Use testutils for context

* Use testutils context

* Use testutils context

* Use ctx

* Refactor forwarder ORM

* Generate tidy

* Fix logpoller mocks

* Remove pg dependency

* Fix mock calls

* Fix mock calls

* Fix mock calls

* Use request context

* Update context

* Update contexts

* Fix mock call args

* Unexport orm

* Fix arg name

* Extract pg from log broadcaster

* Remove pg from helpers test

* update logpoller

* unexport orm

* Use query

* fix tests

* fix imports

* Use pkgerrors

* Use registry ctx

* Use context

* Use ctx

* Use ctx

* Update orm.go

* Use context

* Use context

* Use context

* Propagate context

* Propagate context

* Update listener_test.go

* Fix context

* Export DbORM struct

* Update orm.go

* Pass context

* Pass context

* Update orm.go

* Use testcontext

* Initialize context

* Draft changes

* Refactor evm_tx_store

* Update evm_tx_store.go

* Fix tests

* Add context to tests

* Fix mocks

* lint

* Enable read only transactions

* Update context

* Propagate context

* core/services/chainlink: start using sqlutil.DB instead of pg.Q (#12386)

* Check bind errors

* Remove default timeout

* lint

* Add changeset

* Remove pg dependency

* Use sqlutil DataSource

* Handle bind errors

* Update core/chains/evm/txmgr/evm_tx_store.go

Co-authored-by: Jordan Krage <jmank88@gmail.com>

* Use sqlutil batching

* Set longer timeout

* Update broadcaster.go

* Update core/chains/evm/log/broadcaster.go

Co-authored-by: Jordan Krage <jmank88@gmail.com>

---------

Co-authored-by: Dylan Tinianov <dylan.tinianov@smartcontract.com>
Co-authored-by: Dylan Tinianov <dylantinianov@gmail.com>
Co-authored-by: Jordan Krage <jmank88@gmail.com>
  • Loading branch information
4 people authored Mar 20, 2024
1 parent 93762cc commit 79db120
Show file tree
Hide file tree
Showing 62 changed files with 975 additions and 1,008 deletions.
5 changes: 5 additions & 0 deletions .changeset/selfish-timers-matter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Refactor Log and TxStore ORMs
49 changes: 25 additions & 24 deletions core/chains/evm/log/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"sync/atomic"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
pkgerrors "github.com/pkg/errors"
Expand All @@ -23,7 +25,6 @@ import (
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
evmutils "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils"
"github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
)

//go:generate mockery --quiet --name Broadcaster --output ./mocks/ --case=underscore --structname Broadcaster --filename broadcaster.go
Expand Down Expand Up @@ -58,11 +59,11 @@ type (
IsConnected() bool
Register(listener Listener, opts ListenerOpts) (unsubscribe func())

WasAlreadyConsumed(lb Broadcast, qopts ...pg.QOpt) (bool, error)
MarkConsumed(lb Broadcast, qopts ...pg.QOpt) error
WasAlreadyConsumed(ctx context.Context, lb Broadcast) (bool, error)
MarkConsumed(ctx context.Context, lb Broadcast) error

// MarkManyConsumed marks all the provided log broadcasts as consumed.
MarkManyConsumed(lbs []Broadcast, qopts ...pg.QOpt) error
MarkManyConsumed(ctx context.Context, lbs []Broadcast) error

// NOTE: WasAlreadyConsumed, MarkConsumed and MarkManyConsumed MUST be used within a single goroutine in order for WasAlreadyConsumed to be accurate
}
Expand Down Expand Up @@ -398,7 +399,7 @@ func (b *broadcaster) reinitialize() (backfillStart *int64, abort bool) {

evmutils.RetryWithBackoff(ctx, func() bool {
var err error
backfillStart, err = b.orm.Reinitialize(pg.WithParentCtx(ctx))
backfillStart, err = b.orm.Reinitialize(ctx)
if err != nil {
b.logger.Errorw("Failed to reinitialize database", "err", err)
return true
Expand Down Expand Up @@ -494,12 +495,12 @@ func (b *broadcaster) onReplayRequest(replayReq replayRequest) {
b.backfillBlockNumber.Int64 = replayReq.fromBlock
b.backfillBlockNumber.Valid = true
if replayReq.forceBroadcast {
ctx, cancel := b.chStop.NewCtx()
ctx, cancel := b.chStop.CtxCancel(context.WithTimeout(context.Background(), time.Minute))
ctx = sqlutil.WithoutDefaultTimeout(ctx)
defer cancel()

// Use a longer timeout in the event that a very large amount of logs need to be marked
// as consumed.
err := b.orm.MarkBroadcastsUnconsumed(replayReq.fromBlock, pg.WithParentCtx(ctx), pg.WithLongQueryTimeout())
err := b.orm.MarkBroadcastsUnconsumed(ctx, replayReq.fromBlock)
if err != nil {
b.logger.Errorw("Error marking broadcasts as unconsumed",
"err", err, "fromBlock", replayReq.fromBlock)
Expand Down Expand Up @@ -541,7 +542,7 @@ func (b *broadcaster) onNewLog(log types.Log) {
ctx, cancel := b.chStop.NewCtx()
defer cancel()
blockNumber := int64(log.BlockNumber)
if err := b.orm.SetPendingMinBlock(&blockNumber, pg.WithParentCtx(ctx)); err != nil {
if err := b.orm.SetPendingMinBlock(ctx, &blockNumber); err != nil {
b.logger.Errorw("Failed to set pending broadcasts number", "blockNumber", log.BlockNumber, "err", err)
}
}
Expand Down Expand Up @@ -586,30 +587,30 @@ func (b *broadcaster) onNewHeads() {
if b.registrations.highestNumConfirmations == 0 {
logs, lowest, highest := b.logPool.getAndDeleteAll()
if len(logs) > 0 {
broadcasts, err := b.orm.FindBroadcasts(lowest, highest)
broadcasts, err := b.orm.FindBroadcasts(ctx, lowest, highest)
if err != nil {
b.logger.Errorf("Failed to query for log broadcasts, %v", err)
return
}
b.registrations.sendLogs(logs, *latestHead, broadcasts, b.orm)
if err := b.orm.SetPendingMinBlock(nil, pg.WithParentCtx(ctx)); err != nil {
b.registrations.sendLogs(ctx, logs, *latestHead, broadcasts, b.orm)
if err := b.orm.SetPendingMinBlock(ctx, nil); err != nil {
b.logger.Errorw("Failed to set pending broadcasts number null", "err", err)
}
}
} else {
logs, minBlockNum := b.logPool.getLogsToSend(latestBlockNum)

if len(logs) > 0 {
broadcasts, err := b.orm.FindBroadcasts(minBlockNum, latestBlockNum)
broadcasts, err := b.orm.FindBroadcasts(ctx, minBlockNum, latestBlockNum)
if err != nil {
b.logger.Errorf("Failed to query for log broadcasts, %v", err)
return
}

b.registrations.sendLogs(logs, *latestHead, broadcasts, b.orm)
b.registrations.sendLogs(ctx, logs, *latestHead, broadcasts, b.orm)
}
newMin := b.logPool.deleteOlderLogs(keptDepth)
if err := b.orm.SetPendingMinBlock(newMin); err != nil {
if err := b.orm.SetPendingMinBlock(ctx, newMin); err != nil {
b.logger.Errorw("Failed to set pending broadcasts number", "blockNumber", keptDepth, "err", err)
}
}
Expand Down Expand Up @@ -688,17 +689,17 @@ func (b *broadcaster) maybeWarnOnLargeBlockNumberDifference(logBlockNumber int64
}

// WasAlreadyConsumed reports whether the given consumer had already consumed the given log
func (b *broadcaster) WasAlreadyConsumed(lb Broadcast, qopts ...pg.QOpt) (bool, error) {
return b.orm.WasBroadcastConsumed(lb.RawLog().BlockHash, lb.RawLog().Index, lb.JobID(), qopts...)
func (b *broadcaster) WasAlreadyConsumed(ctx context.Context, lb Broadcast) (bool, error) {
return b.orm.WasBroadcastConsumed(ctx, lb.RawLog().BlockHash, lb.RawLog().Index, lb.JobID())
}

// MarkConsumed marks the log as having been successfully consumed by the subscriber
func (b *broadcaster) MarkConsumed(lb Broadcast, qopts ...pg.QOpt) error {
return b.orm.MarkBroadcastConsumed(lb.RawLog().BlockHash, lb.RawLog().BlockNumber, lb.RawLog().Index, lb.JobID(), qopts...)
func (b *broadcaster) MarkConsumed(ctx context.Context, lb Broadcast) error {
return b.orm.MarkBroadcastConsumed(ctx, lb.RawLog().BlockHash, lb.RawLog().BlockNumber, lb.RawLog().Index, lb.JobID())
}

// MarkManyConsumed marks the logs as having been successfully consumed by the subscriber
func (b *broadcaster) MarkManyConsumed(lbs []Broadcast, qopts ...pg.QOpt) (err error) {
func (b *broadcaster) MarkManyConsumed(ctx context.Context, lbs []Broadcast) (err error) {
var (
blockHashes = make([]common.Hash, len(lbs))
blockNumbers = make([]uint64, len(lbs))
Expand All @@ -711,7 +712,7 @@ func (b *broadcaster) MarkManyConsumed(lbs []Broadcast, qopts ...pg.QOpt) (err e
logIndexes[i] = lbs[i].RawLog().Index
jobIDs[i] = lbs[i].JobID()
}
return b.orm.MarkBroadcastsConsumed(blockHashes, blockNumbers, logIndexes, jobIDs, qopts...)
return b.orm.MarkBroadcastsConsumed(ctx, blockHashes, blockNumbers, logIndexes, jobIDs)
}

// test only
Expand Down Expand Up @@ -775,13 +776,13 @@ func (n *NullBroadcaster) BackfillBlockNumber() sql.NullInt64 {
func (n *NullBroadcaster) TrackedAddressesCount() uint32 {
return 0
}
func (n *NullBroadcaster) WasAlreadyConsumed(lb Broadcast, qopts ...pg.QOpt) (bool, error) {
func (n *NullBroadcaster) WasAlreadyConsumed(ctx context.Context, lb Broadcast) (bool, error) {
return false, pkgerrors.New(n.ErrMsg)
}
func (n *NullBroadcaster) MarkConsumed(lb Broadcast, qopts ...pg.QOpt) error {
func (n *NullBroadcaster) MarkConsumed(ctx context.Context, lb Broadcast) error {
return pkgerrors.New(n.ErrMsg)
}
func (n *NullBroadcaster) MarkManyConsumed(lbs []Broadcast, qopts ...pg.QOpt) error {
func (n *NullBroadcaster) MarkManyConsumed(ctx context.Context, lbs []Broadcast) error {
return pkgerrors.New(n.ErrMsg)
}

Expand Down
20 changes: 9 additions & 11 deletions core/chains/evm/log/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
"github.com/smartcontractkit/chainlink/v2/core/services/chainlink"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
)

Expand Down Expand Up @@ -93,7 +92,7 @@ func newBroadcasterHelperWithEthClient(t *testing.T, ethClient evmclient.Client,
mailMon := servicetest.Run(t, mailboxtest.NewMonitor(t))

db := pgtest.NewSqlxDB(t)
orm := log.NewORM(db, lggr, config.Database(), cltest.FixtureChainID)
orm := log.NewORM(db, cltest.FixtureChainID)
lb := log.NewTestBroadcaster(orm, ethClient, config.EVM(), lggr, highestSeenHead, mailMon)
kst := cltest.NewKeyStore(t, db, globalConfig.Database())

Expand Down Expand Up @@ -247,7 +246,6 @@ func (rec *received) logsOnBlocks() []logOnBlock {
type simpleLogListener struct {
name string
lggr logger.SugaredLogger
cfg pg.QConfig
received *received
t *testing.T
db *sqlx.DB
Expand All @@ -272,7 +270,6 @@ func (helper *broadcasterHelper) newLogListenerWithJob(name string) *simpleLogLi
return &simpleLogListener{
db: db,
lggr: logger.Sugared(logger.Test(t)),
cfg: helper.config.Database(),
name: name,
received: &rec,
t: t,
Expand Down Expand Up @@ -326,16 +323,17 @@ func (listener *simpleLogListener) requireAllReceived(t *testing.T, expectedStat

func (listener *simpleLogListener) handleLogBroadcast(lb log.Broadcast) bool {
t := listener.t
consumed, err := listener.WasAlreadyConsumed(lb)
ctx := testutils.Context(t)
consumed, err := listener.WasAlreadyConsumed(ctx, lb)
if !assert.NoError(t, err) {
return false
}
if !consumed && !listener.skipMarkingConsumed.Load() {

err = listener.MarkConsumed(lb)
err = listener.MarkConsumed(ctx, lb)
if assert.NoError(t, err) {

consumed2, err := listener.WasAlreadyConsumed(lb)
consumed2, err := listener.WasAlreadyConsumed(ctx, lb)
if assert.NoError(t, err) {
assert.True(t, consumed2)
}
Expand All @@ -344,12 +342,12 @@ func (listener *simpleLogListener) handleLogBroadcast(lb log.Broadcast) bool {
return consumed
}

func (listener *simpleLogListener) WasAlreadyConsumed(broadcast log.Broadcast) (bool, error) {
return log.NewORM(listener.db, listener.lggr, listener.cfg, cltest.FixtureChainID).WasBroadcastConsumed(broadcast.RawLog().BlockHash, broadcast.RawLog().Index, listener.jobID)
func (listener *simpleLogListener) WasAlreadyConsumed(ctx context.Context, broadcast log.Broadcast) (bool, error) {
return log.NewORM(listener.db, cltest.FixtureChainID).WasBroadcastConsumed(ctx, broadcast.RawLog().BlockHash, broadcast.RawLog().Index, listener.jobID)
}

func (listener *simpleLogListener) MarkConsumed(broadcast log.Broadcast) error {
return log.NewORM(listener.db, listener.lggr, listener.cfg, cltest.FixtureChainID).MarkBroadcastConsumed(broadcast.RawLog().BlockHash, broadcast.RawLog().BlockNumber, broadcast.RawLog().Index, listener.jobID)
func (listener *simpleLogListener) MarkConsumed(ctx context.Context, broadcast log.Broadcast) error {
return log.NewORM(listener.db, cltest.FixtureChainID).MarkBroadcastConsumed(ctx, broadcast.RawLog().BlockHash, broadcast.RawLog().BlockNumber, broadcast.RawLog().Index, listener.jobID)
}

type mockListener struct {
Expand Down
31 changes: 14 additions & 17 deletions core/chains/evm/log/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ func TestBroadcaster_ReplaysLogs(t *testing.T) {
func TestBroadcaster_BackfillUnconsumedAfterCrash(t *testing.T) {
contract1 := newMockContract(t)
contract2 := newMockContract(t)
ctx := testutils.Context(t)

blocks := cltest.NewBlocks(t, 10)
const (
Expand All @@ -268,8 +269,7 @@ func TestBroadcaster_BackfillUnconsumedAfterCrash(t *testing.T) {
helper := newBroadcasterHelper(t, 0, 1, logs, func(c *chainlink.Config, s *chainlink.Secrets) {
c.EVM[0].FinalityDepth = ptr[uint32](confs)
})
lggr := logger.Test(t)
orm := log.NewORM(helper.db, lggr, helper.config.Database(), cltest.FixtureChainID)
orm := log.NewORM(helper.db, cltest.FixtureChainID)

listener := helper.newLogListenerWithJob("one")
listener.SkipMarkingConsumed(true)
Expand All @@ -282,7 +282,7 @@ func TestBroadcaster_BackfillUnconsumedAfterCrash(t *testing.T) {
chRawLogs.TrySend(log2)
})
// Pool min block in DB and neither listener received a broadcast
blockNum, err := orm.GetPendingMinBlock()
blockNum, err := orm.GetPendingMinBlock(ctx)
require.NoError(t, err)
require.NotNil(t, blockNum)
require.Equal(t, int64(log1.BlockNumber), *blockNum)
Expand All @@ -294,8 +294,7 @@ func TestBroadcaster_BackfillUnconsumedAfterCrash(t *testing.T) {
helper := newBroadcasterHelper(t, 2, 1, logs, func(c *chainlink.Config, s *chainlink.Secrets) {
c.EVM[0].FinalityDepth = ptr[uint32](confs)
})
lggr := logger.Test(t)
orm := log.NewORM(helper.db, lggr, helper.config.Database(), cltest.FixtureChainID)
orm := log.NewORM(helper.db, cltest.FixtureChainID)

listener := helper.newLogListenerWithJob("one")
listener.SkipMarkingConsumed(true)
Expand All @@ -305,58 +304,56 @@ func TestBroadcaster_BackfillUnconsumedAfterCrash(t *testing.T) {
helper.simulateHeads(t, listener, listener2, contract1, contract2, confs, blocks.Slice(2, 5), orm, &expBlock, nil)

// Pool min block in DB and one listener received but didn't consume
blockNum, err := orm.GetPendingMinBlock()
blockNum, err := orm.GetPendingMinBlock(ctx)
require.NoError(t, err)
require.NotNil(t, blockNum)
require.Equal(t, int64(log2.BlockNumber), *blockNum)
require.NotEmpty(t, listener.getUniqueLogs())
require.Empty(t, listener2.getUniqueLogs())
c, err := orm.WasBroadcastConsumed(log1.BlockHash, log1.Index, listener.JobID())
c, err := orm.WasBroadcastConsumed(ctx, log1.BlockHash, log1.Index, listener.JobID())
require.NoError(t, err)
require.False(t, c)
})
t.Run("backfill pool and broadcast two, but only consume one", func(t *testing.T) {
helper := newBroadcasterHelper(t, 4, 1, logs, func(c *chainlink.Config, s *chainlink.Secrets) {
c.EVM[0].FinalityDepth = ptr[uint32](confs)
})
lggr := logger.Test(t)
orm := log.NewORM(helper.db, lggr, helper.config.Database(), cltest.FixtureChainID)
orm := log.NewORM(helper.db, cltest.FixtureChainID)

listener := helper.newLogListenerWithJob("one")
listener2 := helper.newLogListenerWithJob("two")
listener2.SkipMarkingConsumed(true)
helper.simulateHeads(t, listener, listener2, contract1, contract2, confs, blocks.Slice(5, 8), orm, nil, nil)

// Pool empty and one consumed but other didn't
blockNum, err := orm.GetPendingMinBlock()
blockNum, err := orm.GetPendingMinBlock(ctx)
require.NoError(t, err)
require.Nil(t, blockNum)
require.NotEmpty(t, listener.getUniqueLogs())
require.NotEmpty(t, listener2.getUniqueLogs())
c, err := orm.WasBroadcastConsumed(log1.BlockHash, log1.Index, listener.JobID())
c, err := orm.WasBroadcastConsumed(ctx, log1.BlockHash, log1.Index, listener.JobID())
require.NoError(t, err)
require.True(t, c)
c, err = orm.WasBroadcastConsumed(log2.BlockHash, log2.Index, listener2.JobID())
c, err = orm.WasBroadcastConsumed(ctx, log2.BlockHash, log2.Index, listener2.JobID())
require.NoError(t, err)
require.False(t, c)
})
t.Run("backfill pool, broadcast and consume one", func(t *testing.T) {
helper := newBroadcasterHelper(t, 7, 1, logs[1:], func(c *chainlink.Config, s *chainlink.Secrets) {
c.EVM[0].FinalityDepth = ptr[uint32](confs)
})
lggr := logger.Test(t)
orm := log.NewORM(helper.db, lggr, helper.config.Database(), cltest.FixtureChainID)
orm := log.NewORM(helper.db, cltest.FixtureChainID)
listener := helper.newLogListenerWithJob("one")
listener2 := helper.newLogListenerWithJob("two")
helper.simulateHeads(t, listener, listener2, contract1, contract2, confs, blocks.Slice(8, 9), orm, nil, nil)

// Pool empty, one broadcasted and consumed
blockNum, err := orm.GetPendingMinBlock()
blockNum, err := orm.GetPendingMinBlock(ctx)
require.NoError(t, err)
require.Nil(t, blockNum)
require.Empty(t, listener.getUniqueLogs())
require.NotEmpty(t, listener2.getUniqueLogs())
c, err := orm.WasBroadcastConsumed(log2.BlockHash, log2.Index, listener2.JobID())
c, err := orm.WasBroadcastConsumed(ctx, log2.BlockHash, log2.Index, listener2.JobID())
require.NoError(t, err)
require.True(t, c)
})
Expand All @@ -381,7 +378,7 @@ func (helper *broadcasterHelper) simulateHeads(t *testing.T, listener, listener2
<-headsDone

require.Eventually(t, func() bool {
blockNum, err := orm.GetPendingMinBlock()
blockNum, err := orm.GetPendingMinBlock(testutils.Context(t))
if !assert.NoError(t, err) {
return false
}
Expand Down
Loading

0 comments on commit 79db120

Please sign in to comment.