Skip to content

Commit

Permalink
Implement batching for the logPoller (#7155)
Browse files Browse the repository at this point in the history
  • Loading branch information
RensR authored Aug 8, 2022
1 parent 374972a commit e57ae37
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 26 deletions.
47 changes: 24 additions & 23 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,6 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) int64 {
// Retry forever to save logs,
// unblocked by resolving db connectivity issues.
utils.RetryWithBackoff(ctx, func() bool {
// Note the insert param limit is 65535 and we have 10 columns per log.
// Thus the maximum number of logs we can insert in a given block is 6500.
// TODO (https://app.shortcut.com/chainlinklabs/story/48377/support-arbitrary-number-of-logs-per-block)
if err := lp.orm.InsertLogs(convertLogs(lp.ec.ChainID(), logs), pg.WithParentCtx(ctx)); err != nil {
lp.lggr.Warnw("Unable to insert logs logs, retrying", "err", err, "from", from, "to", to)
return true
Expand All @@ -290,7 +287,7 @@ func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, curren
// If not, there was a reorg, so we need to rewind.
expectedParent, err1 := lp.orm.SelectBlockByNumber(currentBlockNumber-1, pg.WithParentCtx(ctx))
if err1 != nil && !errors.Is(err1, sql.ErrNoRows) {
// If err is not a no rows error, assume transient db issue and retry
// If err is not a 'no rows' error, assume transient db issue and retry
lp.lggr.Warnw("Unable to read latestBlockNumber currentBlock saved", "err", err1, "currentBlockNumber", currentBlockNumber)
return nil, errors.New("Unable to read latestBlockNumber currentBlock saved")
}
Expand All @@ -316,7 +313,7 @@ func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, curren
// We could preserve the logs for forensics, since its possible
// that applications see them and take action upon it, however that
// results in significantly slower reads since we must then compute
// the canonical set per read. Typically if an application took action on a log
// the canonical set per read. Typically, if an application took action on a log
// it would be saved elsewhere e.g. eth_txes, so it seems better to just support the fast reads.
// Its also nicely analogous to reading from the chain itself.
err2 = lp.orm.q.WithOpts(pg.WithParentCtx(ctx)).Transaction(func(tx pg.Queryer) error {
Expand Down Expand Up @@ -346,11 +343,13 @@ func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, curren
}

// pollAndSaveLogs On startup/crash current is the first block after the last processed block.
// currentBlockNumber is the block from where new logs are to be polled & saved. Under normal
// conditions this would be equal to lastProcessed.BlockNumber + 1.
func (lp *logPoller) pollAndSaveLogs(ctx context.Context, currentBlockNumber int64) {
lp.lggr.Infow("Polling for logs", "currentBlockNumber", currentBlockNumber)
latestBlock, err1 := lp.ec.HeaderByNumber(ctx, nil)
if err1 != nil {
lp.lggr.Warnw("Unable to get latestBlockNumber block", "err", err1, "currentBlockNumber", currentBlockNumber)
latestBlock, err := lp.ec.HeaderByNumber(ctx, nil)
if err != nil {
lp.lggr.Warnw("Unable to get latestBlockNumber block", "err", err, "currentBlockNumber", currentBlockNumber)
return
}
latestBlockNumber := latestBlock.Number.Int64()
Expand All @@ -362,11 +361,11 @@ func (lp *logPoller) pollAndSaveLogs(ctx context.Context, currentBlockNumber int
}
// Possibly handle a reorg. For example if we crash, we'll be in the middle of processing unfinalized blocks.
// Returns (currentBlock || LCA+1 if reorg detected, error)
currentBlock, err1 := lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber)
if err1 != nil {
currentBlock, err := lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber)
if err != nil {
// If there's an error handling the reorg, we can't be sure what state the db was left in.
// Resume from the latest block saved.
lp.lggr.Errorw("Unable to get current block", "err", err1)
lp.lggr.Errorw("Unable to get current block", "err", err)
return
}
currentBlockNumber = currentBlock.Number.Int64()
Expand All @@ -375,29 +374,31 @@ func (lp *logPoller) pollAndSaveLogs(ctx context.Context, currentBlockNumber int
// E.g. 1<-2<-3(currentBlockNumber)<-4<-5<-6<-7(latestBlockNumber), finality is 2. So 3,4 can be batched.
// Although 5 is finalized, we still need to save it to the db for reorg detection if 6 is a reorg.
// start = currentBlockNumber = 3, end = latestBlockNumber - finality - 1 = 7-2-1 = 4 (inclusive range).
if (latestBlockNumber - currentBlockNumber) >= (lp.finalityDepth + 1) {
lp.lggr.Infow("Backfilling logs", "start", currentBlockNumber, "end", latestBlockNumber-lp.finalityDepth-1)
currentBlockNumber = lp.backfill(ctx, currentBlockNumber, latestBlockNumber-lp.finalityDepth-1)
lastSafeBackfillBlock := latestBlockNumber - lp.finalityDepth - 1
if lastSafeBackfillBlock >= currentBlockNumber {
lp.lggr.Infow("Backfilling logs", "start", currentBlockNumber, "end", lastSafeBackfillBlock)
currentBlockNumber = lp.backfill(ctx, currentBlockNumber, lastSafeBackfillBlock)
}

for currentBlockNumber <= latestBlockNumber {
// Same reorg detection on unfinalized blocks.
currentBlock, err2 := lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber)
if err2 != nil {
currentBlock, err = lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber)
if err != nil {
// If there's an error handling the reorg, we can't be sure what state the db was left in.
// Resume from the latest block saved.
lp.lggr.Errorw("Unable to get current block", "err", err1)
lp.lggr.Errorw("Unable to get current block", "err", err)
return
}
currentBlockNumber = currentBlock.Number.Int64()
h := currentBlock.Hash()
logs, err2 := lp.ec.FilterLogs(ctx, lp.filter(nil, nil, &h))
if err2 != nil {
lp.lggr.Warnw("Unable query for logs, retrying", "err", err2, "block", currentBlockNumber)
var logs []types.Log
logs, err = lp.ec.FilterLogs(ctx, lp.filter(nil, nil, &h))
if err != nil {
lp.lggr.Warnw("Unable to query for logs, retrying", "err", err, "block", currentBlockNumber)
return
}
lp.lggr.Infow("Unfinalized log query", "logs", len(logs), "currentBlockNumber", currentBlockNumber, "blockHash", currentBlock.Hash())
err2 = lp.orm.q.WithOpts(pg.WithParentCtx(ctx)).Transaction(func(tx pg.Queryer) error {
err = lp.orm.q.WithOpts(pg.WithParentCtx(ctx)).Transaction(func(tx pg.Queryer) error {
if err3 := lp.orm.InsertBlock(currentBlock.Hash(), currentBlockNumber, pg.WithQueryer(tx)); err3 != nil {
return err3
}
Expand All @@ -406,8 +407,8 @@ func (lp *logPoller) pollAndSaveLogs(ctx context.Context, currentBlockNumber int
}
return lp.orm.InsertLogs(convertLogs(lp.ec.ChainID(), logs), pg.WithQueryer(tx))
})
if err2 != nil {
lp.lggr.Warnw("Unable to save logs resuming from last saved block + 1", "err", err2, "block", currentBlockNumber)
if err != nil {
lp.lggr.Warnw("Unable to save logs resuming from last saved block + 1", "err", err, "block", currentBlockNumber)
return
}
currentBlockNumber++
Expand Down
24 changes: 24 additions & 0 deletions core/chains/evm/logpoller/log_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,30 @@ func assertHaveCanonical(t *testing.T, start, end int, ec *backends.SimulatedBac
}
}

func TestLogPoller_Batching(t *testing.T) {
lggr := logger.TestLogger(t)
chainID := testutils.NewRandomEVMChainID()
db := pgtest.NewSqlxDB(t)
require.NoError(t, utils.JustError(db.Exec(`SET CONSTRAINTS log_poller_blocks_evm_chain_id_fkey DEFERRED`)))
require.NoError(t, utils.JustError(db.Exec(`SET CONSTRAINTS logs_evm_chain_id_fkey DEFERRED`)))
o := logpoller.NewORM(chainID, db, lggr, pgtest.NewPGCfg(true))
event1 := EmitterABI.Events["Log1"].ID
address1 := common.HexToAddress("0x2ab9a2Dc53736b361b72d900CdF9F78F9406fbbb")

var logs []logpoller.Log
// Inserts are limited to 65535 parameters. A log being 10 parameters this results in
// a maximum of 6553 log inserts per tx. As inserting more than 6553 would result in
// an error without batching, this test makes sure batching is enabled.
for i := 0; i < 15000; i++ {
logs = append(logs, GenLog(chainID, int64(i+1), 1, "0x3", event1[:], address1))
}
require.NoError(t, o.InsertLogs(logs))
lgs, err := o.SelectLogsByBlockRange(1, 1)
require.NoError(t, err)
// Make sure all logs are inserted
require.Equal(t, len(logs), len(lgs))
}

func TestLogPoller_SynchronizedWithGeth(t *testing.T) {
// The log poller's blocks table should remain synchronized
// with the canonical chain of geth's despite arbitrary mixes of mining and reorgs.
Expand Down
20 changes: 17 additions & 3 deletions core/chains/evm/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,24 @@ func (o *ORM) InsertLogs(logs []Log, qopts ...pg.QOpt) error {
}
}
q := o.q.WithOpts(qopts...)
_, err := q.NamedExec(`INSERT INTO logs

batchInsertSize := 4000
for i := 0; i < len(logs); i += batchInsertSize {
start, end := i, i+batchInsertSize
if end > len(logs) {
end = len(logs)
}

_, err := q.NamedExec(`INSERT INTO logs
(evm_chain_id, log_index, block_hash, block_number, address, event_sig, topics, tx_hash, data, created_at) VALUES
(:evm_chain_id, :log_index, :block_hash, :block_number, :address, :event_sig, :topics, :tx_hash, :data, NOW()) ON CONFLICT DO NOTHING`, logs)
return err
(:evm_chain_id, :log_index, :block_hash, :block_number, :address, :event_sig, :topics, :tx_hash, :data, NOW()) ON CONFLICT DO NOTHING`, logs[start:end])

if err != nil {
return err
}
}

return nil
}

func (o *ORM) selectLogsByBlockRange(start, end int64) ([]Log, error) {
Expand Down

0 comments on commit e57ae37

Please sign in to comment.