Skip to content

Commit

Permalink
Merge pull request #4797 from onflow/petera/adjust-checkpoint-bootstr…
Browse files Browse the repository at this point in the history
…ap-logging

[Access] Adjust logging in checkpoint loading
  • Loading branch information
peterargue authored Oct 5, 2023
2 parents 2c6b2e3 + 0d60429 commit 9c6f9ff
Showing 1 changed file with 33 additions and 8 deletions.
41 changes: 33 additions & 8 deletions storage/pebble/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import (
"errors"
"fmt"
"path/filepath"
"time"

"github.com/cockroachdb/pebble"
"github.com/rs/zerolog"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"

"github.com/onflow/flow-go/ledger/common/convert"
Expand All @@ -18,12 +20,13 @@ import (
var ErrAlreadyBootstrapped = errors.New("found latest key set on badger instance, DB is already bootstrapped")

type RegisterBootstrap struct {
checkpointDir string
checkpointFileName string
log zerolog.Logger
db *pebble.DB
checkpointDir string
checkpointFileName string
leafNodeChan chan *wal.LeafNode
rootHeight uint64
registerCount *atomic.Uint64
}

// NewRegisterBootstrap creates the bootstrap object for reading checkpoint data and the height tracker in pebble
Expand All @@ -45,21 +48,24 @@ func NewRegisterBootstrap(
// key detected, attempt to run bootstrap on corrupt or already bootstrapped data
return nil, ErrAlreadyBootstrapped
}

checkpointDir, checkpointFileName := filepath.Split(checkpointFile)
return &RegisterBootstrap{
checkpointDir: checkpointDir,
checkpointFileName: checkpointFileName,
log: log.With().Str("module", "register_bootstrap").Logger(),
db: db,
checkpointDir: checkpointDir,
checkpointFileName: checkpointFileName,
leafNodeChan: make(chan *wal.LeafNode, checkpointLeafNodeBufSize),
rootHeight: rootHeight,
registerCount: atomic.NewUint64(0),
}, nil
}

func (b *RegisterBootstrap) batchIndexRegisters(leafNodes []*wal.LeafNode) error {
b.log.Debug().Int("batch_size", len(leafNodes)).Msg("indexing batch of leaf nodes")
batch := b.db.NewBatch()
defer batch.Close()

b.log.Trace().Int("batch_size", len(leafNodes)).Msg("indexing batch of leaf nodes")
for _, register := range leafNodes {
payload := register.Payload
key, err := payload.Key()
Expand All @@ -78,17 +84,22 @@ func (b *RegisterBootstrap) batchIndexRegisters(leafNodes []*wal.LeafNode) error
return fmt.Errorf("failed to set key: %w", err)
}
}

err := batch.Commit(pebble.Sync)
if err != nil {
return fmt.Errorf("failed to commit batch: %w", err)
}

b.registerCount.Add(uint64(len(leafNodes)))

return nil
}

// indexCheckpointFileWorker asynchronously indexes register entries in b.checkpointDir
// with wal.OpenAndReadLeafNodesFromCheckpointV6
func (b *RegisterBootstrap) indexCheckpointFileWorker(ctx context.Context) error {
b.log.Info().Msg("started checkpoint index worker")
b.log.Debug().Msg("started checkpoint index worker")

// collect leaf nodes to batch index until the channel is closed
batch := make([]*wal.LeafNode, 0, pebbleBootstrapRegisterBatchLen)
for leafNode := range b.leafNodeChan {
Expand All @@ -106,6 +117,7 @@ func (b *RegisterBootstrap) indexCheckpointFileWorker(ctx context.Context) error
}
}
}

// index the remaining registers if didn't reach a batch length.
err := b.batchIndexRegisters(batch)
if err != nil {
Expand All @@ -118,24 +130,37 @@ func (b *RegisterBootstrap) indexCheckpointFileWorker(ctx context.Context) error
func (b *RegisterBootstrap) IndexCheckpointFile(ctx context.Context) error {
cct, cancel := context.WithCancel(ctx)
defer cancel()

g, gCtx := errgroup.WithContext(cct)
b.log.Info().Msg("indexing checkpoint file for pebble register store")

start := time.Now()
b.log.Info().Msg("indexing registers from checkpoint")
for i := 0; i < pebbleBootstrapWorkerCount; i++ {
g.Go(func() error {
return b.indexCheckpointFileWorker(gCtx)
})
}

err := wal.OpenAndReadLeafNodesFromCheckpointV6(b.leafNodeChan, b.checkpointDir, b.checkpointFileName, b.log)
if err != nil {
return fmt.Errorf("error reading leaf node: %w", err)
}

if err = g.Wait(); err != nil {
return fmt.Errorf("failed to index checkpoint file: %w", err)
}
b.log.Info().Msg("checkpoint indexing complete")

err = initHeights(b.db, b.rootHeight)
if err != nil {
return fmt.Errorf("could not index latest height: %w", err)
}

b.log.Info().
Uint64("root_height", b.rootHeight).
Uint64("register_count", b.registerCount.Load()).
// note: not using Dur() since default units are ms and this duration is long
Str("duration", fmt.Sprintf("%v", time.Since(start))).
Msg("checkpoint indexing complete")

return nil
}

0 comments on commit 9c6f9ff

Please sign in to comment.