Skip to content

Commit 021f4f7

Browse files
authored
vms/platformvm: Prune mempool periodically (#2566)
1 parent d825ec2 commit 021f4f7

File tree

5 files changed

+163
-4
lines changed

5 files changed

+163
-4
lines changed

vms/platformvm/block/builder/builder.go

+30-3
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,13 @@ type Builder interface {
6060

6161
// BuildBlock can be called to attempt to create a new block
6262
BuildBlock(context.Context) (snowman.Block, error)
63+
64+
// PackBlockTxs returns an array of txs that can fit into a valid block of
65+
// size [targetBlockSize]. The returned txs are all verified against the
66+
// preferred state.
67+
//
68+
// Note: This function does not call the consensus engine.
69+
PackBlockTxs(targetBlockSize int) ([]*txs.Tx, error)
6370
}
6471

6572
// builder implements a simple builder to convert txs into valid blocks
@@ -232,6 +239,24 @@ func (b *builder) BuildBlock(context.Context) (snowman.Block, error) {
232239
return b.blkManager.NewBlock(statelessBlk), nil
233240
}
234241

242+
func (b *builder) PackBlockTxs(targetBlockSize int) ([]*txs.Tx, error) {
243+
preferredID := b.blkManager.Preferred()
244+
preferredState, ok := b.blkManager.GetState(preferredID)
245+
if !ok {
246+
return nil, fmt.Errorf("%w: %s", errMissingPreferredState, preferredID)
247+
}
248+
249+
return packBlockTxs(
250+
preferredID,
251+
preferredState,
252+
b.Mempool,
253+
b.txExecutorBackend,
254+
b.blkManager,
255+
b.txExecutorBackend.Clk.Time(),
256+
targetBlockSize,
257+
)
258+
}
259+
235260
// [timestamp] is min(max(now, parent timestamp), next staker change time)
236261
func buildBlock(
237262
builder *builder,
@@ -264,6 +289,7 @@ func buildBlock(
264289
builder.txExecutorBackend,
265290
builder.blkManager,
266291
timestamp,
292+
targetBlockSize,
267293
)
268294
if err != nil {
269295
return nil, fmt.Errorf("failed to pack block txs: %w", err)
@@ -286,6 +312,7 @@ func buildBlock(
286312
builder.txExecutorBackend,
287313
builder.blkManager,
288314
timestamp,
315+
targetBlockSize,
289316
)
290317
if err != nil {
291318
return nil, fmt.Errorf("failed to pack block txs: %w", err)
@@ -313,6 +340,7 @@ func packBlockTxs(
313340
backend *txexecutor.Backend,
314341
manager blockexecutor.Manager,
315342
timestamp time.Time,
343+
remainingSize int,
316344
) ([]*txs.Tx, error) {
317345
stateDiff, err := state.NewDiffOn(parentState)
318346
if err != nil {
@@ -327,9 +355,8 @@ func packBlockTxs(
327355
stateDiff.SetTimestamp(timestamp)
328356

329357
var (
330-
blockTxs []*txs.Tx
331-
inputs set.Set[ids.ID]
332-
remainingSize = targetBlockSize
358+
blockTxs []*txs.Tx
359+
inputs set.Set[ids.ID]
333360
)
334361

335362
for {

vms/platformvm/config/execution_config.go

+3
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package config
55

66
import (
77
"encoding/json"
8+
"time"
89

910
"github.com/ava-labs/avalanchego/utils/units"
1011
"github.com/ava-labs/avalanchego/vms/platformvm/network"
@@ -21,6 +22,7 @@ var DefaultExecutionConfig = ExecutionConfig{
2122
BlockIDCacheSize: 8192,
2223
FxOwnerCacheSize: 4 * units.MiB,
2324
ChecksumsEnabled: false,
25+
MempoolPruneFrequency: 30 * time.Minute,
2426
}
2527

2628
// ExecutionConfig provides execution parameters of PlatformVM
@@ -35,6 +37,7 @@ type ExecutionConfig struct {
3537
BlockIDCacheSize int `json:"block-id-cache-size"`
3638
FxOwnerCacheSize int `json:"fx-owner-cache-size"`
3739
ChecksumsEnabled bool `json:"checksums-enabled"`
40+
MempoolPruneFrequency time.Duration `json:"mempool-prune-frequency"`
3841
}
3942

4043
// GetExecutionConfig returns an ExecutionConfig

vms/platformvm/config/execution_config_test.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package config
55

66
import (
77
"testing"
8+
"time"
89

910
"github.com/stretchr/testify/require"
1011

@@ -61,7 +62,8 @@ func TestExecutionConfigUnmarshal(t *testing.T) {
6162
"chain-db-cache-size": 7,
6263
"block-id-cache-size": 8,
6364
"fx-owner-cache-size": 9,
64-
"checksums-enabled": true
65+
"checksums-enabled": true,
66+
"mempool-prune-frequency": 60000000000
6567
}`)
6668
ec, err := GetExecutionConfig(b)
6769
require.NoError(err)
@@ -87,6 +89,7 @@ func TestExecutionConfigUnmarshal(t *testing.T) {
8789
BlockIDCacheSize: 8,
8890
FxOwnerCacheSize: 9,
8991
ChecksumsEnabled: true,
92+
MempoolPruneFrequency: time.Minute,
9093
}
9194
require.Equal(expected, ec)
9295
})
@@ -135,6 +138,7 @@ func TestExecutionConfigUnmarshal(t *testing.T) {
135138
BlockIDCacheSize: 8,
136139
FxOwnerCacheSize: 9,
137140
ChecksumsEnabled: true,
141+
MempoolPruneFrequency: 30 * time.Minute,
138142
}
139143
require.Equal(expected, ec)
140144
})

vms/platformvm/vm.go

+46
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@ import (
77
"context"
88
"errors"
99
"fmt"
10+
"math"
1011
"net/http"
1112
"sync"
13+
"time"
1214

1315
"github.com/gorilla/rpc/v2"
1416

@@ -247,6 +249,10 @@ func (vm *VM) Initialize(
247249
return err
248250
}
249251

252+
// Incrementing [awaitShutdown] would cause a deadlock since
253+
// [periodicallyPruneMempool] grabs the context lock.
254+
go vm.periodicallyPruneMempool(execConfig.MempoolPruneFrequency)
255+
250256
shouldPrune, err := vm.state.ShouldPrune()
251257
if err != nil {
252258
return fmt.Errorf(
@@ -274,6 +280,46 @@ func (vm *VM) Initialize(
274280
return nil
275281
}
276282

283+
func (vm *VM) periodicallyPruneMempool(frequency time.Duration) {
284+
ticker := time.NewTicker(frequency)
285+
defer ticker.Stop()
286+
287+
for {
288+
select {
289+
case <-vm.onShutdownCtx.Done():
290+
return
291+
case <-ticker.C:
292+
if err := vm.pruneMempool(); err != nil {
293+
vm.ctx.Log.Debug("pruning mempool failed",
294+
zap.Error(err),
295+
)
296+
}
297+
}
298+
}
299+
}
300+
301+
func (vm *VM) pruneMempool() error {
302+
vm.ctx.Lock.Lock()
303+
defer vm.ctx.Lock.Unlock()
304+
305+
blockTxs, err := vm.Builder.PackBlockTxs(math.MaxInt)
306+
if err != nil {
307+
return err
308+
}
309+
310+
for _, tx := range blockTxs {
311+
if err := vm.Builder.Add(tx); err != nil {
312+
vm.ctx.Log.Debug(
313+
"failed to reissue tx",
314+
zap.Stringer("txID", tx.ID()),
315+
zap.Error(err),
316+
)
317+
}
318+
}
319+
320+
return nil
321+
}
322+
277323
// Create all chains that exist that this node validates.
278324
func (vm *VM) initBlockchains() error {
279325
if vm.Config.PartialSyncPrimaryNetwork {

vms/platformvm/vm_test.go

+79
Original file line numberDiff line numberDiff line change
@@ -2271,3 +2271,82 @@ func TestBaseTx(t *testing.T) {
22712271
require.NoError(baseTxBlock.Accept(context.Background()))
22722272
require.NoError(vm.SetPreference(context.Background(), vm.manager.LastAccepted()))
22732273
}
2274+
2275+
func TestPruneMempool(t *testing.T) {
2276+
require := require.New(t)
2277+
vm, _, _ := defaultVM(t, latestFork)
2278+
vm.ctx.Lock.Lock()
2279+
defer func() {
2280+
require.NoError(vm.Shutdown(context.Background()))
2281+
vm.ctx.Lock.Unlock()
2282+
}()
2283+
2284+
// Create a tx that will be valid regardless of timestamp.
2285+
sendAmt := uint64(100000)
2286+
changeAddr := ids.ShortEmpty
2287+
2288+
baseTx, err := vm.txBuilder.NewBaseTx(
2289+
sendAmt,
2290+
secp256k1fx.OutputOwners{
2291+
Threshold: 1,
2292+
Addrs: []ids.ShortID{
2293+
keys[1].Address(),
2294+
},
2295+
},
2296+
[]*secp256k1.PrivateKey{keys[0]},
2297+
changeAddr,
2298+
)
2299+
require.NoError(err)
2300+
2301+
vm.ctx.Lock.Unlock()
2302+
require.NoError(vm.issueTx(context.Background(), baseTx))
2303+
vm.ctx.Lock.Lock()
2304+
2305+
// [baseTx] should be in the mempool.
2306+
baseTxID := baseTx.ID()
2307+
_, ok := vm.Builder.Get(baseTxID)
2308+
require.True(ok)
2309+
2310+
// Create a tx that will be invalid after time advancement.
2311+
var (
2312+
startTime = vm.clock.Time()
2313+
endTime = startTime.Add(vm.MinStakeDuration)
2314+
)
2315+
2316+
addValidatorTx, err := vm.txBuilder.NewAddValidatorTx(
2317+
defaultMinValidatorStake,
2318+
uint64(startTime.Unix()),
2319+
uint64(endTime.Unix()),
2320+
ids.GenerateTestNodeID(),
2321+
keys[2].Address(),
2322+
20000,
2323+
[]*secp256k1.PrivateKey{keys[1]},
2324+
ids.ShortEmpty,
2325+
)
2326+
require.NoError(err)
2327+
2328+
vm.ctx.Lock.Unlock()
2329+
require.NoError(vm.issueTx(context.Background(), addValidatorTx))
2330+
vm.ctx.Lock.Lock()
2331+
2332+
// Advance clock to [endTime], making [addValidatorTx] invalid.
2333+
vm.clock.Set(endTime)
2334+
2335+
// [addValidatorTx] and [baseTx] should still be in the mempool.
2336+
addValidatorTxID := addValidatorTx.ID()
2337+
_, ok = vm.Builder.Get(addValidatorTxID)
2338+
require.True(ok)
2339+
_, ok = vm.Builder.Get(baseTxID)
2340+
require.True(ok)
2341+
2342+
vm.ctx.Lock.Unlock()
2343+
require.NoError(vm.pruneMempool())
2344+
vm.ctx.Lock.Lock()
2345+
2346+
// [addValidatorTx] should be ejected from the mempool.
2347+
// [baseTx] should still be in the mempool.
2348+
_, ok = vm.Builder.Get(addValidatorTxID)
2349+
require.False(ok)
2350+
_, ok = vm.Builder.Get(baseTxID)
2351+
require.True(ok)
2352+
}

0 commit comments

Comments
 (0)