Skip to content

Commit

Permalink
Merge pull request #44 from andyzhang2023/impr/add-some-metrics-txpool
Browse files Browse the repository at this point in the history
Impr/add some metrics txpool
  • Loading branch information
andyzhang2023 authored Jan 4, 2024
2 parents 23d7cb5 + adc0523 commit 95d5458
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 2 deletions.
68 changes: 68 additions & 0 deletions core/txpool/invalid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package txpool

import (
"github.com/ethereum/go-ethereum/metrics"
)

const (
AlreadyKnown = "AlreadyKnown"
TypeNotSupportDeposit = "TypeNotSupportDeposit"
TypeNotSupport1559 = "TypeNotSupport1559"
TypeNotSupport2718 = "TypeNotSupport2718"
MissingTransaction = "MissingTransaction"
OversizedData = "OversizedData"
MaxInitCodeSizeExceeded = "MaxInitCodeSizeExceeded"
NegativeValue = "NegativeValue"
GasLimit = "GasLimit"
FeeCapVeryHigh = "FeeCapVeryHigh"
TipVeryHigh = "TipVeryHigh"
TipAboveFeeCap = "TipAboveFeeCap"
InvalidSender = "InvalidSender"
Underpriced = "Underpriced"
NonceTooLow = "NonceTooLow"
InsufficientFunds = "InsufficientFunds"
Overdraft = "Overdraft"
IntrinsicGas = "IntrinsicGas"
Throttle = "Throttle"
Overflow = "Overflow"
FutureReplacePending = "FutureReplacePending"
ReplaceUnderpriced = "ReplaceUnderpriced"
QueuedDiscard = "QueueDiscard"
GasUnitOverflow = "GasUnitOverflow"
)

func meter(err string) metrics.Meter {
return metrics.GetOrRegisterMeter("txpool/invalid/"+err, nil)
}

func init() {
// init the metrics
for _, err := range []string{
AlreadyKnown,
TypeNotSupportDeposit,
TypeNotSupport1559,
TypeNotSupport2718,
MissingTransaction,
OversizedData,
MaxInitCodeSizeExceeded,
NegativeValue,
GasLimit,
FeeCapVeryHigh,
TipVeryHigh,
TipAboveFeeCap,
InvalidSender,
Underpriced,
NonceTooLow,
InsufficientFunds,
Overdraft,
IntrinsicGas,
Throttle,
Overflow,
FutureReplacePending,
ReplaceUnderpriced,
QueuedDiscard,
GasUnitOverflow,
} {
meter(err).Mark(0)
}
}
27 changes: 27 additions & 0 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,55 +653,68 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
// This is for spam protection, not consensus,
// as the external engine-API user authenticates deposits.
if tx.Type() == types.DepositTxType {
meter(TypeNotSupportDeposit).Mark(1)
return core.ErrTxTypeNotSupported
}
// Accept only legacy transactions until EIP-2718/2930 activates.
if !pool.eip2718 && tx.Type() != types.LegacyTxType {
meter(TypeNotSupport2718).Mark(1)
return core.ErrTxTypeNotSupported
}
// Reject dynamic fee transactions until EIP-1559 activates.
if !pool.eip1559 && tx.Type() == types.DynamicFeeTxType {
meter(TypeNotSupport1559).Mark(1)
return core.ErrTxTypeNotSupported
}
// Reject transactions over defined size to prevent DOS attacks
if tx.Size() > txMaxSize {
meter(OversizedData).Mark(1)
return ErrOversizedData
}
// Check whether the init code size has been exceeded.
if pool.shanghai && tx.To() == nil && len(tx.Data()) > params.MaxInitCodeSize {
meter(MaxInitCodeSizeExceeded).Mark(1)
return fmt.Errorf("%w: code size %v limit %v", core.ErrMaxInitCodeSizeExceeded, len(tx.Data()), params.MaxInitCodeSize)
}
// Transactions can't be negative. This may never happen using RLP decoded
// transactions but may occur if you create a transaction using the RPC.
if tx.Value().Sign() < 0 {
meter(NegativeValue).Mark(1)
return ErrNegativeValue
}
// Ensure the transaction doesn't exceed the current block limit gas.
if pool.currentMaxGas < tx.Gas() {
meter(GasLimit).Mark(1)
return ErrGasLimit
}
// Sanity check for extremely large numbers
if tx.GasFeeCap().BitLen() > 256 {
meter(FeeCapVeryHigh).Mark(1)
return core.ErrFeeCapVeryHigh
}
if tx.GasTipCap().BitLen() > 256 {
meter(TipVeryHigh).Mark(1)
return core.ErrTipVeryHigh
}
// Ensure gasFeeCap is greater than or equal to gasTipCap.
if tx.GasFeeCapIntCmp(tx.GasTipCap()) < 0 {
meter(TipAboveFeeCap).Mark(1)
return core.ErrTipAboveFeeCap
}
// Make sure the transaction is signed properly.
from, err := types.Sender(pool.signer, tx)
if err != nil {
meter(InvalidSender).Mark(1)
return ErrInvalidSender
}
// Drop non-local transactions under our own minimal accepted gas price or tip
if !local && tx.GasTipCapIntCmp(pool.gasPrice) < 0 {
meter(Underpriced).Mark(1)
return ErrUnderpriced
}
// Ensure the transaction adheres to nonce ordering
if pool.currentState.GetNonce(from) > tx.Nonce() {
meter(NonceTooLow).Mark(1)
return core.ErrNonceTooLow
}
// Transactor should have enough funds to cover the costs
Expand All @@ -712,6 +725,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
}
balance := pool.currentState.GetBalance(from)
if balance.Cmp(cost) < 0 {
meter(InsufficientFunds).Mark(1)
return core.ErrInsufficientFunds
}

Expand All @@ -729,16 +743,19 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
}
if balance.Cmp(sum) < 0 {
log.Trace("Replacing transactions would overdraft", "sender", from, "balance", pool.currentState.GetBalance(from), "required", sum)
meter(Overdraft).Mark(1)
return ErrOverdraft
}
}

// Ensure the transaction has more gas than the basic tx fee.
intrGas, err := core.IntrinsicGas(tx.Data(), tx.AccessList(), tx.To() == nil, true, pool.istanbul, pool.shanghai)
if err != nil {
meter(GasUnitOverflow).Mark(1)
return err
}
if tx.Gas() < intrGas {
meter(IntrinsicGas).Mark(1)
return core.ErrIntrinsicGas
}
return nil
Expand All @@ -757,6 +774,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
if pool.all.Get(hash) != nil {
log.Trace("Discarding already known transaction", "hash", hash)
knownTxMeter.Mark(1)
meter(AlreadyKnown).Mark(1)
return false, ErrAlreadyKnown
}
// Make the local flag. If it's from local source or it's from the network but
Expand All @@ -779,6 +797,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
if !isLocal && pool.priced.Underpriced(tx) {
log.Trace("Discarding underpriced transaction", "hash", hash, "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
underpricedTxMeter.Mark(1)
meter(Underpriced).Mark(1)
return false, ErrUnderpriced
}

Expand All @@ -788,6 +807,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
// replacements to 25% of the slots
if pool.changesSinceReorg > int(pool.config.GlobalSlots/4) {
throttleTxMeter.Mark(1)
meter(Throttle).Mark(1)
return false, ErrTxPoolOverflow
}

Expand All @@ -800,6 +820,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
if !isLocal && !success {
log.Trace("Discarding overflown transaction", "hash", hash)
overflowedTxMeter.Mark(1)
meter(Overflow).Mark(1)
return false, ErrTxPoolOverflow
}

Expand All @@ -819,6 +840,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
pool.priced.Put(dropTx, false)
}
log.Trace("Discarding future transaction replacing pending tx", "hash", hash)
meter(FutureReplacePending).Mark(1)
return false, ErrFutureReplacePending
}
}
Expand All @@ -838,6 +860,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
inserted, old := list.Add(tx, pool.config.PriceBump)
if !inserted {
pendingDiscardMeter.Mark(1)
meter(ReplaceUnderpriced).Mark(1)
return false, ErrReplaceUnderpriced
}
// New transaction is better, replace old one
Expand Down Expand Up @@ -903,6 +926,7 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction, local boo
if !inserted {
// An older transaction was better, discard this
queuedDiscardMeter.Mark(1)
meter(QueuedDiscard).Mark(1)
return false, ErrReplaceUnderpriced
}
// Discard any previous transaction and mark this
Expand All @@ -917,6 +941,7 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction, local boo
// If the transaction isn't in lookup set but it's expected to be there,
// show the error log.
if pool.all.Get(hash) == nil && !addAll {
meter(MissingTransaction).Mark(1)
log.Error("Missing transaction in lookup set, please report the issue", "hash", hash)
}
if addAll {
Expand Down Expand Up @@ -1035,6 +1060,7 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error {
if pool.all.Get(tx.Hash()) != nil {
errs[i] = ErrAlreadyKnown
knownTxMeter.Mark(1)
meter(AlreadyKnown).Mark(1)
continue
}
// Exclude transactions with invalid signatures as soon as
Expand All @@ -1044,6 +1070,7 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error {
if err != nil {
errs[i] = ErrInvalidSender
invalidTxMeter.Mark(1)
meter(InvalidSender).Mark(1)
continue
}
// Accumulate all unknown transactions for deeper processing
Expand Down
6 changes: 6 additions & 0 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,11 +672,17 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) {
directPeers++
directCount += len(hashes)
peer.AsyncSendTransactions(hashes)
log.Trace("Transaction broadcast bodies", "txs", len(hashes),
"peer.id", peer.Node().ID().String(), "peer.IP", peer.Node().IP().String(),
)
}
for peer, hashes := range annos {
annoPeers++
annoCount += len(hashes)
peer.AsyncSendPooledTransactionHashes(hashes)
log.Trace("Transaction broadcast hashes", "txs", len(hashes),
"peer.id", peer.Node().ID().String(), "peer.IP", peer.Node().IP().String(),
)
}
log.Debug("Transaction broadcast", "txs", len(txs),
"announce packs", annoPeers, "announced hashes", annoCount,
Expand Down
21 changes: 19 additions & 2 deletions eth/protocols/eth/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package eth

import (
"math/big"
"strings"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/gopool"
Expand Down Expand Up @@ -61,6 +62,22 @@ func (p *Peer) broadcastBlocks() {
}
}

func collectHashes(txs []*types.Transaction) []common.Hash {
hashes := make([]common.Hash, len(txs))
for i, tx := range txs {
hashes[i] = tx.Hash()
}
return hashes
}

func concat(hashes []common.Hash) string {
strslice := make([]string, len(hashes))
for i, hash := range hashes {
strslice[i] = hash.String()
}
return strings.Join(strslice, ",")
}

// broadcastTransactions is a write loop that schedules transaction broadcasts
// to the remote peer. The goal is to have an async writer that does not lock up
// node internals and at the same time rate limits queued data.
Expand Down Expand Up @@ -98,7 +115,7 @@ func (p *Peer) broadcastTransactions() {
return
}
close(done)
p.Log().Trace("Sent transactions", "count", len(txs))
p.Log().Trace("Sent transaction bodies", "count", len(txs), "peer.id", p.Node().ID().String(), "peer.ip", p.Node().IP().String(), "hashes", concat(collectHashes(txs)))
})
}
}
Expand Down Expand Up @@ -176,7 +193,7 @@ func (p *Peer) announceTransactions() {
}
}
close(done)
p.Log().Trace("Sent transaction announcements", "count", len(pending))
p.Log().Trace("Sent transaction announcements", "count", len(pending), "peer.Id", p.ID(), "peer.IP", p.Node().IP().String(), "hashes", concat(pending))
})
}
}
Expand Down

0 comments on commit 95d5458

Please sign in to comment.