Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove GetTx from the DAGVM interface #1642

Merged
merged 28 commits into from
Jun 22, 2023
Merged
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
36988a4
Remove DAG based tx issuance logic
StephenButtolph Jun 19, 2023
f4ae592
Remove dagState and GetUTXOFromID
StephenButtolph Jun 19, 2023
d924932
Fix unit tests
StephenButtolph Jun 19, 2023
7d1dda4
Merge branch 'remove-get-utxo-from-id' into remove-avm-tx-batcher
StephenButtolph Jun 19, 2023
4855439
wip make tests compile
StephenButtolph Jun 19, 2023
c517ea9
nit lint
StephenButtolph Jun 20, 2023
30485cd
fix linting
StephenButtolph Jun 20, 2023
4bc87bd
Merge branch 'dev' into remove-get-utxo-from-id
StephenButtolph Jun 20, 2023
2ff0186
wip cleanup tests
StephenButtolph Jun 20, 2023
23c038d
Merge branch 'remove-get-utxo-from-id' into remove-avm-tx-batcher
StephenButtolph Jun 20, 2023
ad086ba
wip fixup service tests
StephenButtolph Jun 20, 2023
724d878
more cleanup
StephenButtolph Jun 20, 2023
26f86e0
fix tests
StephenButtolph Jun 21, 2023
97950f1
keep test
StephenButtolph Jun 21, 2023
4c2e286
nit
StephenButtolph Jun 21, 2023
168aed9
nit
StephenButtolph Jun 21, 2023
b03a622
nits
StephenButtolph Jun 21, 2023
e4aa1c9
Remove PendingTxs from the DAGVM interface
StephenButtolph Jun 21, 2023
fc2df6e
lint
StephenButtolph Jun 21, 2023
37ae15c
Remove GetTx from the DAGVM interface
StephenButtolph Jun 21, 2023
9ad9511
lint
StephenButtolph Jun 21, 2023
3db4590
clenaup
StephenButtolph Jun 21, 2023
ca82e41
remove code changes
StephenButtolph Jun 21, 2023
208c7d9
merged
StephenButtolph Jun 21, 2023
55d91dc
merged
StephenButtolph Jun 21, 2023
c4838d8
Merge branch 'remove-pending-txs' into remove-get-tx
StephenButtolph Jun 21, 2023
c512d15
oops
StephenButtolph Jun 21, 2023
cc9f8fe
merged
StephenButtolph Jun 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
remove code changes
  • Loading branch information
StephenButtolph committed Jun 21, 2023
commit ca82e41cc67732f0ffe343292e075cd008772df8
157 changes: 117 additions & 40 deletions vms/avm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"reflect"
"time"

stdjson "encoding/json"

Expand All @@ -32,6 +33,7 @@ import (
"github.com/ava-labs/avalanchego/utils/json"
"github.com/ava-labs/avalanchego/utils/linkedhashmap"
"github.com/ava-labs/avalanchego/utils/set"
"github.com/ava-labs/avalanchego/utils/timer"
"github.com/ava-labs/avalanchego/utils/timer/mockable"
"github.com/ava-labs/avalanchego/utils/wrappers"
"github.com/ava-labs/avalanchego/version"
Expand All @@ -55,6 +57,8 @@ import (
)

const (
batchTimeout = time.Second
batchSize = 30
assetToFxCacheSize = 1024
txDeduplicatorSize = 8192
)
Expand Down Expand Up @@ -106,6 +110,12 @@ type VM struct {
// Asset ID --> Bit set with fx IDs the asset supports
assetToFxCache *cache.LRU[ids.ID, set.Bits64]

// Transaction issuing
timer *timer.Timer
batchTimeout time.Duration
txs []snowstorm.Tx
toEngine chan<- common.Message

baseDB database.Database
db *versiondb.Database

Expand Down Expand Up @@ -152,7 +162,7 @@ func (vm *VM) Initialize(
genesisBytes []byte,
_ []byte,
configBytes []byte,
_ chan<- common.Message,
toEngine chan<- common.Message,
fxs []*common.Fx,
appSender common.AppSender,
) error {
Expand Down Expand Up @@ -187,6 +197,7 @@ func (vm *VM) Initialize(

db := dbManager.Current().Database
vm.ctx = ctx
vm.toEngine = toEngine
vm.appSender = appSender
vm.baseDB = db
vm.db = versiondb.New(db)
Expand Down Expand Up @@ -237,6 +248,15 @@ func (vm *VM) Initialize(
return err
}

vm.timer = timer.NewTimer(func() {
ctx.Lock.Lock()
defer ctx.Lock.Unlock()

vm.FlushTxs()
})
go ctx.Log.RecoverAndPanic(vm.timer.Dispatch)
vm.batchTimeout = batchTimeout

vm.uniqueTxs = &cache.EvictableLRU[ids.ID, *UniqueTx]{
Size: txDeduplicatorSize,
}
Expand Down Expand Up @@ -319,10 +339,16 @@ func (vm *VM) SetState(_ context.Context, state snow.State) error {
}

func (vm *VM) Shutdown(context.Context) error {
if vm.state == nil {
if vm.timer == nil {
return nil
}

// There is a potential deadlock if the timer is about to execute a timeout.
// So, the lock must be released before stopping the timer.
vm.ctx.Lock.Unlock()
vm.timer.Stop()
vm.ctx.Lock.Lock()

errs := wrappers.Errs{}
errs.Add(
vm.state.Close(),
Expand Down Expand Up @@ -454,34 +480,16 @@ func (vm *VM) Linearize(_ context.Context, stopVertexID ids.ID, toEngine chan<-
return nil
}

func (*VM) PendingTxs(context.Context) []snowstorm.Tx {
return nil
}

func (vm *VM) ParseTx(_ context.Context, bytes []byte) (snowstorm.Tx, error) {
rawTx, err := vm.parser.ParseTx(bytes)
if err != nil {
return nil, err
}
func (vm *VM) PendingTxs(context.Context) []snowstorm.Tx {
vm.timer.Cancel()

tx := &UniqueTx{
TxCachedState: &TxCachedState{
Tx: rawTx,
},
vm: vm,
txID: rawTx.ID(),
}
if err := tx.SyntacticVerify(); err != nil {
return nil, err
}

if tx.Status() == choices.Unknown {
vm.state.AddTx(tx.Tx)
tx.setStatus(choices.Processing)
return tx, vm.state.Commit()
}
txs := vm.txs
vm.txs = nil
return txs
}

return tx, nil
func (vm *VM) ParseTx(_ context.Context, b []byte) (snowstorm.Tx, error) {
return vm.parseTx(b)
}

func (vm *VM) GetTx(_ context.Context, txID ids.ID) (snowstorm.Tx, error) {
Expand All @@ -505,29 +513,62 @@ func (vm *VM) GetTx(_ context.Context, txID ids.ID) (snowstorm.Tx, error) {
// either accepted or rejected with the appropriate status. This function will
// go out of scope when the transaction is removed from memory.
func (vm *VM) IssueTx(b []byte) (ids.ID, error) {
if !vm.bootstrapped || vm.Builder == nil {
if !vm.bootstrapped {
return ids.ID{}, errBootstrapping
}

tx, err := vm.parser.ParseTx(b)
if err != nil {
vm.ctx.Log.Debug("failed to parse tx",
zap.Error(err),
)
return ids.ID{}, err
// If the chain has been linearized, issue the tx to the network.
if vm.Builder != nil {
tx, err := vm.parser.ParseTx(b)
if err != nil {
vm.ctx.Log.Debug("failed to parse tx",
zap.Error(err),
)
return ids.ID{}, err
}

err = vm.network.IssueTx(context.TODO(), tx)
if err != nil {
vm.ctx.Log.Debug("failed to add tx to mempool",
zap.Error(err),
)
return ids.ID{}, err
}

return tx.ID(), nil
}

err = vm.network.IssueTx(context.TODO(), tx)
// TODO: After the chain is linearized, remove the following code.
tx, err := vm.parseTx(b)
if err != nil {
vm.ctx.Log.Debug("failed to add tx to mempool",
zap.Error(err),
)
return ids.ID{}, err
}

if err := tx.verifyWithoutCacheWrites(); err != nil {
return ids.ID{}, err
}
vm.issueTx(tx)
return tx.ID(), nil
}

/*
******************************************************************************
********************************** Timer API *********************************
******************************************************************************
*/

// FlushTxs into consensus
func (vm *VM) FlushTxs() {
vm.timer.Cancel()
if len(vm.txs) != 0 {
select {
case vm.toEngine <- common.PendingTxs:
default:
vm.ctx.Log.Debug("dropping message to engine due to contention")
vm.timer.SetTimeoutIn(vm.batchTimeout)
}
}
}

/*
******************************************************************************
********************************** Helpers ***********************************
Expand Down Expand Up @@ -597,6 +638,42 @@ func (vm *VM) initState(tx *txs.Tx) {
}
}

func (vm *VM) parseTx(bytes []byte) (*UniqueTx, error) {
rawTx, err := vm.parser.ParseTx(bytes)
if err != nil {
return nil, err
}

tx := &UniqueTx{
TxCachedState: &TxCachedState{
Tx: rawTx,
},
vm: vm,
txID: rawTx.ID(),
}
if err := tx.SyntacticVerify(); err != nil {
return nil, err
}

if tx.Status() == choices.Unknown {
vm.state.AddTx(tx.Tx)
tx.setStatus(choices.Processing)
return tx, vm.state.Commit()
}

return tx, nil
}

func (vm *VM) issueTx(tx snowstorm.Tx) {
vm.txs = append(vm.txs, tx)
switch {
case len(vm.txs) == batchSize:
vm.FlushTxs()
case len(vm.txs) == 1:
vm.timer.SetTimeoutIn(vm.batchTimeout)
}
}

// LoadUser returns:
// 1) The UTXOs that reference one or more addresses controlled by the given user
// 2) A keychain that contains this user's keys
Expand Down