Skip to content

Commit

Permalink
core/txpool/blobpool: post-crash cleanup and addition/removal metrics…
Browse files Browse the repository at this point in the history
… (#28914)

* core/txpool/blobpool: clean up resurrected junk after a crash

* core/txpool/blobpool: track transaction insertions and rejections

* core/txpool/blobpool: linnnnnnnt
  • Loading branch information
karalabe authored Feb 2, 2024
1 parent 06a8711 commit 62affdc
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 18 deletions.
74 changes: 68 additions & 6 deletions core/txpool/blobpool/blobpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,8 @@ func (p *BlobPool) Init(gasTip *big.Int, head *types.Header, reserve txpool.Addr

if len(fails) > 0 {
log.Warn("Dropping invalidated blob transactions", "ids", fails)
dropInvalidMeter.Mark(int64(len(fails)))

for _, id := range fails {
if err := p.store.Delete(id); err != nil {
p.Close()
Expand Down Expand Up @@ -467,7 +469,13 @@ func (p *BlobPool) parseTransaction(id uint64, size uint32, blob []byte) error {
}

meta := newBlobTxMeta(id, size, tx)

if _, exists := p.lookup[meta.hash]; exists {
// This path is only possible after a crash, where deleted items are not
// removed via the normal shutdown-startup procedure and thus may get
// partially resurrected.
log.Error("Rejecting duplicate blob pool entry", "id", id, "hash", tx.Hash())
return errors.New("duplicate blob entry")
}
sender, err := p.signer.Sender(tx)
if err != nil {
// This path is impossible unless the signature validity changes across
Expand Down Expand Up @@ -537,8 +545,10 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6

if gapped {
log.Warn("Dropping dangling blob transactions", "from", addr, "missing", next, "drop", nonces, "ids", ids)
dropDanglingMeter.Mark(int64(len(ids)))
} else {
log.Trace("Dropping filled blob transactions", "from", addr, "filled", nonces, "ids", ids)
dropFilledMeter.Mark(int64(len(ids)))
}
for _, id := range ids {
if err := p.store.Delete(id); err != nil {
Expand Down Expand Up @@ -569,6 +579,8 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
txs = txs[1:]
}
log.Trace("Dropping overlapped blob transactions", "from", addr, "overlapped", nonces, "ids", ids, "left", len(txs))
dropOverlappedMeter.Mark(int64(len(ids)))

for _, id := range ids {
if err := p.store.Delete(id); err != nil {
log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err)
Expand Down Expand Up @@ -600,10 +612,30 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
}
continue
}
// Sanity check that there's no double nonce. This case would be a coding
// error, but better know about it
// Sanity check that there's no double nonce. This case would generally
// be a coding error, so better know about it.
//
// Also, Billy behind the blobpool does not journal deletes. A process
// crash would result in previously deleted entities being resurrected.
// That could potentially cause a duplicate nonce to appear.
if txs[i].nonce == txs[i-1].nonce {
log.Error("Duplicate nonce blob transaction", "from", addr, "nonce", txs[i].nonce)
id := p.lookup[txs[i].hash]

log.Error("Dropping repeat nonce blob transaction", "from", addr, "nonce", txs[i].nonce, "id", id)
dropRepeatedMeter.Mark(1)

p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], txs[i].costCap)
p.stored -= uint64(txs[i].size)
delete(p.lookup, txs[i].hash)

if err := p.store.Delete(id); err != nil {
log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err)
}
txs = append(txs[:i], txs[i+1:]...)
p.index[addr] = txs

i--
continue
}
// Otherwise if there's a nonce gap evict all later transactions
var (
Expand All @@ -621,6 +653,8 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
txs = txs[:i]

log.Error("Dropping gapped blob transactions", "from", addr, "missing", txs[i-1].nonce+1, "drop", nonces, "ids", ids)
dropGappedMeter.Mark(int64(len(ids)))

for _, id := range ids {
if err := p.store.Delete(id); err != nil {
log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err)
Expand Down Expand Up @@ -665,6 +699,8 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
p.index[addr] = txs
}
log.Warn("Dropping overdrafted blob transactions", "from", addr, "balance", balance, "spent", spent, "drop", nonces, "ids", ids)
dropOverdraftedMeter.Mark(int64(len(ids)))

for _, id := range ids {
if err := p.store.Delete(id); err != nil {
log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err)
Expand Down Expand Up @@ -695,6 +731,8 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
p.index[addr] = txs

log.Warn("Dropping overcapped blob transactions", "from", addr, "kept", len(txs), "drop", nonces, "ids", ids)
dropOvercappedMeter.Mark(int64(len(ids)))

for _, id := range ids {
if err := p.store.Delete(id); err != nil {
log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err)
Expand Down Expand Up @@ -952,7 +990,7 @@ func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error {
return err
}

// Update the indixes and metrics
// Update the indices and metrics
meta := newBlobTxMeta(id, p.store.Size(id), tx)
if _, ok := p.index[addr]; !ok {
if err := p.reserve(addr, true); err != nil {
Expand Down Expand Up @@ -1019,6 +1057,8 @@ func (p *BlobPool) SetGasTip(tip *big.Int) {
}
// Clear out the transactions from the data store
log.Warn("Dropping underpriced blob transaction", "from", addr, "rejected", tx.nonce, "tip", tx.execTipCap, "want", tip, "drop", nonces, "ids", ids)
dropUnderpricedMeter.Mark(int64(len(ids)))

for _, id := range ids {
if err := p.store.Delete(id); err != nil {
log.Error("Failed to delete dropped transaction", "id", id, "err", err)
Expand Down Expand Up @@ -1198,13 +1238,30 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) {
// Ensure the transaction is valid from all perspectives
if err := p.validateTx(tx); err != nil {
log.Trace("Transaction validation failed", "hash", tx.Hash(), "err", err)
switch {
case errors.Is(err, txpool.ErrUnderpriced):
addUnderpricedMeter.Mark(1)
case errors.Is(err, core.ErrNonceTooLow):
addStaleMeter.Mark(1)
case errors.Is(err, core.ErrNonceTooHigh):
addGappedMeter.Mark(1)
case errors.Is(err, core.ErrInsufficientFunds):
addOverdraftedMeter.Mark(1)
case errors.Is(err, txpool.ErrAccountLimitExceeded):
addOvercappedMeter.Mark(1)
case errors.Is(err, txpool.ErrReplaceUnderpriced):
addNoreplaceMeter.Mark(1)
default:
addInvalidMeter.Mark(1)
}
return err
}
// If the address is not yet known, request exclusivity to track the account
// only by this subpool until all transactions are evicted
from, _ := types.Sender(p.signer, tx) // already validated above
if _, ok := p.index[from]; !ok {
if err := p.reserve(from, true); err != nil {
addNonExclusiveMeter.Mark(1)
return err
}
defer func() {
Expand Down Expand Up @@ -1244,6 +1301,8 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) {
}
if len(p.index[from]) > offset {
// Transaction replaces a previously queued one
dropReplacedMeter.Mark(1)

prev := p.index[from][offset]
if err := p.store.Delete(prev.id); err != nil {
// Shitty situation, but try to recover gracefully instead of going boom
Expand Down Expand Up @@ -1322,6 +1381,7 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) {
}
p.updateStorageMetrics()

addValidMeter.Mark(1)
return nil
}

Expand Down Expand Up @@ -1371,7 +1431,9 @@ func (p *BlobPool) drop() {
}
}
// Remove the transaction from the data store
log.Warn("Evicting overflown blob transaction", "from", from, "evicted", drop.nonce, "id", drop.id)
log.Debug("Evicting overflown blob transaction", "from", from, "evicted", drop.nonce, "id", drop.id)
dropOverflownMeter.Mark(1)

if err := p.store.Delete(drop.id); err != nil {
log.Error("Failed to drop evicted transaction", "id", drop.id, "err", err)
}
Expand Down
71 changes: 61 additions & 10 deletions core/txpool/blobpool/blobpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,16 @@ func verifyPoolInternals(t *testing.T, pool *BlobPool) {
// - 1. A transaction that cannot be decoded must be dropped
// - 2. A transaction that cannot be recovered (bad signature) must be dropped
// - 3. All transactions after a nonce gap must be dropped
// - 4. All transactions after an underpriced one (including it) must be dropped
// - 4. All transactions after an already included nonce must be dropped
// - 5. All transactions after an underpriced one (including it) must be dropped
// - 6. All transactions after an overdrafting sequence must be dropped
// - 7. All transactions exceeding the per-account limit must be dropped
//
// Furthermore, some strange corner-cases can also occur after a crash, as Billy's
// simplicity also allows it to resurrect past deleted entities:
//
// - 8. Fully duplicate transactions (matching hash) must be dropped
// - 9. Duplicate nonces from the same account must be dropped
func TestOpenDrops(t *testing.T) {
log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelTrace, true)))

Expand Down Expand Up @@ -338,7 +347,7 @@ func TestOpenDrops(t *testing.T) {
badsig, _ := store.Put(blob)

// Insert a sequence of transactions with a nonce gap in between to verify
// that anything gapped will get evicted (case 3)
// that anything gapped will get evicted (case 3).
var (
gapper, _ = crypto.GenerateKey()

Expand All @@ -357,7 +366,7 @@ func TestOpenDrops(t *testing.T) {
}
}
// Insert a sequence of transactions with a gapped starting nonce to verify
// that the entire set will get dropped.
// that the entire set will get dropped (case 3).
var (
dangler, _ = crypto.GenerateKey()
dangling = make(map[uint64]struct{})
Expand All @@ -370,7 +379,7 @@ func TestOpenDrops(t *testing.T) {
dangling[id] = struct{}{}
}
// Insert a sequence of transactions with already passed nonces to veirfy
// that the entire set will get dropped.
// that the entire set will get dropped (case 4).
var (
filler, _ = crypto.GenerateKey()
filled = make(map[uint64]struct{})
Expand All @@ -383,7 +392,7 @@ func TestOpenDrops(t *testing.T) {
filled[id] = struct{}{}
}
// Insert a sequence of transactions with partially passed nonces to veirfy
// that the included part of the set will get dropped
// that the included part of the set will get dropped (case 4).
var (
overlapper, _ = crypto.GenerateKey()
overlapped = make(map[uint64]struct{})
Expand All @@ -400,7 +409,7 @@ func TestOpenDrops(t *testing.T) {
}
}
// Insert a sequence of transactions with an underpriced first to verify that
// the entire set will get dropped (case 4).
// the entire set will get dropped (case 5).
var (
underpayer, _ = crypto.GenerateKey()
underpaid = make(map[uint64]struct{})
Expand All @@ -419,7 +428,7 @@ func TestOpenDrops(t *testing.T) {
}

// Insert a sequence of transactions with an underpriced in between to verify
// that it and anything newly gapped will get evicted (case 4).
// that it and anything newly gapped will get evicted (case 5).
var (
outpricer, _ = crypto.GenerateKey()
outpriced = make(map[uint64]struct{})
Expand All @@ -441,7 +450,7 @@ func TestOpenDrops(t *testing.T) {
}
}
// Insert a sequence of transactions fully overdrafted to verify that the
// entire set will get invalidated.
// entire set will get invalidated (case 6).
var (
exceeder, _ = crypto.GenerateKey()
exceeded = make(map[uint64]struct{})
Expand All @@ -459,7 +468,7 @@ func TestOpenDrops(t *testing.T) {
exceeded[id] = struct{}{}
}
// Insert a sequence of transactions partially overdrafted to verify that part
// of the set will get invalidated.
// of the set will get invalidated (case 6).
var (
overdrafter, _ = crypto.GenerateKey()
overdrafted = make(map[uint64]struct{})
Expand All @@ -481,7 +490,7 @@ func TestOpenDrops(t *testing.T) {
}
}
// Insert a sequence of transactions overflowing the account cap to verify
// that part of the set will get invalidated.
// that part of the set will get invalidated (case 7).
var (
overcapper, _ = crypto.GenerateKey()
overcapped = make(map[uint64]struct{})
Expand All @@ -496,6 +505,42 @@ func TestOpenDrops(t *testing.T) {
overcapped[id] = struct{}{}
}
}
// Insert a batch of duplicated transactions to verify that only one of each
// version will remain (case 8).
var (
duplicater, _ = crypto.GenerateKey()
duplicated = make(map[uint64]struct{})
)
for _, nonce := range []uint64{0, 1, 2} {
blob, _ := rlp.EncodeToBytes(makeTx(nonce, 1, 1, 1, duplicater))

for i := 0; i < int(nonce)+1; i++ {
id, _ := store.Put(blob)
if i == 0 {
valids[id] = struct{}{}
} else {
duplicated[id] = struct{}{}
}
}
}
// Insert a batch of duplicated nonces to verify that only one of each will
// remain (case 9).
var (
repeater, _ = crypto.GenerateKey()
repeated = make(map[uint64]struct{})
)
for _, nonce := range []uint64{0, 1, 2} {
for i := 0; i < int(nonce)+1; i++ {
blob, _ := rlp.EncodeToBytes(makeTx(nonce, 1, uint64(i)+1 /* unique hashes */, 1, repeater))

id, _ := store.Put(blob)
if i == 0 {
valids[id] = struct{}{}
} else {
repeated[id] = struct{}{}
}
}
}
store.Close()

// Create a blob pool out of the pre-seeded data
Expand All @@ -511,6 +556,8 @@ func TestOpenDrops(t *testing.T) {
statedb.AddBalance(crypto.PubkeyToAddress(exceeder.PublicKey), uint256.NewInt(1000000))
statedb.AddBalance(crypto.PubkeyToAddress(overdrafter.PublicKey), uint256.NewInt(1000000))
statedb.AddBalance(crypto.PubkeyToAddress(overcapper.PublicKey), uint256.NewInt(10000000))
statedb.AddBalance(crypto.PubkeyToAddress(duplicater.PublicKey), uint256.NewInt(1000000))
statedb.AddBalance(crypto.PubkeyToAddress(repeater.PublicKey), uint256.NewInt(1000000))
statedb.Commit(0, true)

chain := &testBlockChain{
Expand Down Expand Up @@ -554,6 +601,10 @@ func TestOpenDrops(t *testing.T) {
t.Errorf("partially overdrafted transaction remained in storage: %d", tx.id)
} else if _, ok := overcapped[tx.id]; ok {
t.Errorf("overcapped transaction remained in storage: %d", tx.id)
} else if _, ok := duplicated[tx.id]; ok {
t.Errorf("duplicated transaction remained in storage: %d", tx.id)
} else if _, ok := repeated[tx.id]; ok {
t.Errorf("repeated nonce transaction remained in storage: %d", tx.id)
} else {
alive[tx.id] = struct{}{}
}
Expand Down
31 changes: 29 additions & 2 deletions core/txpool/blobpool/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ var (
pooltipGauge = metrics.NewRegisteredGauge("blobpool/pooltip", nil)

// addwait/time, resetwait/time and getwait/time track the rough health of
// the pool and whether or not it's capable of keeping up with the load from
// the network.
// the pool and whether it's capable of keeping up with the load from the
// network.
addwaitHist = metrics.NewRegisteredHistogram("blobpool/addwait", nil, metrics.NewExpDecaySample(1028, 0.015))
addtimeHist = metrics.NewRegisteredHistogram("blobpool/addtime", nil, metrics.NewExpDecaySample(1028, 0.015))
getwaitHist = metrics.NewRegisteredHistogram("blobpool/getwait", nil, metrics.NewExpDecaySample(1028, 0.015))
Expand All @@ -75,4 +75,31 @@ var (
pendtimeHist = metrics.NewRegisteredHistogram("blobpool/pendtime", nil, metrics.NewExpDecaySample(1028, 0.015))
resetwaitHist = metrics.NewRegisteredHistogram("blobpool/resetwait", nil, metrics.NewExpDecaySample(1028, 0.015))
resettimeHist = metrics.NewRegisteredHistogram("blobpool/resettime", nil, metrics.NewExpDecaySample(1028, 0.015))

// The below metrics track various cases where transactions are dropped out
// of the pool. Most are exceptional, some are chain progression and some
// threshold cappings.
dropInvalidMeter = metrics.NewRegisteredMeter("blobpool/drop/invalid", nil) // Invalid transaction, consensus change or bugfix, neutral-ish
dropDanglingMeter = metrics.NewRegisteredMeter("blobpool/drop/dangling", nil) // First nonce gapped, bad
dropFilledMeter = metrics.NewRegisteredMeter("blobpool/drop/filled", nil) // State full-overlap, chain progress, ok
dropOverlappedMeter = metrics.NewRegisteredMeter("blobpool/drop/overlapped", nil) // State partial-overlap, chain progress, ok
dropRepeatedMeter = metrics.NewRegisteredMeter("blobpool/drop/repeated", nil) // Repeated nonce, bad
dropGappedMeter = metrics.NewRegisteredMeter("blobpool/drop/gapped", nil) // Non-first nonce gapped, bad
dropOverdraftedMeter = metrics.NewRegisteredMeter("blobpool/drop/overdrafted", nil) // Balance exceeded, bad
dropOvercappedMeter = metrics.NewRegisteredMeter("blobpool/drop/overcapped", nil) // Per-account cap exceeded, bad
dropOverflownMeter = metrics.NewRegisteredMeter("blobpool/drop/overflown", nil) // Global disk cap exceeded, neutral-ish
dropUnderpricedMeter = metrics.NewRegisteredMeter("blobpool/drop/underpriced", nil) // Gas tip changed, neutral
dropReplacedMeter = metrics.NewRegisteredMeter("blobpool/drop/replaced", nil) // Transaction replaced, neutral

// The below metrics track various outcomes of transactions being added to
// the pool.
addInvalidMeter = metrics.NewRegisteredMeter("blobpool/add/invalid", nil) // Invalid transaction, reject, neutral
addUnderpricedMeter = metrics.NewRegisteredMeter("blobpool/add/underpriced", nil) // Gas tip too low, neutral
addStaleMeter = metrics.NewRegisteredMeter("blobpool/add/stale", nil) // Nonce already filled, reject, bad-ish
addGappedMeter = metrics.NewRegisteredMeter("blobpool/add/gapped", nil) // Nonce gapped, reject, bad-ish
addOverdraftedMeter = metrics.NewRegisteredMeter("blobpool/add/overdrafted", nil) // Balance exceeded, reject, neutral
addOvercappedMeter = metrics.NewRegisteredMeter("blobpool/add/overcapped", nil) // Per-account cap exceeded, reject, neutral
addNoreplaceMeter = metrics.NewRegisteredMeter("blobpool/add/noreplace", nil) // Replacement fees or tips too low, neutral
addNonExclusiveMeter = metrics.NewRegisteredMeter("blobpool/add/nonexclusive", nil) // Plain transaction from same account exists, reject, neutral
addValidMeter = metrics.NewRegisteredMeter("blobpool/add/valid", nil) // Valid transaction, add, neutral
)

0 comments on commit 62affdc

Please sign in to comment.