Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: UX Mempool Tweaks (backport #15121) #15122

Merged
merged 3 commits into from
Feb 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 67 additions & 39 deletions types/mempool/priority_nonce.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,18 @@ import (
)

var (
_ Mempool = (*priorityNonceMempool)(nil)
_ Iterator = (*priorityNonceIterator)(nil)
_ Mempool = (*PriorityNonceMempool)(nil)
_ Iterator = (*PriorityNonceIterator)(nil)
)

// priorityNonceMempool is a mempool implementation that stores txs
// PriorityNonceMempool is a mempool implementation that stores txs
// in a partially ordered set by 2 dimensions: priority, and sender-nonce
// (sequence number). Internally it uses one priority ordered skip list and one
// skip list per sender ordered by sender-nonce (sequence number). When there
// are multiple txs from the same sender, they are not always comparable by
// priority to other sender txs and must be partially ordered by both sender-nonce
// and priority.
type priorityNonceMempool struct {
type PriorityNonceMempool struct {
priorityIndex *skiplist.SkipList
priorityCounts map[int64]int
senderIndices map[string]*skiplist.SkipList
Expand All @@ -33,12 +33,12 @@ type priorityNonceMempool struct {
maxTx int
}

type priorityNonceIterator struct {
type PriorityNonceIterator struct {
senderCursors map[string]*skiplist.Element
nextPriority int64
sender string
priorityNode *skiplist.Element
mempool *priorityNonceMempool
mempool *PriorityNonceMempool
}

// txMeta stores transaction metadata used in indices
Expand Down Expand Up @@ -67,15 +67,17 @@ func txMetaLess(a, b any) int {
return res
}

// weight is used as a tiebreaker for transactions with the same priority. weight is calculated in a single
// pass in .Select(...) and so will be 0 on .Insert(...)
// Weight is used as a tiebreaker for transactions with the same priority.
// Weight is calculated in a single pass in .Select(...) and so will be 0
// on .Insert(...).
res = skiplist.Int64.Compare(keyA.weight, keyB.weight)
if res != 0 {
return res
}

// Because weight will be 0 on .Insert(...), we must also compare sender and nonce to resolve priority collisions.
// If we didn't then transactions with the same priority would overwrite each other in the priority index.
// Because weight will be 0 on .Insert(...), we must also compare sender and
// nonce to resolve priority collisions. If we didn't then transactions with
// the same priority would overwrite each other in the priority index.
res = skiplist.String.Compare(keyA.sender, keyB.sender)
if res != 0 {
return res
Expand All @@ -84,30 +86,33 @@ func txMetaLess(a, b any) int {
return skiplist.Uint64.Compare(keyA.nonce, keyB.nonce)
}

type PriorityNonceMempoolOption func(*priorityNonceMempool)
type PriorityNonceMempoolOption func(*PriorityNonceMempool)

// PriorityNonceWithOnRead sets a callback to be called when a tx is read from the mempool.
// PriorityNonceWithOnRead sets a callback to be called when a tx is read from
// the mempool.
func PriorityNonceWithOnRead(onRead func(tx sdk.Tx)) PriorityNonceMempoolOption {
return func(mp *priorityNonceMempool) {
return func(mp *PriorityNonceMempool) {
mp.onRead = onRead
}
}

// PriorityNonceWithTxReplacement sets a callback to be called when duplicated transaction nonce detected during mempool insert.
// Application can define a transaction replacement rule based on tx priority or certain transaction fields.
// PriorityNonceWithTxReplacement sets a callback to be called when duplicated
// transaction nonce detected during mempool insert. An application can define a
// transaction replacement rule based on tx priority or certain transaction fields.
func PriorityNonceWithTxReplacement(txReplacementRule func(op, np int64, oTx, nTx sdk.Tx) bool) PriorityNonceMempoolOption {
return func(mp *priorityNonceMempool) {
return func(mp *PriorityNonceMempool) {
mp.txReplacement = txReplacementRule
}
}

// PriorityNonceWithMaxTx sets the maximum number of transactions allowed in the mempool with the semantics:
// PriorityNonceWithMaxTx sets the maximum number of transactions allowed in the
// mempool with the semantics:
//
// <0: disabled, `Insert` is a no-op
// 0: unlimited
// >0: maximum number of transactions allowed
func PriorityNonceWithMaxTx(maxTx int) PriorityNonceMempoolOption {
return func(mp *priorityNonceMempool) {
return func(mp *PriorityNonceMempool) {
mp.maxTx = maxTx
}
}
Expand All @@ -119,8 +124,8 @@ func DefaultPriorityMempool() Mempool {

// NewPriorityMempool returns the SDK's default mempool implementation which
// returns txs in a partial order by 2 dimensions; priority, and sender-nonce.
func NewPriorityMempool(opts ...PriorityNonceMempoolOption) Mempool {
mp := &priorityNonceMempool{
func NewPriorityMempool(opts ...PriorityNonceMempoolOption) *PriorityNonceMempool {
mp := &PriorityNonceMempool{
priorityIndex: skiplist.New(skiplist.LessThanFunc(txMetaLess)),
priorityCounts: make(map[int64]int),
senderIndices: make(map[string]*skiplist.SkipList),
Expand All @@ -134,6 +139,19 @@ func NewPriorityMempool(opts ...PriorityNonceMempoolOption) Mempool {
return mp
}

// NextSenderTx returns the next transaction for a given sender by nonce order,
// i.e. the next valid transaction for the sender. If no such transaction exists,
// nil will be returned.
func (mp *PriorityNonceMempool) NextSenderTx(sender string) sdk.Tx {
senderIndex, ok := mp.senderIndices[sender]
if !ok {
return nil
}

cursor := senderIndex.Front()
return cursor.Value.(sdk.Tx)
}

// Insert attempts to insert a Tx into the app-side mempool in O(log n) time,
// returning an error if unsuccessful. Sender and nonce are derived from the
// transaction's first signature.
Expand All @@ -143,7 +161,7 @@ func NewPriorityMempool(opts ...PriorityNonceMempoolOption) Mempool {
//
// Inserting a duplicate tx with a different priority overwrites the existing tx,
// changing the total order of the mempool.
func (mp *priorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error {
func (mp *PriorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error {
if mp.maxTx > 0 && mp.CountTx() >= mp.maxTx {
return ErrMempoolTxMaxCapacity
} else if mp.maxTx < 0 {
Expand All @@ -161,7 +179,7 @@ func (mp *priorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error {
sdkContext := sdk.UnwrapSDKContext(ctx)
priority := sdkContext.Priority()
sig := sigs[0]
sender := sig.PubKey.Address().String()
sender := sdk.AccAddress(sig.PubKey.Address()).String()
nonce := sig.Sequence
key := txMeta{nonce: nonce, priority: priority, sender: sender}

Expand Down Expand Up @@ -215,7 +233,7 @@ func (mp *priorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error {
return nil
}

func (i *priorityNonceIterator) iteratePriority() Iterator {
func (i *PriorityNonceIterator) iteratePriority() Iterator {
// beginning of priority iteration
if i.priorityNode == nil {
i.priorityNode = i.mempool.priorityIndex.Front()
Expand All @@ -240,7 +258,7 @@ func (i *priorityNonceIterator) iteratePriority() Iterator {
return i.Next()
}

func (i *priorityNonceIterator) Next() Iterator {
func (i *PriorityNonceIterator) Next() Iterator {
if i.priorityNode == nil {
return nil
}
Expand All @@ -258,14 +276,16 @@ func (i *priorityNonceIterator) Next() Iterator {
if cursor == nil {
return i.iteratePriority()
}

key := cursor.Key().(txMeta)

// we've reached a transaction with a priority lower than the next highest priority in the pool
// We've reached a transaction with a priority lower than the next highest
// priority in the pool.
if key.priority < i.nextPriority {
return i.iteratePriority()
} else if key.priority == i.nextPriority {
// weight is incorporated into the priority index key only (not sender index) so we must fetch it here
// from the scores map.
// Weight is incorporated into the priority index key only (not sender index)
// so we must fetch it here from the scores map.
weight := i.mempool.scores[txMeta{nonce: key.nonce, sender: key.sender}].weight
if weight < i.priorityNode.Next().Key().(txMeta).weight {
return i.iteratePriority()
Expand All @@ -276,7 +296,7 @@ func (i *priorityNonceIterator) Next() Iterator {
return i
}

func (i *priorityNonceIterator) Tx() sdk.Tx {
func (i *PriorityNonceIterator) Tx() sdk.Tx {
return i.senderCursors[i.sender].Value.(sdk.Tx)
}

Expand All @@ -286,14 +306,14 @@ func (i *priorityNonceIterator) Tx() sdk.Tx {
//
// The maxBytes parameter defines the maximum number of bytes of transactions to
// return.
func (mp *priorityNonceMempool) Select(_ context.Context, _ [][]byte) Iterator {
func (mp *PriorityNonceMempool) Select(_ context.Context, _ [][]byte) Iterator {
if mp.priorityIndex.Len() == 0 {
return nil
}

mp.reorderPriorityTies()

iterator := &priorityNonceIterator{
iterator := &PriorityNonceIterator{
mempool: mp,
senderCursors: make(map[string]*skiplist.Element),
}
Expand All @@ -307,8 +327,9 @@ type reorderKey struct {
tx sdk.Tx
}

func (mp *priorityNonceMempool) reorderPriorityTies() {
func (mp *PriorityNonceMempool) reorderPriorityTies() {
node := mp.priorityIndex.Front()

var reordering []reorderKey
for node != nil {
key := node.Key().(txMeta)
Expand All @@ -317,6 +338,7 @@ func (mp *priorityNonceMempool) reorderPriorityTies() {
newKey.weight = senderWeight(key.senderElement)
reordering = append(reordering, reorderKey{deleteKey: key, insertKey: newKey, tx: node.Value.(sdk.Tx)})
}

node = node.Next()
}

Expand All @@ -328,33 +350,37 @@ func (mp *priorityNonceMempool) reorderPriorityTies() {
}
}

// senderWeight returns the weight of a given tx (t) at senderCursor. Weight is defined as the first (nonce-wise)
// same sender tx with a priority not equal to t. It is used to resolve priority collisions, that is when 2 or more
// txs from different senders have the same priority.
// senderWeight returns the weight of a given tx (t) at senderCursor. Weight is
// defined as the first (nonce-wise) same sender tx with a priority not equal to
// t. It is used to resolve priority collisions, that is when 2 or more txs from
// different senders have the same priority.
func senderWeight(senderCursor *skiplist.Element) int64 {
if senderCursor == nil {
return 0
}

weight := senderCursor.Key().(txMeta).priority
senderCursor = senderCursor.Next()
for senderCursor != nil {
p := senderCursor.Key().(txMeta).priority
if p != weight {
weight = p
}

senderCursor = senderCursor.Next()
}

return weight
}

// CountTx returns the number of transactions in the mempool.
func (mp *priorityNonceMempool) CountTx() int {
func (mp *PriorityNonceMempool) CountTx() int {
return mp.priorityIndex.Len()
}

// Remove removes a transaction from the mempool in O(log n) time, returning an error if unsuccessful.
func (mp *priorityNonceMempool) Remove(tx sdk.Tx) error {
// Remove removes a transaction from the mempool in O(log n) time, returning an
// error if unsuccessful.
func (mp *PriorityNonceMempool) Remove(tx sdk.Tx) error {
sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2()
if err != nil {
return err
Expand All @@ -364,7 +390,7 @@ func (mp *priorityNonceMempool) Remove(tx sdk.Tx) error {
}

sig := sigs[0]
sender := sig.PubKey.Address().String()
sender := sdk.AccAddress(sig.PubKey.Address()).String()
nonce := sig.Sequence

scoreKey := txMeta{nonce: nonce, sender: sender}
Expand All @@ -388,7 +414,7 @@ func (mp *priorityNonceMempool) Remove(tx sdk.Tx) error {
}

func IsEmpty(mempool Mempool) error {
mp := mempool.(*priorityNonceMempool)
mp := mempool.(*PriorityNonceMempool)
if mp.priorityIndex.Len() != 0 {
return fmt.Errorf("priorityIndex not empty")
}
Expand All @@ -397,6 +423,7 @@ func IsEmpty(mempool Mempool) error {
for k := range mp.priorityCounts {
countKeys = append(countKeys, k)
}

for _, k := range countKeys {
if mp.priorityCounts[k] != 0 {
return fmt.Errorf("priorityCounts not zero at %v, got %v", k, mp.priorityCounts[k])
Expand All @@ -407,6 +434,7 @@ func IsEmpty(mempool Mempool) error {
for k := range mp.senderIndices {
senderKeys = append(senderKeys, k)
}

for _, k := range senderKeys {
if mp.senderIndices[k].Len() != 0 {
return fmt.Errorf("senderIndex not empty for sender %v", k)
Expand Down
Loading