Skip to content

Commit

Permalink
linter: close cursor (#13298)
Browse files Browse the repository at this point in the history
  • Loading branch information
AskAlexSharov authored Jan 2, 2025
1 parent 85489c5 commit 7316b09
Show file tree
Hide file tree
Showing 19 changed files with 131 additions and 51 deletions.
5 changes: 5 additions & 0 deletions cl/persistence/beacon_indicies/indicies.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ func PruneSignedHeaders(tx kv.RwTx, from uint64) error {
if err != nil {
return err
}
defer cursor.Close()
for k, _, err := cursor.Seek(base_encoding.Encode64ToBytes4(from)); err == nil && k != nil; k, _, err = cursor.Prev() {
if err != nil { //nolint:govet
return err
Expand All @@ -268,6 +269,7 @@ func RangeBlockRoots(ctx context.Context, tx kv.Tx, fromSlot, toSlot uint64, fn
if err != nil {
return err
}
defer cursor.Close()
for k, v, err := cursor.Seek(base_encoding.Encode64ToBytes4(fromSlot)); err == nil && k != nil && base_encoding.Decode64FromBytes4(k) <= toSlot; k, v, err = cursor.Next() {
if !fn(base_encoding.Decode64FromBytes4(k), libcommon.BytesToHash(v)) {
break
Expand All @@ -281,6 +283,7 @@ func PruneBlockRoots(ctx context.Context, tx kv.RwTx, fromSlot, toSlot uint64) e
if err != nil {
return err
}
defer cursor.Close()
for k, _, err := cursor.Seek(base_encoding.Encode64ToBytes4(fromSlot)); err == nil && k != nil && base_encoding.Decode64FromBytes4(k) <= toSlot; k, _, err = cursor.Next() {
if err := cursor.DeleteCurrent(); err != nil {
return err
Expand All @@ -296,6 +299,7 @@ func ReadBeaconBlockRootsInSlotRange(ctx context.Context, tx kv.Tx, fromSlot, co
if err != nil {
return nil, nil, err
}
defer cursor.Close()
currentCount := uint64(0)
for k, v, err := cursor.Seek(base_encoding.Encode64ToBytes4(fromSlot)); err == nil && k != nil && currentCount != count; k, v, err = cursor.Next() {
currentCount++
Expand Down Expand Up @@ -373,6 +377,7 @@ func PruneBlocks(ctx context.Context, tx kv.RwTx, to uint64) error {
if err != nil {
return err
}
defer cursor.Close()
for k, _, err := cursor.First(); err == nil && k != nil; k, _, err = cursor.Prev() {
if len(k) != 40 {
continue
Expand Down
1 change: 1 addition & 0 deletions cmd/evm/internal/t8ntool/transition.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,7 @@ func CalculateStateRoot(tx kv.RwTx) (*libcommon.Hash, error) {
if err != nil {
return nil, err
}
defer c.Close()
h := libcommon.NewHasher()
defer libcommon.ReturnHasherToPool(h)
domains, err := libstate.NewSharedDomains(tx, log.New())
Expand Down
4 changes: 4 additions & 0 deletions cmd/integration/commands/refetence_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ func compareBuckets(ctx context.Context, tx kv.Tx, b string, refTx kv.Tx, refB s
if err != nil {
return err
}
defer c.Close()
k, v, e := c.First()
if e != nil {
return e
Expand All @@ -271,6 +272,7 @@ func compareBuckets(ctx context.Context, tx kv.Tx, b string, refTx kv.Tx, refB s
if err != nil {
return err
}
defer refC.Close()
refK, refV, revErr := refC.First()
if revErr != nil {
return revErr
Expand Down Expand Up @@ -386,6 +388,7 @@ MainLoop:
if err != nil {
return err
}
defer c.Close()

for {
if !fileScanner.Scan() {
Expand Down Expand Up @@ -419,6 +422,7 @@ MainLoop:
logger.Info("Progress", "bucket", bucket, "key", hex.EncodeToString(k))
}
}
c.Close()
err = fileScanner.Err()
if err != nil {
panic(err)
Expand Down
3 changes: 3 additions & 0 deletions cmd/verkle/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ func dump(ctx context.Context, cfg optionsCfg) error {
if err != nil {
return err
}
defer verkleCursor.Close()
for k, v, err := verkleCursor.First(); k != nil; k, v, err = verkleCursor.Next() {
if err != nil {
return err
Expand Down Expand Up @@ -324,6 +325,7 @@ func dump_acc_preimages(ctx context.Context, cfg optionsCfg) error {
if err != nil {
return err
}
defer stateCursor.Close()
num, err := stages.GetStageProgress(tx, stages.Execution)
if err != nil {
return err
Expand Down Expand Up @@ -380,6 +382,7 @@ func dump_storage_preimages(ctx context.Context, cfg optionsCfg, logger log.Logg
if err != nil {
return err
}
defer stateCursor.Close()
num, err := stages.GetStageProgress(tx, stages.Execution)
if err != nil {
return err
Expand Down
2 changes: 2 additions & 0 deletions erigon-lib/kv/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func backupTable(ctx context.Context, src kv.RoDB, srcTx kv.Tx, dst kv.RwDB, tab
if err != nil {
return err
}
defer srcC.Close()
total, _ = srcTx.Count(table)

if err := dst.Update(ctx, func(tx kv.RwTx) error {
Expand All @@ -128,6 +129,7 @@ func backupTable(ctx context.Context, src kv.RoDB, srcTx kv.Tx, dst kv.RwDB, tab
if err != nil {
return err
}
defer c.Close()
casted, isDupsort := c.(kv.RwCursorDupSort)
i := uint64(0)

Expand Down
58 changes: 29 additions & 29 deletions erigon-lib/kv/mdbx/kv_mdbx.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ func (db *MdbxKV) BeginRo(ctx context.Context) (txn kv.Tx, err error) {
db: db,
tx: tx,
readOnly: true,
id: db.leakDetector.Add(),
traceID: db.leakDetector.Add(),
}, nil
}

Expand Down Expand Up @@ -616,31 +616,32 @@ func (db *MdbxKV) beginRw(ctx context.Context, flags uint) (txn kv.RwTx, err err
}

return &MdbxTx{
db: db,
tx: tx,
ctx: ctx,
id: db.leakDetector.Add(),
db: db,
tx: tx,
ctx: ctx,
traceID: db.leakDetector.Add(),
}, nil
}

type MdbxTx struct {
tx *mdbx.Txn
id uint64 // set only if TRACE_TX=true
traceID uint64 // set only if TRACE_TX=true
db *MdbxKV
statelessCursors map[string]kv.RwCursor
readOnly bool
ctx context.Context

toCloseMap map[uint64]kv.Closer
ID uint64
cursorID uint64
}

type MdbxCursor struct {
tx *MdbxTx
toCloseMap map[uint64]kv.Closer
c *mdbx.Cursor
bucketName string
bucketCfg kv.TableCfgItem
isDupSort bool
id uint64
label kv.Label // marker to distinct db instances - one process may open many databases. for example to collect metrics of only 1 database
}

func (db *MdbxKV) Env() *mdbx.Env { return db.env }
Expand Down Expand Up @@ -872,7 +873,7 @@ func (tx *MdbxTx) Commit() error {
} else {
runtime.UnlockOSThread()
}
tx.db.leakDetector.Del(tx.id)
tx.db.leakDetector.Del(tx.traceID)
}()
tx.closeCursors()

Expand Down Expand Up @@ -923,7 +924,7 @@ func (tx *MdbxTx) Rollback() {
} else {
runtime.UnlockOSThread()
}
tx.db.leakDetector.Del(tx.id)
tx.db.leakDetector.Del(tx.traceID)
}()
tx.closeCursors()
//tx.printDebugInfo()
Expand Down Expand Up @@ -956,7 +957,7 @@ func (tx *MdbxTx) statelessCursor(bucket string) (kv.RwCursor, error) {
c, ok := tx.statelessCursors[bucket]
if !ok {
var err error
c, err = tx.RwCursor(bucket)
c, err = tx.RwCursor(bucket) //nolint:gocritic
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1105,9 +1106,8 @@ func (tx *MdbxTx) Cursor(bucket string) (kv.Cursor, error) {
}

func (tx *MdbxTx) stdCursor(bucket string) (kv.RwCursor, error) {
b := tx.db.buckets[bucket]
c := &MdbxCursor{bucketName: bucket, tx: tx, bucketCfg: b, id: tx.ID}
tx.ID++
c := &MdbxCursor{bucketName: bucket, toCloseMap: tx.toCloseMap, label: tx.db.opts.label, isDupSort: tx.db.buckets[bucket].Flags&mdbx.DupSort != 0, id: tx.cursorID}
tx.cursorID++

if tx.tx == nil {
panic("assert: tx.tx nil. seems this `tx` was Rollback'ed")
Expand Down Expand Up @@ -1220,7 +1220,7 @@ func (c *MdbxCursor) Delete(k []byte) error {
return err
}

if c.bucketCfg.Flags&mdbx.DupSort != 0 {
if c.isDupSort {
return c.c.Del(mdbx.AllDups)
}

Expand All @@ -1237,7 +1237,7 @@ func (c *MdbxCursor) PutNoOverwrite(k, v []byte) error { return c.c.Put(k, v, md

func (c *MdbxCursor) Put(key []byte, value []byte) error {
if err := c.c.Put(key, value, 0); err != nil {
return fmt.Errorf("label: %s, table: %s, err: %w", c.tx.db.opts.label, c.bucketName, err)
return fmt.Errorf("label: %s, table: %s, err: %w", c.label, c.bucketName, err)
}
return nil
}
Expand All @@ -1258,15 +1258,15 @@ func (c *MdbxCursor) SeekExact(key []byte) ([]byte, []byte, error) {
// Return error - if provided data will not sorted (or bucket have old records which mess with new in sorting manner).
func (c *MdbxCursor) Append(k []byte, v []byte) error {
if err := c.c.Put(k, v, mdbx.Append); err != nil {
return fmt.Errorf("label: %s, bucket: %s, %w", c.tx.db.opts.label, c.bucketName, err)
return fmt.Errorf("label: %s, bucket: %s, %w", c.label, c.bucketName, err)
}
return nil
}

func (c *MdbxCursor) Close() {
if c.c != nil {
c.c.Close()
delete(c.tx.toCloseMap, c.id)
delete(c.toCloseMap, c.id)
c.c = nil
}
}
Expand Down Expand Up @@ -1381,21 +1381,21 @@ func (c *MdbxDupSortCursor) LastDup() ([]byte, error) {

func (c *MdbxDupSortCursor) Append(k []byte, v []byte) error {
if err := c.c.Put(k, v, mdbx.Append|mdbx.AppendDup); err != nil {
return fmt.Errorf("label: %s, in Append: bucket=%s, %w", c.tx.db.opts.label, c.bucketName, err)
return fmt.Errorf("label: %s, in Append: bucket=%s, %w", c.label, c.bucketName, err)
}
return nil
}

func (c *MdbxDupSortCursor) AppendDup(k []byte, v []byte) error {
if err := c.c.Put(k, v, mdbx.AppendDup); err != nil {
return fmt.Errorf("label: %s, in AppendDup: bucket=%s, %w", c.tx.db.opts.label, c.bucketName, err)
return fmt.Errorf("label: %s, in AppendDup: bucket=%s, %w", c.label, c.bucketName, err)
}
return nil
}

func (c *MdbxDupSortCursor) PutNoDupData(k, v []byte) error {
if err := c.c.Put(k, v, mdbx.NoDupData); err != nil {
return fmt.Errorf("label: %s, in PutNoDupData: %w", c.tx.db.opts.label, err)
return fmt.Errorf("label: %s, in PutNoDupData: %w", c.label, err)
}

return nil
Expand All @@ -1404,7 +1404,7 @@ func (c *MdbxDupSortCursor) PutNoDupData(k, v []byte) error {
// DeleteCurrentDuplicates - delete all of the data items for the current key.
func (c *MdbxDupSortCursor) DeleteCurrentDuplicates() error {
if err := c.c.Del(mdbx.AllDups); err != nil {
return fmt.Errorf("label: %s,in DeleteCurrentDuplicates: %w", c.tx.db.opts.label, err)
return fmt.Errorf("label: %s,in DeleteCurrentDuplicates: %w", c.label, err)
}
return nil
}
Expand Down Expand Up @@ -1456,8 +1456,8 @@ func (tx *MdbxTx) Prefix(table string, prefix []byte) (stream.KV, error) {
}

func (tx *MdbxTx) Range(table string, fromPrefix, toPrefix []byte, asc order.By, limit int) (stream.KV, error) {
s := &cursor2iter{ctx: tx.ctx, tx: tx, fromPrefix: fromPrefix, toPrefix: toPrefix, orderAscend: asc, limit: int64(limit), id: tx.ID}
tx.ID++
s := &cursor2iter{ctx: tx.ctx, tx: tx, fromPrefix: fromPrefix, toPrefix: toPrefix, orderAscend: asc, limit: int64(limit), id: tx.cursorID}
tx.cursorID++
if tx.toCloseMap == nil {
tx.toCloseMap = make(map[uint64]kv.Closer)
}
Expand Down Expand Up @@ -1487,7 +1487,7 @@ func (s *cursor2iter) init(table string, tx kv.Tx) error {
if !s.orderAscend && s.fromPrefix != nil && s.toPrefix != nil && bytes.Compare(s.fromPrefix, s.toPrefix) <= 0 {
return fmt.Errorf("tx.Dual: %x must be lexicographicaly before %x", s.toPrefix, s.fromPrefix)
}
c, err := tx.Cursor(table)
c, err := tx.Cursor(table) //nolint:gocritic
if err != nil {
return err
}
Expand Down Expand Up @@ -1619,8 +1619,8 @@ func (s *cursor2iter) Next() (k, v []byte, err error) {
}

func (tx *MdbxTx) RangeDupSort(table string, key []byte, fromPrefix, toPrefix []byte, asc order.By, limit int) (stream.KV, error) {
s := &cursorDup2iter{ctx: tx.ctx, tx: tx, key: key, fromPrefix: fromPrefix, toPrefix: toPrefix, orderAscend: bool(asc), limit: int64(limit), id: tx.ID}
tx.ID++
s := &cursorDup2iter{ctx: tx.ctx, tx: tx, key: key, fromPrefix: fromPrefix, toPrefix: toPrefix, orderAscend: bool(asc), limit: int64(limit), id: tx.cursorID}
tx.cursorID++
if tx.toCloseMap == nil {
tx.toCloseMap = make(map[uint64]kv.Closer)
}
Expand Down Expand Up @@ -1651,7 +1651,7 @@ func (s *cursorDup2iter) init(table string, tx kv.Tx) error {
if !s.orderAscend && s.fromPrefix != nil && s.toPrefix != nil && bytes.Compare(s.fromPrefix, s.toPrefix) <= 0 {
return fmt.Errorf("tx.Dual: %x must be lexicographicaly before %x", s.toPrefix, s.fromPrefix)
}
c, err := tx.CursorDupSort(table)
c, err := tx.CursorDupSort(table) //nolint:gocritic
if err != nil {
return err
}
Expand Down
7 changes: 4 additions & 3 deletions erigon-lib/kv/membatchwithdb/memory_mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ func initSequences(db kv.Tx, memTx kv.RwTx) error {
if err != nil {
return err
}
defer cursor.Close()
for k, v, err := cursor.First(); k != nil; k, v, err = cursor.Next() {
if err != nil {
return err
Expand Down Expand Up @@ -169,7 +170,7 @@ func (m *MemoryMutation) statelessCursor(table string) (kv.RwCursor, error) {
c, ok := m.statelessCursors[table]
if !ok {
var err error
c, err = m.RwCursor(table)
c, err = m.RwCursor(table) // nolint:gocritic
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -664,11 +665,11 @@ func (m *MemoryMutation) makeCursor(bucket string) (kv.RwCursorDupSort, error) {
c.table = bucket

var err error
c.cursor, err = m.db.CursorDupSort(bucket)
c.cursor, err = m.db.CursorDupSort(bucket) //nolint:gocritic
if err != nil {
return nil, err
}
c.memCursor, err = m.memTx.RwCursorDupSort(bucket)
c.memCursor, err = m.memTx.RwCursorDupSort(bucket) //nolint:gocritic
if err != nil {
return nil, err
}
Expand Down
7 changes: 4 additions & 3 deletions erigon-lib/kv/remotedb/kv_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func (tx *tx) statelessCursor(bucket string) (kv.Cursor, error) {
c, ok := tx.statelessCursors[bucket]
if !ok {
var err error
c, err = tx.Cursor(bucket)
c, err = tx.Cursor(bucket) // nolint:gocritic
if err != nil {
return nil, err
}
Expand All @@ -282,6 +282,7 @@ func (tx *tx) ForEach(bucket string, fromPrefix []byte, walker func(k, v []byte)
if err != nil {
return err
}
defer it.Close()
for it.HasNext() {
k, v, err := it.Next()
if err != nil {
Expand Down Expand Up @@ -689,9 +690,9 @@ func (tx *tx) IndexRange(name kv.InvertedIdx, k []byte, fromTs, toTs int, asc or
func (tx *tx) Prefix(table string, prefix []byte) (stream.KV, error) {
nextPrefix, ok := kv.NextSubtree(prefix)
if !ok {
return tx.Range(table, prefix, nil, order.Asc, kv.Unlim)
return tx.Range(table, prefix, nil, order.Asc, kv.Unlim) //nolint:gocritic
}
return tx.Range(table, prefix, nextPrefix, order.Asc, kv.Unlim)
return tx.Range(table, prefix, nextPrefix, order.Asc, kv.Unlim) //nolint:gocritic
}

func (tx *tx) rangeOrderLimit(table string, fromPrefix, toPrefix []byte, asc order.By, limit int) (stream.KV, error) {
Expand Down
Loading

0 comments on commit 7316b09

Please sign in to comment.