Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ledger/blockdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ func blockCompleteCatchup(tx *sql.Tx) (err error) {
return nil
}

// TODO: unused, either actually implement cleanup on catchpoint failure, or delete this
func blockAbortCatchup(tx *sql.Tx) error {
// delete the old catchpointblocks table, if there is such.
for _, stmt := range blockResetExprs {
Expand Down
1 change: 1 addition & 0 deletions ledger/catchupaccessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,7 @@ func (c *CatchpointCatchupAccessorImpl) FinishBlocks(ctx context.Context, applyC
if applyChanges {
return blockCompleteCatchup(tx)
}
// TODO: unused, either actually implement cleanup on catchpoint failure, or delete this
return blockAbortCatchup(tx)
})
ledgerCatchpointFinishblocksMicros.AddMicrosecondsSince(start, nil)
Expand Down
282 changes: 255 additions & 27 deletions ledger/catchupaccessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,43 @@ import (
"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/ledger/ledgercore"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/protocol"
)

func createTestingEncodedChunks(accountsCount uint64) (encodedAccountChunks [][]byte, last64KIndex int) {
// pre-create all encoded chunks.
accounts := uint64(0)
encodedAccountChunks = make([][]byte, 0, accountsCount/BalancesPerCatchpointFileChunk+1)
last64KIndex = -1
for accounts < accountsCount {
// generate a chunk;
chunkSize := accountsCount - accounts
if chunkSize > BalancesPerCatchpointFileChunk {
chunkSize = BalancesPerCatchpointFileChunk
}
if accounts >= accountsCount-64*1024 && last64KIndex == -1 {
last64KIndex = len(encodedAccountChunks)
}
var balances catchpointFileBalancesChunk
balances.Balances = make([]encodedBalanceRecord, chunkSize)
for i := uint64(0); i < chunkSize; i++ {
var randomAccount encodedBalanceRecord
accountData := basics.AccountData{}
accountData.MicroAlgos.Raw = crypto.RandUint63()
randomAccount.AccountData = protocol.Encode(&accountData)
crypto.RandBytes(randomAccount.Address[:])
binary.LittleEndian.PutUint64(randomAccount.Address[:], accounts+i)
balances.Balances[i] = randomAccount
}
encodedAccountChunks = append(encodedAccountChunks, protocol.Encode(&balances))
accounts += chunkSize
}
return
}

func benchmarkRestoringFromCatchpointFileHelper(b *testing.B) {
genesisInitState, _ := testGenerateInitState(b, protocol.ConsensusCurrentVersion, 100)
const inMem = false
Expand Down Expand Up @@ -74,35 +106,9 @@ func benchmarkRestoringFromCatchpointFileHelper(b *testing.B) {
require.NoError(b, err)

// pre-create all encoded chunks.
accounts := uint64(0)
encodedAccountChunks := make([][]byte, 0, accountsCount/BalancesPerCatchpointFileChunk+1)
last64KIndex := -1
for accounts < accountsCount {
// generate a chunk;
chunkSize := accountsCount - accounts
if chunkSize > BalancesPerCatchpointFileChunk {
chunkSize = BalancesPerCatchpointFileChunk
}
if accounts >= accountsCount-64*1024 && last64KIndex == -1 {
last64KIndex = len(encodedAccountChunks)
}
var balances catchpointFileBalancesChunk
balances.Balances = make([]encodedBalanceRecord, chunkSize)
for i := uint64(0); i < chunkSize; i++ {
var randomAccount encodedBalanceRecord
accountData := basics.AccountData{}
accountData.MicroAlgos.Raw = crypto.RandUint63()
randomAccount.AccountData = protocol.Encode(&accountData)
crypto.RandBytes(randomAccount.Address[:])
binary.LittleEndian.PutUint64(randomAccount.Address[:], accounts+i)
balances.Balances[i] = randomAccount
}
encodedAccountChunks = append(encodedAccountChunks, protocol.Encode(&balances))
accounts += chunkSize
}
encodedAccountChunks, last64KIndex := createTestingEncodedChunks(accountsCount)

b.ResetTimer()
accounts = uint64(0)
var last64KStart time.Time
for len(encodedAccountChunks) > 0 {
encodedAccounts := encodedAccountChunks[0]
Expand Down Expand Up @@ -131,3 +137,225 @@ func BenchmarkRestoringFromCatchpointFile(b *testing.B) {
})
}
}

func TestCatchupAcessorFoo(t *testing.T) {
log := logging.TestingLog(t)
dbBaseFileName := t.Name()
const inMem = true
genesisInitState, _ /* initKeys */ := testGenerateInitState(t, protocol.ConsensusCurrentVersion, 100)
cfg := config.GetDefaultLocal()
l, err := OpenLedger(log, dbBaseFileName, inMem, genesisInitState, cfg)
require.NoError(t, err, "could not open ledger")
defer func() {
l.Close()
}()
catchpointAccessor := MakeCatchpointCatchupAccessor(l, log)
err = catchpointAccessor.ResetStagingBalances(context.Background(), true)
require.NoError(t, err, "ResetStagingBalances")

// TODO: GetState/SetState/GetLabel/SetLabel but setup for an error? (disconnected db?)

err = catchpointAccessor.SetState(context.Background(), CatchpointCatchupStateInactive)
require.NoError(t, err, "catchpointAccessor.SetState")
err = catchpointAccessor.SetState(context.Background(), CatchpointCatchupStateLedgerDownload)
require.NoError(t, err, "catchpointAccessor.SetState")
err = catchpointAccessor.SetState(context.Background(), CatchpointCatchupStateLastestBlockDownload)
require.NoError(t, err, "catchpointAccessor.SetState")
err = catchpointAccessor.SetState(context.Background(), CatchpointCatchupStateBlocksDownload)
require.NoError(t, err, "catchpointAccessor.SetState")
err = catchpointAccessor.SetState(context.Background(), CatchpointCatchupStateSwitch)
require.NoError(t, err, "catchpointAccessor.SetState")
err = catchpointAccessor.SetState(context.Background(), catchpointCatchupStateLast+1)
require.Error(t, err, "catchpointAccessor.SetState")

state, err := catchpointAccessor.GetState(context.Background())
require.NoError(t, err, "catchpointAccessor.GetState")
require.Equal(t, CatchpointCatchupState(CatchpointCatchupStateSwitch), state)
t.Logf("catchpoint state %#v", state)

// invalid label
err = catchpointAccessor.SetLabel(context.Background(), "wat")
require.Error(t, err, "catchpointAccessor.SetLabel")

// ok
calabel := "98#QGMCMMUPV74AXXVKSNPRN73XMJG44ZJTZHU25HDG7JH5OHMM6N3Q"
err = catchpointAccessor.SetLabel(context.Background(), calabel)
require.NoError(t, err, "catchpointAccessor.SetLabel")

label, err := catchpointAccessor.GetLabel(context.Background())
require.NoError(t, err, "catchpointAccessor.GetLabel")
require.Equal(t, calabel, label)
t.Logf("catchpoint label %#v", label)

err = catchpointAccessor.ResetStagingBalances(context.Background(), false)
require.NoError(t, err, "ResetStagingBalances")
}

func TestBuildMerkleTrie(t *testing.T) {
// setup boilerplate
log := logging.TestingLog(t)
dbBaseFileName := t.Name()
const inMem = true
genesisInitState, initKeys := testGenerateInitState(t, protocol.ConsensusCurrentVersion, 100)
cfg := config.GetDefaultLocal()
l, err := OpenLedger(log, dbBaseFileName, inMem, genesisInitState, cfg)
require.NoError(t, err, "could not open ledger")
defer func() {
l.Close()
}()
catchpointAccessor := MakeCatchpointCatchupAccessor(l, log)

progressCallCount := 0
progressNop := func(uint64) {
progressCallCount++
}

ctx := context.Background()

// actual testing...
// insufficient setup, it should fail:
err = catchpointAccessor.BuildMerkleTrie(ctx, progressNop)
require.Error(t, err)

// from reset it's okay, but it doesn't do anything
err = catchpointAccessor.ResetStagingBalances(ctx, true)
require.NoError(t, err, "ResetStagingBalances")
err = catchpointAccessor.BuildMerkleTrie(ctx, progressNop)
require.NoError(t, err)
require.True(t, progressCallCount > 0)

// process some data...
progressCallCount = 0
err = catchpointAccessor.ResetStagingBalances(ctx, true)
require.NoError(t, err, "ResetStagingBalances")
// TODO: catchpointAccessor.ProgressStagingBalances() like in ledgerFetcher.downloadLedger(cs.ctx, peer, round) like catchup/catchpointService.go which is the real usage of BuildMerkleTrie()
var blob []byte = nil // TODO: content!
var progress CatchpointCatchupAccessorProgress
err = catchpointAccessor.ProgressStagingBalances(ctx, "ignoredContent", blob, &progress)
require.NoError(t, err)
// this shouldn't work yet
err = catchpointAccessor.ProgressStagingBalances(ctx, "balances.FAKE.msgpack", blob, &progress)
require.Error(t, err)
// this needs content
err = catchpointAccessor.ProgressStagingBalances(ctx, "content.msgpack", blob, &progress)
require.Error(t, err)

// content.msgpack from this:
accountsCount := uint64(len(initKeys))
fileHeader := CatchpointFileHeader{
Version: catchpointFileVersion,
BalancesRound: basics.Round(0),
BlocksRound: basics.Round(0),
Totals: ledgercore.AccountTotals{},
TotalAccounts: accountsCount,
TotalChunks: (accountsCount + BalancesPerCatchpointFileChunk - 1) / BalancesPerCatchpointFileChunk,
Catchpoint: "",
BlockHeaderDigest: crypto.Digest{},
}
encodedFileHeader := protocol.Encode(&fileHeader)
err = catchpointAccessor.ProgressStagingBalances(ctx, "content.msgpack", encodedFileHeader, &progress)
require.NoError(t, err)
// shouldn't work a second time
err = catchpointAccessor.ProgressStagingBalances(ctx, "content.msgpack", encodedFileHeader, &progress)
require.Error(t, err)

// This should still fail, but slightly different coverage path
err = catchpointAccessor.ProgressStagingBalances(ctx, "balances.FAKE.msgpack", blob, &progress)
require.Error(t, err)

// create some catchpoint data
encodedAccountChunks, _ := createTestingEncodedChunks(accountsCount)

for _, encodedAccounts := range encodedAccountChunks {

err = catchpointAccessor.ProgressStagingBalances(context.Background(), "balances.XX.msgpack", encodedAccounts, &progress)
require.NoError(t, err)
}

err = catchpointAccessor.BuildMerkleTrie(ctx, progressNop)
require.NoError(t, err)
require.True(t, progressCallCount > 0)

blockRound, err := catchpointAccessor.GetCatchupBlockRound(ctx)
require.NoError(t, err)
require.Equal(t, basics.Round(0), blockRound)
}

// blockdb.go code
// TODO: blockStartCatchupStaging called from StoreFirstBlock()
// TODO: blockCompleteCatchup called from FinishBlocks()
// TODO: blockAbortCatchup called from FinishBlocks()
// TODO: blockPutStaging called from StoreBlock()
// TODO: blockEnsureSingleBlock called from EnsureFirstBlock()

func TestCatchupAccessorBlockdb(t *testing.T) {
// setup boilerplate
log := logging.TestingLog(t)
dbBaseFileName := t.Name()
const inMem = true
genesisInitState, _ /*initKeys*/ := testGenerateInitState(t, protocol.ConsensusCurrentVersion, 100)
cfg := config.GetDefaultLocal()
l, err := OpenLedger(log, dbBaseFileName, inMem, genesisInitState, cfg)
require.NoError(t, err, "could not open ledger")
defer func() {
l.Close()
}()
catchpointAccessor := MakeCatchpointCatchupAccessor(l, log)
ctx := context.Background()
progressCallCount := 0
progressNop := func(uint64) {
progressCallCount++
}

// actual testing...
err = catchpointAccessor.BuildMerkleTrie(ctx, progressNop)
require.Error(t, err)
}

func TestVerifyCatchpoint(t *testing.T) {
// setup boilerplate
log := logging.TestingLog(t)
dbBaseFileName := t.Name()
const inMem = true
genesisInitState, _ /*initKeys*/ := testGenerateInitState(t, protocol.ConsensusCurrentVersion, 100)
cfg := config.GetDefaultLocal()
l, err := OpenLedger(log, dbBaseFileName, inMem, genesisInitState, cfg)
require.NoError(t, err, "could not open ledger")
defer func() {
l.Close()
}()
catchpointAccessor := MakeCatchpointCatchupAccessor(l, log)

ctx := context.Background()

// actual testing...
var blk bookkeeping.Block
err = catchpointAccessor.VerifyCatchpoint(ctx, &blk)
require.Error(t, err)

err = catchpointAccessor.ResetStagingBalances(ctx, true)
require.NoError(t, err, "ResetStagingBalances")

err = catchpointAccessor.VerifyCatchpoint(ctx, &blk)
require.Error(t, err)
// TODO: verify a catchpoint block that is valid

// StoreBalancesRound assumes things are valid, so just does the db put
err = catchpointAccessor.StoreBalancesRound(ctx, &blk)
require.NoError(t, err)
// StoreFirstBlock is a dumb wrapper on some db logic
err = catchpointAccessor.StoreFirstBlock(ctx, &blk)
require.NoError(t, err)

_, err = catchpointAccessor.EnsureFirstBlock(ctx)
require.NoError(t, err)

blk.BlockHeader.Round++
err = catchpointAccessor.StoreBlock(ctx, &blk)
require.NoError(t, err)

// TODO: write a case with working no-err
err = catchpointAccessor.CompleteCatchup(ctx)
require.Error(t, err)
//require.NoError(t, err)
}