Skip to content

Commit fd35c4e

Browse files
committed
Exclude potentially long catchpoint generation from trackerRegistry lock
1 parent 6330862 commit fd35c4e

File tree

4 files changed

+22
-49
lines changed

4 files changed

+22
-49
lines changed

ledger/acctupdates.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,9 @@ type accountUpdates struct {
133133
// i.e., totals is one longer than deltas.
134134
roundTotals []ledgercore.AccountTotals
135135

136+
// roundDigest stores the digest of the block for every round starting with dbRound and every round after it.
137+
roundDigest []crypto.Digest
138+
136139
// log copied from ledger
137140
log logging.Logger
138141

@@ -717,6 +720,7 @@ func (au *accountUpdates) initializeFromDisk(l ledgerForTracker, lastBalancesRou
717720
au.accounts = make(map[basics.Address]modifiedAccount)
718721
au.creatables = make(map[basics.CreatableIndex]ledgercore.ModifiedCreatable)
719722
au.deltasAccum = []int{0}
723+
au.roundDigest = nil
720724

721725
au.baseAccounts.init(au.log, baseAccountsPendingAccountsBufferSize, baseAccountsPendingAccountsWarnThreshold)
722726
return
@@ -738,6 +742,7 @@ func (au *accountUpdates) newBlockImpl(blk bookkeeping.Block, delta ledgercore.S
738742
au.deltas = append(au.deltas, delta.Accts)
739743
au.versions = append(au.versions, blk.CurrentProtocol)
740744
au.creatableDeltas = append(au.creatableDeltas, delta.Creatables)
745+
au.roundDigest = append(au.roundDigest, blk.Digest())
741746
au.deltasAccum = append(au.deltasAccum, delta.Accts.Len()+au.deltasAccum[len(au.deltasAccum)-1])
742747

743748
au.baseAccounts.flushPendingWrites()
@@ -1085,6 +1090,10 @@ func (au *accountUpdates) prepareCommit(dcc *deferredCommitContext) error {
10851090
return fmt.Errorf("attempted to commit series of rounds with non-uniform consensus versions")
10861091
}
10871092

1093+
if dcc.isCatchpointRound {
1094+
dcc.committedRoundDigest = au.roundDigest[offset+uint64(dcc.lookback)-1]
1095+
}
1096+
10881097
// compact all the deltas - when we're trying to persist multiple rounds, we might have the same account
10891098
// being updated multiple times. When that happen, we can safely omit the intermediate updates.
10901099
dcc.compactAccountDeltas = makeCompactAccountDeltas(dcc.deltas, au.baseAccounts)
@@ -1216,6 +1225,7 @@ func (au *accountUpdates) postCommit(ctx context.Context, dcc *deferredCommitCon
12161225

12171226
au.deltas = au.deltas[offset:]
12181227
au.deltasAccum = au.deltasAccum[offset:]
1228+
au.roundDigest = au.roundDigest[offset:]
12191229
au.versions = au.versions[offset:]
12201230
au.roundTotals = au.roundTotals[offset:]
12211231
au.creatableDeltas = au.creatableDeltas[offset:]

ledger/catchpointtracker.go

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -110,9 +110,6 @@ type catchpointTracker struct {
110110

111111
// catchpointsMu is the synchronization mutex for accessing the various non-static variables.
112112
catchpointsMu deadlock.RWMutex
113-
114-
// roundDigest stores the digest of the block for every round starting with dbRound and every round after it.
115-
roundDigest []crypto.Digest
116113
}
117114

118115
// initialize initializes the catchpointTracker structure
@@ -159,7 +156,6 @@ func (ct *catchpointTracker) loadFromDisk(l ledgerForTracker, lastBalancesRound
159156
ct.log = l.trackerLog()
160157
ct.dbs = l.trackerDB()
161158

162-
ct.roundDigest = nil
163159
ct.catchpointWriting = 0
164160
// keep these channel closed if we're not generating catchpoint
165161
ct.catchpointSlowWriting = make(chan struct{}, 1)
@@ -220,9 +216,6 @@ func (ct *catchpointTracker) loadFromDisk(l ledgerForTracker, lastBalancesRound
220216
// newBlock informs the tracker of a new block from round
221217
// rnd and a given ledgercore.StateDelta as produced by BlockEvaluator.
222218
func (ct *catchpointTracker) newBlock(blk bookkeeping.Block, delta ledgercore.StateDelta) {
223-
ct.catchpointsMu.Lock()
224-
defer ct.catchpointsMu.Unlock()
225-
ct.roundDigest = append(ct.roundDigest, blk.Digest())
226219
}
227220

228221
// committedUpTo implements the ledgerTracker interface for catchpointTracker.
@@ -294,11 +287,6 @@ func (ct *catchpointTracker) produceCommittingTask(committedRound basics.Round,
294287
// prepareCommit, commitRound and postCommit are called when it is time to commit tracker's data.
295288
// If an error returned the process is aborted.
296289
func (ct *catchpointTracker) prepareCommit(dcc *deferredCommitContext) error {
297-
ct.catchpointsMu.RLock()
298-
defer ct.catchpointsMu.RUnlock()
299-
if dcc.isCatchpointRound {
300-
dcc.committedRoundDigest = ct.roundDigest[dcc.offset+uint64(dcc.lookback)-1]
301-
}
302290
return nil
303291
}
304292

@@ -380,20 +368,16 @@ func (ct *catchpointTracker) postCommit(ctx context.Context, dcc *deferredCommit
380368
}
381369

382370
if dcc.isCatchpointRound && dcc.catchpointLabel != "" {
371+
ct.catchpointsMu.Lock()
383372
ct.lastCatchpointLabel = dcc.catchpointLabel
373+
ct.catchpointsMu.Unlock()
384374
}
385375
dcc.updatingBalancesDuration = time.Since(dcc.flushTime)
386376

387377
if dcc.updateStats {
388378
dcc.stats.MemoryUpdatesDuration = time.Duration(time.Now().UnixNano())
389379
}
390380

391-
ct.catchpointsMu.Lock()
392-
393-
ct.roundDigest = ct.roundDigest[dcc.offset:]
394-
395-
ct.catchpointsMu.Unlock()
396-
397381
if dcc.isCatchpointRound && ct.archivalLedger && dcc.catchpointLabel != "" {
398382
// generate the catchpoint file. This need to be done inline so that it will block any new accounts that from being written.
399383
// the generateCatchpoint expects that the accounts data would not be modified in the background during it's execution.

ledger/catchpointtracker_test.go

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -374,9 +374,7 @@ func TestReproducibleCatchpointLabels(t *testing.T) {
374374
}
375375

376376
// test to see that after loadFromDisk, all the tracker content is lost ( as expected )
377-
require.NotZero(t, len(ct.roundDigest))
378377
require.NoError(t, ct.loadFromDisk(ml, ml.Latest()))
379-
require.Zero(t, len(ct.roundDigest))
380378
require.Zero(t, ct.catchpointWriting)
381379
select {
382380
case _, closed := <-ct.catchpointSlowWriting:
@@ -385,31 +383,3 @@ func TestReproducibleCatchpointLabels(t *testing.T) {
385383
require.FailNow(t, "The catchpointSlowWriting should have been a closed channel; it seems to be a nil ?!")
386384
}
387385
}
388-
389-
func TestCatchpointTrackerPrepareCommit(t *testing.T) {
390-
partitiontest.PartitionTest(t)
391-
392-
ct := &catchpointTracker{}
393-
const maxOffset = 40
394-
const maxLookback = 320
395-
ct.roundDigest = make([]crypto.Digest, maxOffset+maxLookback)
396-
for i := 0; i < len(ct.roundDigest); i++ {
397-
ct.roundDigest[i] = crypto.Hash([]byte{byte(i), byte(i / 256)})
398-
}
399-
dcc := &deferredCommitContext{}
400-
for offset := uint64(1); offset < maxOffset; offset++ {
401-
dcc.offset = offset
402-
for lookback := basics.Round(0); lookback < maxLookback; lookback += 20 {
403-
dcc.lookback = lookback
404-
for _, isCatchpointRound := range []bool{false, true} {
405-
dcc.isCatchpointRound = isCatchpointRound
406-
require.NoError(t, ct.prepareCommit(dcc))
407-
if isCatchpointRound {
408-
expectedRound := offset + uint64(lookback) - 1
409-
expectedHash := crypto.Hash([]byte{byte(expectedRound), byte(expectedRound / 256)})
410-
require.Equal(t, expectedHash[:], dcc.committedRoundDigest[:])
411-
}
412-
}
413-
}
414-
}
415-
}

ledger/tracker.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -464,14 +464,23 @@ func (tr *trackerRegistry) commitRound(dcc *deferredCommitContext) {
464464
return
465465
}
466466

467+
var ctt ledgerTracker
467468
tr.mu.Lock()
468469
tr.dbRound = newBase
469470
for _, lt := range tr.trackers {
470-
lt.postCommit(tr.ctx, dcc)
471+
if ct, ok := lt.(*catchpointTracker); ok {
472+
ctt = ct
473+
} else {
474+
lt.postCommit(tr.ctx, dcc)
475+
}
471476
}
472477
tr.lastFlushTime = dcc.flushTime
473478
tr.mu.Unlock()
474479

480+
if ctt != nil {
481+
// run catchpoint tracker's postCommit without a lock as potentially long operation
482+
ctt.postCommit(tr.ctx, dcc)
483+
}
475484
}
476485

477486
// initializeTrackerCaches fills up the accountUpdates cache with the most recent ~320 blocks ( on normal execution ).

0 commit comments

Comments
 (0)