Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[vms/avm] Cleanup GetTx + remove state pruning logic #2826

Merged
merged 8 commits into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 0 additions & 16 deletions vms/avm/state/mock_state.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

309 changes: 2 additions & 307 deletions vms/avm/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,9 @@ import (
"bytes"
"errors"
"fmt"
"math"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"

"github.com/ava-labs/avalanchego/cache"
"github.com/ava-labs/avalanchego/cache/metercacher"
Expand All @@ -22,8 +19,6 @@ import (
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow/choices"
"github.com/ava-labs/avalanchego/utils"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/timer"
"github.com/ava-labs/avalanchego/vms/avm/block"
"github.com/ava-labs/avalanchego/vms/avm/txs"
"github.com/ava-labs/avalanchego/vms/components/avax"
Expand All @@ -34,11 +29,6 @@ const (
txCacheSize = 8192
blockIDCacheSize = 8192
blockCacheSize = 2048

pruneCommitLimit = 1024
pruneCommitSleepMultiplier = 5
pruneCommitSleepCap = 10 * time.Second
pruneUpdateFrequency = 30 * time.Second
)

var (
Expand Down Expand Up @@ -106,19 +96,6 @@ type State interface {
// pending changes to the base database.
CommitBatch() (database.Batch, error)

// Asynchronously removes unneeded state from disk.
//
// Specifically, this removes:
// - All transaction statuses
// - All non-accepted transactions
// - All UTXOs that were consumed by accepted transactions
//
// [lock] is the AVM's context lock and is assumed to be unlocked when this
// method is called.
//
// TODO: remove after v1.11.x is activated
Prune(lock sync.Locker, log logging.Logger) error

// Checksums returns the current TxChecksum and UTXOChecksum.
Checksums() (txChecksum ids.ID, utxoChecksum ids.ID)

Expand Down Expand Up @@ -150,9 +127,8 @@ type state struct {
utxoDB database.Database
utxoState avax.UTXOState

statusesPruned bool
statusCache cache.Cacher[ids.ID, *choices.Status] // cache of id -> choices.Status. If the entry is nil, it is not in the database
statusDB database.Database
statusCache cache.Cacher[ids.ID, *choices.Status] // cache of id -> choices.Status. If the entry is nil, it is not in the database
statusDB database.Database

addedTxs map[ids.ID]*txs.Tx // map of txID -> *txs.Tx
txCache cache.Cacher[ids.ID, *txs.Tx] // cache of txID -> *txs.Tx. If the entry is nil, it is not in the database
Expand Down Expand Up @@ -281,69 +257,7 @@ func (s *state) DeleteUTXO(utxoID ids.ID) {
s.modifiedUTXOs[utxoID] = nil
}

// TODO: After v1.11.x has activated we can rename [getTx] to [GetTx] and delete
// [getStatus].
func (s *state) GetTx(txID ids.ID) (*txs.Tx, error) {
tx, err := s.getTx(txID)
if err != nil {
return nil, err
}

// Before the linearization, transactions were persisted before they were
// marked as Accepted. However, this function aims to only return accepted
// transactions.
status, err := s.getStatus(txID)
if err == database.ErrNotFound {
// If the status wasn't persisted, then the transaction was written
// after the linearization, and is accepted.
return tx, nil
}
if err != nil {
return nil, err
}

// If the status was persisted, then the transaction was written before the
// linearization. If it wasn't marked as accepted, then we treat it as if it
// doesn't exist.
if status != choices.Accepted {
return nil, database.ErrNotFound
}
return tx, nil
}

func (s *state) getStatus(id ids.ID) (choices.Status, error) {
if s.statusesPruned {
return choices.Unknown, database.ErrNotFound
}

if _, ok := s.addedTxs[id]; ok {
return choices.Unknown, database.ErrNotFound
}
if status, found := s.statusCache.Get(id); found {
if status == nil {
return choices.Unknown, database.ErrNotFound
}
return *status, nil
}

val, err := database.GetUInt32(s.statusDB, id[:])
if err == database.ErrNotFound {
s.statusCache.Put(id, nil)
return choices.Unknown, database.ErrNotFound
}
if err != nil {
return choices.Unknown, err
}

status := choices.Status(val)
if err := status.Valid(); err != nil {
return choices.Unknown, err
}
s.statusCache.Put(id, &status)
return status, nil
}

func (s *state) getTx(txID ids.ID) (*txs.Tx, error) {
if tx, exists := s.addedTxs[txID]; exists {
return tx, nil
}
Expand Down Expand Up @@ -618,225 +532,6 @@ func (s *state) writeMetadata() error {
return nil
}

func (s *state) Prune(lock sync.Locker, log logging.Logger) error {
lock.Lock()
// It is possible that more txs are added after grabbing this iterator. No
// new txs will write a status, so we don't need to check those txs.
statusIter := s.statusDB.NewIterator()
// Releasing is done using a closure to ensure that updating statusIter will
// result in having the most recent iterator released when executing the
// deferred function.
defer func() {
statusIter.Release()
}()

if !statusIter.Next() {
// If there are no statuses on disk, pruning was previously run and
// finished.
lock.Unlock()

log.Info("state already pruned")

return statusIter.Error()
}

startTxIDBytes := statusIter.Key()
txIter := s.txDB.NewIteratorWithStart(startTxIDBytes)
// Releasing is done using a closure to ensure that updating statusIter will
// result in having the most recent iterator released when executing the
// deferred function.
defer func() {
txIter.Release()
}()

// While we are pruning the disk, we disable caching of the data we are
// modifying. Caching is re-enabled when pruning finishes.
//
// Note: If an unexpected error occurs the caches are never re-enabled.
// That's fine as the node is going to be in an unhealthy state regardless.
oldTxCache := s.txCache
s.statusCache = &cache.Empty[ids.ID, *choices.Status]{}
s.txCache = &cache.Empty[ids.ID, *txs.Tx]{}
lock.Unlock()

startTime := time.Now()
lastCommit := startTime
lastUpdate := startTime
startProgress := timer.ProgressFromHash(startTxIDBytes)

startStatusBytes := statusIter.Value()
if err := s.cleanupTx(lock, startTxIDBytes, startStatusBytes, txIter); err != nil {
return err
}

numPruned := 1
for statusIter.Next() {
txIDBytes := statusIter.Key()
statusBytes := statusIter.Value()
if err := s.cleanupTx(lock, txIDBytes, statusBytes, txIter); err != nil {
return err
}

numPruned++

if numPruned%pruneCommitLimit == 0 {
// We must hold the lock during committing to make sure we don't
// attempt to commit to disk while a block is concurrently being
// accepted.
lock.Lock()
err := utils.Err(
s.Commit(),
statusIter.Error(),
txIter.Error(),
)
lock.Unlock()
if err != nil {
return err
}

// We release the iterators here to allow the underlying database to
// clean up deleted state.
statusIter.Release()
txIter.Release()

now := time.Now()
if now.Sub(lastUpdate) > pruneUpdateFrequency {
lastUpdate = now

progress := timer.ProgressFromHash(txIDBytes)
eta := timer.EstimateETA(
startTime,
progress-startProgress,
math.MaxUint64-startProgress,
)
log.Info("committing state pruning",
zap.Int("numPruned", numPruned),
zap.Duration("eta", eta),
)
}

// We take the minimum here because it's possible that the node is
// currently bootstrapping. This would mean that grabbing the lock
// could take an extremely long period of time; which we should not
// delay processing for.
pruneDuration := now.Sub(lastCommit)
sleepDuration := min(
pruneCommitSleepMultiplier*pruneDuration,
pruneCommitSleepCap,
)
time.Sleep(sleepDuration)

// Make sure not to include the sleep duration into the next prune
// duration.
lastCommit = time.Now()

// We shouldn't need to grab the lock here, but doing so ensures
// that we see a consistent view across both the statusDB and the
// txDB.
lock.Lock()
statusIter = s.statusDB.NewIteratorWithStart(txIDBytes)
txIter = s.txDB.NewIteratorWithStart(txIDBytes)
lock.Unlock()
}
}

lock.Lock()
defer lock.Unlock()

err := utils.Err(
s.Commit(),
statusIter.Error(),
txIter.Error(),
)

// Make sure we flush the original cache before re-enabling it to prevent
// surfacing any stale data.
oldTxCache.Flush()
s.statusesPruned = true
s.txCache = oldTxCache

log.Info("finished state pruning",
zap.Int("numPruned", numPruned),
zap.Duration("duration", time.Since(startTime)),
)

return err
}

// Assumes [lock] is unlocked.
func (s *state) cleanupTx(lock sync.Locker, txIDBytes []byte, statusBytes []byte, txIter database.Iterator) error {
// After the linearization, we write txs to disk without statuses to mark
// them as accepted. This means that there may be more txs than statuses and
// we need to skip over them.
//
// Note: We do not need to remove UTXOs consumed after the linearization, as
// those UTXOs are guaranteed to have already been deleted.
if err := skipTo(txIter, txIDBytes); err != nil {
return err
}
// txIter.Key() is now `txIDBytes`

statusInt, err := database.ParseUInt32(statusBytes)
if err != nil {
return err
}
status := choices.Status(statusInt)

if status == choices.Accepted {
txBytes := txIter.Value()
tx, err := s.parser.ParseGenesisTx(txBytes)
if err != nil {
return err
}

utxos := tx.Unsigned.InputUTXOs()

// Locking is done here to make sure that any concurrent verification is
// performed with a valid view of the state.
lock.Lock()
defer lock.Unlock()

// Remove all the UTXOs consumed by the accepted tx. Technically we only
// need to remove UTXOs consumed by operations, but it's easy to just
// remove all of them.
for _, UTXO := range utxos {
if err := s.utxoState.DeleteUTXO(UTXO.InputID()); err != nil {
return err
}
}
} else {
lock.Lock()
defer lock.Unlock()

// This tx wasn't accepted, so we can remove it entirely from disk.
if err := s.txDB.Delete(txIDBytes); err != nil {
return err
}
}
// By removing the status, we will treat the tx as accepted if it is still
// on disk.
return s.statusDB.Delete(txIDBytes)
}

// skipTo advances [iter] until its key is equal to [targetKey]. If [iter] does
// not contain [targetKey] an error will be returned.
//
// Note: [iter.Next()] will always be called at least once.
func skipTo(iter database.Iterator, targetKey []byte) error {
for {
if !iter.Next() {
return fmt.Errorf("%w: 0x%x", database.ErrNotFound, targetKey)
}
key := iter.Key()
switch bytes.Compare(targetKey, key) {
case -1:
return fmt.Errorf("%w: 0x%x", database.ErrNotFound, targetKey)
case 0:
return nil
}
}
}

func (s *state) Checksums() (ids.ID, ids.ID) {
return s.txChecksum, s.utxoState.Checksum()
}
Expand Down
Loading
Loading