Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ledger/acctupdates.go
Original file line number Diff line number Diff line change
Expand Up @@ -1241,6 +1241,9 @@ func (au *accountUpdates) postCommit(ctx context.Context, dcc *deferredCommitCon
}
}

func (au *accountUpdates) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
}

// compactCreatableDeltas takes an array of creatables map deltas ( one array entry per round ), and compact the array into a single
// map that contains all the deltas changes. While doing that, the function eliminate any intermediate changes.
// It counts the number of changes per round by specifying it in the ndeltas field of the modifiedCreatable.
Expand Down
3 changes: 3 additions & 0 deletions ledger/bulletin.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ func (b *bulletin) commitRound(context.Context, *sql.Tx, *deferredCommitContext)
func (b *bulletin) postCommit(ctx context.Context, dcc *deferredCommitContext) {
}

func (b *bulletin) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
}

func (b *bulletin) handleUnorderedCommit(uint64, basics.Round, basics.Round) {
}
func (b *bulletin) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange {
Expand Down
3 changes: 3 additions & 0 deletions ledger/catchpointtracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,12 +393,15 @@ func (ct *catchpointTracker) postCommit(ctx context.Context, dcc *deferredCommit
ct.roundDigest = ct.roundDigest[dcc.offset:]

ct.catchpointsMu.Unlock()
}

func (ct *catchpointTracker) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
if dcc.isCatchpointRound && ct.archivalLedger && dcc.catchpointLabel != "" {
// generate the catchpoint file. This need to be done inline so that it will block any new accounts that from being written.
// the generateCatchpoint expects that the accounts data would not be modified in the background during it's execution.
ct.generateCatchpoint(ctx, basics.Round(dcc.offset)+dcc.oldBase+dcc.lookback, dcc.catchpointLabel, dcc.committedRoundDigest, dcc.updatingBalancesDuration)
}

// in scheduleCommit, we expect that this function to update the catchpointWriting when
// it's on a catchpoint round and it's an archival ledger. Doing this in a deferred function
// here would prevent us from "forgetting" to update this variable later on.
Expand Down
202 changes: 202 additions & 0 deletions ledger/catchpointtracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"os"
"path/filepath"
"runtime"
"sync/atomic"
"testing"
"time"

Expand All @@ -33,8 +34,10 @@ import (
"github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/data/transactions"
"github.com/algorand/go-algorand/ledger/ledgercore"
ledgertesting "github.com/algorand/go-algorand/ledger/testing"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/test/partitiontest"
)
Expand Down Expand Up @@ -413,3 +416,202 @@ func TestCatchpointTrackerPrepareCommit(t *testing.T) {
}
}
}

// blockingTracker is a testing tracker used to test "what if" a tracker would get blocked.
type blockingTracker struct {
postCommitUnlockedEntryLock chan struct{}
postCommitUnlockedReleaseLock chan struct{}
postCommitEntryLock chan struct{}
postCommitReleaseLock chan struct{}
committedUpToRound int64
}

// loadFromDisk is not implemented in the blockingTracker.
func (bt *blockingTracker) loadFromDisk(ledgerForTracker, basics.Round) error {
return nil
}

// newBlock is not implemented in the blockingTracker.
func (bt *blockingTracker) newBlock(blk bookkeeping.Block, delta ledgercore.StateDelta) {
}

// committedUpTo in the blockingTracker just stores the committed round.
func (bt *blockingTracker) committedUpTo(committedRnd basics.Round) (minRound, lookback basics.Round) {
atomic.StoreInt64(&bt.committedUpToRound, int64(committedRnd))
return committedRnd, basics.Round(0)
}

// produceCommittingTask is not used by the blockingTracker
func (bt *blockingTracker) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange {
return dcr
}

// prepareCommit, is not used by the blockingTracker
func (bt *blockingTracker) prepareCommit(*deferredCommitContext) error {
return nil
}

// commitRound is not used by the blockingTracker
func (bt *blockingTracker) commitRound(context.Context, *sql.Tx, *deferredCommitContext) error {
return nil
}

// postCommit implements entry/exit blockers, designed for testing.
func (bt *blockingTracker) postCommit(ctx context.Context, dcc *deferredCommitContext) {
if dcc.isCatchpointRound && dcc.catchpointLabel != "" {
bt.postCommitEntryLock <- struct{}{}
<-bt.postCommitReleaseLock
}
}

// postCommitUnlocked implements entry/exit blockers, designed for testing.
func (bt *blockingTracker) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
if dcc.isCatchpointRound && dcc.catchpointLabel != "" {
bt.postCommitUnlockedEntryLock <- struct{}{}
<-bt.postCommitUnlockedReleaseLock
}
}

// handleUnorderedCommit is not used by the blockingTracker
func (bt *blockingTracker) handleUnorderedCommit(uint64, basics.Round, basics.Round) {
}

// close is not used by the blockingTracker
func (bt *blockingTracker) close() {
}

func TestCatchpointTrackerNonblockingCatchpointWriting(t *testing.T) {
partitiontest.PartitionTest(t)

genesisInitState, _ := ledgertesting.GenerateInitState(t, protocol.ConsensusCurrentVersion, 10)
const inMem = true
log := logging.TestingLog(t)
log.SetLevel(logging.Warn)
cfg := config.GetDefaultLocal()
cfg.Archival = true
cfg.CatchpointInterval = 2
ledger, err := OpenLedger(log, t.Name(), inMem, genesisInitState, cfg)
require.NoError(t, err, "could not open ledger")
defer ledger.Close()

writeStallingTracker := &blockingTracker{
postCommitUnlockedEntryLock: make(chan struct{}),
postCommitUnlockedReleaseLock: make(chan struct{}),
postCommitEntryLock: make(chan struct{}),
postCommitReleaseLock: make(chan struct{}),
}
ledger.trackerMu.Lock()
ledger.trackers.mu.Lock()
ledger.trackers.trackers = append(ledger.trackers.trackers, writeStallingTracker)
ledger.trackers.mu.Unlock()
ledger.trackerMu.Unlock()

proto := config.Consensus[protocol.ConsensusCurrentVersion]

// create the first MaxBalLookback blocks
for rnd := ledger.Latest() + 1; rnd <= basics.Round(proto.MaxBalLookback); rnd++ {
err = ledger.addBlockTxns(t, genesisInitState.Accounts, []transactions.SignedTxn{}, transactions.ApplyData{})
require.NoError(t, err)
}

// make sure to get to a catchpoint round, and block the writing there.
for {
err = ledger.addBlockTxns(t, genesisInitState.Accounts, []transactions.SignedTxn{}, transactions.ApplyData{})
require.NoError(t, err)
if uint64(ledger.Latest())%cfg.CatchpointInterval == 0 {
// release the entry lock for postCommit
<-writeStallingTracker.postCommitEntryLock

// release the exit lock for postCommit
writeStallingTracker.postCommitReleaseLock <- struct{}{}

// wait until we're blocked by the stalling tracker.
<-writeStallingTracker.postCommitUnlockedEntryLock
break
}
}

// write additional block, so that the block queue would trigger that too
err = ledger.addBlockTxns(t, genesisInitState.Accounts, []transactions.SignedTxn{}, transactions.ApplyData{})
require.NoError(t, err)
// wait for the committedUpToRound to be called with the correct round number.
for {
committedUpToRound := atomic.LoadInt64(&writeStallingTracker.committedUpToRound)
if basics.Round(committedUpToRound) == ledger.Latest() {
break
}
time.Sleep(1 * time.Millisecond)
}

lookupDone := make(chan struct{})
// now that we're blocked the tracker, try to call LookupAgreement and confirm it returns almost immediately
go func() {
defer close(lookupDone)
ledger.LookupAgreement(ledger.Latest(), genesisInitState.Block.FeeSink)
}()

select {
case <-lookupDone:
// we expect it not to get stuck, even when the postCommitUnlocked is stuck.
case <-time.After(25 * time.Second):
require.FailNow(t, "The LookupAgreement wasn't getting blocked as expected by the blocked tracker")
}
// let the goroutines complete.
// release the exit lock for postCommit
writeStallingTracker.postCommitUnlockedReleaseLock <- struct{}{}

// test false positive : we want to ensure that without releasing the postCommit lock, the LookupAgreemnt would not be able to return within 1 second.

// make sure to get to a catchpoint round, and block the writing there.
for {
err = ledger.addBlockTxns(t, genesisInitState.Accounts, []transactions.SignedTxn{}, transactions.ApplyData{})
require.NoError(t, err)
if uint64(ledger.Latest())%cfg.CatchpointInterval == 0 {
// release the entry lock for postCommit
<-writeStallingTracker.postCommitEntryLock
break
}
}
// write additional block, so that the block queue would trigger that too
err = ledger.addBlockTxns(t, genesisInitState.Accounts, []transactions.SignedTxn{}, transactions.ApplyData{})
require.NoError(t, err)
// wait for the committedUpToRound to be called with the correct round number.
for {
committedUpToRound := atomic.LoadInt64(&writeStallingTracker.committedUpToRound)
if basics.Round(committedUpToRound) == ledger.Latest() {
break
}
time.Sleep(1 * time.Millisecond)
}

lookupDone = make(chan struct{})
// now that we're blocked the tracker, try to call LookupAgreement and confirm it's not returning within 1 second.
go func() {
defer close(lookupDone)
ledger.LookupAgreement(ledger.Latest(), genesisInitState.Block.FeeSink)
}()

select {
case <-lookupDone:
require.FailNow(t, "The LookupAgreement wasn't getting blocked as expected by the blocked tracker")
case <-time.After(5 * time.Second):
// this one was "stuck" for over five second ( as expected )
}
// let the goroutines complete.
// release the exit lock for postCommit
writeStallingTracker.postCommitReleaseLock <- struct{}{}

// wait until we're blocked by the stalling tracker.
<-writeStallingTracker.postCommitUnlockedEntryLock
// release the blocker.
writeStallingTracker.postCommitUnlockedReleaseLock <- struct{}{}

// confirm that we get released quickly.
select {
case <-lookupDone:
// now that all the blocker have been removed, we should be able to complete
// the LookupAgreement call.
case <-time.After(30 * time.Second):
require.FailNow(t, "The LookupAgreement wasn't getting release as expected by the blocked tracker")
}
}
3 changes: 3 additions & 0 deletions ledger/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ func (mt *metricsTracker) commitRound(context.Context, *sql.Tx, *deferredCommitC
func (mt *metricsTracker) postCommit(ctx context.Context, dcc *deferredCommitContext) {
}

func (mt *metricsTracker) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
}

func (mt *metricsTracker) handleUnorderedCommit(uint64, basics.Round, basics.Round) {
}
func (mt *metricsTracker) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange {
Expand Down
3 changes: 3 additions & 0 deletions ledger/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ func (bn *blockNotifier) commitRound(context.Context, *sql.Tx, *deferredCommitCo
func (bn *blockNotifier) postCommit(ctx context.Context, dcc *deferredCommitContext) {
}

func (bn *blockNotifier) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
}

func (bn *blockNotifier) handleUnorderedCommit(uint64, basics.Round, basics.Round) {
}

Expand Down
11 changes: 11 additions & 0 deletions ledger/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ type ledgerTracker interface {
// has completed. An optional context is provided for long-running operations.
postCommit(context.Context, *deferredCommitContext)

// postCommitUnlocked is called only on a successful commitRound. In that case, each of the trackers have
// the chance to make changes that aren't state-dependent.
// An optional context is provided for long-running operations.
postCommitUnlocked(context.Context, *deferredCommitContext)

// handleUnorderedCommit is a special method for handling deferred commits that are out of order.
// Tracker might update own state in this case. For example, account updates tracker cancels
// scheduled catchpoint writing that deferred commit.
Expand Down Expand Up @@ -323,6 +328,8 @@ func (tr *trackerRegistry) scheduleCommit(blockqRound, maxLookback basics.Round)
}
if cdr != nil {
dcc.deferredCommitRange = *cdr
} else {
dcc = nil
}

tr.mu.RLock()
Expand Down Expand Up @@ -472,6 +479,10 @@ func (tr *trackerRegistry) commitRound(dcc *deferredCommitContext) {
tr.lastFlushTime = dcc.flushTime
tr.mu.Unlock()

for _, lt := range tr.trackers {
lt.postCommitUnlocked(tr.ctx, dcc)
}

}

// initializeTrackerCaches fills up the accountUpdates cache with the most recent ~320 blocks ( on normal execution ).
Expand Down
3 changes: 3 additions & 0 deletions ledger/txtail.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ func (t *txTail) commitRound(context.Context, *sql.Tx, *deferredCommitContext) e
func (t *txTail) postCommit(ctx context.Context, dcc *deferredCommitContext) {
}

func (t *txTail) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
}

func (t *txTail) handleUnorderedCommit(uint64, basics.Round, basics.Round) {
}

Expand Down