diff --git a/docs/docs/building-apps/02-app-mempool.md b/docs/docs/building-apps/02-app-mempool.md index b9e7dfbe410b..4eb150e8b085 100644 --- a/docs/docs/building-apps/02-app-mempool.md +++ b/docs/docs/building-apps/02-app-mempool.md @@ -51,3 +51,13 @@ Now that we have walked through the `PrepareProposal` & `ProcessProposal`, we ca There are countless designs that an application developer can write for a mempool, the core team opted to provide a simple implementation of a nonce mempool. The nonce mempool is a mempool that keeps transactions from an sorted by nonce in order to avoid the issues with nonces. It works by storing the transation in a list sorted by the transaction nonce. When the proposer asks for transactions to be included in a block it randomly selects a sender and gets the first transaction in the list. It repeats this until the mempool is empty or the block is full. + +### Configurations + +#### MaxTxs + +Its an integer value that sets the mempool in one of three modes, bounded, unbounded, or disabled. + +- **negative**: Disabled, mempool does not insert new tx and return early. +- **zero**: Unbounded mempool has no tx limit and will never fail with ErrMempoolTxMaxCapacity. +- **positive**: Bounded, it fails with ErrMempoolTxMaxCapacity when maxTx value is the same as CountTx() diff --git a/server/config/config.go b/server/config/config.go index 5775acc96c40..420aa6df7df1 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -166,6 +166,15 @@ type StateSyncConfig struct { SnapshotKeepRecent uint32 `mapstructure:"snapshot-keep-recent"` } +// MempoolConfig defines the configurations for the appside mempool +type MempoolConfig struct { + // MaxTxs defines the behavior of the mempool. A negative value indicates + // the mempool is disabled entirely, zero indicates that the mempool is + // unbounded in how many txs it may contain, and a positive value indicates + // the maximum amount of txs it may contain. + MaxTxs int +} + type ( // StoreConfig defines application configuration for state streaming and other // storage related operations. @@ -200,6 +209,7 @@ type Config struct { StateSync StateSyncConfig `mapstructure:"state-sync"` Store StoreConfig `mapstructure:"store"` Streamers StreamersConfig `mapstructure:"streamers"` + Mempool MempoolConfig `mapstructure:"mempool"` } // SetMinGasPrices sets the validator's minimum gas prices. @@ -278,6 +288,9 @@ func DefaultConfig() *Config { Keys: []string{"*"}, }, }, + Mempool: MempoolConfig{ + MaxTxs: 0, + }, } } diff --git a/server/config/toml.go b/server/config/toml.go index be79dbaeec2f..c23ea1c0fdae 100644 --- a/server/config/toml.go +++ b/server/config/toml.go @@ -212,6 +212,14 @@ streamers = [{{ range .Store.Streamers }}{{ printf "%q, " . }}{{end}}] keys = [{{ range .Streamers.File.Keys }}{{ printf "%q, " . }}{{end}}] write_dir = "{{ .Streamers.File.WriteDir }}" prefix = "{{ .Streamers.File.Prefix }}" + +############################################################################### +### Mempool ### +############################################################################### + +[mempool] +max-txs = "{{ .Mempool.MaxTxs }}" + ` var configTemplate *template.Template diff --git a/server/start.go b/server/start.go index 9ecb0cce6560..516a627ce9ed 100644 --- a/server/start.go +++ b/server/start.go @@ -28,6 +28,7 @@ import ( "github.com/cosmos/cosmos-sdk/server/types" pruningtypes "github.com/cosmos/cosmos-sdk/store/pruning/types" "github.com/cosmos/cosmos-sdk/telemetry" + "github.com/cosmos/cosmos-sdk/types/mempool" ) const ( @@ -73,6 +74,9 @@ const ( flagGRPCAddress = "grpc.address" flagGRPCWebEnable = "grpc-web.enable" flagGRPCWebAddress = "grpc-web.address" + + // mempool flags + FlagMempoolMaxTxs = "mempool.max-txs" ) // StartCmd runs the service passed in, either stand-alone or in-process with @@ -184,6 +188,8 @@ is performed. Note, when enabled, gRPC will also be automatically enabled. cmd.Flags().Bool(FlagDisableIAVLFastNode, false, "Disable fast node for IAVL tree") + cmd.Flags().Int(FlagMempoolMaxTxs, mempool.DefaultMaxTx, "Sets MaxTx value for the app side mempool") + // add support for all Tendermint-specific command line options tcmd.AddNodeFlags(cmd) return cmd diff --git a/simapp/simd/cmd/root.go b/simapp/simd/cmd/root.go index f46f0d6fa055..859c53dba360 100644 --- a/simapp/simd/cmd/root.go +++ b/simapp/simd/cmd/root.go @@ -34,6 +34,7 @@ import ( snapshottypes "github.com/cosmos/cosmos-sdk/store/snapshots/types" simtestutil "github.com/cosmos/cosmos-sdk/testutil/sims" sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/cosmos/cosmos-sdk/types/mempool" authcmd "github.com/cosmos/cosmos-sdk/x/auth/client/cli" "github.com/cosmos/cosmos-sdk/x/auth/types" banktypes "github.com/cosmos/cosmos-sdk/x/bank/types" @@ -307,6 +308,7 @@ func newApp( baseapp.SetSnapshot(snapshotStore, snapshotOptions), baseapp.SetIAVLCacheSize(cast.ToInt(appOpts.Get(server.FlagIAVLCacheSize))), baseapp.SetIAVLDisableFastNode(cast.ToBool(appOpts.Get(server.FlagDisableIAVLFastNode))), + baseapp.SetMempool(mempool.NewSenderNonceMempool(mempool.SenderNonceMaxTxOpt(cast.ToInt(appOpts.Get(server.FlagMempoolMaxTxs))))), ) } diff --git a/types/mempool/mempool.go b/types/mempool/mempool.go index 6c957807675d..3413d7cc9f8f 100644 --- a/types/mempool/mempool.go +++ b/types/mempool/mempool.go @@ -36,4 +36,7 @@ type Iterator interface { Tx() sdk.Tx } -var ErrTxNotFound = errors.New("tx not found in mempool") +var ( + ErrTxNotFound = errors.New("tx not found in mempool") + ErrMempoolTxMaxCapacity = errors.New("pool reached max tx capacity") +) diff --git a/types/mempool/sender_nonce.go b/types/mempool/sender_nonce.go index 60d4608b93ca..956e2cce2fab 100644 --- a/types/mempool/sender_nonce.go +++ b/types/mempool/sender_nonce.go @@ -17,6 +17,8 @@ var ( _ Iterator = (*senderNonceMepoolIterator)(nil) ) +var DefaultMaxTx = 0 + // senderNonceMempool is a mempool that prioritizes transactions within a sender by nonce, the lowest first, // but selects a random sender on each iteration. The mempool is iterated by: // @@ -26,15 +28,27 @@ var ( // // Note that PrepareProposal could choose to stop iteration before reaching the end if maxBytes is reached. type senderNonceMempool struct { - senders map[string]*skiplist.SkipList - rnd *rand.Rand + senders map[string]*skiplist.SkipList + rnd *rand.Rand + maxTx int + existingTx map[txKey]bool +} + +type SenderNonceOptions func(mp *senderNonceMempool) + +type txKey struct { + address string + nonce uint64 } // NewSenderNonceMempool creates a new mempool that prioritizes transactions by nonce, the lowest first. -func NewSenderNonceMempool() Mempool { +func NewSenderNonceMempool(opts ...SenderNonceOptions) Mempool { senderMap := make(map[string]*skiplist.SkipList) + existingTx := make(map[txKey]bool) snp := &senderNonceMempool{ - senders: senderMap, + senders: senderMap, + maxTx: DefaultMaxTx, + existingTx: existingTx, } var seed int64 @@ -44,17 +58,30 @@ func NewSenderNonceMempool() Mempool { } snp.setSeed(seed) + for _, opt := range opts { + opt(snp) + } + return snp } -// NewSenderNonceMempoolWithSeed creates a new mempool that prioritizes transactions by nonce, the lowest first and sets the random seed. -func NewSenderNonceMempoolWithSeed(seed int64) Mempool { - senderMap := make(map[string]*skiplist.SkipList) - snp := &senderNonceMempool{ - senders: senderMap, +// SenderNonceSeedOpt Option To add a Seed for random type when calling the constructor NewSenderNonceMempool +// Example: +// > random_seed := int64(1000) +// > NewSenderNonceMempool(SenderNonceSeedTxOpt(random_seed)) +func SenderNonceSeedOpt(seed int64) SenderNonceOptions { + return func(snp *senderNonceMempool) { + snp.setSeed(seed) + } +} + +// SenderNonceMaxTxOpt Option To set limit of max tx when calling the constructor NewSenderNonceMempool +// Example: +// > NewSenderNonceMempool(SenderNonceMaxTxOpt(100)) +func SenderNonceMaxTxOpt(maxTx int) SenderNonceOptions { + return func(snp *senderNonceMempool) { + snp.maxTx = maxTx } - snp.setSeed(seed) - return snp } func (snm *senderNonceMempool) setSeed(seed int64) { @@ -65,6 +92,12 @@ func (snm *senderNonceMempool) setSeed(seed int64) { // Insert adds a tx to the mempool. It returns an error if the tx does not have at least one signer. // priority is ignored. func (snm *senderNonceMempool) Insert(_ sdk.Context, tx sdk.Tx) error { + if snm.maxTx > 0 && snm.CountTx() >= snm.maxTx { + return ErrMempoolTxMaxCapacity + } + if snm.maxTx < 0 { + return nil + } sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2() if err != nil { return err @@ -82,7 +115,8 @@ func (snm *senderNonceMempool) Insert(_ sdk.Context, tx sdk.Tx) error { snm.senders[sender] = senderTxs } senderTxs.Set(nonce, tx) - + key := txKey{nonce: nonce, address: sender} + snm.existingTx[key] = true return nil } @@ -117,14 +151,7 @@ func (snm *senderNonceMempool) Select(_ sdk.Context, _ [][]byte) Iterator { // CountTx returns the total count of txs in the mempool. func (snm *senderNonceMempool) CountTx() int { - count := 0 - - // Disable gosec here since we need neither strong randomness nor deterministic iteration. - // #nosec - for _, value := range snm.senders { - count += value.Len() - } - return count + return len(snm.existingTx) } // Remove removes a tx from the mempool. It returns an error if the tx does not have at least one signer or the tx @@ -154,6 +181,10 @@ func (snm *senderNonceMempool) Remove(tx sdk.Tx) error { if senderTxs.Len() == 0 { delete(snm.senders, sender) } + + key := txKey{nonce: nonce, address: sender} + delete(snm.existingTx, key) + return nil } diff --git a/types/mempool/sender_nonce_property_test.go b/types/mempool/sender_nonce_property_test.go index e0bf716edcfa..28e98a64d6ea 100644 --- a/types/mempool/sender_nonce_property_test.go +++ b/types/mempool/sender_nonce_property_test.go @@ -1,7 +1,6 @@ package mempool_test import ( - "math/rand" "sort" "pgregory.net/rapid" @@ -9,7 +8,6 @@ import ( cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types" sdk "github.com/cosmos/cosmos-sdk/types" mempool "github.com/cosmos/cosmos-sdk/types/mempool" - simtypes "github.com/cosmos/cosmos-sdk/types/simulation" "github.com/cosmos/cosmos-sdk/x/auth/signing" "github.com/stretchr/testify/require" "github.com/tendermint/tendermint/libs/log" @@ -27,18 +25,20 @@ var ( // same elements input on the mempool should be in the output except for sender nonce duplicates, which are overwritten by the later duplicate entries. // for every sender transaction tx_n, tx_0.nonce < tx_1.nonce ... < tx_n.nonce -var genAddress = rapid.Custom(func(t *rapid.T) simtypes.Account { - accounts := simtypes.RandomAccounts(rand.New(rand.NewSource(rapid.Int64().Draw(t, "seed for account"))), 1) - return accounts[0] -}) +func AddressGenerator(t *rapid.T) *rapid.Generator[sdk.AccAddress] { + return rapid.Custom(func(t *rapid.T) sdk.AccAddress { + pkBz := rapid.SliceOfN(rapid.Byte(), 20, 20).Draw(t, "hex") + return sdk.AccAddress(pkBz) + }) +} func testMempoolProperties(t *rapid.T) { ctx := sdk.NewContext(nil, tmproto.Header{}, false, log.NewNopLogger()) mp := mempool.NewSenderNonceMempool() - genMultipleAddress := rapid.SliceOfNDistinct(genAddress, 1, 10, func(acc simtypes.Account) string { - return acc.Address.String() + genMultipleAddress := rapid.SliceOfNDistinct(AddressGenerator(t), 1, 10, func(acc sdk.AccAddress) string { + return acc.String() }) accounts := genMultipleAddress.Draw(t, "address") @@ -46,10 +46,10 @@ func testMempoolProperties(t *rapid.T) { return testTx{ priority: rapid.Int64Range(0, 1000).Draw(t, "priority"), nonce: rapid.Uint64().Draw(t, "nonce"), - address: rapid.SampledFrom(accounts).Draw(t, "acc").Address, + address: rapid.SampledFrom(accounts).Draw(t, "acc"), } }) - genMultipleTX := rapid.SliceOfN(genTx, 1, 500) + genMultipleTX := rapid.SliceOfN(genTx, 1, 5000) txs := genMultipleTX.Draw(t, "txs") senderTxRaw := getSenderTxMap(txs) @@ -61,6 +61,7 @@ func testMempoolProperties(t *rapid.T) { iter := mp.Select(ctx, nil) orderTx := fetchAllTxs(iter) + require.Equal(t, len(orderTx), mp.CountTx()) senderTxOrdered := getSenderTxMap(orderTx) for key := range senderTxOrdered { ordered, found := senderTxOrdered[key] diff --git a/types/mempool/sender_nonce_test.go b/types/mempool/sender_nonce_test.go index 9b4a52926eef..b001b5cb1fa8 100644 --- a/types/mempool/sender_nonce_test.go +++ b/types/mempool/sender_nonce_test.go @@ -114,7 +114,7 @@ func (s *MempoolTestSuite) TestTxOrder() { } for i, tt := range tests { t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) { - pool := mempool.NewSenderNonceMempoolWithSeed(tt.seed) + pool := mempool.NewSenderNonceMempool(mempool.SenderNonceSeedOpt(tt.seed)) // create test txs and insert into mempool for i, ts := range tt.txs { tx := testTx{id: i, priority: int64(ts.p), nonce: uint64(ts.n), address: ts.a} @@ -137,3 +137,60 @@ func (s *MempoolTestSuite) TestTxOrder() { }) } } + +func (s *MempoolTestSuite) TestMaxTx() { + t := s.T() + ctx := sdk.NewContext(nil, tmproto.Header{}, false, log.NewNopLogger()) + accounts := simtypes.RandomAccounts(rand.New(rand.NewSource(0)), 1) + mp := mempool.NewSenderNonceMempool(mempool.SenderNonceMaxTxOpt(1)) + + tx := testTx{ + nonce: 0, + address: accounts[0].Address, + priority: rand.Int63(), + } + tx2 := testTx{ + nonce: 1, + address: accounts[0].Address, + priority: rand.Int63(), + } + + // empty mempool behavior + require.Equal(t, 0, s.mempool.CountTx()) + itr := mp.Select(ctx, nil) + require.Nil(t, itr) + + ctx = ctx.WithPriority(tx.priority) + err := mp.Insert(ctx, tx) + require.NoError(t, err) + ctx = ctx.WithPriority(tx.priority) + err = mp.Insert(ctx, tx2) + require.Equal(t, mempool.ErrMempoolTxMaxCapacity, err) + +} + +func (s *MempoolTestSuite) TestTxNotFoundOnSender() { + t := s.T() + ctx := sdk.NewContext(nil, tmproto.Header{}, false, log.NewNopLogger()) + accounts := simtypes.RandomAccounts(rand.New(rand.NewSource(0)), 1) + mp := mempool.NewSenderNonceMempool() + + txSender := testTx{ + nonce: 0, + address: accounts[0].Address, + priority: rand.Int63(), + } + + tx := testTx{ + nonce: 1, + address: accounts[0].Address, + priority: rand.Int63(), + } + + ctx = ctx.WithPriority(tx.priority) + err := mp.Insert(ctx, txSender) + require.NoError(t, err) + err = mp.Remove(tx) + require.Equal(t, mempool.ErrTxNotFound, err) + +}