Skip to content

Commit 009edc6

Browse files
Reintroduce P-chain block reindexing (#3883)
1 parent da4b470 commit 009edc6

File tree

4 files changed

+367
-1
lines changed

4 files changed

+367
-1
lines changed

vms/platformvm/state/mock_state.go

Lines changed: 16 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vms/platformvm/state/state.go

Lines changed: 182 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
"context"
99
"errors"
1010
"fmt"
11+
"math"
12+
"sync"
1113
"time"
1214

1315
"github.com/google/btree"
@@ -23,14 +25,17 @@ import (
2325
"github.com/ava-labs/avalanchego/database/versiondb"
2426
"github.com/ava-labs/avalanchego/ids"
2527
"github.com/ava-labs/avalanchego/snow"
28+
"github.com/ava-labs/avalanchego/snow/choices"
2629
"github.com/ava-labs/avalanchego/snow/uptime"
2730
"github.com/ava-labs/avalanchego/snow/validators"
2831
"github.com/ava-labs/avalanchego/upgrade"
2932
"github.com/ava-labs/avalanchego/utils/constants"
3033
"github.com/ava-labs/avalanchego/utils/crypto/bls"
3134
"github.com/ava-labs/avalanchego/utils/hashing"
3235
"github.com/ava-labs/avalanchego/utils/iterator"
36+
"github.com/ava-labs/avalanchego/utils/logging"
3337
"github.com/ava-labs/avalanchego/utils/maybe"
38+
"github.com/ava-labs/avalanchego/utils/timer"
3439
"github.com/ava-labs/avalanchego/utils/wrappers"
3540
"github.com/ava-labs/avalanchego/vms/components/avax"
3641
"github.com/ava-labs/avalanchego/vms/components/gas"
@@ -97,6 +102,7 @@ var (
97102
LastAcceptedKey = []byte("last accepted")
98103
HeightsIndexedKey = []byte("heights indexed")
99104
InitializedKey = []byte("initialized")
105+
BlocksReindexedKey = []byte("blocks reindexed.3")
100106

101107
emptyL1ValidatorCache = &cache.Empty[ids.ID, maybe.Maybe[L1Validator]]{}
102108
)
@@ -215,6 +221,14 @@ type State interface {
215221
// Discard uncommitted changes to the database.
216222
Abort()
217223

224+
// ReindexBlocks converts any block indices using the legacy storage format
225+
// to the new format. If this database has already updated the indices,
226+
// this function will return immediately, without iterating over the
227+
// database.
228+
//
229+
// TODO: Remove after v1.14.x is activated
230+
ReindexBlocks(lock sync.Locker, log logging.Logger) error
231+
218232
// Commit changes to the base database.
219233
Commit() error
220234

@@ -227,6 +241,16 @@ type State interface {
227241
Close() error
228242
}
229243

244+
// Prior to https://github.com/ava-labs/avalanchego/pull/1719, blocks were
245+
// stored as a map from blkID to stateBlk. Nodes synced prior to this PR may
246+
// still have blocks partially stored using this legacy format.
247+
//
248+
// TODO: Remove after v1.14.x is activated
249+
type stateBlk struct {
250+
Bytes []byte `serialize:"true"`
251+
Status choices.Status `serialize:"true"`
252+
}
253+
230254
/*
231255
* VMDB
232256
* |-. validators
@@ -296,6 +320,7 @@ type State interface {
296320
* | '-- timestamp + validationID -> nil
297321
* '-. singletons
298322
* |-- initializedKey -> nil
323+
* |-- blocksReindexedKey -> nil
299324
* |-- timestampKey -> timestamp
300325
* |-- feeStateKey -> feeState
301326
* |-- l1ValidatorExcessKey -> l1ValidatorExcess
@@ -2258,7 +2283,7 @@ func (s *state) GetStatelessBlock(blockID ids.ID) (block.Block, error) {
22582283
return nil, err
22592284
}
22602285

2261-
blk, err := block.Parse(block.GenesisCodec, blkBytes)
2286+
blk, _, err := parseStoredBlock(blkBytes)
22622287
if err != nil {
22632288
return nil, err
22642289
}
@@ -3039,6 +3064,162 @@ func (s *state) writeMetadata() error {
30393064
return nil
30403065
}
30413066

3067+
// Returns the block and whether it is a [stateBlk].
3068+
// Invariant: blkBytes is safe to parse with blocks.GenesisCodec
3069+
//
3070+
// TODO: Remove after v1.14.x is activated
3071+
func parseStoredBlock(blkBytes []byte) (block.Block, bool, error) {
3072+
// Attempt to parse as blocks.Block
3073+
blk, err := block.Parse(block.GenesisCodec, blkBytes)
3074+
if err == nil {
3075+
return blk, false, nil
3076+
}
3077+
3078+
// Fallback to [stateBlk]
3079+
blkState := stateBlk{}
3080+
if _, err := block.GenesisCodec.Unmarshal(blkBytes, &blkState); err != nil {
3081+
return nil, false, err
3082+
}
3083+
3084+
blk, err = block.Parse(block.GenesisCodec, blkState.Bytes)
3085+
return blk, true, err
3086+
}
3087+
3088+
func (s *state) ReindexBlocks(lock sync.Locker, log logging.Logger) error {
3089+
has, err := s.singletonDB.Has(BlocksReindexedKey)
3090+
if err != nil {
3091+
return err
3092+
}
3093+
if has {
3094+
log.Info("blocks already reindexed")
3095+
return nil
3096+
}
3097+
3098+
// It is possible that new blocks are added after grabbing this iterator.
3099+
// New blocks are guaranteed to be persisted in the new format, so we don't
3100+
// need to check them.
3101+
blockIterator := s.blockDB.NewIterator()
3102+
// Releasing is done using a closure to ensure that updating blockIterator
3103+
// will result in having the most recent iterator released when executing
3104+
// the deferred function.
3105+
defer func() {
3106+
blockIterator.Release()
3107+
}()
3108+
3109+
log.Info("starting block reindexing")
3110+
3111+
var (
3112+
startTime = time.Now()
3113+
lastCommit = startTime
3114+
nextUpdate = startTime.Add(indexLogFrequency)
3115+
numIndicesChecked = 0
3116+
numIndicesUpdated = 0
3117+
)
3118+
3119+
for blockIterator.Next() {
3120+
valueBytes := blockIterator.Value()
3121+
blk, isStateBlk, err := parseStoredBlock(valueBytes)
3122+
if err != nil {
3123+
return fmt.Errorf("failed to parse block: %w", err)
3124+
}
3125+
3126+
blkID := blk.ID()
3127+
3128+
// This block was previously stored using the legacy format, update the
3129+
// index to remove the usage of stateBlk.
3130+
if isStateBlk {
3131+
blkBytes := blk.Bytes()
3132+
if err := s.blockDB.Put(blkID[:], blkBytes); err != nil {
3133+
return fmt.Errorf("failed to write block: %w", err)
3134+
}
3135+
3136+
numIndicesUpdated++
3137+
}
3138+
3139+
numIndicesChecked++
3140+
3141+
now := time.Now()
3142+
if now.After(nextUpdate) {
3143+
nextUpdate = now.Add(indexLogFrequency)
3144+
3145+
progress := timer.ProgressFromHash(blkID[:])
3146+
eta := timer.EstimateETA(
3147+
startTime,
3148+
progress,
3149+
math.MaxUint64,
3150+
)
3151+
3152+
log.Info("reindexing blocks",
3153+
zap.Int("numIndicesUpdated", numIndicesUpdated),
3154+
zap.Int("numIndicesChecked", numIndicesChecked),
3155+
zap.Duration("eta", eta),
3156+
)
3157+
}
3158+
3159+
if numIndicesChecked%indexIterationLimit == 0 {
3160+
// We must hold the lock during committing to make sure we don't
3161+
// attempt to commit to disk while a block is concurrently being
3162+
// accepted.
3163+
lock.Lock()
3164+
err := errors.Join(
3165+
s.Commit(),
3166+
blockIterator.Error(),
3167+
)
3168+
lock.Unlock()
3169+
if err != nil {
3170+
return err
3171+
}
3172+
3173+
// We release the iterator here to allow the underlying database to
3174+
// clean up deleted state.
3175+
blockIterator.Release()
3176+
3177+
// We take the minimum here because it's possible that the node is
3178+
// currently bootstrapping. This would mean that grabbing the lock
3179+
// could take an extremely long period of time; which we should not
3180+
// delay processing for.
3181+
indexDuration := now.Sub(lastCommit)
3182+
sleepDuration := min(
3183+
indexIterationSleepMultiplier*indexDuration,
3184+
indexIterationSleepCap,
3185+
)
3186+
time.Sleep(sleepDuration)
3187+
3188+
// Make sure not to include the sleep duration into the next index
3189+
// duration.
3190+
lastCommit = time.Now()
3191+
3192+
blockIterator = s.blockDB.NewIteratorWithStart(blkID[:])
3193+
}
3194+
}
3195+
3196+
// Ensure we fully iterated over all blocks before writing that indexing has
3197+
// finished.
3198+
//
3199+
// Note: This is needed because a transient read error could cause the
3200+
// iterator to stop early.
3201+
if err := blockIterator.Error(); err != nil {
3202+
return fmt.Errorf("failed to iterate over historical blocks: %w", err)
3203+
}
3204+
3205+
if err := s.singletonDB.Put(BlocksReindexedKey, nil); err != nil {
3206+
return fmt.Errorf("failed to put marked blocks as reindexed: %w", err)
3207+
}
3208+
3209+
// We must hold the lock during committing to make sure we don't attempt to
3210+
// commit to disk while a block is concurrently being accepted.
3211+
lock.Lock()
3212+
defer lock.Unlock()
3213+
3214+
log.Info("finished block reindexing",
3215+
zap.Int("numIndicesUpdated", numIndicesUpdated),
3216+
zap.Int("numIndicesChecked", numIndicesChecked),
3217+
zap.Duration("duration", time.Since(startTime)),
3218+
)
3219+
3220+
return s.Commit()
3221+
}
3222+
30423223
func (s *state) GetUptime(vdrID ids.NodeID) (time.Duration, time.Time, error) {
30433224
return s.validatorState.GetUptime(vdrID, constants.PrimaryNetworkID)
30443225
}

0 commit comments

Comments
 (0)