Skip to content

Optimize platformvm mempool peek operations (~20% less memory) #1455

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions message/messages_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ var (
// $ go test -run=NONE -bench=BenchmarkMarshalVersion > /tmp/cpu.before.txt
// $ USE_BUILDER=true go test -run=NONE -bench=BenchmarkMarshalVersion > /tmp/cpu.after.txt
// $ benchcmp /tmp/cpu.before.txt /tmp/cpu.after.txt
// $ benchstat -alpha 0.03 -geomean /tmp/cpu.before.txt /tmp/cpu.after.txt
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This flag is deprecated btw

// $ benchstat -alpha 0.03 /tmp/cpu.before.txt /tmp/cpu.after.txt
//
// $ go test -run=NONE -bench=BenchmarkMarshalVersion -benchmem > /tmp/mem.before.txt
// $ USE_BUILDER=true go test -run=NONE -bench=BenchmarkMarshalVersion -benchmem > /tmp/mem.after.txt
// $ benchcmp /tmp/mem.before.txt /tmp/mem.after.txt
// $ benchstat -alpha 0.03 -geomean /tmp/mem.before.txt /tmp/mem.after.txt
// $ benchstat -alpha 0.03 /tmp/mem.before.txt /tmp/mem.after.txt
func BenchmarkMarshalVersion(b *testing.B) {
require := require.New(b)

Expand Down Expand Up @@ -90,12 +90,12 @@ func BenchmarkMarshalVersion(b *testing.B) {
// $ go test -run=NONE -bench=BenchmarkUnmarshalVersion > /tmp/cpu.before.txt
// $ USE_BUILDER=true go test -run=NONE -bench=BenchmarkUnmarshalVersion > /tmp/cpu.after.txt
// $ benchcmp /tmp/cpu.before.txt /tmp/cpu.after.txt
// $ benchstat -alpha 0.03 -geomean /tmp/cpu.before.txt /tmp/cpu.after.txt
// $ benchstat -alpha 0.03 /tmp/cpu.before.txt /tmp/cpu.after.txt
//
// $ go test -run=NONE -bench=BenchmarkUnmarshalVersion -benchmem > /tmp/mem.before.txt
// $ USE_BUILDER=true go test -run=NONE -bench=BenchmarkUnmarshalVersion -benchmem > /tmp/mem.after.txt
// $ benchcmp /tmp/mem.before.txt /tmp/mem.after.txt
// $ benchstat -alpha 0.03 -geomean /tmp/mem.before.txt /tmp/mem.after.txt
// $ benchstat -alpha 0.03 /tmp/mem.before.txt /tmp/mem.after.txt
func BenchmarkUnmarshalVersion(b *testing.B) {
require := require.New(b)

Expand Down
3 changes: 2 additions & 1 deletion vms/platformvm/api/static_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package api
import (
"errors"
"fmt"
stdmath "math"
"net/http"

"github.com/ava-labs/avalanchego/ids"
Expand Down Expand Up @@ -351,7 +352,7 @@ func (*StaticService) BuildGenesis(_ *http.Request, args *BuildGenesisArgs, repl
chains = append(chains, tx)
}

validatorTxs := vdrs.List()
validatorTxs, _ := vdrs.ListWithLimit(stdmath.MaxInt32)

// genesis holds the genesis state
g := genesis.Genesis{
Expand Down
15 changes: 5 additions & 10 deletions vms/platformvm/txs/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,17 +224,12 @@ func (m *mempool) HasTxs() bool {
}

func (m *mempool) PeekTxs(maxTxsBytes int) []*txs.Tx {
txs := m.unissuedDecisionTxs.List()
txs = append(txs, m.unissuedStakerTxs.List()...)

size := 0
for i, tx := range txs {
size += len(tx.Bytes())
if size > maxTxsBytes {
return txs[:i]
}
txs, remaining := m.unissuedDecisionTxs.ListWithLimit(maxTxsBytes)
if remaining <= 0 {
return txs
}
return txs
txs2, _ := m.unissuedStakerTxs.ListWithLimit(remaining)
return append(txs, txs2...)
}

func (m *mempool) addDecisionTx(tx *txs.Tx) {
Expand Down
84 changes: 84 additions & 0 deletions vms/platformvm/txs/mempool/mempool_benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package mempool

import (
"testing"

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
)

// Benchmarks mempool PeekTxs with limits.
//
// e.g.,
//
// $ go install -v golang.org/x/tools/cmd/benchcmp@latest
// $ go install -v golang.org/x/perf/cmd/benchstat@latest
//
// $ go test -run=NONE -bench=BenchmarkPeekTxs > /tmp/cpu.before.txt
// $ go test -run=NONE -bench=BenchmarkPeekTxs > /tmp/cpu.after.txt
// $ benchcmp /tmp/cpu.before.txt /tmp/cpu.after.txt
// $ benchstat -alpha 0.03 /tmp/cpu.before.txt /tmp/cpu.after.txt
//
// $ go test -run=NONE -bench=BenchmarkPeekTxs -benchmem > /tmp/mem.before.txt
// $ go test -run=NONE -bench=BenchmarkPeekTxs -benchmem > /tmp/mem.after.txt
// $ benchcmp /tmp/mem.before.txt /tmp/mem.after.txt
// $ benchstat -alpha 0.03 /tmp/mem.before.txt /tmp/mem.after.txt
func BenchmarkPeekTxs(b *testing.B) {
require := require.New(b)

registerer := prometheus.NewRegistry()
mpool, err := NewMempool("mempool", registerer, &noopBlkTimer{})
require.NoError(err)

total := 300
decisionTxs, err := createTestDecisionTxs(total)
require.NoError(err)
stakerTxs, err := createTestAddPermissionlessValidatorTxs(total)
require.NoError(err)

// txs must not already there before we start
require.False(mpool.HasTxs())

oneDecisionTxSize, totalDecisionTxSize := 0, 0
for _, tx := range decisionTxs {
require.False(mpool.Has(tx.ID()))
require.NoError(mpool.Add(tx))

size := tx.Size()
if oneDecisionTxSize != 0 {
// assume all txs have the same size for the purpose of testing
require.Equal(oneDecisionTxSize, size)
} else {
oneDecisionTxSize = size
}

totalDecisionTxSize += oneDecisionTxSize
}

oneStakerTxSize, totalStakerTxSize := 0, 0
for _, tx := range stakerTxs {
require.False(mpool.Has(tx.ID()))
require.NoError(mpool.Add(tx))

size := tx.Size()
if oneStakerTxSize != 0 {
// assume all txs have the same size for the purpose of testing
require.Equal(oneStakerTxSize, size)
} else {
oneStakerTxSize = size
}

totalStakerTxSize += oneStakerTxSize
}

// reasonable limit to both query decision txs + staker txs
maxTxBytes := totalDecisionTxSize + totalStakerTxSize/2

b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = mpool.PeekTxs(maxTxBytes)
}
}
172 changes: 172 additions & 0 deletions vms/platformvm/txs/mempool/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ import (
"github.com/stretchr/testify/require"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/utils/crypto/bls"
"github.com/ava-labs/avalanchego/utils/crypto/secp256k1"
"github.com/ava-labs/avalanchego/utils/timer/mockable"
"github.com/ava-labs/avalanchego/vms/components/avax"
"github.com/ava-labs/avalanchego/vms/platformvm/signer"
"github.com/ava-labs/avalanchego/vms/platformvm/txs"
"github.com/ava-labs/avalanchego/vms/secp256k1fx"
)
Expand Down Expand Up @@ -172,6 +174,105 @@ func TestProposalTxsInMempool(t *testing.T) {
}
}

func TestPeekTxs(t *testing.T) {
require := require.New(t)

registerer := prometheus.NewRegistry()
mpool, err := NewMempool("mempool", registerer, &noopBlkTimer{})
require.NoError(err)

total := 100
decisionTxs, err := createTestDecisionTxs(total)
require.NoError(err)
stakerTxs, err := createTestAddPermissionlessValidatorTxs(total)
require.NoError(err)

// txs must not already there before we start
require.False(mpool.HasTxs())

oneDecisionTxSize, totalDecisionTxSize := 0, 0
for _, tx := range decisionTxs {
require.False(mpool.Has(tx.ID()))
require.NoError(mpool.Add(tx))

size := tx.Size()
if oneDecisionTxSize != 0 {
// assume all txs have the same size for the purpose of testing
require.Equal(oneDecisionTxSize, size)
} else {
oneDecisionTxSize = size
}

totalDecisionTxSize += oneDecisionTxSize
}

oneStakerTxSize, totalStakerTxSize := 0, 0
for _, tx := range stakerTxs {
require.False(mpool.Has(tx.ID()))
require.NoError(mpool.Add(tx))

size := tx.Size()
if oneStakerTxSize != 0 {
// assume all txs have the same size for the purpose of testing
require.Equal(oneStakerTxSize, size)
} else {
oneStakerTxSize = size
}

totalStakerTxSize += oneStakerTxSize
}

tt := []struct {
desc string
maxTxBytes int
expectedTxs int
}{
{
desc: "peek txs with zero max tx bytes should return none",
maxTxBytes: 0,
expectedTxs: 0,
},
{
desc: "peek txs with MaxInt should return all decision + staker txs",
maxTxBytes: math.MaxInt,
expectedTxs: 2 * total,
},
{
desc: "peek txs with totalDecisionTxSize + totalStakerTxSize should return all",
maxTxBytes: totalDecisionTxSize + totalStakerTxSize,
expectedTxs: 2 * total,
},
{
desc: "peek txs with totalDecisionTxSize should only return decision txs",
maxTxBytes: totalDecisionTxSize,
expectedTxs: total,
},
{
desc: "peek txs with totalDecisionTxSize - one decision tx size should return total - 1",
maxTxBytes: totalDecisionTxSize - oneDecisionTxSize,
expectedTxs: total - 1,
},
{
desc: "peek txs with half of totalDecisionTxSize should return total/2",
maxTxBytes: totalDecisionTxSize / 2,
expectedTxs: total / 2,
},
{
desc: "peek txs with totalDecisionTxSize + 3 staker tx size should return total + 3",
maxTxBytes: totalDecisionTxSize + 3*oneStakerTxSize,
expectedTxs: total + 3,
},
{
desc: "peek txs with totalDecisionTxSize + totalStakerTxSize/2 should return total + total/2",
maxTxBytes: totalDecisionTxSize + totalStakerTxSize/2,
expectedTxs: total + total/2,
},
}
for i, tv := range tt {
require.Lenf(mpool.PeekTxs(tv.maxTxBytes), tv.expectedTxs, "[%d] %s", i, tv.desc)
}
}

func createTestDecisionTxs(count int) ([]*txs.Tx, error) {
decisionTxs := make([]*txs.Tx, 0, count)
for i := uint32(0); i < uint32(count); i++ {
Expand Down Expand Up @@ -241,3 +342,74 @@ func createTestProposalTxs(count int) ([]*txs.Tx, error) {
}
return proposalTxs, nil
}

func createTestAddPermissionlessValidatorTxs(count int) ([]*txs.Tx, error) {
var (
networkID = uint32(1337)
chainID = ids.GenerateTestID()
)

// A BaseTx that passes syntactic verification.
validBaseTx := txs.BaseTx{
BaseTx: avax.BaseTx{
NetworkID: networkID,
BlockchainID: chainID,
},
}

blsSK, err := bls.NewSecretKey()
if err != nil {
return nil, err
}
blsPOP := signer.NewProofOfPossession(blsSK)

signers := [][]*secp256k1.PrivateKey{{secp256k1.TestKeys()[0]}}

var clk mockable.Clock
tss := make([]*txs.Tx, 0, count)
for i := uint32(0); i < uint32(count); i++ {
utx := &txs.AddPermissionlessValidatorTx{
BaseTx: validBaseTx,
Validator: txs.Validator{
NodeID: ids.GenerateTestNodeID(),
Start: uint64(clk.Time().Add(time.Duration(uint32(count)-i) * time.Second).Unix()),
},
Subnet: ids.GenerateTestID(),
Signer: blsPOP,
StakeOuts: []*avax.TransferableOutput{
{
Asset: avax.Asset{
ID: ids.GenerateTestID(),
},
Out: &secp256k1fx.TransferOutput{
Amt: 1,
},
},
{
Asset: avax.Asset{
ID: ids.GenerateTestID(),
},
Out: &secp256k1fx.TransferOutput{
Amt: 1,
},
},
},
ValidatorRewardsOwner: &secp256k1fx.OutputOwners{
Addrs: []ids.ShortID{ids.GenerateTestShortID()},
Threshold: 1,
},
DelegatorRewardsOwner: &secp256k1fx.OutputOwners{
Addrs: []ids.ShortID{ids.GenerateTestShortID()},
Threshold: 1,
},
DelegationShares: 20_000,
}

tx, err := txs.NewSigned(utx, txs.Codec, signers)
if err != nil {
return nil, err
}
tss = append(tss, tx)
}
return tss, nil
}
5 changes: 5 additions & 0 deletions vms/platformvm/txs/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ func Parse(c codec.Manager, signedBytes []byte) (*Tx, error) {
return tx, nil
}

// Returns the size of the inner bytes.
func (tx *Tx) Size() int {
return len(tx.bytes)
}

func (tx *Tx) Bytes() []byte {
return tx.bytes
}
Expand Down
Loading