Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 10 additions & 28 deletions db/integrity/rcache_no_duplicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"golang.org/x/sync/errgroup"

"github.com/erigontech/erigon/common"
"github.com/erigontech/erigon/common/log/v3"
"github.com/erigontech/erigon/db/kv"
"github.com/erigontech/erigon/db/rawdb"
Expand Down Expand Up @@ -67,35 +66,18 @@ func RCacheNoDupsRange(ctx context.Context, fromBlock, toBlock uint64, tx kv.Tem
var _min, _max uint64
_min, _ = txNumsReader.Min(tx, fromBlock)
_max, _ = txNumsReader.Max(tx, fromBlock)
for txNum := fromTxNum; txNum <= toTxNum; txNum++ {
r, found, err := rawdb.ReadReceiptCacheV2(tx, rawdb.RCacheV2Query{
TxNum: txNum,
BlockNum: blockNum,
BlockHash: common.Hash{}, // don't care about blockHash/txnHash
TxnHash: common.Hash{},
DontCalcBloom: true, // we don't need bloom for this check
})

it, err := rawdb.ReceiptCacheV2Stream(tx, fromTxNum, toTxNum)
if err != nil {
return err
}
defer it.Close()

for it.HasNext() {
txNum, r, err := it.Next()
if err != nil {
return err
}
if !found {
if txNum == _max {
blockNum++
_min = _max + 1
_max, _ = txNumsReader.Max(tx, blockNum)
expectedFirstLogIdx = 0
prevCumUsedGas = -1
continue // skip system txs
}
if txNum == _min {
continue
}
if failFast {
return fmt.Errorf("[integrity] RCacheNoDups: missing receipt for block %d, txNum %d", blockNum, txNum)
}
log.Warn("[integrity] RCacheNoDups: missing receipt", "block", blockNum, "txNum", txNum)
continue
}

logIdx := r.FirstLogIndexWithinBlock
exactLogIdx := logIdx == expectedFirstLogIdx
Expand All @@ -119,7 +101,7 @@ func RCacheNoDupsRange(ctx context.Context, fromBlock, toBlock uint64, tx kv.Tem
}
prevCumUsedGas = int(cumUsedGas)

if txNum == _max {
for txNum >= _max {
blockNum++
_min = _max + 1
_max, _ = txNumsReader.Max(tx, blockNum)
Expand Down
23 changes: 23 additions & 0 deletions db/kv/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,29 @@ func (m *TransformedDuo[K, v]) Close() {
}
}

// TransformedDuoV - analog `map` (in terms of map-filter-reduce pattern) but with different value type
type TransformedDuoV[K, V, VR any] struct {
it Duo[K, V]
transform func(K, V) (K, VR, error)
}

func TransformDuoV[K, V, VR any](it Duo[K, V], transform func(K, V) (K, VR, error)) *TransformedDuoV[K, V, VR] {
return &TransformedDuoV[K, V, VR]{it: it, transform: transform}
}
func (m *TransformedDuoV[K, V, VR]) HasNext() bool { return m.it.HasNext() }
func (m *TransformedDuoV[K, V, VR]) Next() (k K, vr VR, err error) {
k, v, err := m.it.Next()
if err != nil {
return k, vr, err
}
return m.transform(k, v)
}
func (m *TransformedDuoV[K, V, VR]) Close() {
if x, ok := m.it.(Closer); ok {
x.Close()
}
}

// FilteredDuo - analog `map` (in terms of map-filter-reduce pattern)
// please avoid reading from Disk/DB more elements and then filter them. Better
// push-down filter conditions to lower-level iterator to reduce disk reads amount.
Expand Down
23 changes: 23 additions & 0 deletions db/rawdb/accessors_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/erigontech/erigon/db/kv"
"github.com/erigontech/erigon/db/kv/dbutils"
"github.com/erigontech/erigon/db/kv/rawdbv3"
"github.com/erigontech/erigon/db/kv/stream"
"github.com/erigontech/erigon/db/rawdb/utils"
"github.com/erigontech/erigon/execution/rlp"
"github.com/erigontech/erigon/execution/types"
Expand Down Expand Up @@ -1222,6 +1223,28 @@ type RCacheV2Query struct {
DontCalcBloom bool // avoid calculating bloom (can be bottleneck)
}

// doesn't do DeriveFieldsV4ForCachedReceipt
func ReceiptCacheV2Stream(tx kv.TemporalTx, fromTxNum, toTxNum uint64) (stream.Duo[uint64, *types.Receipt], error) {
it, err := tx.Debug().KeyTrace(kv.RCacheDomain, receiptCacheKey, fromTxNum, toTxNum)
if err != nil {
return nil, err
}

return stream.TransformDuoV(it, func(txNum uint64, v []byte) (uint64, *types.Receipt, error) {
if len(v) == 0 {
return txNum, nil, nil
}

receipt := &types.ReceiptForStorage{}
if err := rlp.DecodeBytes(v, receipt); err != nil {
return txNum, nil, fmt.Errorf("%w, of txNum %d, len(v)=%d", err, txNum, len(v))
}
res := (*types.Receipt)(receipt)
//res.DeriveFieldsV4ForCachedReceipt(query.BlockHash, query.BlockNum, query.TxnHash, !query.DontCalcBloom)
return txNum, res, nil
}), nil
}

func ReadReceiptCacheV2(tx kv.TemporalTx, query RCacheV2Query) (*types.Receipt, bool, error) {
v, ok, err := tx.HistorySeek(kv.RCacheDomain, receiptCacheKey, query.TxNum+1 /*history storing value BEFORE-change*/)
if err != nil {
Expand Down