Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 149 additions & 0 deletions ledger/acctupdates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1700,3 +1700,152 @@ func TestConsecutiveVersion(t *testing.T) {
protocol.ConsensusV21,
}
}

// This test attempts to cover the case when an accountUpdates.lookupX method:
// - can't find the requested address,
// - falls through looking at deltas and the LRU accounts cache,
// - then hits the database (calling accountsDbQueries.lookup)
// only to discover that the round stored in the database (committed in accountUpdates.commitRound)
// is out of sync with accountUpdates.cachedDBRound (updated a little bit later in accountUpdates.postCommit).
//
// In this case it waits on a condition variable and retries when
// commitSyncer/accountUpdates has advanced the cachedDBRound.
func TestAcctUpdatesLookupRetry(t *testing.T) {
partitiontest.PartitionTest(t)

testProtocolVersion := protocol.ConsensusVersion("test-protocol-TestAcctUpdatesLookupRetry")
proto := config.Consensus[protocol.ConsensusCurrentVersion]
proto.MaxBalLookback = 10
config.Consensus[testProtocolVersion] = proto
defer func() {
delete(config.Consensus, testProtocolVersion)
}()

accts := []map[basics.Address]basics.AccountData{ledgertesting.RandomAccounts(20, true)}
rewardsLevels := []uint64{0}

pooldata := basics.AccountData{}
pooldata.MicroAlgos.Raw = 1000 * 1000 * 1000 * 1000
pooldata.Status = basics.NotParticipating
accts[0][testPoolAddr] = pooldata

sinkdata := basics.AccountData{}
sinkdata.MicroAlgos.Raw = 1000 * 1000 * 1000 * 1000
sinkdata.Status = basics.NotParticipating
accts[0][testSinkAddr] = sinkdata

ml := makeMockLedgerForTracker(t, false, 10, testProtocolVersion, accts)
defer ml.Close()

conf := config.GetDefaultLocal()
au := newAcctUpdates(t, ml, conf, ".")
defer au.close()

// cover 10 genesis blocks
rewardLevel := uint64(0)
for i := 1; i < 10; i++ {
accts = append(accts, accts[0])
rewardsLevels = append(rewardsLevels, rewardLevel)
}

checkAcctUpdates(t, au, 0, 9, accts, rewardsLevels, proto)

// lastCreatableID stores asset or app max used index to get rid of conflicts
lastCreatableID := crypto.RandUint64() % 512
knownCreatables := make(map[basics.CreatableIndex]bool)

for i := basics.Round(10); i < basics.Round(proto.MaxBalLookback+15); i++ {
rewardLevelDelta := crypto.RandUint64() % 5
rewardLevel += rewardLevelDelta
var updates ledgercore.AccountDeltas
var totals map[basics.Address]basics.AccountData
base := accts[i-1]
updates, totals, lastCreatableID = ledgertesting.RandomDeltasBalancedFull(1, base, rewardLevel, lastCreatableID)
prevTotals, err := au.Totals(basics.Round(i - 1))
require.NoError(t, err)

newPool := totals[testPoolAddr]
newPool.MicroAlgos.Raw -= prevTotals.RewardUnits() * rewardLevelDelta
updates.Upsert(testPoolAddr, newPool)
totals[testPoolAddr] = newPool

blk := bookkeeping.Block{
BlockHeader: bookkeeping.BlockHeader{
Round: basics.Round(i),
},
}
blk.RewardsLevel = rewardLevel
blk.CurrentProtocol = testProtocolVersion

delta := ledgercore.MakeStateDelta(&blk.BlockHeader, 0, updates.Len(), 0)
delta.Accts.MergeAccounts(updates)
delta.Creatables = creatablesFromUpdates(base, updates, knownCreatables)
delta.Totals = accumulateTotals(t, testProtocolVersion, []map[basics.Address]basics.AccountData{totals}, rewardLevel)
au.newBlock(blk, delta)
accts = append(accts, totals)
rewardsLevels = append(rewardsLevels, rewardLevel)

checkAcctUpdates(t, au, 0, i, accts, rewardsLevels, proto)
}

flushRound := func(i basics.Round) {
// Clear the timer to ensure a flush
ml.trackers.lastFlushTime = time.Time{}

ml.trackers.committedUpTo(basics.Round(proto.MaxBalLookback) + i)
ml.trackers.waitAccountsWriting()
}

// flush a couple of rounds (indirectly schedules commitSyncer)
flushRound(basics.Round(0))
flushRound(basics.Round(1))

// add stallingTracker to list of trackers
stallingTracker := &blockingTracker{
postCommitUnlockedEntryLock: make(chan struct{}),
postCommitUnlockedReleaseLock: make(chan struct{}),
postCommitEntryLock: make(chan struct{}),
postCommitReleaseLock: make(chan struct{}),
alwaysLock: true,
}
ml.trackers.trackers = append([]ledgerTracker{stallingTracker}, ml.trackers.trackers...)

// kick off another round
go flushRound(basics.Round(2))

// let stallingTracker enter postCommit() and block (waiting on postCommitReleaseLock)
// this will prevent accountUpdates.postCommit() from updating au.cachedDBRound = newBase
<-stallingTracker.postCommitEntryLock

// prune the baseAccounts cache, so that lookup will fall through to the DB
au.accountsMu.Lock()
au.baseAccounts.prune(0)
au.accountsMu.Unlock()

rnd := basics.Round(2)

// grab any address and data to use for call to lookup
var addr basics.Address
var data basics.AccountData
for a, d := range accts[rnd] {
addr = a
data = d
break
}

// release the postCommit lock, once au.lookupWithoutRewards hits au.accountsReadCond.Wait()
go func() {
time.Sleep(200 * time.Millisecond)
stallingTracker.postCommitReleaseLock <- struct{}{}
}()

// issue a LookupWithoutRewards while persistedData.round != au.cachedDBRound
d, validThrough, err := au.LookupWithoutRewards(rnd, addr)
require.NoError(t, err)
require.Equal(t, d, data)
require.GreaterOrEqualf(t, uint64(validThrough), uint64(rnd), "validThrough: %v rnd :%v", validThrough, rnd)

// allow the postCommitUnlocked() handler to go through
<-stallingTracker.postCommitUnlockedEntryLock
stallingTracker.postCommitUnlockedReleaseLock <- struct{}{}
}
5 changes: 3 additions & 2 deletions ledger/catchpointtracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ type blockingTracker struct {
postCommitEntryLock chan struct{}
postCommitReleaseLock chan struct{}
committedUpToRound int64
alwaysLock bool
}

// loadFromDisk is not implemented in the blockingTracker.
Expand Down Expand Up @@ -461,15 +462,15 @@ func (bt *blockingTracker) commitRound(context.Context, *sql.Tx, *deferredCommit

// postCommit implements entry/exit blockers, designed for testing.
func (bt *blockingTracker) postCommit(ctx context.Context, dcc *deferredCommitContext) {
if dcc.isCatchpointRound && dcc.catchpointLabel != "" {
if bt.alwaysLock || (dcc.isCatchpointRound && dcc.catchpointLabel != "") {
bt.postCommitEntryLock <- struct{}{}
<-bt.postCommitReleaseLock
}
}

// postCommitUnlocked implements entry/exit blockers, designed for testing.
func (bt *blockingTracker) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
if dcc.isCatchpointRound && dcc.catchpointLabel != "" {
if bt.alwaysLock || (dcc.isCatchpointRound && dcc.catchpointLabel != "") {
bt.postCommitUnlockedEntryLock <- struct{}{}
<-bt.postCommitUnlockedReleaseLock
}
Expand Down