diff --git a/pkg/kv/kvserver/flow_control_replica_integration.go b/pkg/kv/kvserver/flow_control_replica_integration.go index 7f73a23d4e8b..c3f0db3e8a4d 100644 --- a/pkg/kv/kvserver/flow_control_replica_integration.go +++ b/pkg/kv/kvserver/flow_control_replica_integration.go @@ -455,19 +455,9 @@ func (n noopReplicaFlowControlIntegration) handle() (kvflowcontrol.Handle, bool) type replicaForRACv2 Replica -var _ replica_rac2.Replica = &replicaForRACv2{} +var _ replica_rac2.ReplicaForTesting = &replicaForRACv2{} -// RaftMuAssertHeld implements replica_rac2.Replica. -func (r *replicaForRACv2) RaftMuAssertHeld() { - r.raftMu.AssertHeld() -} - -// MuAssertHeld implements replica_rac2.Replica. -func (r *replicaForRACv2) MuAssertHeld() { - r.mu.AssertHeld() -} - -// IsScratchRange implements replica_rac2.Replica. +// IsScratchRange implements replica_rac2.ReplicaForTesting. func (r *replicaForRACv2) IsScratchRange() bool { return (*Replica)(r).IsScratchRange() } diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go index 1dd2fb8af8f4..995390afa537 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go @@ -26,17 +26,8 @@ import ( "github.com/cockroachdb/errors" ) -// Replica abstracts kvserver.Replica. It exposes internal implementation -// details of Replica, specifically the locking behavior, since it is -// essential to reason about correctness. -// -// TODO(sumeer): because the mutex assertions are hidden behind an interface, -// they are not free for production builds. Fix, and then add more assertions. -type Replica interface { - // RaftMuAssertHeld asserts that Replica.raftMu is held. - RaftMuAssertHeld() - // MuAssertHeld asserts that Replica.mu is held. - MuAssertHeld() +// ReplicaForTesting abstracts kvserver.Replica for testing. +type ReplicaForTesting interface { // IsScratchRange returns true if this is range is a scratch range (i.e. // overlaps with the scratch span and has a start key <= // keys.ScratchRangeMin). @@ -142,6 +133,37 @@ type RangeControllerFactory interface { New(ctx context.Context, state rangeControllerInitState) rac2.RangeController } +// ReplicaMutexAsserter must only be used to assert that mutexes are held. +// This is a concrete struct so that the assertions can be compiled away in +// production code. +type ReplicaMutexAsserter struct { + raftMu *syncutil.Mutex + replicaMu *syncutil.RWMutex +} + +func MakeReplicaMutexAsserter( + raftMu *syncutil.Mutex, replicaMu *syncutil.RWMutex, +) ReplicaMutexAsserter { + return ReplicaMutexAsserter{ + raftMu: raftMu, + replicaMu: replicaMu, + } +} + +// RaftMuAssertHeld asserts that Replica.raftMu is held. +// +// gcassert:inline +func (rmu ReplicaMutexAsserter) RaftMuAssertHeld() { + rmu.raftMu.AssertHeld() +} + +// ReplicaMuAssertHeld asserts that Replica.mu is held for writing. +// +// gcassert:inline +func (rmu ReplicaMutexAsserter) ReplicaMuAssertHeld() { + rmu.replicaMu.AssertHeld() +} + // ProcessorOptions are specified when creating a new Processor. type ProcessorOptions struct { // Various constant fields that are duplicated from Replica, since we @@ -154,7 +176,8 @@ type ProcessorOptions struct { RangeID roachpb.RangeID ReplicaID roachpb.ReplicaID - Replica Replica + ReplicaForTesting ReplicaForTesting + ReplicaMutexAsserter ReplicaMutexAsserter RaftScheduler RaftScheduler AdmittedPiggybacker AdmittedPiggybacker ACWorkQueue ACWorkQueue @@ -519,8 +542,8 @@ func (p *processorImpl) isLeaderUsingV2ProcLocked() bool { func (p *processorImpl) InitRaftLocked( ctx context.Context, rn rac2.RaftInterface, logMark rac2.LogMark, ) { - p.opts.Replica.RaftMuAssertHeld() - p.opts.Replica.MuAssertHeld() + p.opts.ReplicaMutexAsserter.RaftMuAssertHeld() + p.opts.ReplicaMutexAsserter.ReplicaMuAssertHeld() if p.desc.replicas != nil { log.Fatalf(ctx, "initializing RaftNode after replica is initialized") } @@ -530,7 +553,7 @@ func (p *processorImpl) InitRaftLocked( // OnDestroyRaftMuLocked implements Processor. func (p *processorImpl) OnDestroyRaftMuLocked(ctx context.Context) { - p.opts.Replica.RaftMuAssertHeld() + p.opts.ReplicaMutexAsserter.RaftMuAssertHeld() p.destroyed = true p.closeLeaderStateRaftMuLocked(ctx) // Release some memory. @@ -541,7 +564,7 @@ func (p *processorImpl) OnDestroyRaftMuLocked(ctx context.Context) { func (p *processorImpl) SetEnabledWhenLeaderRaftMuLocked( ctx context.Context, level kvflowcontrol.V2EnabledWhenLeaderLevel, state RaftNodeBasicState, ) { - p.opts.Replica.RaftMuAssertHeld() + p.opts.ReplicaMutexAsserter.RaftMuAssertHeld() if p.destroyed || p.enabledWhenLeader >= level { return } @@ -572,8 +595,8 @@ func descToReplicaSet(desc *roachpb.RangeDescriptor) rac2.ReplicaSet { func (p *processorImpl) OnDescChangedLocked( ctx context.Context, desc *roachpb.RangeDescriptor, tenantID roachpb.TenantID, ) { - p.opts.Replica.RaftMuAssertHeld() - p.opts.Replica.MuAssertHeld() + p.opts.ReplicaMutexAsserter.RaftMuAssertHeld() + p.opts.ReplicaMutexAsserter.ReplicaMuAssertHeld() initialization := p.desc.replicas == nil if initialization { // Replica is initialized, in that we now have a descriptor. @@ -754,7 +777,7 @@ func (p *processorImpl) createLeaderStateRaftMuLocked( func (p *processorImpl) HandleRaftReadyRaftMuLocked( ctx context.Context, state RaftNodeBasicState, e rac2.RaftEvent, ) { - p.opts.Replica.RaftMuAssertHeld() + p.opts.ReplicaMutexAsserter.RaftMuAssertHeld() // Register all snapshots / log appends without exception. If the replica is // being destroyed, this should be a no-op, but there is no harm in // registering the write just in case. @@ -786,7 +809,7 @@ func (p *processorImpl) HandleRaftReadyRaftMuLocked( p.maybeSendAdmittedRaftMuLocked(ctx) if rc := p.leader.rc; rc != nil { if knobs := p.opts.Knobs; knobs == nil || !knobs.UseOnlyForScratchRanges || - p.opts.Replica.IsScratchRange() { + p.opts.ReplicaForTesting.IsScratchRange() { if err := rc.HandleRaftEventRaftMuLocked(ctx, e); err != nil { log.Errorf(ctx, "error handling raft event: %v", err) } @@ -873,6 +896,7 @@ func (p *processorImpl) registerStorageAppendRaftMuLocked(ctx context.Context, e // AdmitRaftEntriesRaftMuLocked implements Processor. func (p *processorImpl) AdmitRaftEntriesRaftMuLocked(ctx context.Context, e rac2.RaftEvent) bool { + p.opts.ReplicaMutexAsserter.RaftMuAssertHeld() // Return false only if we're not destroyed and not using V2. if p.destroyed || !p.isLeaderUsingV2ProcLocked() { return p.destroyed @@ -1003,7 +1027,7 @@ func (p *processorImpl) EnqueuePiggybackedAdmittedAtLeader( // ProcessPiggybackedAdmittedAtLeaderRaftMuLocked implements Processor. func (p *processorImpl) ProcessPiggybackedAdmittedAtLeaderRaftMuLocked(ctx context.Context) { - p.opts.Replica.RaftMuAssertHeld() + p.opts.ReplicaMutexAsserter.RaftMuAssertHeld() if p.destroyed { return } @@ -1040,7 +1064,7 @@ func (p *processorImpl) ProcessPiggybackedAdmittedAtLeaderRaftMuLocked(ctx conte func (p *processorImpl) SideChannelForPriorityOverrideAtFollowerRaftMuLocked( info SideChannelInfoUsingRaftMessageRequest, ) { - p.opts.Replica.RaftMuAssertHeld() + p.opts.ReplicaMutexAsserter.RaftMuAssertHeld() if p.destroyed { return } @@ -1090,7 +1114,7 @@ func (p *processorImpl) AdmittedState() rac2.AdmittedVector { func (p *processorImpl) AdmitRaftMuLocked( ctx context.Context, replicaID roachpb.ReplicaID, av rac2.AdmittedVector, ) { - p.opts.Replica.RaftMuAssertHeld() + p.opts.ReplicaMutexAsserter.RaftMuAssertHeld() // NB: rc is always updated while raftMu is held. if rc := p.leader.rc; rc != nil { rc.AdmitRaftMuLocked(ctx, replicaID, av) @@ -1099,7 +1123,7 @@ func (p *processorImpl) AdmitRaftMuLocked( // MaybeSendPingsRaftMuLocked implements Processor. func (p *processorImpl) MaybeSendPingsRaftMuLocked() { - p.opts.Replica.RaftMuAssertHeld() + p.opts.ReplicaMutexAsserter.RaftMuAssertHeld() if rc := p.leader.rc; rc != nil { rc.MaybeSendPingsRaftMuLocked() } @@ -1107,8 +1131,8 @@ func (p *processorImpl) MaybeSendPingsRaftMuLocked() { // HoldsSendTokensLocked implements Processor. func (p *processorImpl) HoldsSendTokensLocked() bool { - p.opts.Replica.RaftMuAssertHeld() - p.opts.Replica.MuAssertHeld() + p.opts.ReplicaMutexAsserter.RaftMuAssertHeld() + p.opts.ReplicaMutexAsserter.ReplicaMuAssertHeld() if rc := p.leader.rc; rc != nil { return rc.HoldsSendTokensLocked() } @@ -1142,7 +1166,7 @@ func (p *processorImpl) AdmitForEval( func (p *processorImpl) ProcessSchedulerEventRaftMuLocked( ctx context.Context, mode rac2.RaftMsgAppMode, logSnapshot raft.LogSnapshot, ) { - p.opts.Replica.RaftMuAssertHeld() + p.opts.ReplicaMutexAsserter.RaftMuAssertHeld() if p.destroyed { return } @@ -1153,7 +1177,7 @@ func (p *processorImpl) ProcessSchedulerEventRaftMuLocked( // InspectRaftMuLocked implements Processor. func (p *processorImpl) InspectRaftMuLocked(ctx context.Context) (kvflowinspectpb.Handle, bool) { - p.opts.Replica.RaftMuAssertHeld() + p.opts.ReplicaMutexAsserter.RaftMuAssertHeld() p.leader.rcReferenceUpdateMu.RLock() defer p.leader.rcReferenceUpdateMu.RUnlock() if p.leader.rc == nil { diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go index 1d4db9902eb1..73c275fb301e 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go @@ -35,14 +35,13 @@ import ( ) type testReplica struct { - mu syncutil.RWMutex raftNode *testRaftNode b *strings.Builder leaseholder roachpb.ReplicaID } -var _ Replica = &testReplica{} +var _ ReplicaForTesting = &testReplica{} func newTestReplica(b *strings.Builder) *testReplica { return &testReplica{b: b} @@ -57,14 +56,6 @@ func (r *testReplica) initRaft(stable rac2.LogMark) { } } -func (r *testReplica) RaftMuAssertHeld() { - fmt.Fprintf(r.b, " Replica.RaftMuAssertHeld\n") -} - -func (r *testReplica) MuAssertHeld() { - fmt.Fprintf(r.b, " Replica.MuAssertHeld\n") -} - func (r *testReplica) IsScratchRange() bool { return true } @@ -249,6 +240,28 @@ func (c *testRangeController) SendStreamStats(stats *rac2.RangeSendStreamStats) fmt.Fprintf(c.b, " RangeController.SendStreamStats\n") } +func makeTestMutexAsserter() ReplicaMutexAsserter { + var raftMu syncutil.Mutex + var replicaMu syncutil.RWMutex + return MakeReplicaMutexAsserter(&raftMu, &replicaMu) +} + +func LockRaftMuAndReplicaMu(mu *ReplicaMutexAsserter) (unlockFunc func()) { + mu.raftMu.Lock() + mu.replicaMu.Lock() + return func() { + mu.replicaMu.Unlock() + mu.raftMu.Unlock() + } +} + +func LockRaftMu(mu *ReplicaMutexAsserter) (unlockFunc func()) { + mu.raftMu.Lock() + return func() { + mu.raftMu.Unlock() + } +} + func TestProcessorBasic(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -264,6 +277,7 @@ func TestProcessorBasic(t *testing.T) { var rcFactory testRangeControllerFactory var p *processorImpl tenantID := roachpb.MustMakeTenantID(4) + muAsserter := makeTestMutexAsserter() reset := func(enabled kvflowcontrol.V2EnabledWhenLeaderLevel) { b.Reset() r = newTestReplica(&b) @@ -276,7 +290,8 @@ func TestProcessorBasic(t *testing.T) { StoreID: 2, RangeID: 3, ReplicaID: replicaID, - Replica: r, + ReplicaForTesting: r, + ReplicaMutexAsserter: muAsserter, RaftScheduler: &sched, AdmittedPiggybacker: &piggybacker, ACWorkQueue: &q, @@ -311,10 +326,10 @@ func TestProcessorBasic(t *testing.T) { var mark rac2.LogMark d.ScanArgs(t, "log-term", &mark.Term) d.ScanArgs(t, "log-index", &mark.Index) - r.mu.Lock() r.initRaft(mark) + unlockFunc := LockRaftMuAndReplicaMu(&muAsserter) p.InitRaftLocked(ctx, r.raftNode, r.raftNode.mark) - r.mu.Unlock() + unlockFunc() return builderStr() case "set-raft-state": @@ -357,7 +372,9 @@ func TestProcessorBasic(t *testing.T) { return builderStr() case "on-destroy": + unlockFunc := LockRaftMu(&muAsserter) p.OnDestroyRaftMuLocked(ctx) + unlockFunc() return builderStr() case "set-enabled-level": @@ -372,7 +389,9 @@ func TestProcessorBasic(t *testing.T) { Leaseholder: r.leaseholder, } } + unlockFunc := LockRaftMu(&muAsserter) p.SetEnabledWhenLeaderRaftMuLocked(ctx, enabledLevel, state) + unlockFunc() return builderStr() case "get-enabled-level": @@ -382,7 +401,9 @@ func TestProcessorBasic(t *testing.T) { case "on-desc-changed": desc := parseRangeDescriptor(t, d) + unlockFunc := LockRaftMuAndReplicaMu(&muAsserter) p.OnDescChangedLocked(ctx, &desc, tenantID) + unlockFunc() return builderStr() case "handle-raft-ready-and-admit": @@ -409,6 +430,7 @@ func TestProcessorBasic(t *testing.T) { Leaseholder: r.leaseholder, } } + unlockFunc := LockRaftMu(&muAsserter) p.HandleRaftReadyRaftMuLocked(ctx, state, event) fmt.Fprintf(&b, ".....\n") if len(event.Entries) > 0 { @@ -417,6 +439,7 @@ func TestProcessorBasic(t *testing.T) { fmt.Fprintf(&b, "destroyed-or-leader-using-v2: %t\n", destroyedOrV2) printLogTracker() } + unlockFunc() return builderStr() case "enqueue-piggybacked-admitted": @@ -440,7 +463,9 @@ func TestProcessorBasic(t *testing.T) { return builderStr() case "process-piggybacked-admitted": + unlockFunc := LockRaftMu(&muAsserter) p.ProcessPiggybackedAdmittedAtLeaderRaftMuLocked(ctx) + unlockFunc() return builderStr() case "side-channel": @@ -464,7 +489,9 @@ func TestProcessorBasic(t *testing.T) { Last: last, LowPriOverride: lowPriOverride, } + unlockFunc := LockRaftMu(&muAsserter) p.SideChannelForPriorityOverrideAtFollowerRaftMuLocked(info) + unlockFunc() return builderStr() case "admitted-log-entry": @@ -502,7 +529,9 @@ func TestProcessorBasic(t *testing.T) { return builderStr() case "inspect": + unlockFunc := LockRaftMu(&muAsserter) p.InspectRaftMuLocked(ctx) + unlockFunc() return builderStr() case "send-stream-stats": diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor index 9bf6748d3390..5f28570d93dc 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor @@ -12,7 +12,6 @@ enabled-level: not-enabled on-destroy ---- - Replica.RaftMuAssertHeld # AdmitForEval returns false since there is no RangeController. admit-for-eval pri=low-pri @@ -28,7 +27,6 @@ admitted: false err: handle-raft-ready-and-admit ---- HandleRaftReady: - Replica.RaftMuAssertHeld ..... reset @@ -42,7 +40,6 @@ enabled-level: not-enabled # Can use v1 encoding and v2 protocol, if become leader. set-enabled-level enabled-level=v1-encoding ---- - Replica.RaftMuAssertHeld get-enabled-level ---- @@ -52,8 +49,6 @@ enabled-level: v1-encoding # of the log. The leader and leaseholder are both on replica-id 10. init-raft log-term=40 log-index=23 ---- - Replica.RaftMuAssertHeld - Replica.MuAssertHeld set-raft-state term=50 leader=10 leaseholder=10 ---- @@ -63,15 +58,12 @@ Raft: term: 50 leader: 10 leaseholder: 10 mark: {Term:40 Index:23} next-unstable handle-raft-ready-and-admit ---- HandleRaftReady: - Replica.RaftMuAssertHeld ..... # The processor is provided a range descriptor that does not include itself or # the leader. This is acceptable. on-desc-changed replicas=n11/s11/11 ---- - Replica.RaftMuAssertHeld - Replica.MuAssertHeld # Raft is about to send us a newly appended entry 24. set-raft-state log-term=50 log-index=24 next-unstable-index=25 @@ -83,7 +75,6 @@ Raft: term: 50 leader: 10 leaseholder: 10 mark: {Term:50 Index:24} next-unstable handle-raft-ready-and-admit entries=v1/i24/t45/pri0/time2/len100 leader-term=50 ---- HandleRaftReady: - Replica.RaftMuAssertHeld ..... AdmitRaftEntries: destroyed-or-leader-using-v2: false @@ -97,14 +88,12 @@ Raft: term: 50 leader: 10 leaseholder: 10 mark: {Term:50 Index:25} next-unstable # Told that the leader is using v2. And that [25,25] has no low-pri override. side-channel v2 leader-term=50 first=25 last=25 ---- - Replica.RaftMuAssertHeld # The index 25 entry is v1 encoded, so by default it is low-pri. Admitted vector # does not advance, but its initial value is sent to the new leader. handle-raft-ready-and-admit entries=v1/i25/t45/pri0/time2/len100 leader-term=50 ---- HandleRaftReady: - Replica.RaftMuAssertHeld ..... AdmitRaftEntries: ACWorkQueue.Admit({StoreID:2 TenantID:4 Priority:low-pri CreateTime:2 RequestedCount:100 Ingested:false RangeID:3 ReplicaID:5 CallbackState:{Mark:{Term:50 Index:25} Priority:LowPri}}) = true @@ -131,7 +120,6 @@ LowPri: {Term:50 Index:25} handle-raft-ready-and-admit ---- HandleRaftReady: - Replica.RaftMuAssertHeld Piggybacker.Add(n11, [r3,s11,5->11] admitted=t50/[24 25 25 25]) ..... @@ -143,13 +131,11 @@ Raft: term: 50 leader: 11 leaseholder: 10 mark: {Term:50 Index:26} next-unstable # Side channel for entries [26, 26] with no low-pri override. side-channel v2 leader-term=50 first=26 last=26 ---- - Replica.RaftMuAssertHeld # The index 26 entry uses v2 and is using pri=2, which is AboveNormalPri. handle-raft-ready-and-admit entries=v2/i26/t45/pri2/time2/len100 leader-term=50 ---- HandleRaftReady: - Replica.RaftMuAssertHeld ..... AdmitRaftEntries: ACWorkQueue.Admit({StoreID:2 TenantID:4 Priority:user-high-pri CreateTime:2 RequestedCount:100 Ingested:false RangeID:3 ReplicaID:5 CallbackState:{Mark:{Term:50 Index:26} Priority:AboveNormalPri}}) = true @@ -162,7 +148,6 @@ AboveNormalPri: {Term:50 Index:26} handle-raft-ready-and-admit ---- HandleRaftReady: - Replica.RaftMuAssertHeld ..... # Stable index is advanced, which should allow some priorities to advance @@ -179,7 +164,6 @@ AboveNormalPri: {Term:50 Index:26} handle-raft-ready-and-admit ---- HandleRaftReady: - Replica.RaftMuAssertHeld Piggybacker.Add(n11, [r3,s11,5->11] admitted=t50/[24 26 25 26]) ..... @@ -195,7 +179,6 @@ AboveNormalPri: {Term:50 Index:26} handle-raft-ready-and-admit ---- HandleRaftReady: - Replica.RaftMuAssertHeld Piggybacker.Add(n11, [r3,s11,5->11] admitted=t50/[26 26 25 26]) ..... @@ -207,13 +190,11 @@ Raft: term: 50 leader: 11 leaseholder: 10 mark: {Term:50 Index:27} next-unstable # Side channel for entries [27,27] indicate a low-pri override. side-channel v2 leader-term=50 first=27 last=27 low-pri ---- - Replica.RaftMuAssertHeld # The index 27 entry is marked AboveNormalPri, but will be treated as LowPri. handle-raft-ready-and-admit entries=v2/i27/t45/pri2/time2/len100 leader-term=50 ---- HandleRaftReady: - Replica.RaftMuAssertHeld ..... AdmitRaftEntries: ACWorkQueue.Admit({StoreID:2 TenantID:4 Priority:low-pri CreateTime:2 RequestedCount:100 Ingested:false RangeID:3 ReplicaID:5 CallbackState:{Mark:{Term:50 Index:27} Priority:LowPri}}) = true @@ -237,7 +218,6 @@ LowPri: {Term:50 Index:27} handle-raft-ready-and-admit ---- HandleRaftReady: - Replica.RaftMuAssertHeld Piggybacker.Add(n11, [r3,s11,5->11] admitted=t50/[26 26 26 26]) ..... @@ -252,7 +232,6 @@ Raft: term: 51 leader: 11 leaseholder: 10 mark: {Term:50 Index:27} next-unstable # mind. side-channel v1 leader-term=51 first=27 last=27 ---- - Replica.RaftMuAssertHeld # Stable index advanced to 27, as well as all admitted indices except LowPri. synced-log term=50 index=27 @@ -264,7 +243,6 @@ LowPri: {Term:50 Index:27} handle-raft-ready-and-admit ---- HandleRaftReady: - Replica.RaftMuAssertHeld ..... # A new entry at index 27 overwrites the previous one, and regresses the stable @@ -272,7 +250,6 @@ HandleRaftReady: handle-raft-ready-and-admit entries=v1/i27/t46/pri0/time2/len100 leader-term=51 ---- HandleRaftReady: - Replica.RaftMuAssertHeld ..... AdmitRaftEntries: destroyed-or-leader-using-v2: false @@ -286,13 +263,11 @@ LogTracker [+dirty]: mark:{Term:51 Index:27}, stable:26, admitted:[26 26 26 26] # Same leader switches to v2. side-channel v2 leader-term=51 first=27 last=27 ---- - Replica.RaftMuAssertHeld # Admitted vector is now sent to the leader. handle-raft-ready-and-admit ---- HandleRaftReady: - Replica.RaftMuAssertHeld Piggybacker.Add(n11, [r3,s11,5->11] admitted=t51/[26 26 26 26]) ..... @@ -303,7 +278,6 @@ enqueue-piggybacked-admitted from=25 to=5 term=50 index=24 pri=0 # Noop. process-piggybacked-admitted ---- - Replica.RaftMuAssertHeld # Local replica is becoming the leader. set-raft-state term=52 leader=5 @@ -312,8 +286,6 @@ Raft: term: 52 leader: 5 leaseholder: 10 mark: {Term:50 Index:27} next-unstable: on-desc-changed replicas=n11/s11/11,n1/s2/5 ---- - Replica.RaftMuAssertHeld - Replica.MuAssertHeld RaftScheduler.EnqueueRaftReady(rangeID=3) set-raft-state log-term=51 log-index=28 next-unstable-index=29 @@ -324,7 +296,6 @@ Raft: term: 52 leader: 5 leaseholder: 10 mark: {Term:51 Index:28} next-unstable: handle-raft-ready-and-admit entries=v1/i28/t46/pri0/time2/len100 leader-term=52 ---- HandleRaftReady: - Replica.RaftMuAssertHeld RangeControllerFactory.New(replicaSet=[(n1,s2):5,(n11,s11):11], leaseholder=10, nextRaftIndex=28) RangeController.AdmitRaftMuLocked(5, term:52, admitted:[LowPri:26,NormalPri:26,AboveNormalPri:26,HighPri:26]) RangeController.HandleRaftEventRaftMuLocked([28]) @@ -377,7 +348,6 @@ LogTracker: mark:{Term:52 Index:28}, stable:26, admitted:[26 26 26 26] handle-raft-ready-and-admit ---- HandleRaftReady: - Replica.RaftMuAssertHeld RangeController.HandleRaftEventRaftMuLocked([]) ..... @@ -391,7 +361,6 @@ LogTracker [+dirty]: mark:{Term:52 Index:28}, stable:28, admitted:[28 28 28 28] handle-raft-ready-and-admit ---- HandleRaftReady: - Replica.RaftMuAssertHeld RangeController.AdmitRaftMuLocked(5, term:52, admitted:[LowPri:28,NormalPri:28,AboveNormalPri:28,HighPri:28]) RangeController.HandleRaftEventRaftMuLocked([]) ..... @@ -407,13 +376,11 @@ enqueue-piggybacked-admitted from=25 to=5 term=52 index=25 pri=2 # Process it. process-piggybacked-admitted ---- - Replica.RaftMuAssertHeld RangeController.AdmitRaftMuLocked(25, term:52, admitted:[LowPri:24,NormalPri:0,AboveNormalPri:25,HighPri:0]) # Noop. process-piggybacked-admitted ---- - Replica.RaftMuAssertHeld # We are still the leader, now at a new term. set-raft-state term=53 @@ -424,7 +391,6 @@ Raft: term: 53 leader: 5 leaseholder: 10 mark: {Term:51 Index:28} next-unstable: handle-raft-ready-and-admit ---- HandleRaftReady: - Replica.RaftMuAssertHeld RangeController.CloseRaftMuLocked RangeControllerFactory.New(replicaSet=[(n1,s2):5,(n11,s11):11], leaseholder=10, nextRaftIndex=29) RangeController.HandleRaftEventRaftMuLocked([]) @@ -432,14 +398,11 @@ HandleRaftReady: on-desc-changed replicas=n11/s11/11,n1/s2/5,n13/s13/13 ---- - Replica.RaftMuAssertHeld - Replica.MuAssertHeld RaftScheduler.EnqueueRaftReady(rangeID=3) handle-raft-ready-and-admit ---- HandleRaftReady: - Replica.RaftMuAssertHeld RangeController.SetReplicasRaftMuLocked([(n1,s2):5,(n11,s11):11,(n13,s13):13]) RangeController.SetLeaseholderRaftMuLocked(10) RangeController.HandleRaftEventRaftMuLocked([]) @@ -447,20 +410,17 @@ HandleRaftReady: on-destroy ---- - Replica.RaftMuAssertHeld RangeController.CloseRaftMuLocked # Noop, since destroyed. handle-raft-ready-and-admit ---- HandleRaftReady: - Replica.RaftMuAssertHeld ..... # Noop, since destroyed. set-enabled-level enabled-level=v2-encoding ---- - Replica.RaftMuAssertHeld # Noop. get-enabled-level @@ -474,18 +434,15 @@ enqueue-piggybacked-admitted from=25 to=5 term=52 index=24 pri=0 # Noop. process-piggybacked-admitted ---- - Replica.RaftMuAssertHeld # Noop. side-channel v2 leader-term=53 first=29 last=29 ---- - Replica.RaftMuAssertHeld # Noop. handle-raft-ready-and-admit entries=v1/i29/t45/pri0/time2/len100 leader-term=53 ---- HandleRaftReady: - Replica.RaftMuAssertHeld ..... AdmitRaftEntries: destroyed-or-leader-using-v2: true @@ -507,8 +464,6 @@ enabled-level: not-enabled init-raft log-term=50 log-index=24 ---- - Replica.RaftMuAssertHeld - Replica.MuAssertHeld set-raft-state term=50 leader=5 leaseholder=5 ---- @@ -518,19 +473,15 @@ Raft: term: 50 leader: 5 leaseholder: 5 mark: {Term:50 Index:24} next-unstable: handle-raft-ready-and-admit ---- HandleRaftReady: - Replica.RaftMuAssertHeld ..... # Descriptor does not contain self, but that is ok. on-desc-changed replicas=n11/s11/11,n13/s13/13 ---- - Replica.RaftMuAssertHeld - Replica.MuAssertHeld handle-raft-ready-and-admit ---- HandleRaftReady: - Replica.RaftMuAssertHeld ..... set-raft-state log-term=50 log-index=25 next-unstable-index=26 @@ -541,7 +492,6 @@ Raft: term: 50 leader: 5 leaseholder: 5 mark: {Term:50 Index:25} next-unstable: handle-raft-ready-and-admit entries=v1/i25/t45/pri0/time2/len100 leader-term=50 ---- HandleRaftReady: - Replica.RaftMuAssertHeld ..... AdmitRaftEntries: destroyed-or-leader-using-v2: false @@ -550,7 +500,6 @@ LogTracker: mark:{Term:50 Index:25}, stable:24, admitted:[24 24 24 24] # RangeController is created. set-enabled-level enabled-level=v1-encoding ---- - Replica.RaftMuAssertHeld RangeControllerFactory.New(replicaSet=[(n11,s11):11,(n13,s13):13], leaseholder=5, nextRaftIndex=26) set-raft-state log-term=50 log-index=26 next-unstable-index=27 @@ -561,7 +510,6 @@ Raft: term: 50 leader: 5 leaseholder: 5 mark: {Term:50 Index:26} next-unstable: handle-raft-ready-and-admit entries=v1/i26/t45/pri0/time2/len100 leader-term=50 ---- HandleRaftReady: - Replica.RaftMuAssertHeld RangeController.HandleRaftEventRaftMuLocked([26]) ..... AdmitRaftEntries: @@ -579,7 +527,6 @@ LogTracker: mark:{Term:50 Index:26}, stable:24, admitted:[24 24 24 24] handle-raft-ready-and-admit ---- HandleRaftReady: - Replica.RaftMuAssertHeld RangeController.HandleRaftEventRaftMuLocked([]) ..... @@ -591,7 +538,6 @@ LogTracker [+dirty]: mark:{Term:50 Index:26}, stable:26, admitted:[26 26 26 26] handle-raft-ready-and-admit ---- HandleRaftReady: - Replica.RaftMuAssertHeld RangeController.AdmitRaftMuLocked(5, term:50, admitted:[LowPri:26,NormalPri:26,AboveNormalPri:26,HighPri:26]) RangeController.HandleRaftEventRaftMuLocked([]) ..... @@ -604,7 +550,6 @@ Raft: term: 50 leader: 5 leaseholder: 5 mark: {Term:50 Index:27} next-unstable: handle-raft-ready-and-admit entries=none/i27/t45/pri0/time2/len100 leader-term=50 ---- HandleRaftReady: - Replica.RaftMuAssertHeld RangeController.HandleRaftEventRaftMuLocked([27]) ..... AdmitRaftEntries: @@ -619,14 +564,12 @@ LogTracker [+dirty]: mark:{Term:50 Index:27}, stable:27, admitted:[27 27 27 27] handle-raft-ready-and-admit ---- HandleRaftReady: - Replica.RaftMuAssertHeld RangeController.AdmitRaftMuLocked(5, term:50, admitted:[LowPri:27,NormalPri:27,AboveNormalPri:27,HighPri:27]) RangeController.HandleRaftEventRaftMuLocked([]) ..... inspect ---- - Replica.RaftMuAssertHeld RangeController.InspectRaftMuLocked send-stream-stats @@ -641,6 +584,5 @@ Raft: term: 51 leader: 0 leaseholder: 5 mark: {Term:50 Index:27} next-unstable: handle-raft-ready-and-admit ---- HandleRaftReady: - Replica.RaftMuAssertHeld RangeController.CloseRaftMuLocked ..... diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index 193f98f9e57c..f58f8f9a60ee 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -32,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" @@ -232,11 +233,13 @@ func newUninitializedReplicaWithoutRaftGroup( r.raftMu.msgAppScratchForFlowControl = map[roachpb.ReplicaID][]raftpb.Message{} r.raftMu.replicaStateScratchForFlowControl = map[roachpb.ReplicaID]rac2.ReplicaStateInfo{} r.flowControlV2 = replica_rac2.NewProcessor(replica_rac2.ProcessorOptions{ - NodeID: store.NodeID(), - StoreID: r.StoreID(), - RangeID: r.RangeID, - ReplicaID: r.replicaID, - Replica: (*replicaForRACv2)(r), + NodeID: store.NodeID(), + StoreID: r.StoreID(), + RangeID: r.RangeID, + ReplicaID: r.replicaID, + ReplicaForTesting: (*replicaForRACv2)(r), + ReplicaMutexAsserter: replica_rac2.MakeReplicaMutexAsserter( + &r.raftMu.Mutex, (*syncutil.RWMutex)(&r.mu.ReplicaMutex)), RaftScheduler: r.store.scheduler, AdmittedPiggybacker: r.store.cfg.KVFlowAdmittedPiggybacker, ACWorkQueue: r.store.cfg.KVAdmissionController, diff --git a/pkg/testutils/lint/gcassert_paths.txt b/pkg/testutils/lint/gcassert_paths.txt index 2bdfac932b5b..6bc7b25228e2 100644 --- a/pkg/testutils/lint/gcassert_paths.txt +++ b/pkg/testutils/lint/gcassert_paths.txt @@ -5,6 +5,7 @@ kv/kvclient/kvcoord kv/kvclient/rangecache kv/kvpb kv/kvserver/intentresolver +kv/kvserver/kvflowcontrol/replica_rac2 kv/kvserver/rangefeed roachpb sql/catalog/descs