diff --git a/storage/pebble/bootstrap.go b/storage/pebble/bootstrap.go index ecb18ec6b43..c6f15b27922 100644 --- a/storage/pebble/bootstrap.go +++ b/storage/pebble/bootstrap.go @@ -5,9 +5,11 @@ import ( "errors" "fmt" "path/filepath" + "time" "github.com/cockroachdb/pebble" "github.com/rs/zerolog" + "go.uber.org/atomic" "golang.org/x/sync/errgroup" "github.com/onflow/flow-go/ledger/common/convert" @@ -18,12 +20,13 @@ import ( var ErrAlreadyBootstrapped = errors.New("found latest key set on badger instance, DB is already bootstrapped") type RegisterBootstrap struct { - checkpointDir string - checkpointFileName string log zerolog.Logger db *pebble.DB + checkpointDir string + checkpointFileName string leafNodeChan chan *wal.LeafNode rootHeight uint64 + registerCount *atomic.Uint64 } // NewRegisterBootstrap creates the bootstrap object for reading checkpoint data and the height tracker in pebble @@ -45,21 +48,24 @@ func NewRegisterBootstrap( // key detected, attempt to run bootstrap on corrupt or already bootstrapped data return nil, ErrAlreadyBootstrapped } + checkpointDir, checkpointFileName := filepath.Split(checkpointFile) return &RegisterBootstrap{ - checkpointDir: checkpointDir, - checkpointFileName: checkpointFileName, log: log.With().Str("module", "register_bootstrap").Logger(), db: db, + checkpointDir: checkpointDir, + checkpointFileName: checkpointFileName, leafNodeChan: make(chan *wal.LeafNode, checkpointLeafNodeBufSize), rootHeight: rootHeight, + registerCount: atomic.NewUint64(0), }, nil } func (b *RegisterBootstrap) batchIndexRegisters(leafNodes []*wal.LeafNode) error { - b.log.Debug().Int("batch_size", len(leafNodes)).Msg("indexing batch of leaf nodes") batch := b.db.NewBatch() defer batch.Close() + + b.log.Trace().Int("batch_size", len(leafNodes)).Msg("indexing batch of leaf nodes") for _, register := range leafNodes { payload := register.Payload key, err := payload.Key() @@ -78,17 +84,22 @@ func (b *RegisterBootstrap) batchIndexRegisters(leafNodes []*wal.LeafNode) error return fmt.Errorf("failed to set key: %w", err) } } + err := batch.Commit(pebble.Sync) if err != nil { return fmt.Errorf("failed to commit batch: %w", err) } + + b.registerCount.Add(uint64(len(leafNodes))) + return nil } // indexCheckpointFileWorker asynchronously indexes register entries in b.checkpointDir // with wal.OpenAndReadLeafNodesFromCheckpointV6 func (b *RegisterBootstrap) indexCheckpointFileWorker(ctx context.Context) error { - b.log.Info().Msg("started checkpoint index worker") + b.log.Debug().Msg("started checkpoint index worker") + // collect leaf nodes to batch index until the channel is closed batch := make([]*wal.LeafNode, 0, pebbleBootstrapRegisterBatchLen) for leafNode := range b.leafNodeChan { @@ -106,6 +117,7 @@ func (b *RegisterBootstrap) indexCheckpointFileWorker(ctx context.Context) error } } } + // index the remaining registers if didn't reach a batch length. err := b.batchIndexRegisters(batch) if err != nil { @@ -118,24 +130,37 @@ func (b *RegisterBootstrap) indexCheckpointFileWorker(ctx context.Context) error func (b *RegisterBootstrap) IndexCheckpointFile(ctx context.Context) error { cct, cancel := context.WithCancel(ctx) defer cancel() + g, gCtx := errgroup.WithContext(cct) - b.log.Info().Msg("indexing checkpoint file for pebble register store") + + start := time.Now() + b.log.Info().Msg("indexing registers from checkpoint") for i := 0; i < pebbleBootstrapWorkerCount; i++ { g.Go(func() error { return b.indexCheckpointFileWorker(gCtx) }) } + err := wal.OpenAndReadLeafNodesFromCheckpointV6(b.leafNodeChan, b.checkpointDir, b.checkpointFileName, b.log) if err != nil { return fmt.Errorf("error reading leaf node: %w", err) } + if err = g.Wait(); err != nil { return fmt.Errorf("failed to index checkpoint file: %w", err) } - b.log.Info().Msg("checkpoint indexing complete") + err = initHeights(b.db, b.rootHeight) if err != nil { return fmt.Errorf("could not index latest height: %w", err) } + + b.log.Info(). + Uint64("root_height", b.rootHeight). + Uint64("register_count", b.registerCount.Load()). + // note: not using Dur() since default units are ms and this duration is long + Str("duration", fmt.Sprintf("%v", time.Since(start))). + Msg("checkpoint indexing complete") + return nil }