Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Adjust logging in checkpoint loading #4797

Merged
merged 1 commit into from
Oct 5, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 33 additions & 8 deletions storage/pebble/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import (
"errors"
"fmt"
"path/filepath"
"time"

"github.com/cockroachdb/pebble"
"github.com/rs/zerolog"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"

"github.com/onflow/flow-go/ledger/common/convert"
Expand All @@ -18,12 +20,13 @@ import (
var ErrAlreadyBootstrapped = errors.New("found latest key set on badger instance, DB is already bootstrapped")

type RegisterBootstrap struct {
checkpointDir string
checkpointFileName string
log zerolog.Logger
db *pebble.DB
checkpointDir string
checkpointFileName string
leafNodeChan chan *wal.LeafNode
rootHeight uint64
registerCount *atomic.Uint64
}

// NewRegisterBootstrap creates the bootstrap object for reading checkpoint data and the height tracker in pebble
Expand All @@ -45,21 +48,24 @@ func NewRegisterBootstrap(
// key detected, attempt to run bootstrap on corrupt or already bootstrapped data
return nil, ErrAlreadyBootstrapped
}

checkpointDir, checkpointFileName := filepath.Split(checkpointFile)
return &RegisterBootstrap{
checkpointDir: checkpointDir,
checkpointFileName: checkpointFileName,
log: log.With().Str("module", "register_bootstrap").Logger(),
db: db,
checkpointDir: checkpointDir,
checkpointFileName: checkpointFileName,
leafNodeChan: make(chan *wal.LeafNode, checkpointLeafNodeBufSize),
rootHeight: rootHeight,
registerCount: atomic.NewUint64(0),
}, nil
}

func (b *RegisterBootstrap) batchIndexRegisters(leafNodes []*wal.LeafNode) error {
b.log.Debug().Int("batch_size", len(leafNodes)).Msg("indexing batch of leaf nodes")
batch := b.db.NewBatch()
defer batch.Close()

b.log.Trace().Int("batch_size", len(leafNodes)).Msg("indexing batch of leaf nodes")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is to monitor the progress.

Actually, OpenAndReadLeafNodesFromCheckpointV6 is already doing that, since it's logging the percentage of reading the trie, and is blocked if database is not storing the registers fast enough, which means if the log shows the reading percentage is going up, it means leaf nodes are being indexed properly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, exactly. the checkpoint loader also logs based on % progress, resulting in much fewer messages

for _, register := range leafNodes {
payload := register.Payload
key, err := payload.Key()
Expand All @@ -78,17 +84,22 @@ func (b *RegisterBootstrap) batchIndexRegisters(leafNodes []*wal.LeafNode) error
return fmt.Errorf("failed to set key: %w", err)
}
}

err := batch.Commit(pebble.Sync)
if err != nil {
return fmt.Errorf("failed to commit batch: %w", err)
}

b.registerCount.Add(uint64(len(leafNodes)))

return nil
}

// indexCheckpointFileWorker asynchronously indexes register entries in b.checkpointDir
// with wal.OpenAndReadLeafNodesFromCheckpointV6
func (b *RegisterBootstrap) indexCheckpointFileWorker(ctx context.Context) error {
b.log.Info().Msg("started checkpoint index worker")
b.log.Debug().Msg("started checkpoint index worker")

// collect leaf nodes to batch index until the channel is closed
batch := make([]*wal.LeafNode, 0, pebbleBootstrapRegisterBatchLen)
for leafNode := range b.leafNodeChan {
Expand All @@ -106,6 +117,7 @@ func (b *RegisterBootstrap) indexCheckpointFileWorker(ctx context.Context) error
}
}
}

// index the remaining registers if didn't reach a batch length.
err := b.batchIndexRegisters(batch)
if err != nil {
Expand All @@ -118,24 +130,37 @@ func (b *RegisterBootstrap) indexCheckpointFileWorker(ctx context.Context) error
func (b *RegisterBootstrap) IndexCheckpointFile(ctx context.Context) error {
cct, cancel := context.WithCancel(ctx)
defer cancel()

g, gCtx := errgroup.WithContext(cct)
b.log.Info().Msg("indexing checkpoint file for pebble register store")

start := time.Now()
b.log.Info().Msg("indexing registers from checkpoint")
for i := 0; i < pebbleBootstrapWorkerCount; i++ {
g.Go(func() error {
return b.indexCheckpointFileWorker(gCtx)
})
}

err := wal.OpenAndReadLeafNodesFromCheckpointV6(b.leafNodeChan, b.checkpointDir, b.checkpointFileName, b.log)
if err != nil {
return fmt.Errorf("error reading leaf node: %w", err)
}

if err = g.Wait(); err != nil {
return fmt.Errorf("failed to index checkpoint file: %w", err)
}
b.log.Info().Msg("checkpoint indexing complete")

err = initHeights(b.db, b.rootHeight)
if err != nil {
return fmt.Errorf("could not index latest height: %w", err)
}

b.log.Info().
Uint64("root_height", b.rootHeight).
Uint64("register_count", b.registerCount.Load()).
// note: not using Dur() since default units are ms and this duration is long
Str("duration", fmt.Sprintf("%v", time.Since(start))).
Msg("checkpoint indexing complete")

return nil
}
Loading