Skip to content

Commit bd1129e

Browse files
authored
ledger: fix possible dbRound sync issue in trackers and the registry (#3910)
The trackerRegistry.dbRound value and cached dbRound values stored in trackers might go out of sync. Although they are updated under the same lock, produceCommittingTask might use outdated dbRound and give it to trackers with the updated state. The fix is simple: to have dbRound usage and produceCommittingTask invocation under the same lock.
1 parent e58954d commit bd1129e

File tree

3 files changed

+183
-8
lines changed

3 files changed

+183
-8
lines changed

ledger/applications_test.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,18 @@ import (
3333
"github.com/algorand/go-algorand/test/partitiontest"
3434
)
3535

36+
// commitRound schedules a commit for known offset and dbRound
37+
// and waits for completion
3638
func commitRound(offset uint64, dbRound basics.Round, l *Ledger) {
39+
commitRoundLookback(dbRound+basics.Round(offset), l)
40+
}
41+
42+
func commitRoundLookback(lookback basics.Round, l *Ledger) {
3743
l.trackers.mu.Lock()
3844
l.trackers.lastFlushTime = time.Time{}
3945
l.trackers.mu.Unlock()
4046

41-
l.trackers.scheduleCommit(l.Latest(), l.Latest()-(dbRound+basics.Round(offset)))
47+
l.trackers.scheduleCommit(l.Latest(), l.Latest()-lookback)
4248
// wait for the operation to complete. Once it does complete, the tr.lastFlushTime is going to be updated, so we can
4349
// use that as an indicator.
4450
for {
@@ -49,7 +55,6 @@ func commitRound(offset uint64, dbRound basics.Round, l *Ledger) {
4955
break
5056
}
5157
time.Sleep(time.Millisecond)
52-
5358
}
5459
}
5560

ledger/tracker.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -322,16 +322,15 @@ func (tr *trackerRegistry) committedUpTo(rnd basics.Round) basics.Round {
322322
}
323323

324324
func (tr *trackerRegistry) scheduleCommit(blockqRound, maxLookback basics.Round) {
325-
tr.mu.RLock()
326-
dbRound := tr.dbRound
327-
tr.mu.RUnlock()
328-
329325
dcc := &deferredCommitContext{
330326
deferredCommitRange: deferredCommitRange{
331327
lookback: maxLookback,
332328
},
333329
}
334330
cdr := &dcc.deferredCommitRange
331+
332+
tr.mu.RLock()
333+
dbRound := tr.dbRound
335334
for _, lt := range tr.trackers {
336335
base := cdr.oldBase
337336
offset := cdr.offset
@@ -351,8 +350,6 @@ func (tr *trackerRegistry) scheduleCommit(blockqRound, maxLookback basics.Round)
351350
} else {
352351
dcc = nil
353352
}
354-
355-
tr.mu.RLock()
356353
// If we recently flushed, wait to aggregate some more blocks.
357354
// ( unless we're creating a catchpoint, in which case we want to flush it right away
358355
// so that all the instances of the catchpoint would contain exactly the same data )

ledger/tracker_test.go

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,32 @@ package ledger
1818

1919
import (
2020
"bytes"
21+
"context"
22+
"database/sql"
23+
"sync"
2124
"testing"
2225

2326
"github.com/stretchr/testify/require"
2427

28+
"github.com/algorand/go-algorand/agreement"
2529
"github.com/algorand/go-algorand/config"
30+
"github.com/algorand/go-algorand/crypto"
2631
"github.com/algorand/go-algorand/data/basics"
32+
"github.com/algorand/go-algorand/data/bookkeeping"
2733
"github.com/algorand/go-algorand/ledger/ledgercore"
2834
ledgertesting "github.com/algorand/go-algorand/ledger/testing"
2935
"github.com/algorand/go-algorand/logging"
3036
"github.com/algorand/go-algorand/protocol"
3137
"github.com/algorand/go-algorand/test/partitiontest"
3238
)
3339

40+
// commitRoundNext schedules a commit with as many rounds as possible
41+
func commitRoundNext(l *Ledger) {
42+
// maxAcctLookback := l.trackers.cfg.MaxAcctLookback
43+
maxAcctLookback := 320
44+
commitRoundLookback(basics.Round(maxAcctLookback), l)
45+
}
46+
3447
// TestTrackerScheduleCommit checks catchpointTracker.produceCommittingTask does not increase commit offset relative
3548
// to the value set by accountUpdates
3649
func TestTrackerScheduleCommit(t *testing.T) {
@@ -123,3 +136,163 @@ func TestTrackerScheduleCommit(t *testing.T) {
123136
dc := <-ml.trackers.deferredCommits
124137
a.Equal(expectedOffset, dc.offset)
125138
}
139+
140+
type producePrepareBlockingTracker struct {
141+
produceReleaseLock chan struct{}
142+
prepareCommitEntryLock chan struct{}
143+
prepareCommitReleaseLock chan struct{}
144+
cancelTasks bool
145+
}
146+
147+
// loadFromDisk is not implemented in the blockingTracker.
148+
func (bt *producePrepareBlockingTracker) loadFromDisk(ledgerForTracker, basics.Round) error {
149+
return nil
150+
}
151+
152+
// newBlock is not implemented in the blockingTracker.
153+
func (bt *producePrepareBlockingTracker) newBlock(blk bookkeeping.Block, delta ledgercore.StateDelta) {
154+
}
155+
156+
// committedUpTo in the blockingTracker just stores the committed round.
157+
func (bt *producePrepareBlockingTracker) committedUpTo(committedRnd basics.Round) (minRound, lookback basics.Round) {
158+
return 0, basics.Round(0)
159+
}
160+
161+
func (bt *producePrepareBlockingTracker) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange {
162+
if bt.cancelTasks {
163+
return nil
164+
}
165+
166+
<-bt.produceReleaseLock
167+
return dcr
168+
}
169+
170+
// prepareCommit, is not used by the blockingTracker
171+
func (bt *producePrepareBlockingTracker) prepareCommit(*deferredCommitContext) error {
172+
bt.prepareCommitEntryLock <- struct{}{}
173+
<-bt.prepareCommitReleaseLock
174+
return nil
175+
}
176+
177+
// commitRound is not used by the blockingTracker
178+
func (bt *producePrepareBlockingTracker) commitRound(context.Context, *sql.Tx, *deferredCommitContext) error {
179+
return nil
180+
}
181+
182+
func (bt *producePrepareBlockingTracker) postCommit(ctx context.Context, dcc *deferredCommitContext) {
183+
}
184+
185+
// postCommitUnlocked implements entry/exit blockers, designed for testing.
186+
func (bt *producePrepareBlockingTracker) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
187+
}
188+
189+
// handleUnorderedCommit is not used by the blockingTracker
190+
func (bt *producePrepareBlockingTracker) handleUnorderedCommit(*deferredCommitContext) {
191+
}
192+
193+
// close is not used by the blockingTracker
194+
func (bt *producePrepareBlockingTracker) close() {
195+
}
196+
197+
func (bt *producePrepareBlockingTracker) reset() {
198+
bt.prepareCommitEntryLock = make(chan struct{})
199+
bt.prepareCommitReleaseLock = make(chan struct{})
200+
bt.prepareCommitReleaseLock = make(chan struct{})
201+
bt.cancelTasks = false
202+
}
203+
204+
// TestTrackerDbRoundDataRace checks for dbRound data race
205+
// when commit scheduling relies on dbRound from the tracker registry but tracker's deltas
206+
// are used in calculations
207+
// 1. Add say 128 + MaxAcctLookback (MaxLookback) blocks and commit
208+
// 2. Add 2*MaxLookback blocks without committing
209+
// 3. Set a block in prepareCommit, and initiate the commit
210+
// 4. Set a block in produceCommittingTask, add a new block and resume the commit
211+
// 5. Resume produceCommittingTask
212+
// 6. The data race and panic happens in block queue syncher thread
213+
func TestTrackerDbRoundDataRace(t *testing.T) {
214+
partitiontest.PartitionTest(t)
215+
216+
t.Skip("For manual run when touching ledger locking")
217+
218+
a := require.New(t)
219+
220+
genesisInitState, _ := ledgertesting.GenerateInitState(t, protocol.ConsensusCurrentVersion, 1)
221+
const inMem = true
222+
log := logging.TestingLog(t)
223+
log.SetLevel(logging.Warn)
224+
cfg := config.GetDefaultLocal()
225+
ledger, err := OpenLedger(log, t.Name(), inMem, genesisInitState, cfg)
226+
a.NoError(err, "could not open ledger")
227+
defer ledger.Close()
228+
229+
stallingTracker := &producePrepareBlockingTracker{
230+
// produceEntryLock: make(chan struct{}, 10),
231+
produceReleaseLock: make(chan struct{}),
232+
prepareCommitEntryLock: make(chan struct{}, 10),
233+
prepareCommitReleaseLock: make(chan struct{}),
234+
}
235+
ledger.trackerMu.Lock()
236+
ledger.trackers.mu.Lock()
237+
ledger.trackers.trackers = append([]ledgerTracker{stallingTracker}, ledger.trackers.trackers...)
238+
ledger.trackers.mu.Unlock()
239+
ledger.trackerMu.Unlock()
240+
241+
close(stallingTracker.produceReleaseLock)
242+
close(stallingTracker.prepareCommitReleaseLock)
243+
244+
targetRound := basics.Round(128) * 5
245+
blk := genesisInitState.Block
246+
for i := basics.Round(0); i < targetRound-1; i++ {
247+
blk.BlockHeader.Round++
248+
blk.BlockHeader.TimeStamp += int64(crypto.RandUint64() % 100 * 1000)
249+
err := ledger.AddBlock(blk, agreement.Certificate{})
250+
a.NoError(err)
251+
}
252+
blk.BlockHeader.Round++
253+
blk.BlockHeader.TimeStamp += int64(crypto.RandUint64() % 100 * 1000)
254+
err = ledger.AddBlock(blk, agreement.Certificate{})
255+
a.NoError(err)
256+
commitRoundNext(ledger)
257+
ledger.trackers.waitAccountsWriting()
258+
lookback := 320
259+
// lookback := cfg.MaxAcctLookback
260+
a.Equal(targetRound-basics.Round(lookback), ledger.trackers.dbRound)
261+
262+
// build up some non-committed queue
263+
stallingTracker.cancelTasks = true
264+
for i := targetRound; i < 2*targetRound; i++ {
265+
blk.BlockHeader.Round++
266+
blk.BlockHeader.TimeStamp += int64(crypto.RandUint64() % 100 * 1000)
267+
err := ledger.AddBlock(blk, agreement.Certificate{})
268+
a.NoError(err)
269+
}
270+
ledger.WaitForCommit(2*targetRound - 1)
271+
272+
stallingTracker.reset()
273+
var wg sync.WaitGroup
274+
wg.Add(1)
275+
go func() {
276+
commitRoundNext(ledger)
277+
wg.Done()
278+
}()
279+
280+
<-stallingTracker.prepareCommitEntryLock
281+
stallingTracker.produceReleaseLock = make(chan struct{})
282+
283+
blk.BlockHeader.Round++
284+
blk.BlockHeader.TimeStamp += int64(crypto.RandUint64() % 100 * 1000)
285+
err = ledger.AddBlock(blk, agreement.Certificate{})
286+
a.NoError(err)
287+
// the notifyCommit -> committedUpTo -> scheduleCommit chain
288+
// is called right after the cond var, so wait until that moment
289+
ledger.WaitForCommit(2 * targetRound)
290+
291+
// let the commit to complete
292+
close(stallingTracker.prepareCommitReleaseLock)
293+
wg.Wait()
294+
295+
// unblock the notifyCommit (scheduleCommit) goroutine
296+
stallingTracker.cancelTasks = true
297+
close(stallingTracker.produceReleaseLock)
298+
}

0 commit comments

Comments
 (0)