Skip to content

Commit

Permalink
refactor: mempool use context.Context (backport #14266) (#14269)
Browse files Browse the repository at this point in the history
Co-authored-by: Aleksandr Bezobchuk <alexanderbez@users.noreply.github.com>
Co-authored-by: Marko <marbar3778@yahoo.com>
  • Loading branch information
3 people authored Dec 13, 2022
1 parent 159c868 commit b27353d
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 23 deletions.
10 changes: 6 additions & 4 deletions types/mempool/mempool.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package mempool

import (
"context"
"errors"

sdk "github.com/cosmos/cosmos-sdk/types"
Expand All @@ -9,11 +10,12 @@ import (
type Mempool interface {
// Insert attempts to insert a Tx into the app-side mempool returning
// an error upon failure.
Insert(sdk.Context, sdk.Tx) error
Insert(context.Context, sdk.Tx) error

// Select returns an Iterator over the app-side mempool. If txs are specified, then they shall be incorporated
// into the Iterator. The Iterator must be closed by the caller.
Select(sdk.Context, [][]byte) Iterator
// Select returns an Iterator over the app-side mempool. If txs are specified,
// then they shall be incorporated into the Iterator. The Iterator must
// closed by the caller.
Select(context.Context, [][]byte) Iterator

// CountTx returns the number of transactions currently in the mempool.
CountTx() int
Expand Down
56 changes: 37 additions & 19 deletions types/mempool/sender_nonce.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package mempool

import (
"context"
crand "crypto/rand" // #nosec // crypto/rand is used for seed generation
"encoding/binary"
"fmt"
Expand All @@ -19,14 +20,16 @@ var (

var DefaultMaxTx = 0

// senderNonceMempool is a mempool that prioritizes transactions within a sender by nonce, the lowest first,
// but selects a random sender on each iteration. The mempool is iterated by:
// senderNonceMempool is a mempool that prioritizes transactions within a sender
// by nonce, the lowest first, but selects a random sender on each iteration.
// The mempool is iterated by:
//
// 1) Maintaining a separate list of nonce ordered txs per sender
// 2) For each select iteration, randomly choose a sender and pick the next nonce ordered tx from their list
// 3) Repeat 1,2 until the mempool is exhausted
//
// Note that PrepareProposal could choose to stop iteration before reaching the end if maxBytes is reached.
// Note that PrepareProposal could choose to stop iteration before reaching the
// end if maxBytes is reached.
type senderNonceMempool struct {
senders map[string]*skiplist.SkipList
rnd *rand.Rand
Expand All @@ -41,7 +44,8 @@ type txKey struct {
nonce uint64
}

// NewSenderNonceMempool creates a new mempool that prioritizes transactions by nonce, the lowest first.
// NewSenderNonceMempool creates a new mempool that prioritizes transactions by
// nonce, the lowest first, picking a random sender on each iteration.
func NewSenderNonceMempool(opts ...SenderNonceOptions) Mempool {
senderMap := make(map[string]*skiplist.SkipList)
existingTx := make(map[txKey]bool)
Expand All @@ -56,6 +60,7 @@ func NewSenderNonceMempool(opts ...SenderNonceOptions) Mempool {
if err != nil {
panic(err)
}

snp.setSeed(seed)

for _, opt := range opts {
Expand All @@ -65,19 +70,25 @@ func NewSenderNonceMempool(opts ...SenderNonceOptions) Mempool {
return snp
}

// SenderNonceSeedOpt Option To add a Seed for random type when calling the constructor NewSenderNonceMempool
// SenderNonceSeedOpt Option To add a Seed for random type when calling the
// constructor NewSenderNonceMempool.
//
// Example:
// > random_seed := int64(1000)
// > NewSenderNonceMempool(SenderNonceSeedTxOpt(random_seed))
//
// random_seed := int64(1000)
// NewSenderNonceMempool(SenderNonceSeedTxOpt(random_seed))
func SenderNonceSeedOpt(seed int64) SenderNonceOptions {
return func(snp *senderNonceMempool) {
snp.setSeed(seed)
}
}

// SenderNonceMaxTxOpt Option To set limit of max tx when calling the constructor NewSenderNonceMempool
// SenderNonceMaxTxOpt Option To set limit of max tx when calling the constructor
// NewSenderNonceMempool.
//
// Example:
// > NewSenderNonceMempool(SenderNonceMaxTxOpt(100))
//
// NewSenderNonceMempool(SenderNonceMaxTxOpt(100))
func SenderNonceMaxTxOpt(maxTx int) SenderNonceOptions {
return func(snp *senderNonceMempool) {
snp.maxTx = maxTx
Expand All @@ -89,15 +100,16 @@ func (snm *senderNonceMempool) setSeed(seed int64) {
snm.rnd = rand.New(s1) //#nosec // math/rand is seeded from crypto/rand by default
}

// Insert adds a tx to the mempool. It returns an error if the tx does not have at least one signer.
// priority is ignored.
func (snm *senderNonceMempool) Insert(_ sdk.Context, tx sdk.Tx) error {
// Insert adds a tx to the mempool. It returns an error if the tx does not have
// at least one signer. Note, priority is ignored.
func (snm *senderNonceMempool) Insert(_ context.Context, tx sdk.Tx) error {
if snm.maxTx > 0 && snm.CountTx() >= snm.maxTx {
return ErrMempoolTxMaxCapacity
}
if snm.maxTx < 0 {
return nil
}

sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2()
if err != nil {
return err
Expand All @@ -109,22 +121,27 @@ func (snm *senderNonceMempool) Insert(_ sdk.Context, tx sdk.Tx) error {
sig := sigs[0]
sender := sig.PubKey.Address().String()
nonce := sig.Sequence

senderTxs, found := snm.senders[sender]
if !found {
senderTxs = skiplist.New(skiplist.Uint64)
snm.senders[sender] = senderTxs
}

senderTxs.Set(nonce, tx)

key := txKey{nonce: nonce, address: sender}
snm.existingTx[key] = true

return nil
}

// Select returns an iterator ordering transactions the mempool with the lowest nonce of a random selected sender first.
func (snm *senderNonceMempool) Select(_ sdk.Context, _ [][]byte) Iterator {
// Select returns an iterator ordering transactions the mempool with the lowest
// nonce of a random selected sender first.
func (snm *senderNonceMempool) Select(_ context.Context, _ [][]byte) Iterator {
var senders []string
senderCursors := make(map[string]*skiplist.Element)

senderCursors := make(map[string]*skiplist.Element)
orderedSenders := skiplist.New(skiplist.String)

// #nosec
Expand Down Expand Up @@ -154,8 +171,8 @@ func (snm *senderNonceMempool) CountTx() int {
return len(snm.existingTx)
}

// Remove removes a tx from the mempool. It returns an error if the tx does not have at least one signer or the tx
// was not found in the pool.
// Remove removes a tx from the mempool. It returns an error if the tx does not
// have at least one signer or the tx was not found in the pool.
func (snm *senderNonceMempool) Remove(tx sdk.Tx) error {
sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2()
if err != nil {
Expand All @@ -168,6 +185,7 @@ func (snm *senderNonceMempool) Remove(tx sdk.Tx) error {
sig := sigs[0]
sender := sig.PubKey.Address().String()
nonce := sig.Sequence

senderTxs, found := snm.senders[sender]
if !found {
return ErrTxNotFound
Expand Down Expand Up @@ -195,8 +213,8 @@ type senderNonceMepoolIterator struct {
senderCursors map[string]*skiplist.Element
}

// Next returns the next iterator state which will contain a tx with the next smallest nonce of a randomly
// selected sender.
// Next returns the next iterator state which will contain a tx with the next
// smallest nonce of a randomly selected sender.
func (i *senderNonceMepoolIterator) Next() Iterator {
for len(i.senders) > 0 {
senderIndex := i.rnd.Intn(len(i.senders))
Expand Down

0 comments on commit b27353d

Please sign in to comment.