Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 6 additions & 18 deletions vms/platformvm/block/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow/consensus/snowman"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/utils/timer"
"github.com/ava-labs/avalanchego/utils/timer/mockable"
"github.com/ava-labs/avalanchego/utils/units"
Expand All @@ -40,7 +39,11 @@ var (

type Builder interface {
mempool.Mempool
mempool.BlockTimer

// ResetBlockTimer schedules a timer to notify the consensus engine once
// there is a block ready to be built. If a block is ready to be built when
// this function is called, the engine will be notified directly.
ResetBlockTimer()

// BuildBlock is called on timer clock to attempt to create
// next block
Expand All @@ -58,9 +61,6 @@ type builder struct {
txExecutorBackend *txexecutor.Backend
blkManager blockexecutor.Manager

// channel to send messages to the consensus engine
toEngine chan<- common.Message

// This timer goes off when it is time for the next validator to add/leave
// the validator set. When it goes off ResetTimer() is called, potentially
// triggering creation of a new block.
Expand All @@ -72,14 +72,12 @@ func New(
txBuilder txbuilder.Builder,
txExecutorBackend *txexecutor.Backend,
blkManager blockexecutor.Manager,
toEngine chan<- common.Message,
) Builder {
builder := &builder{
Mempool: mempool,
txBuilder: txBuilder,
txExecutorBackend: txExecutorBackend,
blkManager: blkManager,
toEngine: toEngine,
}

builder.timer = timer.NewTimer(builder.setNextBuildBlockTime)
Expand Down Expand Up @@ -192,7 +190,7 @@ func (b *builder) setNextBuildBlockTime() {

if _, err := b.buildBlock(); err == nil {
// We can build a block now
b.notifyBlockReady()
b.Mempool.RequestBuildBlock()
return
}

Expand Down Expand Up @@ -229,16 +227,6 @@ func (b *builder) setNextBuildBlockTime() {
b.timer.SetTimeoutIn(waitTime)
}

// notifyBlockReady tells the consensus engine that a new block is ready to be
// created
func (b *builder) notifyBlockReady() {
select {
case b.toEngine <- common.PendingTxs:
default:
b.txExecutorBackend.Ctx.Log.Debug("dropping message to consensus engine")
}
}

// [timestamp] is min(max(now, parent timestamp), next staker change time)
func buildBlock(
builder *builder,
Expand Down
4 changes: 2 additions & 2 deletions vms/platformvm/block/builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestPreviouslyDroppedTxsCanBeReAddedToMempool(t *testing.T) {
txID := tx.ID()

// A tx simply added to mempool is obviously not marked as dropped
require.NoError(env.mempool.Add(tx))
require.NoError(env.mempool.Add([]*txs.Tx{tx}))
require.True(env.mempool.Has(txID))
reason := env.mempool.GetDropReason(txID)
require.NoError(reason)
Expand All @@ -94,7 +94,7 @@ func TestPreviouslyDroppedTxsCanBeReAddedToMempool(t *testing.T) {
// A previously dropped tx, popped then re-added to mempool,
// is not dropped anymore
env.mempool.Remove([]*txs.Tx{tx})
require.NoError(env.mempool.Add(tx))
require.NoError(env.mempool.Add([]*txs.Tx{tx}))

require.True(env.mempool.Has(txID))
reason = env.mempool.GetDropReason(txID)
Expand Down
3 changes: 1 addition & 2 deletions vms/platformvm/block/builder/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func newEnvironment(t *testing.T) *environment {
metrics, err := metrics.New("", registerer)
require.NoError(err)

res.mempool, err = mempool.New("mempool", registerer, res)
res.mempool, err = mempool.New("mempool", registerer, nil)
require.NoError(err)

res.blkManager = blockexecutor.NewManager(
Expand All @@ -193,7 +193,6 @@ func newEnvironment(t *testing.T) *environment {
res.txBuilder,
&res.backend,
res.blkManager,
nil, // toEngine,
)

res.blkManager.SetPreference(genesisID)
Expand Down
2 changes: 1 addition & 1 deletion vms/platformvm/block/builder/standard_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestAtomicTxImports(t *testing.T) {
)
require.NoError(err)

require.NoError(env.Builder.Add(tx))
require.NoError(env.Builder.Add([]*txs.Tx{tx}))
b, err := env.Builder.BuildBlock(context.Background())
require.NoError(err)
// Test multiple verify calls work
Expand Down
8 changes: 1 addition & 7 deletions vms/platformvm/block/executor/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ const (
)

var (
_ mempool.BlockTimer = (*environment)(nil)

defaultMinStakingDuration = 24 * time.Hour
defaultMaxStakingDuration = 365 * 24 * time.Hour
defaultGenesisTime = time.Date(1997, 1, 1, 0, 0, 0, 0, time.UTC)
Expand Down Expand Up @@ -131,10 +129,6 @@ type environment struct {
backend *executor.Backend
}

func (*environment) ResetBlockTimer() {
// dummy call, do nothing for now
}

func newEnvironment(t *testing.T, ctrl *gomock.Controller) *environment {
res := &environment{
isBootstrapped: &utils.Atomic[bool]{},
Expand Down Expand Up @@ -199,7 +193,7 @@ func newEnvironment(t *testing.T, ctrl *gomock.Controller) *environment {
metrics := metrics.Noop

var err error
res.mempool, err = mempool.New("mempool", registerer, res)
res.mempool, err = mempool.New("mempool", registerer, nil)
if err != nil {
panic(fmt.Errorf("failed to create mempool: %w", err))
}
Expand Down
15 changes: 6 additions & 9 deletions vms/platformvm/block/executor/rejector.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,12 @@ func (r *rejector) rejectBlock(b block.Block, blockType string) error {
return nil
}

for _, tx := range b.Txs() {
if err := r.Mempool.Add(tx); err != nil {
r.ctx.Log.Debug(
"failed to reissue tx",
zap.Stringer("txID", tx.ID()),
zap.Stringer("blkID", blkID),
zap.Error(err),
)
}
if err := r.Mempool.Add(b.Txs()); err != nil {
r.ctx.Log.Debug(
"failed to reissue txs from rejected block",
zap.Stringer("blkID", blkID),
zap.Error(err),
)
}

return nil
Expand Down
4 changes: 1 addition & 3 deletions vms/platformvm/block/executor/rejector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,7 @@ func TestRejectBlock(t *testing.T) {
}

// Set expected calls on dependencies.
for _, tx := range blk.Txs() {
mempool.EXPECT().Add(tx).Return(nil).Times(1)
}
mempool.EXPECT().Add(blk.Txs()).Return(nil).Times(1)

require.NoError(tt.rejectFunc(rejector, blk))
// Make sure block and its parent are removed from the state map.
Expand Down
2 changes: 1 addition & 1 deletion vms/platformvm/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (n *network) issueTx(tx *txs.Tx) error {
return nil
}

if err := n.mempool.Add(tx); err != nil {
if err := n.mempool.Add([]*txs.Tx{tx}); err != nil {
n.ctx.Log.Debug("tx failed to be added to the mempool",
zap.Stringer("txID", txID),
zap.Error(err),
Expand Down
110 changes: 60 additions & 50 deletions vms/platformvm/txs/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/ava-labs/avalanchego/cache"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/utils/set"
"github.com/ava-labs/avalanchego/utils/units"
"github.com/ava-labs/avalanchego/vms/platformvm/txs"
Expand All @@ -35,26 +36,20 @@ const (
var (
_ Mempool = (*mempool)(nil)

errClosedMempool = errors.New("txs not added because mempool is closed")
errDuplicateTx = errors.New("duplicate tx")
errTxTooLarge = errors.New("tx too large")
errMempoolFull = errors.New("mempool is full")
errConflictsWithOtherTx = errors.New("tx conflicts with other tx")
)

type BlockTimer interface {
// ResetBlockTimer schedules a timer to notify the consensus engine once
// there is a block ready to be built. If a block is ready to be built when
// this function is called, the engine will be notified directly.
ResetBlockTimer()
}

type Mempool interface {
// we may want to be able to stop valid transactions
// from entering the mempool, e.g. during blocks creation
EnableAdding()
DisableAdding()

Add(tx *txs.Tx) error
Add(txs []*txs.Tx) error
Has(txID ids.ID) bool
Get(txID ids.ID) *txs.Tx
Remove(txs []*txs.Tx)
Expand All @@ -73,6 +68,10 @@ type Mempool interface {
// It's guaranteed that the returned tx, if not nil, is a StakerTx.
PeekStakerTx() *txs.Tx

// RequestBuildBlock notifies the consensus engine that a block should be
// built.
RequestBuildBlock()

// Note: dropped txs are added to droppedTxIDs but are not evicted from
// unissued decision/staker txs. This allows previously dropped txs to be
// possibly reissued.
Expand All @@ -98,13 +97,13 @@ type mempool struct {

consumedUTXOs set.Set[ids.ID]

blkTimer BlockTimer
toEngine chan<- common.Message
}

func New(
namespace string,
registerer prometheus.Registerer,
blkTimer BlockTimer,
toEngine chan<- common.Message,
) (Mempool, error) {
bytesAvailableMetric := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Expand Down Expand Up @@ -142,7 +141,7 @@ func New(
droppedTxIDs: &cache.LRU[ids.ID, error]{Size: droppedTxIDsCacheSize},
consumedUTXOs: set.NewSet[ids.ID](initialConsumedUTXOsSize),
dropIncoming: false, // enable tx adding by default
blkTimer: blkTimer,
toEngine: toEngine,
}, nil
}

Expand All @@ -154,54 +153,58 @@ func (m *mempool) DisableAdding() {
m.dropIncoming = true
}

func (m *mempool) Add(tx *txs.Tx) error {
func (m *mempool) Add(txs []*txs.Tx) error {
Copy link
Contributor

@dhrubabasu dhrubabasu Nov 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't add any of the later txs in txs if one in the middle fails a check. I think we should retain the single tx Add instead of moving to an array

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for spotting this. Fixed while keeping the addition of a tx slice. I really like the simmetry with Remove

if m.dropIncoming {
return fmt.Errorf("tx %s not added because mempool is closed", tx.ID())
return errClosedMempool
}

// Note: a previously dropped tx can be re-added
txID := tx.ID()
if m.Has(txID) {
return fmt.Errorf("%w: %s", errDuplicateTx, txID)
}
for _, tx := range txs {
// Note: a previously dropped tx can be re-added
txID := tx.ID()
if m.Has(txID) {
return fmt.Errorf("%w: %s", errDuplicateTx, txID)
}

txSize := len(tx.Bytes())
if txSize > MaxTxSize {
return fmt.Errorf("%w: %s size (%d) > max size (%d)",
errTxTooLarge,
txID,
txSize,
MaxTxSize,
)
}
if txSize > m.bytesAvailable {
return fmt.Errorf("%w: %s size (%d) > available space (%d)",
errMempoolFull,
txID,
txSize,
m.bytesAvailable,
)
}
txSize := len(tx.Bytes())
if txSize > MaxTxSize {
return fmt.Errorf("%w: %s size (%d) > max size (%d)",
errTxTooLarge,
txID,
txSize,
MaxTxSize,
)
}
if txSize > m.bytesAvailable {
return fmt.Errorf("%w: %s size (%d) > available space (%d)",
errMempoolFull,
txID,
txSize,
m.bytesAvailable,
)
}

inputs := tx.Unsigned.InputIDs()
if m.consumedUTXOs.Overlaps(inputs) {
return fmt.Errorf("%w: %s", errConflictsWithOtherTx, txID)
}
inputs := tx.Unsigned.InputIDs()
if m.consumedUTXOs.Overlaps(inputs) {
return fmt.Errorf("%w: %s", errConflictsWithOtherTx, txID)
}

if err := tx.Unsigned.Visit(&issuer{
m: m,
tx: tx,
}); err != nil {
return err
}
if err := tx.Unsigned.Visit(&issuer{
m: m,
tx: tx,
}); err != nil {
return err
}

// Mark these UTXOs as consumed in the mempool
m.consumedUTXOs.Union(inputs)

// Mark these UTXOs as consumed in the mempool
m.consumedUTXOs.Union(inputs)
// An explicitly added tx must not be marked as dropped.
m.droppedTxIDs.Evict(txID)
}

// An explicitly added tx must not be marked as dropped.
m.droppedTxIDs.Evict(txID)
// notify engine that we are ready to build a block
m.RequestBuildBlock()

m.blkTimer.ResetBlockTimer()
return nil
}

Expand Down Expand Up @@ -307,6 +310,13 @@ func (m *mempool) deregister(tx *txs.Tx) {
m.consumedUTXOs.Difference(inputs)
}

func (m *mempool) RequestBuildBlock() {
select {
case m.toEngine <- common.PendingTxs:
default:
}
}

// Drops all [txs.Staker] transactions whose [StartTime] is before
// [minStartTime] from [mempool]. The dropped tx ids are returned.
//
Expand Down
Loading