Skip to content

Commit 2fc0d3b

Browse files
Remove PendingTxs from the DAGVM interface (ava-labs#1641)
1 parent cc73cd5 commit 2fc0d3b

File tree

7 files changed

+40
-168
lines changed

7 files changed

+40
-168
lines changed

snow/engine/avalanche/vertex/mock_vm.go

-14
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

snow/engine/avalanche/vertex/test_vm.go

+4-17
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
)
1616

1717
var (
18-
errPending = errors.New("unexpectedly called Pending")
1918
errLinearize = errors.New("unexpectedly called Linearize")
2019

2120
_ LinearizableVM = (*TestVM)(nil)
@@ -24,18 +23,16 @@ var (
2423
type TestVM struct {
2524
block.TestVM
2625

27-
CantLinearize, CantPendingTxs, CantParse, CantGet bool
26+
CantLinearize, CantParse, CantGet bool
2827

29-
LinearizeF func(context.Context, ids.ID) error
30-
PendingTxsF func(context.Context) []snowstorm.Tx
31-
ParseTxF func(context.Context, []byte) (snowstorm.Tx, error)
32-
GetTxF func(context.Context, ids.ID) (snowstorm.Tx, error)
28+
LinearizeF func(context.Context, ids.ID) error
29+
ParseTxF func(context.Context, []byte) (snowstorm.Tx, error)
30+
GetTxF func(context.Context, ids.ID) (snowstorm.Tx, error)
3331
}
3432

3533
func (vm *TestVM) Default(cant bool) {
3634
vm.TestVM.Default(cant)
3735

38-
vm.CantPendingTxs = cant
3936
vm.CantParse = cant
4037
vm.CantGet = cant
4138
}
@@ -50,16 +47,6 @@ func (vm *TestVM) Linearize(ctx context.Context, stopVertexID ids.ID) error {
5047
return errLinearize
5148
}
5249

53-
func (vm *TestVM) PendingTxs(ctx context.Context) []snowstorm.Tx {
54-
if vm.PendingTxsF != nil {
55-
return vm.PendingTxsF(ctx)
56-
}
57-
if vm.CantPendingTxs && vm.T != nil {
58-
require.FailNow(vm.T, errPending.Error())
59-
}
60-
return nil
61-
}
62-
6350
func (vm *TestVM) ParseTx(ctx context.Context, b []byte) (snowstorm.Tx, error) {
6451
if vm.ParseTxF != nil {
6552
return vm.ParseTxF(ctx, b)

snow/engine/avalanche/vertex/vm.go

-3
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,6 @@ type DAGVM interface {
5858
block.ChainVM
5959
Getter
6060

61-
// Return any transactions that have not been sent to consensus yet
62-
PendingTxs(ctx context.Context) []snowstorm.Tx
63-
6461
// Convert a stream of bytes to a transaction or return an error
6562
ParseTx(ctx context.Context, txBytes []byte) (snowstorm.Tx, error)
6663
}

vms/avm/vm.go

+36-117
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"errors"
99
"fmt"
1010
"reflect"
11-
"time"
1211

1312
stdjson "encoding/json"
1413

@@ -33,7 +32,6 @@ import (
3332
"github.com/ava-labs/avalanchego/utils/json"
3433
"github.com/ava-labs/avalanchego/utils/linkedhashmap"
3534
"github.com/ava-labs/avalanchego/utils/set"
36-
"github.com/ava-labs/avalanchego/utils/timer"
3735
"github.com/ava-labs/avalanchego/utils/timer/mockable"
3836
"github.com/ava-labs/avalanchego/utils/wrappers"
3937
"github.com/ava-labs/avalanchego/version"
@@ -57,8 +55,6 @@ import (
5755
)
5856

5957
const (
60-
batchTimeout = time.Second
61-
batchSize = 30
6258
assetToFxCacheSize = 1024
6359
txDeduplicatorSize = 8192
6460
)
@@ -110,12 +106,6 @@ type VM struct {
110106
// Asset ID --> Bit set with fx IDs the asset supports
111107
assetToFxCache *cache.LRU[ids.ID, set.Bits64]
112108

113-
// Transaction issuing
114-
timer *timer.Timer
115-
batchTimeout time.Duration
116-
txs []snowstorm.Tx
117-
toEngine chan<- common.Message
118-
119109
baseDB database.Database
120110
db *versiondb.Database
121111

@@ -162,7 +152,7 @@ func (vm *VM) Initialize(
162152
genesisBytes []byte,
163153
_ []byte,
164154
configBytes []byte,
165-
toEngine chan<- common.Message,
155+
_ chan<- common.Message,
166156
fxs []*common.Fx,
167157
appSender common.AppSender,
168158
) error {
@@ -197,7 +187,6 @@ func (vm *VM) Initialize(
197187

198188
db := dbManager.Current().Database
199189
vm.ctx = ctx
200-
vm.toEngine = toEngine
201190
vm.appSender = appSender
202191
vm.baseDB = db
203192
vm.db = versiondb.New(db)
@@ -248,15 +237,6 @@ func (vm *VM) Initialize(
248237
return err
249238
}
250239

251-
vm.timer = timer.NewTimer(func() {
252-
ctx.Lock.Lock()
253-
defer ctx.Lock.Unlock()
254-
255-
vm.FlushTxs()
256-
})
257-
go ctx.Log.RecoverAndPanic(vm.timer.Dispatch)
258-
vm.batchTimeout = batchTimeout
259-
260240
vm.uniqueTxs = &cache.EvictableLRU[ids.ID, *UniqueTx]{
261241
Size: txDeduplicatorSize,
262242
}
@@ -339,16 +319,10 @@ func (vm *VM) SetState(_ context.Context, state snow.State) error {
339319
}
340320

341321
func (vm *VM) Shutdown(context.Context) error {
342-
if vm.timer == nil {
322+
if vm.state == nil {
343323
return nil
344324
}
345325

346-
// There is a potential deadlock if the timer is about to execute a timeout.
347-
// So, the lock must be released before stopping the timer.
348-
vm.ctx.Lock.Unlock()
349-
vm.timer.Stop()
350-
vm.ctx.Lock.Lock()
351-
352326
errs := wrappers.Errs{}
353327
errs.Add(
354328
vm.state.Close(),
@@ -480,16 +454,30 @@ func (vm *VM) Linearize(_ context.Context, stopVertexID ids.ID, toEngine chan<-
480454
return nil
481455
}
482456

483-
func (vm *VM) PendingTxs(context.Context) []snowstorm.Tx {
484-
vm.timer.Cancel()
457+
func (vm *VM) ParseTx(_ context.Context, bytes []byte) (snowstorm.Tx, error) {
458+
rawTx, err := vm.parser.ParseTx(bytes)
459+
if err != nil {
460+
return nil, err
461+
}
485462

486-
txs := vm.txs
487-
vm.txs = nil
488-
return txs
489-
}
463+
tx := &UniqueTx{
464+
TxCachedState: &TxCachedState{
465+
Tx: rawTx,
466+
},
467+
vm: vm,
468+
txID: rawTx.ID(),
469+
}
470+
if err := tx.SyntacticVerify(); err != nil {
471+
return nil, err
472+
}
490473

491-
func (vm *VM) ParseTx(_ context.Context, b []byte) (snowstorm.Tx, error) {
492-
return vm.parseTx(b)
474+
if tx.Status() == choices.Unknown {
475+
vm.state.AddTx(tx.Tx)
476+
tx.setStatus(choices.Processing)
477+
return tx, vm.state.Commit()
478+
}
479+
480+
return tx, nil
493481
}
494482

495483
func (vm *VM) GetTx(_ context.Context, txID ids.ID) (snowstorm.Tx, error) {
@@ -513,60 +501,27 @@ func (vm *VM) GetTx(_ context.Context, txID ids.ID) (snowstorm.Tx, error) {
513501
// either accepted or rejected with the appropriate status. This function will
514502
// go out of scope when the transaction is removed from memory.
515503
func (vm *VM) IssueTx(b []byte) (ids.ID, error) {
516-
if !vm.bootstrapped {
504+
if !vm.bootstrapped || vm.Builder == nil {
517505
return ids.ID{}, errBootstrapping
518506
}
519507

520-
// If the chain has been linearized, issue the tx to the network.
521-
if vm.Builder != nil {
522-
tx, err := vm.parser.ParseTx(b)
523-
if err != nil {
524-
vm.ctx.Log.Debug("failed to parse tx",
525-
zap.Error(err),
526-
)
527-
return ids.ID{}, err
528-
}
529-
530-
err = vm.network.IssueTx(context.TODO(), tx)
531-
if err != nil {
532-
vm.ctx.Log.Debug("failed to add tx to mempool",
533-
zap.Error(err),
534-
)
535-
return ids.ID{}, err
536-
}
537-
538-
return tx.ID(), nil
539-
}
540-
541-
// TODO: After the chain is linearized, remove the following code.
542-
tx, err := vm.parseTx(b)
508+
tx, err := vm.parser.ParseTx(b)
543509
if err != nil {
510+
vm.ctx.Log.Debug("failed to parse tx",
511+
zap.Error(err),
512+
)
544513
return ids.ID{}, err
545514
}
546-
if err := tx.verifyWithoutCacheWrites(); err != nil {
515+
516+
err = vm.network.IssueTx(context.TODO(), tx)
517+
if err != nil {
518+
vm.ctx.Log.Debug("failed to add tx to mempool",
519+
zap.Error(err),
520+
)
547521
return ids.ID{}, err
548522
}
549-
vm.issueTx(tx)
550-
return tx.ID(), nil
551-
}
552-
553-
/*
554-
******************************************************************************
555-
********************************** Timer API *********************************
556-
******************************************************************************
557-
*/
558523

559-
// FlushTxs into consensus
560-
func (vm *VM) FlushTxs() {
561-
vm.timer.Cancel()
562-
if len(vm.txs) != 0 {
563-
select {
564-
case vm.toEngine <- common.PendingTxs:
565-
default:
566-
vm.ctx.Log.Debug("dropping message to engine due to contention")
567-
vm.timer.SetTimeoutIn(vm.batchTimeout)
568-
}
569-
}
524+
return tx.ID(), nil
570525
}
571526

572527
/*
@@ -638,42 +593,6 @@ func (vm *VM) initState(tx *txs.Tx) {
638593
}
639594
}
640595

641-
func (vm *VM) parseTx(bytes []byte) (*UniqueTx, error) {
642-
rawTx, err := vm.parser.ParseTx(bytes)
643-
if err != nil {
644-
return nil, err
645-
}
646-
647-
tx := &UniqueTx{
648-
TxCachedState: &TxCachedState{
649-
Tx: rawTx,
650-
},
651-
vm: vm,
652-
txID: rawTx.ID(),
653-
}
654-
if err := tx.SyntacticVerify(); err != nil {
655-
return nil, err
656-
}
657-
658-
if tx.Status() == choices.Unknown {
659-
vm.state.AddTx(tx.Tx)
660-
tx.setStatus(choices.Processing)
661-
return tx, vm.state.Commit()
662-
}
663-
664-
return tx, nil
665-
}
666-
667-
func (vm *VM) issueTx(tx snowstorm.Tx) {
668-
vm.txs = append(vm.txs, tx)
669-
switch {
670-
case len(vm.txs) == batchSize:
671-
vm.FlushTxs()
672-
case len(vm.txs) == 1:
673-
vm.timer.SetTimeoutIn(vm.batchTimeout)
674-
}
675-
}
676-
677596
// LoadUser returns:
678597
// 1) The UTXOs that reference one or more addresses controlled by the given user
679598
// 2) A keychain that contains this user's keys

vms/metervm/vertex_metrics.go

-2
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
)
1212

1313
type vertexMetrics struct {
14-
pending,
1514
parse,
1615
parseErr,
1716
get,
@@ -27,7 +26,6 @@ func (m *vertexMetrics) Initialize(
2726
reg prometheus.Registerer,
2827
) error {
2928
errs := wrappers.Errs{}
30-
m.pending = newAverager(namespace, "pending_txs", reg, &errs)
3129
m.parse = newAverager(namespace, "parse_tx", reg, &errs)
3230
m.parseErr = newAverager(namespace, "parse_tx_err", reg, &errs)
3331
m.get = newAverager(namespace, "get_tx", reg, &errs)

vms/metervm/vertex_vm.go

-8
Original file line numberDiff line numberDiff line change
@@ -77,14 +77,6 @@ func (vm *vertexVM) Initialize(
7777
)
7878
}
7979

80-
func (vm *vertexVM) PendingTxs(ctx context.Context) []snowstorm.Tx {
81-
start := vm.clock.Time()
82-
txs := vm.LinearizableVMWithEngine.PendingTxs(ctx)
83-
end := vm.clock.Time()
84-
vm.vertexMetrics.pending.Observe(float64(end.Sub(start)))
85-
return txs
86-
}
87-
8880
func (vm *vertexVM) ParseTx(ctx context.Context, b []byte) (snowstorm.Tx, error) {
8981
start := vm.clock.Time()
9082
tx, err := vm.LinearizableVMWithEngine.ParseTx(ctx, b)

vms/tracedvm/vertex_vm.go

-7
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,6 @@ func (vm *vertexVM) Initialize(
6060
)
6161
}
6262

63-
func (vm *vertexVM) PendingTxs(ctx context.Context) []snowstorm.Tx {
64-
ctx, span := vm.tracer.Start(ctx, "vertexVM.PendingTxs")
65-
defer span.End()
66-
67-
return vm.LinearizableVMWithEngine.PendingTxs(ctx)
68-
}
69-
7063
func (vm *vertexVM) ParseTx(ctx context.Context, txBytes []byte) (snowstorm.Tx, error) {
7164
ctx, span := vm.tracer.Start(ctx, "vertexVM.ParseTx", oteltrace.WithAttributes(
7265
attribute.Int("txLen", len(txBytes)),

0 commit comments

Comments
 (0)