Skip to content

Commit 14e889e

Browse files
authored
ledger: perform the catchpoint writing outside the trackers lock. (#3311)
## Summary This PR moves the catchpoint file writing to be performed outside of the trackers lock. This resolves the issue where a long catchpoint file writing blocks the agreement from validating and propagating votes. ## Test Plan * [x] Test manually & use existing tests. * [x] Implement a unit test * [x] Deploy a local network where the catchpoint writing takes a long time and verify it doesn't get blocked during catchpoint writing.
1 parent 3520336 commit 14e889e

File tree

8 files changed

+231
-0
lines changed

8 files changed

+231
-0
lines changed

ledger/acctupdates.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1241,6 +1241,9 @@ func (au *accountUpdates) postCommit(ctx context.Context, dcc *deferredCommitCon
12411241
}
12421242
}
12431243

1244+
func (au *accountUpdates) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
1245+
}
1246+
12441247
// compactCreatableDeltas takes an array of creatables map deltas ( one array entry per round ), and compact the array into a single
12451248
// map that contains all the deltas changes. While doing that, the function eliminate any intermediate changes.
12461249
// It counts the number of changes per round by specifying it in the ndeltas field of the modifiedCreatable.

ledger/bulletin.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,9 @@ func (b *bulletin) commitRound(context.Context, *sql.Tx, *deferredCommitContext)
120120
func (b *bulletin) postCommit(ctx context.Context, dcc *deferredCommitContext) {
121121
}
122122

123+
func (b *bulletin) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
124+
}
125+
123126
func (b *bulletin) handleUnorderedCommit(uint64, basics.Round, basics.Round) {
124127
}
125128
func (b *bulletin) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange {

ledger/catchpointtracker.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,12 +393,15 @@ func (ct *catchpointTracker) postCommit(ctx context.Context, dcc *deferredCommit
393393
ct.roundDigest = ct.roundDigest[dcc.offset:]
394394

395395
ct.catchpointsMu.Unlock()
396+
}
396397

398+
func (ct *catchpointTracker) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
397399
if dcc.isCatchpointRound && ct.archivalLedger && dcc.catchpointLabel != "" {
398400
// generate the catchpoint file. This need to be done inline so that it will block any new accounts that from being written.
399401
// the generateCatchpoint expects that the accounts data would not be modified in the background during it's execution.
400402
ct.generateCatchpoint(ctx, basics.Round(dcc.offset)+dcc.oldBase+dcc.lookback, dcc.catchpointLabel, dcc.committedRoundDigest, dcc.updatingBalancesDuration)
401403
}
404+
402405
// in scheduleCommit, we expect that this function to update the catchpointWriting when
403406
// it's on a catchpoint round and it's an archival ledger. Doing this in a deferred function
404407
// here would prevent us from "forgetting" to update this variable later on.

ledger/catchpointtracker_test.go

Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"os"
2525
"path/filepath"
2626
"runtime"
27+
"sync/atomic"
2728
"testing"
2829
"time"
2930

@@ -33,8 +34,10 @@ import (
3334
"github.com/algorand/go-algorand/crypto"
3435
"github.com/algorand/go-algorand/data/basics"
3536
"github.com/algorand/go-algorand/data/bookkeeping"
37+
"github.com/algorand/go-algorand/data/transactions"
3638
"github.com/algorand/go-algorand/ledger/ledgercore"
3739
ledgertesting "github.com/algorand/go-algorand/ledger/testing"
40+
"github.com/algorand/go-algorand/logging"
3841
"github.com/algorand/go-algorand/protocol"
3942
"github.com/algorand/go-algorand/test/partitiontest"
4043
)
@@ -413,3 +416,202 @@ func TestCatchpointTrackerPrepareCommit(t *testing.T) {
413416
}
414417
}
415418
}
419+
420+
// blockingTracker is a testing tracker used to test "what if" a tracker would get blocked.
421+
type blockingTracker struct {
422+
postCommitUnlockedEntryLock chan struct{}
423+
postCommitUnlockedReleaseLock chan struct{}
424+
postCommitEntryLock chan struct{}
425+
postCommitReleaseLock chan struct{}
426+
committedUpToRound int64
427+
}
428+
429+
// loadFromDisk is not implemented in the blockingTracker.
430+
func (bt *blockingTracker) loadFromDisk(ledgerForTracker, basics.Round) error {
431+
return nil
432+
}
433+
434+
// newBlock is not implemented in the blockingTracker.
435+
func (bt *blockingTracker) newBlock(blk bookkeeping.Block, delta ledgercore.StateDelta) {
436+
}
437+
438+
// committedUpTo in the blockingTracker just stores the committed round.
439+
func (bt *blockingTracker) committedUpTo(committedRnd basics.Round) (minRound, lookback basics.Round) {
440+
atomic.StoreInt64(&bt.committedUpToRound, int64(committedRnd))
441+
return committedRnd, basics.Round(0)
442+
}
443+
444+
// produceCommittingTask is not used by the blockingTracker
445+
func (bt *blockingTracker) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange {
446+
return dcr
447+
}
448+
449+
// prepareCommit, is not used by the blockingTracker
450+
func (bt *blockingTracker) prepareCommit(*deferredCommitContext) error {
451+
return nil
452+
}
453+
454+
// commitRound is not used by the blockingTracker
455+
func (bt *blockingTracker) commitRound(context.Context, *sql.Tx, *deferredCommitContext) error {
456+
return nil
457+
}
458+
459+
// postCommit implements entry/exit blockers, designed for testing.
460+
func (bt *blockingTracker) postCommit(ctx context.Context, dcc *deferredCommitContext) {
461+
if dcc.isCatchpointRound && dcc.catchpointLabel != "" {
462+
bt.postCommitEntryLock <- struct{}{}
463+
<-bt.postCommitReleaseLock
464+
}
465+
}
466+
467+
// postCommitUnlocked implements entry/exit blockers, designed for testing.
468+
func (bt *blockingTracker) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
469+
if dcc.isCatchpointRound && dcc.catchpointLabel != "" {
470+
bt.postCommitUnlockedEntryLock <- struct{}{}
471+
<-bt.postCommitUnlockedReleaseLock
472+
}
473+
}
474+
475+
// handleUnorderedCommit is not used by the blockingTracker
476+
func (bt *blockingTracker) handleUnorderedCommit(uint64, basics.Round, basics.Round) {
477+
}
478+
479+
// close is not used by the blockingTracker
480+
func (bt *blockingTracker) close() {
481+
}
482+
483+
func TestCatchpointTrackerNonblockingCatchpointWriting(t *testing.T) {
484+
partitiontest.PartitionTest(t)
485+
486+
genesisInitState, _ := ledgertesting.GenerateInitState(t, protocol.ConsensusCurrentVersion, 10)
487+
const inMem = true
488+
log := logging.TestingLog(t)
489+
log.SetLevel(logging.Warn)
490+
cfg := config.GetDefaultLocal()
491+
cfg.Archival = true
492+
cfg.CatchpointInterval = 2
493+
ledger, err := OpenLedger(log, t.Name(), inMem, genesisInitState, cfg)
494+
require.NoError(t, err, "could not open ledger")
495+
defer ledger.Close()
496+
497+
writeStallingTracker := &blockingTracker{
498+
postCommitUnlockedEntryLock: make(chan struct{}),
499+
postCommitUnlockedReleaseLock: make(chan struct{}),
500+
postCommitEntryLock: make(chan struct{}),
501+
postCommitReleaseLock: make(chan struct{}),
502+
}
503+
ledger.trackerMu.Lock()
504+
ledger.trackers.mu.Lock()
505+
ledger.trackers.trackers = append(ledger.trackers.trackers, writeStallingTracker)
506+
ledger.trackers.mu.Unlock()
507+
ledger.trackerMu.Unlock()
508+
509+
proto := config.Consensus[protocol.ConsensusCurrentVersion]
510+
511+
// create the first MaxBalLookback blocks
512+
for rnd := ledger.Latest() + 1; rnd <= basics.Round(proto.MaxBalLookback); rnd++ {
513+
err = ledger.addBlockTxns(t, genesisInitState.Accounts, []transactions.SignedTxn{}, transactions.ApplyData{})
514+
require.NoError(t, err)
515+
}
516+
517+
// make sure to get to a catchpoint round, and block the writing there.
518+
for {
519+
err = ledger.addBlockTxns(t, genesisInitState.Accounts, []transactions.SignedTxn{}, transactions.ApplyData{})
520+
require.NoError(t, err)
521+
if uint64(ledger.Latest())%cfg.CatchpointInterval == 0 {
522+
// release the entry lock for postCommit
523+
<-writeStallingTracker.postCommitEntryLock
524+
525+
// release the exit lock for postCommit
526+
writeStallingTracker.postCommitReleaseLock <- struct{}{}
527+
528+
// wait until we're blocked by the stalling tracker.
529+
<-writeStallingTracker.postCommitUnlockedEntryLock
530+
break
531+
}
532+
}
533+
534+
// write additional block, so that the block queue would trigger that too
535+
err = ledger.addBlockTxns(t, genesisInitState.Accounts, []transactions.SignedTxn{}, transactions.ApplyData{})
536+
require.NoError(t, err)
537+
// wait for the committedUpToRound to be called with the correct round number.
538+
for {
539+
committedUpToRound := atomic.LoadInt64(&writeStallingTracker.committedUpToRound)
540+
if basics.Round(committedUpToRound) == ledger.Latest() {
541+
break
542+
}
543+
time.Sleep(1 * time.Millisecond)
544+
}
545+
546+
lookupDone := make(chan struct{})
547+
// now that we're blocked the tracker, try to call LookupAgreement and confirm it returns almost immediately
548+
go func() {
549+
defer close(lookupDone)
550+
ledger.LookupAgreement(ledger.Latest(), genesisInitState.Block.FeeSink)
551+
}()
552+
553+
select {
554+
case <-lookupDone:
555+
// we expect it not to get stuck, even when the postCommitUnlocked is stuck.
556+
case <-time.After(25 * time.Second):
557+
require.FailNow(t, "The LookupAgreement wasn't getting blocked as expected by the blocked tracker")
558+
}
559+
// let the goroutines complete.
560+
// release the exit lock for postCommit
561+
writeStallingTracker.postCommitUnlockedReleaseLock <- struct{}{}
562+
563+
// test false positive : we want to ensure that without releasing the postCommit lock, the LookupAgreemnt would not be able to return within 1 second.
564+
565+
// make sure to get to a catchpoint round, and block the writing there.
566+
for {
567+
err = ledger.addBlockTxns(t, genesisInitState.Accounts, []transactions.SignedTxn{}, transactions.ApplyData{})
568+
require.NoError(t, err)
569+
if uint64(ledger.Latest())%cfg.CatchpointInterval == 0 {
570+
// release the entry lock for postCommit
571+
<-writeStallingTracker.postCommitEntryLock
572+
break
573+
}
574+
}
575+
// write additional block, so that the block queue would trigger that too
576+
err = ledger.addBlockTxns(t, genesisInitState.Accounts, []transactions.SignedTxn{}, transactions.ApplyData{})
577+
require.NoError(t, err)
578+
// wait for the committedUpToRound to be called with the correct round number.
579+
for {
580+
committedUpToRound := atomic.LoadInt64(&writeStallingTracker.committedUpToRound)
581+
if basics.Round(committedUpToRound) == ledger.Latest() {
582+
break
583+
}
584+
time.Sleep(1 * time.Millisecond)
585+
}
586+
587+
lookupDone = make(chan struct{})
588+
// now that we're blocked the tracker, try to call LookupAgreement and confirm it's not returning within 1 second.
589+
go func() {
590+
defer close(lookupDone)
591+
ledger.LookupAgreement(ledger.Latest(), genesisInitState.Block.FeeSink)
592+
}()
593+
594+
select {
595+
case <-lookupDone:
596+
require.FailNow(t, "The LookupAgreement wasn't getting blocked as expected by the blocked tracker")
597+
case <-time.After(5 * time.Second):
598+
// this one was "stuck" for over five second ( as expected )
599+
}
600+
// let the goroutines complete.
601+
// release the exit lock for postCommit
602+
writeStallingTracker.postCommitReleaseLock <- struct{}{}
603+
604+
// wait until we're blocked by the stalling tracker.
605+
<-writeStallingTracker.postCommitUnlockedEntryLock
606+
// release the blocker.
607+
writeStallingTracker.postCommitUnlockedReleaseLock <- struct{}{}
608+
609+
// confirm that we get released quickly.
610+
select {
611+
case <-lookupDone:
612+
// now that all the blocker have been removed, we should be able to complete
613+
// the LookupAgreement call.
614+
case <-time.After(30 * time.Second):
615+
require.FailNow(t, "The LookupAgreement wasn't getting release as expected by the blocked tracker")
616+
}
617+
}

ledger/metrics.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@ func (mt *metricsTracker) commitRound(context.Context, *sql.Tx, *deferredCommitC
6565
func (mt *metricsTracker) postCommit(ctx context.Context, dcc *deferredCommitContext) {
6666
}
6767

68+
func (mt *metricsTracker) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
69+
}
70+
6871
func (mt *metricsTracker) handleUnorderedCommit(uint64, basics.Round, basics.Round) {
6972
}
7073
func (mt *metricsTracker) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange {

ledger/notifier.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,9 @@ func (bn *blockNotifier) commitRound(context.Context, *sql.Tx, *deferredCommitCo
125125
func (bn *blockNotifier) postCommit(ctx context.Context, dcc *deferredCommitContext) {
126126
}
127127

128+
func (bn *blockNotifier) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
129+
}
130+
128131
func (bn *blockNotifier) handleUnorderedCommit(uint64, basics.Round, basics.Round) {
129132
}
130133

ledger/tracker.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,11 @@ type ledgerTracker interface {
109109
// has completed. An optional context is provided for long-running operations.
110110
postCommit(context.Context, *deferredCommitContext)
111111

112+
// postCommitUnlocked is called only on a successful commitRound. In that case, each of the trackers have
113+
// the chance to make changes that aren't state-dependent.
114+
// An optional context is provided for long-running operations.
115+
postCommitUnlocked(context.Context, *deferredCommitContext)
116+
112117
// handleUnorderedCommit is a special method for handling deferred commits that are out of order.
113118
// Tracker might update own state in this case. For example, account updates tracker cancels
114119
// scheduled catchpoint writing that deferred commit.
@@ -323,6 +328,8 @@ func (tr *trackerRegistry) scheduleCommit(blockqRound, maxLookback basics.Round)
323328
}
324329
if cdr != nil {
325330
dcc.deferredCommitRange = *cdr
331+
} else {
332+
dcc = nil
326333
}
327334

328335
tr.mu.RLock()
@@ -472,6 +479,10 @@ func (tr *trackerRegistry) commitRound(dcc *deferredCommitContext) {
472479
tr.lastFlushTime = dcc.flushTime
473480
tr.mu.Unlock()
474481

482+
for _, lt := range tr.trackers {
483+
lt.postCommitUnlocked(tr.ctx, dcc)
484+
}
485+
475486
}
476487

477488
// initializeTrackerCaches fills up the accountUpdates cache with the most recent ~320 blocks ( on normal execution ).

ledger/txtail.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,9 @@ func (t *txTail) commitRound(context.Context, *sql.Tx, *deferredCommitContext) e
168168
func (t *txTail) postCommit(ctx context.Context, dcc *deferredCommitContext) {
169169
}
170170

171+
func (t *txTail) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
172+
}
173+
171174
func (t *txTail) handleUnorderedCommit(uint64, basics.Round, basics.Round) {
172175
}
173176

0 commit comments

Comments
 (0)