Skip to content

Commit

Permalink
feat(MERC-6807): added orphaned transaction cleanup (#16432)
Browse files Browse the repository at this point in the history
* feat(MERC-6807): added orphaned transaction cleanup

* added limit when deleting orphaned transmissions

* switched to using DB time when reaping stale transmissions

* cleanup of the reap function and test covering an existing ocr2 oracle spec with donId

* lintfix
  • Loading branch information
msuchacz-cll authored Feb 20, 2025
1 parent 8d2b90c commit a510315
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 31 deletions.
90 changes: 62 additions & 28 deletions core/services/llo/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,54 +86,53 @@ func (t *transmissionReaper) runLoop(ctx context.Context) {
// make a final effort to clear the database that goes into
// overtime
overtimeCtx, cancel := context.WithTimeout(context.Background(), OvertimeDeleteTimeout)
if n, err := t.reapStale(overtimeCtx, TransmissionReaperBatchSize); err != nil {
if n, err := t.reap(overtimeCtx, TransmissionReaperBatchSize, "stale"); err != nil {
t.lggr.Errorw("Failed to reap stale transmissions on exit", "err", err)
} else if n > 0 {
t.lggr.Infow("Reaped stale transmissions on exit", "nDeleted", n)
}
cancel()
return
case <-ticker.C:
// TODO: Could also automatically reap orphaned transmissions
// that don't have a job with a matching DON ID (from job
// deletion)
//
// https://smartcontract-it.atlassian.net/browse/MERC-6807
// TODO: Should also reap other LLO garbage that can be left
// behind e.g. channel definitions etc
n, err := t.reapStale(ctx, TransmissionReaperBatchSize)
if err != nil {
t.lggr.Errorw("Failed to reap", "err", err)
continue
}
if n > 0 {
t.lggr.Infow("Reaped stale transmissions", "nDeleted", n)
}
t.reapAndLog(ctx, TransmissionReaperBatchSize, "stale")
t.reapAndLog(ctx, TransmissionReaperBatchSize, "orphaned")
}
}
}

func (t *transmissionReaper) reapStale(ctx context.Context, batchSize int) (rowsDeleted int64, err error) {
func (t *transmissionReaper) reapAndLog(ctx context.Context, batchSize int, reapType string) {
n, err := t.reap(ctx, batchSize, reapType)
if err != nil {
t.lggr.Errorw("Failed to reap", "type", reapType, "err", err)
return
}
if n > 0 {
t.lggr.Infow("Reaped transmissions", "type", reapType, "nDeleted", n)
}
}

func (t *transmissionReaper) reap(ctx context.Context, batchSize int, reapType string) (rowsDeleted int64, err error) {
for {
var res sql.Result
res, err = t.ds.ExecContext(ctx, `
DELETE FROM llo_mercury_transmit_queue AS q
USING (
SELECT transmission_hash
FROM llo_mercury_transmit_queue
WHERE inserted_at < $1
ORDER BY inserted_at ASC
LIMIT $2
) AS to_delete
WHERE q.transmission_hash = to_delete.transmission_hash;
`, time.Now().Add(-t.maxAge), batchSize)
switch reapType {
case "stale":
res, err = t.reapStale(ctx, batchSize)
case "orphaned":
res, err = t.reapOrphaned(ctx, batchSize)
default:
return 0, fmt.Errorf("transmissionReaper: unknown reap type: %s", reapType)
}

if err != nil {
return rowsDeleted, fmt.Errorf("transmissionReaper: failed to delete stale transmissions: %w", err)
return rowsDeleted, fmt.Errorf("transmissionReaper: failed to delete %s transmissions: %w", reapType, err)
}

var rowsAffected int64
rowsAffected, err = res.RowsAffected()
if err != nil {
return rowsDeleted, fmt.Errorf("transmissionReaper: failed to get rows affected: %w", err)
return rowsDeleted, fmt.Errorf("transmissionReaper: failed to get %s rows affected: %w", reapType, err)
}
if rowsAffected == 0 {
break
Expand All @@ -142,3 +141,38 @@ WHERE q.transmission_hash = to_delete.transmission_hash;
}
return rowsDeleted, nil
}

func (t *transmissionReaper) reapStale(ctx context.Context, batchSize int) (sql.Result, error) {
return t.ds.ExecContext(ctx, `
DELETE FROM llo_mercury_transmit_queue AS q
USING (
SELECT transmission_hash
FROM llo_mercury_transmit_queue
WHERE inserted_at < NOW() - ($1 * INTERVAL '1 MICROSECOND')
ORDER BY inserted_at ASC
LIMIT $2
) AS to_delete
WHERE q.transmission_hash = to_delete.transmission_hash;
`, t.maxAge.Microseconds(), batchSize)
}

func (t *transmissionReaper) reapOrphaned(ctx context.Context, batchSize int) (sql.Result, error) {
return t.ds.ExecContext(ctx, `
WITH activeDonIds AS (
SELECT DISTINCT cast(relay_config->>'lloDonID' as bigint) as don_id
FROM ocr2_oracle_specs
WHERE
relay_config->>'lloDonID' IS NOT NULL
AND relay_config->>'lloDonID' <> ''
)
DELETE FROM llo_mercury_transmit_queue as q
USING (
SELECT transmission_hash
FROM llo_mercury_transmit_queue
WHERE don_id NOT IN (SELECT don_id FROM activeDonIds)
ORDER BY inserted_at ASC
LIMIT $1
) AS to_delete
WHERE q.transmission_hash = to_delete.transmission_hash;
`, batchSize)
}
38 changes: 35 additions & 3 deletions core/services/llo/cleanup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func Test_Cleanup(t *testing.T) {
})
}

func Test_TransmissionReaper(t *testing.T) {
func Test_StaleTransmissionReaper(t *testing.T) {
ds := pgtest.NewSqlxDB(t)
lggr := logger.TestLogger(t)
tr := &transmissionReaper{ds: ds, lggr: lggr, maxAge: 24 * time.Hour}
Expand All @@ -133,13 +133,45 @@ WHERE transmission_hash IN (
`)

// test batching
d, err := tr.reapStale(ctx, n/3)
d, err := tr.reap(ctx, n/3, "stale")
require.NoError(t, err)
assert.Equal(t, int64(5), d)

pgtest.MustExec(t, ds, "UPDATE llo_mercury_transmit_queue SET inserted_at = NOW() - INTERVAL '48 hours'")

d, err = tr.reapStale(ctx, n/3)
d, err = tr.reap(ctx, n/3, "stale")
require.NoError(t, err)
assert.Equal(t, int64(n-5), d)
}

func Test_OrphanedTransmissionReaper(t *testing.T) {
ds := pgtest.NewSqlxDB(t)
lggr := logger.TestLogger(t)
tr := &transmissionReaper{ds: ds, lggr: lggr, maxAge: 24 * time.Hour}
ctx := testutils.Context(t)

const n = 13

pgtest.MustExec(t, ds, `
INSERT INTO ocr2_oracle_specs (contract_id, p2pv2_bootstrappers, contract_config_confirmations, created_at,
updated_at, relay, relay_config, plugin_config, plugin_type, onchain_signing_strategy, allow_no_bootstrappers
) VALUES ('0x','{}', 0, NOW(), NOW(), 'evm', '{"chainID": 421614, "lloDonID": 2}', '{"donID": 2}', 'llo', '{}', FALSE);`)

// add transmissions from a DON not present in ocr2 specs
transmissions := makeSampleTransmissions(n)
torm := mercurytransmitter.NewORM(ds, 1)
err := torm.Insert(testutils.Context(t), transmissions)
require.NoError(t, err)

d, err := tr.reap(ctx, n, "orphaned")
require.NoError(t, err)
assert.Equal(t, int64(n), d)

torm2 := mercurytransmitter.NewORM(ds, 2)
err = torm2.Insert(testutils.Context(t), transmissions)
require.NoError(t, err)

d, err = tr.reap(ctx, n, "orphaned")
require.NoError(t, err)
assert.Equal(t, int64(0), d)
}

0 comments on commit a510315

Please sign in to comment.