Skip to content

Commit

Permalink
fix: lotus-shed: store processed tipset after backfilling events (#12335
Browse files Browse the repository at this point in the history
)
  • Loading branch information
akaladarshi authored and rjan90 committed Aug 12, 2024
1 parent 9a10add commit 436d96c
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
- https://github.com/filecoin-project/lotus/pull/12292: feat: p2p: allow overriding bootstrap nodes with environmemnt variable
- https://github.com/filecoin-project/lotus/pull/12319: feat: `lotus send CLI`: allow sending to ETH addresses
- https://github.com/filecoin-project/lotus/pull/12332: fix: ETH RPC: receipts: use correct txtype in receipts
- https://github.com/filecoin-project/lotus/pull/12335: fix: lotus-shed: store processed tipset after backfilling events

## New features

Expand Down
31 changes: 23 additions & 8 deletions cmd/lotus-shed/indexes.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ import (

const (
// same as in chain/events/index.go
eventExists = `SELECT MAX(id) FROM event WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?`
insertEvent = `INSERT OR IGNORE INTO event(height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)`
insertEntry = `INSERT OR IGNORE INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)`
eventExists = `SELECT MAX(id) FROM event WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?`
insertEvent = `INSERT OR IGNORE INTO event(height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)`
insertEntry = `INSERT OR IGNORE INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)`
upsertEventsSeen = `INSERT INTO events_seen(height, tipset_key_cid, reverted) VALUES(?, ?, false) ON CONFLICT(height, tipset_key_cid) DO UPDATE SET reverted=false`
)

func withCategory(cat string, cmd *cli.Command) *cli.Command {
Expand Down Expand Up @@ -176,6 +177,11 @@ var backfillEventsCmd = &cli.Command{
return err
}

stmtUpsertEventSeen, err := db.Prepare(upsertEventsSeen)
if err != nil {
return err
}

processHeight := func(ctx context.Context, cnt int, msgs []lapi.Message, receipts []*types.MessageReceipt) error {
var tx *sql.Tx
for {
Expand All @@ -196,6 +202,11 @@ var backfillEventsCmd = &cli.Command{
var eventsAffected int64
var entriesAffected int64

tsKeyCid, err := currTs.Key().Cid()
if err != nil {
return fmt.Errorf("failed to get tipset key cid: %w", err)
}

// loop over each message receipt and backfill the events
for idx, receipt := range receipts {
msg := msgs[idx]
Expand Down Expand Up @@ -225,11 +236,6 @@ var backfillEventsCmd = &cli.Command{
addressLookups[event.Emitter] = addr
}

tsKeyCid, err := currTs.Key().Cid()
if err != nil {
return fmt.Errorf("failed to get tipset key cid: %w", err)
}

// select the highest event id that exists in database, or null if none exists
var entryID sql.NullInt64
err = tx.Stmt(stmtEventExists).QueryRow(
Expand Down Expand Up @@ -299,6 +305,15 @@ var backfillEventsCmd = &cli.Command{
}
}

// mark the tipset as processed
_, err = tx.Stmt(stmtUpsertEventSeen).Exec(
currTs.Height(),
tsKeyCid.Bytes(),
)
if err != nil {
return xerrors.Errorf("exec upsert events seen: %w", err)
}

err = tx.Commit()
if err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
Expand Down

0 comments on commit 436d96c

Please sign in to comment.