Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions ledger/accountdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,8 @@ type normalizedAccountBalance struct {
normalizedBalance uint64
// encodedResources provides the encoded form of the resources
encodedResources map[basics.CreatableIndex][]byte
// partial balance indicates that the original account balance was split into multiple parts in catchpoint creation time
partialBalance bool
}

// prepareNormalizedBalancesV5 converts an array of encodedBalanceRecordV5 into an equal size array of normalizedAccountBalances.
Expand Down Expand Up @@ -427,12 +429,22 @@ func prepareNormalizedBalancesV6(bals []encodedBalanceRecordV6, proto config.Con
normalizedAccountBalances[i].accountData.MicroAlgos,
proto)
normalizedAccountBalances[i].encodedAccountData = balance.AccountData
normalizedAccountBalances[i].accountHashes = make([][]byte, 1+len(balance.Resources))
normalizedAccountBalances[i].accountHashes[0] = accountHashBuilderV6(balance.Address, &normalizedAccountBalances[i].accountData, balance.AccountData)
curHashIdx := 0
if balance.ExpectingMoreEntries {
// There is a single chunk in the catchpoint file with ExpectingMoreEntries
// set to false for this account. There may be multiple chunks with
// ExpectingMoreEntries set to true. In this case, we do not have to add the
// account's own hash to accountHashes.
normalizedAccountBalances[i].accountHashes = make([][]byte, len(balance.Resources))
normalizedAccountBalances[i].partialBalance = true
} else {
normalizedAccountBalances[i].accountHashes = make([][]byte, 1+len(balance.Resources))
normalizedAccountBalances[i].accountHashes[0] = accountHashBuilderV6(balance.Address, &normalizedAccountBalances[i].accountData, balance.AccountData)
curHashIdx++
}
if len(balance.Resources) > 0 {
normalizedAccountBalances[i].resources = make(map[basics.CreatableIndex]resourcesData, len(balance.Resources))
normalizedAccountBalances[i].encodedResources = make(map[basics.CreatableIndex][]byte, len(balance.Resources))
resIdx := 0
for cidx, res := range balance.Resources {
var resData resourcesData
err = protocol.Decode(res, &resData)
Expand All @@ -447,10 +459,10 @@ func prepareNormalizedBalancesV6(bals []encodedBalanceRecordV6, proto config.Con
} else {
err = fmt.Errorf("unknown creatable for addr %s, aidx %d, data %v", balance.Address.String(), cidx, resData)
}
normalizedAccountBalances[i].accountHashes[resIdx+1] = resourcesHashBuilderV6(balance.Address, basics.CreatableIndex(cidx), ctype, resData.UpdateRound, res)
normalizedAccountBalances[i].accountHashes[curHashIdx] = resourcesHashBuilderV6(balance.Address, basics.CreatableIndex(cidx), ctype, resData.UpdateRound, res)
normalizedAccountBalances[i].resources[basics.CreatableIndex(cidx)] = resData
normalizedAccountBalances[i].encodedResources[basics.CreatableIndex(cidx)] = res
resIdx++
curHashIdx++
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions ledger/catchpointwriter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,9 @@ func TestFullCatchpointWriter(t *testing.T) {
require.NoError(t, err)
}

err = accessor.BuildMerkleTrie(context.Background(), nil)
require.NoError(t, err)

err = l.trackerDBs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
err := applyCatchpointStagingBalances(ctx, tx, 0, 0)
return err
Expand Down Expand Up @@ -701,6 +704,9 @@ func TestFullCatchpointWriterOverflowAccounts(t *testing.T) {
require.NoError(t, err)
}

err = accessor.BuildMerkleTrie(context.Background(), nil)
require.NoError(t, err)

err = l.trackerDBs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
err := applyCatchpointStagingBalances(ctx, tx, 0, 0)
return err
Expand Down
121 changes: 75 additions & 46 deletions ledger/catchupaccessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,46 @@ type CatchpointCatchupAccessor interface {
Ledger() (l CatchupAccessorClientLedger)
}

// CatchpointCatchupAccessorImpl is the concrete implementation of the CatchpointCatchupAccessor interface
type CatchpointCatchupAccessorImpl struct {
type stagingWriter interface {
writeBalances(context.Context, []normalizedAccountBalance) error
writeCreatables(context.Context, []normalizedAccountBalance) error
writeHashes(context.Context, []normalizedAccountBalance) error
isShared() bool
}

type stagingWriterImpl struct {
wdb db.Accessor
}

func (w *stagingWriterImpl) writeBalances(ctx context.Context, balances []normalizedAccountBalance) error {
return w.wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) {
return writeCatchpointStagingBalances(ctx, tx, balances)
})
}

func (w *stagingWriterImpl) writeCreatables(ctx context.Context, balances []normalizedAccountBalance) error {
return w.wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
return writeCatchpointStagingCreatable(ctx, tx, balances)
})
}

func (w *stagingWriterImpl) writeHashes(ctx context.Context, balances []normalizedAccountBalance) error {
return w.wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
err := writeCatchpointStagingHashes(ctx, tx, balances)
return err
})
}

func (w *stagingWriterImpl) isShared() bool {
return w.wdb.IsSharedCacheConnection()
}

// catchpointCatchupAccessorImpl is the concrete implementation of the CatchpointCatchupAccessor interface
type catchpointCatchupAccessorImpl struct {
ledger *Ledger

stagingWriter stagingWriter

// log copied from ledger
log logging.Logger

Expand Down Expand Up @@ -135,14 +171,15 @@ type CatchupAccessorClientLedger interface {

// MakeCatchpointCatchupAccessor creates a CatchpointCatchupAccessor given a ledger
func MakeCatchpointCatchupAccessor(ledger *Ledger, log logging.Logger) CatchpointCatchupAccessor {
return &CatchpointCatchupAccessorImpl{
ledger: ledger,
log: log,
return &catchpointCatchupAccessorImpl{
ledger: ledger,
stagingWriter: &stagingWriterImpl{wdb: ledger.trackerDB().Wdb},
log: log,
}
}

// GetState returns the current state of the catchpoint catchup
func (c *CatchpointCatchupAccessorImpl) GetState(ctx context.Context) (state CatchpointCatchupState, err error) {
func (c *catchpointCatchupAccessorImpl) GetState(ctx context.Context) (state CatchpointCatchupState, err error) {
var istate uint64
istate, err = readCatchpointStateUint64(ctx, c.ledger.trackerDB().Rdb.Handle, catchpointStateCatchupState)
if err != nil {
Expand All @@ -153,7 +190,7 @@ func (c *CatchpointCatchupAccessorImpl) GetState(ctx context.Context) (state Cat
}

// SetState set the state of the catchpoint catchup
func (c *CatchpointCatchupAccessorImpl) SetState(ctx context.Context, state CatchpointCatchupState) (err error) {
func (c *catchpointCatchupAccessorImpl) SetState(ctx context.Context, state CatchpointCatchupState) (err error) {
if state < CatchpointCatchupStateInactive || state > catchpointCatchupStateLast {
return fmt.Errorf("invalid catchpoint catchup state provided : %d", state)
}
Expand All @@ -165,7 +202,7 @@ func (c *CatchpointCatchupAccessorImpl) SetState(ctx context.Context, state Catc
}

// GetLabel returns the current catchpoint catchup label
func (c *CatchpointCatchupAccessorImpl) GetLabel(ctx context.Context) (label string, err error) {
func (c *catchpointCatchupAccessorImpl) GetLabel(ctx context.Context) (label string, err error) {
label, err = readCatchpointStateString(ctx, c.ledger.trackerDB().Rdb.Handle, catchpointStateCatchupLabel)
if err != nil {
return "", fmt.Errorf("unable to read catchpoint catchup state '%s': %v", catchpointStateCatchupLabel, err)
Expand All @@ -174,7 +211,7 @@ func (c *CatchpointCatchupAccessorImpl) GetLabel(ctx context.Context) (label str
}

// SetLabel set the catchpoint catchup label
func (c *CatchpointCatchupAccessorImpl) SetLabel(ctx context.Context, label string) (err error) {
func (c *catchpointCatchupAccessorImpl) SetLabel(ctx context.Context, label string) (err error) {
// verify it's parsable :
_, _, err = ledgercore.ParseCatchpointLabel(label)
if err != nil {
Expand All @@ -188,7 +225,7 @@ func (c *CatchpointCatchupAccessorImpl) SetLabel(ctx context.Context, label stri
}

// ResetStagingBalances resets the current staging balances, preparing for a new set of balances to be added
func (c *CatchpointCatchupAccessorImpl) ResetStagingBalances(ctx context.Context, newCatchup bool) (err error) {
func (c *catchpointCatchupAccessorImpl) ResetStagingBalances(ctx context.Context, newCatchup bool) (err error) {
wdb := c.ledger.trackerDB().Wdb
if !newCatchup {
c.ledger.setSynchronousMode(ctx, c.ledger.synchronousMode)
Expand Down Expand Up @@ -246,7 +283,7 @@ type CatchpointCatchupAccessorProgress struct {
}

// ProgressStagingBalances deserialize the given bytes as a temporary staging balances
func (c *CatchpointCatchupAccessorImpl) ProgressStagingBalances(ctx context.Context, sectionName string, bytes []byte, progress *CatchpointCatchupAccessorProgress) (err error) {
func (c *catchpointCatchupAccessorImpl) ProgressStagingBalances(ctx context.Context, sectionName string, bytes []byte, progress *CatchpointCatchupAccessorProgress) (err error) {
if sectionName == "content.msgpack" {
return c.processStagingContent(ctx, bytes, progress)
}
Expand All @@ -259,7 +296,7 @@ func (c *CatchpointCatchupAccessorImpl) ProgressStagingBalances(ctx context.Cont
}

// processStagingContent deserialize the given bytes as a temporary staging balances content
func (c *CatchpointCatchupAccessorImpl) processStagingContent(ctx context.Context, bytes []byte, progress *CatchpointCatchupAccessorProgress) (err error) {
func (c *catchpointCatchupAccessorImpl) processStagingContent(ctx context.Context, bytes []byte, progress *CatchpointCatchupAccessorProgress) (err error) {
if progress.SeenHeader {
return fmt.Errorf("CatchpointCatchupAccessorImpl::processStagingContent: content chunk already seen")
}
Expand Down Expand Up @@ -307,12 +344,11 @@ func (c *CatchpointCatchupAccessorImpl) processStagingContent(ctx context.Contex
}

// processStagingBalances deserialize the given bytes as a temporary staging balances
func (c *CatchpointCatchupAccessorImpl) processStagingBalances(ctx context.Context, bytes []byte, progress *CatchpointCatchupAccessorProgress) (err error) {
func (c *catchpointCatchupAccessorImpl) processStagingBalances(ctx context.Context, bytes []byte, progress *CatchpointCatchupAccessorProgress) (err error) {
if !progress.SeenHeader {
return fmt.Errorf("CatchpointCatchupAccessorImpl::processStagingBalances: content chunk was missing")
}

wdb := c.ledger.trackerDB().Wdb
start := time.Now()
ledgerProcessstagingbalancesCount.Inc(nil)

Expand Down Expand Up @@ -440,16 +476,13 @@ func (c *CatchpointCatchupAccessorImpl) processStagingBalances(ctx context.Conte
wg.Add(1)
go func() {
defer wg.Done()
errBalances = wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) {
start := time.Now()
err = writeCatchpointStagingBalances(ctx, tx, normalizedAccountBalances)
durBalances = time.Since(start)
return err
})
start := time.Now()
errBalances = c.stagingWriter.writeBalances(ctx, normalizedAccountBalances)
durBalances = time.Since(start)
}()

// on a in-memory database, wait for the writer to finish before starting the new writer
if wdb.IsSharedCacheConnection() {
if c.stagingWriter.isShared() {
wg.Wait()
}

Expand All @@ -467,30 +500,24 @@ func (c *CatchpointCatchupAccessorImpl) processStagingBalances(ctx context.Conte
}
}
if hasCreatables {
errCreatables = wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
start := time.Now()
err := writeCatchpointStagingCreatable(ctx, tx, normalizedAccountBalances)
durCreatables = time.Since(start)
return err
})
start := time.Now()
errCreatables = c.stagingWriter.writeCreatables(ctx, normalizedAccountBalances)
durCreatables = time.Since(start)
}
}()

// on a in-memory database, wait for the writer to finish before starting the new writer
if wdb.IsSharedCacheConnection() {
if c.stagingWriter.isShared() {
wg.Wait()
}

// start the accounts pending hashes writer
wg.Add(1)
go func() {
defer wg.Done()
errHashes = wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
start := time.Now()
err := writeCatchpointStagingHashes(ctx, tx, normalizedAccountBalances)
durHashes = time.Since(start)
return err
})
start := time.Now()
errHashes = c.stagingWriter.writeHashes(ctx, normalizedAccountBalances)
durHashes = time.Since(start)
}()

wg.Wait()
Expand All @@ -510,10 +537,12 @@ func (c *CatchpointCatchupAccessorImpl) processStagingBalances(ctx context.Conte
progress.HashesWriteDuration += durHashes

ledgerProcessstagingbalancesMicros.AddMicrosecondsSince(start, nil)
progress.ProcessedAccounts += uint64(len(normalizedAccountBalances))
progress.ProcessedBytes += uint64(len(bytes))
for _, acctBal := range normalizedAccountBalances {
progress.TotalAccountHashes += uint64(len(acctBal.accountHashes))
if !acctBal.partialBalance {
progress.ProcessedAccounts++
}
}

// not strictly required, but clean up the pointer when we're done.
Expand All @@ -529,7 +558,7 @@ func (c *CatchpointCatchupAccessorImpl) processStagingBalances(ctx context.Conte
}

// BuildMerkleTrie would process the catchpointpendinghashes and insert all the items in it into the merkle trie
func (c *CatchpointCatchupAccessorImpl) BuildMerkleTrie(ctx context.Context, progressUpdates func(uint64)) (err error) {
func (c *catchpointCatchupAccessorImpl) BuildMerkleTrie(ctx context.Context, progressUpdates func(uint64)) (err error) {
wdb := c.ledger.trackerDB().Wdb
rdb := c.ledger.trackerDB().Rdb
err = wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) {
Expand Down Expand Up @@ -708,7 +737,7 @@ func (c *CatchpointCatchupAccessorImpl) BuildMerkleTrie(ctx context.Context, pro
}

// GetCatchupBlockRound returns the latest block round matching the current catchpoint
func (c *CatchpointCatchupAccessorImpl) GetCatchupBlockRound(ctx context.Context) (round basics.Round, err error) {
func (c *catchpointCatchupAccessorImpl) GetCatchupBlockRound(ctx context.Context) (round basics.Round, err error) {
var iRound uint64
iRound, err = readCatchpointStateUint64(ctx, c.ledger.trackerDB().Rdb.Handle, catchpointStateCatchupBlockRound)
if err != nil {
Expand All @@ -718,7 +747,7 @@ func (c *CatchpointCatchupAccessorImpl) GetCatchupBlockRound(ctx context.Context
}

// VerifyCatchpoint verifies that the catchpoint is valid by reconstructing the label.
func (c *CatchpointCatchupAccessorImpl) VerifyCatchpoint(ctx context.Context, blk *bookkeeping.Block) (err error) {
func (c *catchpointCatchupAccessorImpl) VerifyCatchpoint(ctx context.Context, blk *bookkeeping.Block) (err error) {
rdb := c.ledger.trackerDB().Rdb
var balancesHash crypto.Digest
var blockRound basics.Round
Expand Down Expand Up @@ -780,7 +809,7 @@ func (c *CatchpointCatchupAccessorImpl) VerifyCatchpoint(ctx context.Context, bl

// StoreBalancesRound calculates the balances round based on the first block and the associated consensus parameters, and
// store that to the database
func (c *CatchpointCatchupAccessorImpl) StoreBalancesRound(ctx context.Context, blk *bookkeeping.Block) (err error) {
func (c *catchpointCatchupAccessorImpl) StoreBalancesRound(ctx context.Context, blk *bookkeeping.Block) (err error) {
// calculate the balances round and store it. It *should* be identical to the one in the catchpoint file header, but we don't want to
// trust the one in the catchpoint file header, so we'll calculate it ourselves.
catchpointLookback := config.Consensus[blk.CurrentProtocol].CatchpointLookback
Expand All @@ -803,7 +832,7 @@ func (c *CatchpointCatchupAccessorImpl) StoreBalancesRound(ctx context.Context,
}

// StoreFirstBlock stores a single block to the blocks database.
func (c *CatchpointCatchupAccessorImpl) StoreFirstBlock(ctx context.Context, blk *bookkeeping.Block) (err error) {
func (c *catchpointCatchupAccessorImpl) StoreFirstBlock(ctx context.Context, blk *bookkeeping.Block) (err error) {
blockDbs := c.ledger.blockDB()
start := time.Now()
ledgerStorefirstblockCount.Inc(nil)
Expand All @@ -818,7 +847,7 @@ func (c *CatchpointCatchupAccessorImpl) StoreFirstBlock(ctx context.Context, blk
}

// StoreBlock stores a single block to the blocks database.
func (c *CatchpointCatchupAccessorImpl) StoreBlock(ctx context.Context, blk *bookkeeping.Block) (err error) {
func (c *catchpointCatchupAccessorImpl) StoreBlock(ctx context.Context, blk *bookkeeping.Block) (err error) {
blockDbs := c.ledger.blockDB()
start := time.Now()
ledgerCatchpointStoreblockCount.Inc(nil)
Expand All @@ -833,7 +862,7 @@ func (c *CatchpointCatchupAccessorImpl) StoreBlock(ctx context.Context, blk *boo
}

// FinishBlocks concludes the catchup of the blocks database.
func (c *CatchpointCatchupAccessorImpl) FinishBlocks(ctx context.Context, applyChanges bool) (err error) {
func (c *catchpointCatchupAccessorImpl) FinishBlocks(ctx context.Context, applyChanges bool) (err error) {
blockDbs := c.ledger.blockDB()
start := time.Now()
ledgerCatchpointFinishblocksCount.Inc(nil)
Expand All @@ -852,7 +881,7 @@ func (c *CatchpointCatchupAccessorImpl) FinishBlocks(ctx context.Context, applyC
}

// EnsureFirstBlock ensure that we have a single block in the staging block table, and returns that block
func (c *CatchpointCatchupAccessorImpl) EnsureFirstBlock(ctx context.Context) (blk bookkeeping.Block, err error) {
func (c *catchpointCatchupAccessorImpl) EnsureFirstBlock(ctx context.Context) (blk bookkeeping.Block, err error) {
blockDbs := c.ledger.blockDB()
start := time.Now()
ledgerCatchpointEnsureblock1Count.Inc(nil)
Expand All @@ -869,7 +898,7 @@ func (c *CatchpointCatchupAccessorImpl) EnsureFirstBlock(ctx context.Context) (b

// CompleteCatchup completes the catchpoint catchup process by switching the databases tables around
// and reloading the ledger.
func (c *CatchpointCatchupAccessorImpl) CompleteCatchup(ctx context.Context) (err error) {
func (c *catchpointCatchupAccessorImpl) CompleteCatchup(ctx context.Context) (err error) {
err = c.FinishBlocks(ctx, true)
if err != nil {
return err
Expand All @@ -883,7 +912,7 @@ func (c *CatchpointCatchupAccessorImpl) CompleteCatchup(ctx context.Context) (er
}

// finishBalances concludes the catchup of the balances(tracker) database.
func (c *CatchpointCatchupAccessorImpl) finishBalances(ctx context.Context) (err error) {
func (c *catchpointCatchupAccessorImpl) finishBalances(ctx context.Context) (err error) {
wdb := c.ledger.trackerDB().Wdb
start := time.Now()
ledgerCatchpointFinishBalsCount.Inc(nil)
Expand Down Expand Up @@ -986,7 +1015,7 @@ func (c *CatchpointCatchupAccessorImpl) finishBalances(ctx context.Context) (err
}

// Ledger returns ledger instance as CatchupAccessorClientLedger interface
func (c *CatchpointCatchupAccessorImpl) Ledger() (l CatchupAccessorClientLedger) {
func (c *catchpointCatchupAccessorImpl) Ledger() (l CatchupAccessorClientLedger) {
return c.ledger
}

Expand Down
Loading